Streaming C++ API
StreamBaseClient.hpp
1// Copyright (c) 2004-2023 TIBCO Software Inc. All rights reserved.
2
3#ifndef STREAMBASE_CLIENT_H
4#define STREAMBASE_CLIENT_H
5
6#include "StreamBase.hpp"
7#include "StreamBaseVersion.hpp"
8#include "Exceptions.hpp"
9#include "Schema.hpp"
10#include "Field.hpp"
11#include "StreamProperties.hpp"
12#include "StreamBaseEntityType.hpp"
13#include "StreamBaseURI.hpp"
14#include "TupleList.hpp"
15#include "DequeueResult.hpp"
16#include "ClientSettings.hpp"
17
18SB_INTERNAL_FWD(StreamBaseClientImpl)
19
20SB_NAMESPACE_BEGIN;
21
22/// The StreamBase client API. Connects to an StreamBase node over the network,
23/// via XML/RPC and the tuple wire protocol. With the exception of the close() method,
24/// the StreamBaseClient object is not thread safe, so access to a single object
25/// needs to be synchronized between threads. If multiple threads subscribe to streams,
26/// it is recommended that they use separate instances of this class.
27///
29 public:
30 static const unsigned int DEFAULT_FLUSH_INTERVAL = 250;
31
32#ifndef DOXYGEN_SKIP
33 /// This tells the rest of the API that tuples want to remain in serialized form,
34 /// ostensibly for consumption by the .NET API.
35 static bool useRawTuples;
36#endif
37
38 /// Creates an StreamBase client and establishes a connection to a remote
39 /// server. Optional parameter buffer_size specifies the number of tuples
40 /// to buffer before enqueueing. Optional parameter flush_interval specifies
41 /// the interval in milliseconds between wakeups of the wakeAndFlushBuffer.
42 /// The wakeAndFlushBuffer thread is only started if buffer_size > 0.
43 StreamBaseClient(const StreamBaseURI& uri = StreamBaseURI::fromEnvironment(),
44 int buffer_size = 0,
45 int flush_interval=DEFAULT_FLUSH_INTERVAL);
46
47 StreamBaseClient(const std::string& uris,
48 int buffer_size = 0,
49 int flush_interval=DEFAULT_FLUSH_INTERVAL);
50
51 StreamBaseClient(const std::vector<StreamBaseURI>& uris,
52 int buffer_size = 0,
53 int flush_interval=DEFAULT_FLUSH_INTERVAL);
54
55 StreamBaseClient(const std::vector<StreamBaseURI>& uris,
56 sb::ClientSettings &settings,
57 int buffer_size = 0,
58 int flush_interval=DEFAULT_FLUSH_INTERVAL);
59
60 /// Destroys a session.
62
63 /// Flags for the listEntities call
64 /// This must be consistent with what is found on the Java side of the
65 /// house in StreamBaseClient.java
67 /// set FULLY_QUALIFIED_NAMES if you want to
68 /// include container names for all entities. If not set then the returned
69 /// names will contain the module name (if any) and the name of the entity.
70 FULLY_QUALIFIED_NAMES = 1,
71 /// set INCLUDE_MODULES if you want to include all modules in the output
72 INCLUDE_MODULES = 2,
73 /// set ALL_CONTAINERS if you want to include all
74 /// user containers in the output. This does not include the "system"
75 /// container.
76 ALL_CONTAINERS = 4 | FULLY_QUALIFIED_NAMES
77 };
78
79 /// Lists all entities of a particular type. (One can then use
80 /// "describe" to obtain a description of an entity.) Names is
81 /// cleared first. Use container.entity-type to resolve across containers.
82 /// use the given flags to list the entities
83 void listEntities(const std::string &type, int flags, std::vector<std::string>& names);
84
85 /// Lists all entities of a particular type. (One can then use
86 /// "describe" to obtain a description of an entity.) names is
87 /// cleared first.
88 /// use the given flags to list the entities
89 void listEntities(StreamBaseEntityType::Type type, int flags, std::vector<std::string>& names);
90
91 /// Lists all entities of a particular type. (One can then use
92 /// "describe" to obtain a description of an entity.) names is
93 /// cleared first.
94 void listEntities(StreamBaseEntityType::Type type, std::vector<std::string>& names);
95
96 /// Returns an XML description of an entity, or an empty string if
97 /// the method does not exist.
98 std::string describe(const std::string &entity);
99
100#ifndef DOXYGEN_SKIP
101 /// Returns true if the license for the specified feature is available and false otherwise.
102 bool checkLicense(const std::string &featureName);
103#endif
104
105 /// Returns true if this stream has been subscribed to.
106 /// @param entity the path of the stream
107 bool isStreamSubscribed(const std::string &entity);
108
109 /// Returns a description of a stream, throwing an exception if the
110 /// stream does not exist.
111 /// @param entity the path of the stream
112 /// @param strategy how to handle capture fields on the stream
113 StreamProperties getStreamProperties(const std::string &entity,
114 StreamProperties::CaptureTransformStrategy strategy=StreamProperties::FLATTEN);
115
116 /// Returns a description of a stream, throwing an exception if the
117 /// stream does not exist.
119
120 /// Returns the schema of a stream, throwing an exception if the
121 /// stream does not exist.
122 Schema getSchemaForStream(const std::string &entity,
123 StreamProperties::CaptureTransformStrategy strategy=StreamProperties::FLATTEN);
124
125 /// Typechecks a potential modification to the application, outputting
126 /// properties of all streams in the application. streams is cleared
127 /// first.
128 /// @param sbapp the application
129 /// @param streams the schema defs of the streams
130 /// @param full do a full typecheck
131 void typecheck(const std::string &sbapp,
132 std::map<std::string, Schema>& streams,
133 bool full = false);
134
135 /// Subscribes to a stream.
136 ///
137 /// Uses the default capture transform strategy of flatten. To specify
138 /// capture strategy, use getStreamProperties(stream_name,
139 /// StreamProperties::NEST) and subscribe using the resultant
140 /// StreamProperties.
141 ///
142 /// @return the StreamProperties for the stream
143 StreamProperties subscribe(const std::string &stream_name);
144 /// Subscribes to a stream.
145 /// @return the StreamProperties for the stream
147
148 /// Subscribes to a stream with a predicate to apply to output tuples.
149 /// The stream name of dequeued tuples is logical_stream.
150 /// When unsubscribing, use logical_stream.
151 ///
152 /// Uses the default capture transform strategy of flatten. To specify
153 /// capture strategy, use getStreamProperties(stream_name,
154 /// StreamProperties::NEST) and subscribe using the resultant
155 /// StreamProperties.
156 ///
157 /// @param stream_name the stream to subscribe to, error if empty
158 /// @param logical_stream the name of the logical stream to associate with this predicate (if empty, defaults to streamname)
159 /// @param predicate a predicate to apply to subset the stream, error if empty
160 /// @return the StreamProperties for the stream
161 StreamProperties subscribe(const std::string &stream_name, const std::string &logical_stream, const std::string &predicate);
162
163 /// Subscribes to a stream with a predicate to apply to output tuples.
164 /// The stream name of dequeued tuples is logical_stream.
165 /// When unsubscribing, use logical_stream.
166 /// @param props the stream to subscribe to, error if empty
167 /// @param logical_stream the name of the logical stream to associate with this predicate (if empty, defaults to streamname)
168 /// @param predicate a predicate to apply to subset the stream, error if empty
169 /// @return the StreamProperties for the stream
170 StreamProperties subscribe(const StreamProperties &props, const std::string &logical_stream, const std::string &predicate);
171
172 /// Resubscribes to an already subscribed stream
173 ///
174 /// Uses the default capture transform strategy of flatten. To specify
175 /// capture strategy, use getStreamProperties(stream_name,
176 /// StreamProperties::NEST) and subscribe using the resultant
177 /// StreamProperties.
178 ///
179 /// @return the StreamProperties for the stream
180 StreamProperties resubscribe(const std::string &stream_name, const std::string &logical_stream, const std::string &predicate);
181
182 /// Resubscribes to an already subscribed stream
183 /// @return the StreamProperties for the stream
184 StreamProperties resubscribe(const StreamProperties &props, const std::string &logical_stream, const std::string &predicate);
185
186 /// UnSubscribes to a stream. Note: Any tuples that are in-flight during
187 /// an unsubscribe request will be dequeued until the stream is fully drained.
188 void unsubscribe(const std::string &stream_name);
189
190 /// UnSubscribes to a stream. Note: Any tuples that are in-flight during
191 /// an unsubscribe request will be dequeued until the stream is fully drained.
192 void unsubscribe(const StreamProperties &props);
193
194#ifndef SWIG
195 ///
196 /// Set the dequeue results interceptor for this client connection.
197 ///
198 /// This results interceptor replaces any existing results processor. To
199 /// disable pre-processing of results, set the processor to null.
200 ///
201 /// This method cannot be safely called while another thread is calling
202 /// dequeue().
203 ///
204 /// NOTE: Once the DequeueResult.Interceptor is passed to StreamBaseClient,
205 /// StreamBaseClient will manage the lifecycle of the object. StreamBaseClient
206 /// will delete the object when the interceptor is reset or when the StreamBaseClient
207 /// is deleted.
208 /// Do not delete this passed object!!
210
211 ///
212 /// Get the current dequeue results interceptor, or null if there is no
213 /// current processor.
214 ///
216
217 /// Dequeue tuples from any subscribed stream. This method
218 /// blocks until
219 ///
220 /// - at least one tuple is available on any subscribed stream, or
221 /// - the node is shut down.
222 ///
223 /// @deprecated Use DequeueResult based dequeue
224 /// @param streamProperties (output) set to the StreamProperties of the stream from
225 /// which tuples have been dequeued
226 /// @return a list of the tuples dequeued; empty if the node has been
227 /// shut down
229
230 /// Dequeue tuples from any subscribed stream. This method
231 /// blocks until
232 ///
233 /// - at least one tuple is available on any subscribed stream, or
234 /// - the node is shut down.
235 ///
236 /// @deprecated Use DequeueResult based dequeue
237 /// @param stream_name (output) set to the name of the stream from
238 /// which tuples have been dequeued. Note this does not contain the container name
239 /// @return a list of the tuples dequeued; empty if the node has been
240 /// shut down
241 TupleList dequeue(std::string &stream_name);
242
243 /// Dequeue tuples from any subscribed stream. This method
244 /// blocks until
245 ///
246 /// - At least one tuple is available on any subscribed stream (DequeueResult will have a status of GOOD)
247 /// - The StreamBase server the client is connected to is shut down (DequeueResult will have a status of CLOSED)
248 /// - A network error occurs (StreamBaseException will be thrown)
249 ///
250 /// @return a DequeueResult containing the results of the operation
252
253 /// Dequeue tuples from any subscribed stream. This method
254 /// blocks until
255 ///
256 /// - At least one tuple is available on any subscribed stream (DequeueResult will have a status of GOOD)
257 /// - The StreamBase server the client is connected to is shut down (DequeueResult will have a status of CLOSED)
258 /// - The number of Milliseconds specified has elapsed (DequeueResult will have a status of TIMEOUT)
259 /// - A network error occurs (StreamBaseException will be thrown)
260 ///
261 /// A timeout_ms of zero will block indefinitely, or until a tuple arrives.
262 /// Note that the actual dequeue timeout may vary based on normal thread scheduling,
263 /// plus network interactions with the server.
264 ///
265 /// @return a DequeueResult containing the results of the operation
266 DequeueResult dequeue(int timeout_ms);
267
268 ///
269 /// Enqueue tuples to a stream. The stream must not be tied to
270 /// the output of any operator in the application. This method can block
271 /// depending on network, or StreamBase server, congestion.
272 ///
273 /// Note: enqueue() will modify the tuple as it is enqueued. If you
274 /// do not want the tuple modified you must make a copy of it before
275 /// calling enqueue.
276 ///
277 /// @deprecated Use StreamProperties based enqueue
278 /// @param stream_name the name of the stream to which to enqueue
279 /// @param tuple a tuple to enqueue
280 void enqueue(const std::string &stream_name, Tuple& tuple) ;
281
282 ///
283 /// Enqueue tuples to a stream. The stream must not be tied to
284 /// the output of any operator in the application. This method can block
285 /// depending on network, or StreamBase server, congestion.
286 ///
287 /// Note: enqueue() will modify the tuple as it is enqueued. If you
288 /// do not want the tuple modified you must make a copy of it before
289 /// calling enqueue.
290 ///
291 /// @param props the StreamProperties to enqueue the tuple to
292 /// @param tuple a tuple to enqueue
293 void enqueue(const StreamProperties &props, Tuple& tuple) ;
294
295 /// Enqueue tuples to a stream. The stream must not be tied to
296 /// the output of any operator in the application. This method can block
297 /// depending on network, or StreamBase server, congestion.
298 ///
299 /// Note: enqueue() will modify the tuples as they are enqueued. If you
300 /// do not want the tuples modified you must make copies before
301 /// calling enqueue.
302 ///
303 /// @deprecated Use StreamProperties based enqueue
304 /// @param stream_name the name of the stream to which to enqueue
305 /// @param tuples a list of tuples to enqueue
306 void enqueue(const std::string &stream_name, TupleList& tuples) ;
307
308 /// Enqueue tuples to a stream. The stream must not be tied to
309 /// the output of any operator in the application. This method can block
310 /// depending on network, or StreamBase server, congestion.
311 ///
312 /// Note: enqueue() will modify the tuples as they are enqueued. If you
313 /// do not want the tuples modified you must make copies before
314 /// calling enqueue.
315 ///
316 /// @param props the StreamProperties to enqueue the tuples to
317 /// @param tuples a list of tuples to enqueue
318 void enqueue(const StreamProperties &props, TupleList& tuples) ;
319#endif
320
321 /// Returns a schema with a particular hash.
322 Schema getSchemaByHash(const std::string &hash);
323
324 /// Returns a schema by name. This will only succeed for named schemas;
325 /// anonymous schemas assigned to a port should instead be looked up
326 /// via getStreamProperties().getSchema().
327 Schema getSchemaByName(const std::string &name);
328
329 /// status of java operators and adapters
330 void operatorStatus(const std::string &containerName, std::vector<std::string> &aStatus);
331
332 /// status the streambase daemons
333 void status(std::vector<std::string> &aStatus, bool verbose = false);
334
335 /// Return the URI we're talking to.
336 const StreamBaseURI& getURI() const;
337
338 const std::vector<StreamBaseURI>& getURIs() const;
339
340 /// Terminate the client. May be invoked from a different thread.
341 /// StreamBaseClient memory, network, and thread resources are not released
342 /// until close() is called.
343 ///
344 /// Returns immediately.
345 void close();
346
347 /// Return true if the client connection is closed.
348 /// The client connection can be closed by calling the close() method,
349 /// or by a server shutdown, or a network error.
350 bool isClosed();
351
352
353 /// Return true if we can call dequeue without blocking. This means
354 /// that there is something to dequeue from the server. This dequeued item
355 /// could be Tuples, a null (server shutdown) or an exception.
356 /// @return boolean if we can dequeue without blocking
358
359 /// Enable heartbeating for this client. Generally this is only for enqueue-only clients.
361
362
363 /// Returns true if this TupleConnections has any live connections to a
364 /// server. Note that a return of "false" doesn't necessarily indicate an
365 /// error since the TupleConnection's state (e.g., lack of subscriptions)
366 /// might mean no connections are needed.
367 /// @return boolean if we are connected
369
370 /// Turn on buffering. The WakeAndFlushBuffer thread is only started if flush_interval > 0.
371 void enableBuffering(int buffer_size, int flush_interval = DEFAULT_FLUSH_INTERVAL);
372
373#ifndef DOXYGEN_SKIP
374 /// Enable connection less enqueue. This enables a mode where tuples are enqueued
375 /// over an XMLRPC request rather than a binary connection. It is useful for low
376 /// volume enqueue of tuples. It may reduce the load on the server as the connection
377 /// is transient. Setting has no effect upon dequeue.
378 /// @param enable enable connection less enqueue of tuples. Default is off.
379 /// excluded until server performance improves
380 /// <em>Note: this method is not public API, and is for internal StreamBase use only</em>
381 void enableConnectionlessEnqueue(bool enable);
382#endif
383
384#ifndef DOXYGEN_SKIP
385 /// Return state of connection less enqueue setting
386 /// excluded until server performance improves
387 /// <em>Note: this method is not public API, and is for internal StreamBase use only</em>
388 bool getIsConnectionlessEnqueue();
389#endif
390
391 /// Flush any pending enqueue buffers.
392 /// This operation has no effect if buffering is not enabled.
393 ///
394 /// @throws StreamBaseException if there is an IO error while flushing the buffer
396
397 /// Flush any pending enqueue buffer for the StreamProperties provided.
398 /// This operation has no effect if buffering is not enabled or
399 /// there is no buffer to flush for the given stream.
400 /// <br/>
401 /// <b>Note:</b> Note that this will cause inter-stream ordering to be interrupted.
402 /// <br/>
403 /// @param props the stream whose enqueue buffers to flush, if not empty
404 /// @throws StreamBaseException if there is an IO error while flushing the buffer
405 /// @deprecated use flushAllBuffers() to preserve inter-stream ordering
406 void flushBuffer(const StreamProperties &props);
407
408 /// Flush any pending enqueue buffer for the stream name provided.
409 /// This operation has no effect if buffering is not enabled or
410 /// there is no buffer to flush for the given stream.
411 /// <br/>
412 /// <b>Note:</b> Note that this will cause inter-stream ordering to be interrupted.
413 /// <br/>
414 /// @param stream_name the stream whose enqueue buffers to flush, if not empty
415 /// @throws StreamBaseException if there is an IO error while flushing the buffer
416 /// @deprecated stream_name use StreamProperties based flushBuffer
417 /// @deprecated use flushAllBuffers() to preserve inter-stream ordering
418 void flushBuffer(const std::string &stream_name);
419
420 /// Return whether buffering is turned on or off
421 bool getIsBuffering() const;
422
423 /// Return buffer size (in tuples)
424 int getBufferSize() const;
425
426 /// Return the Connection ID for this Client Connection. Only Valid once
427 /// an enqueue/dequeue has been attempted.
428 /// @returns connection ID in binary format
429 std::string getConnectionID() const;
430
431 /// Get all the dynamic variables in the given module, and return them
432 /// as a Tuple where the field names are the names of the dynamic variables,
433 /// and the field values are the current values of the dynamic variables.
434 ///
435 /// @throws StreamBaseException on network or other errors, or if there are
436 /// no dynamic variables in the named module
437 Tuple getDynamicVariables(const std::string& modulePath);
438
439 /// Set the dynamic variable at the given path to the given value, expressed
440 /// as a string in CSV style.
441 /// @throws StreamBaseException if the dynamic variable does not exist, or the
442 /// value is not appropriate for setting the dynamic variable
443 void setDynamicVariable(const std::string& dynamicVariablePath, const std::string& value);
444
445 /// Return the Connection ID for this Client Connection. Only Valid once
446 /// an enqueue/dequeue has been attempted.
447 /// @returns connection ID in Hexadecimal format
448 std::string getConnectionIdAsHexString() const;
449
450 /// Read rows out of the table at this path. Limit to the
451 /// specified number of rows returned, or -1 for no limit. Filter
452 /// rows according to predicate, or return all rows if predicate
453 /// is empty.
454 /// @returns a list of matching rows.
455 TupleList readTable(const std::string &tablePath, int limit, const std::string& predicate="");
456
457#ifndef DOXYGEN_SKIP
458 /// Execute an internal command. Internal commands are subject to change and
459 /// for StreamBase internal use only.
460 void internalCommand(const std::vector<std::string>& arguments, std::vector<std::string> &result);
461
462 bool doByteSwap();
463#endif
464
465 /// Sets how many milliseconds a dequeueing client will tolerate not receiving a client heart
466 /// beat from the StreamBase server that it is connected to. The default value is
467 /// 120 seconds (120000). By default, StreamBase servers emit "client heart beats"
468 /// every 10 seconds so StreamBase applications have no requirement to send data regularly.
469 ///
470 /// @param timeoutMS The number of milliseconds that is allowed to pass without
471 /// receiving a message from the StreamBase server client heart beat stream.
472 int setQuiescentLimit(int timeoutMS);
473
474 /// Returns the client version as a string
475 std::string getVersion() {
476 return std::string(StreamBaseVersion::INFO_LINE);
477 }
478
479 ///
480 /// Get the settings for the client
481 ///
483 return _clientSettings;
484 }
485
486 /// Returns the number of tuples this client has actually enqueued to the server.
487 /// Note that this number will not include any tuples that have been buffered but
488 /// have not actually been enqueued to the server.
489 long long getTupleEnqueueCount() const;
490
491 /// Returns the number of tuples this client has dequeued from the server. This number
492 /// does not include tuples that may be dequeued as part of system services such
493 /// as heartbeating. This number does however include tuples that have been dequeued
494 /// but have been intercepted by the Interceptor.
495 long long getTupleDequeueCount() const;
496
497private:
498 typedef std::map<std::string, StreamProperties> StreamMap;
499
500 std::shared_ptr<sb_internal::StreamBaseClientImpl> _impl;
501
502 // No copying.
504
505 // No copying.
506 StreamBaseClient& operator = (const StreamBaseClient&);
507
508 void init();
509
510 // keep track of logical and associated real streams
511 StreamMap _streamMap;
512
513 sb::ClientSettings _clientSettings;
514};
515
516
517SB_NAMESPACE_END;
518#endif
This class loads all the environment variables used by the C++ client API.
Definition: ClientSettings.hpp:20
A callback interface that can be implemented by an object that is associated with a StreamBaseClient ...
Definition: DequeueResult.hpp:30
Encapsulates the data returned from a dequeue() operation.
Definition: DequeueResult.hpp:23
A type of tuple, containing zero or more fields (each encapsulated as a Schema::Field object).
Definition: Schema.hpp:62
The StreamBase client API.
Definition: StreamBaseClient.hpp:28
std::string getVersion()
Returns the client version as a string.
Definition: StreamBaseClient.hpp:475
DequeueResult dequeue(int timeout_ms)
Dequeue tuples from any subscribed stream.
void enqueue(const StreamProperties &props, TupleList &tuples)
Enqueue tuples to a stream.
bool isStreamSubscribed(const std::string &entity)
Returns true if this stream has been subscribed to.
Schema getSchemaForStream(const std::string &entity, StreamProperties::CaptureTransformStrategy strategy=StreamProperties::FLATTEN)
Returns the schema of a stream, throwing an exception if the stream does not exist.
long long getTupleEnqueueCount() const
Returns the number of tuples this client has actually enqueued to the server.
void enqueue(const std::string &stream_name, Tuple &tuple)
Enqueue tuples to a stream.
long long getTupleDequeueCount() const
Returns the number of tuples this client has dequeued from the server.
DequeueResult::Interceptor * getDequeueResultsInterceptor()
Get the current dequeue results interceptor, or null if there is no current processor.
void setDynamicVariable(const std::string &dynamicVariablePath, const std::string &value)
Set the dynamic variable at the given path to the given value, expressed as a string in CSV style.
TupleList dequeue(std::string &stream_name)
Dequeue tuples from any subscribed stream.
TupleList readTable(const std::string &tablePath, int limit, const std::string &predicate="")
Read rows out of the table at this path.
StreamProperties subscribe(const std::string &stream_name, const std::string &logical_stream, const std::string &predicate)
Subscribes to a stream with a predicate to apply to output tuples.
void typecheck(const std::string &sbapp, std::map< std::string, Schema > &streams, bool full=false)
Typechecks a potential modification to the application, outputting properties of all streams in the a...
bool getIsBuffering() const
Return whether buffering is turned on or off.
bool isClosed()
Return true if the client connection is closed.
void enqueue(const StreamProperties &props, Tuple &tuple)
Enqueue tuples to a stream.
void operatorStatus(const std::string &containerName, std::vector< std::string > &aStatus)
status of java operators and adapters
void flushBuffer(const std::string &stream_name)
Flush any pending enqueue buffer for the stream name provided.
void enableBuffering(int buffer_size, int flush_interval=DEFAULT_FLUSH_INTERVAL)
Turn on buffering. The WakeAndFlushBuffer thread is only started if flush_interval > 0.
void flushBuffer(const StreamProperties &props)
Flush any pending enqueue buffer for the StreamProperties provided.
void close()
Terminate the client.
StreamProperties resubscribe(const StreamProperties &props, const std::string &logical_stream, const std::string &predicate)
Resubscribes to an already subscribed stream.
StreamProperties subscribe(const std::string &stream_name)
Subscribes to a stream.
Schema getSchemaByHash(const std::string &hash)
Returns a schema with a particular hash.
StreamProperties subscribe(const StreamProperties &props)
Subscribes to a stream.
std::string describe(const std::string &entity)
Returns an XML description of an entity, or an empty string if the method does not exist.
void listEntities(StreamBaseEntityType::Type type, int flags, std::vector< std::string > &names)
Lists all entities of a particular type.
TupleList dequeue(StreamProperties &streamProperties)
Dequeue tuples from any subscribed stream.
bool canDequeue()
Return true if we can call dequeue without blocking.
StreamProperties getStreamProperties(const std::string &entity, StreamProperties::CaptureTransformStrategy strategy=StreamProperties::FLATTEN)
Returns a description of a stream, throwing an exception if the stream does not exist.
int setQuiescentLimit(int timeoutMS)
Sets how many milliseconds a dequeueing client will tolerate not receiving a client heart beat from t...
const StreamBaseURI & getURI() const
Return the URI we're talking to.
std::string getConnectionID() const
Return the Connection ID for this Client Connection.
StreamProperties resubscribe(const std::string &stream_name, const std::string &logical_stream, const std::string &predicate)
Resubscribes to an already subscribed stream.
void enableHeartbeating()
Enable heartbeating for this client. Generally this is only for enqueue-only clients.
StreamProperties subscribe(const StreamProperties &props, const std::string &logical_stream, const std::string &predicate)
Subscribes to a stream with a predicate to apply to output tuples.
bool isConnected()
Returns true if this TupleConnections has any live connections to a server.
StreamBaseClient(const StreamBaseURI &uri=StreamBaseURI::fromEnvironment(), int buffer_size=0, int flush_interval=DEFAULT_FLUSH_INTERVAL)
Creates an StreamBase client and establishes a connection to a remote server.
std::string getConnectionIdAsHexString() const
Return the Connection ID for this Client Connection.
StreamProperties getStreamPropertiesByHash(const std::string &hex)
Returns a description of a stream, throwing an exception if the stream does not exist.
void setDequeueResultInterceptor(DequeueResult::Interceptor *dri)
Set the dequeue results interceptor for this client connection.
int getBufferSize() const
Return buffer size (in tuples)
void unsubscribe(const std::string &stream_name)
UnSubscribes to a stream.
sb::ClientSettings & getSettings()
Get the settings for the client.
Definition: StreamBaseClient.hpp:482
Tuple getDynamicVariables(const std::string &modulePath)
Get all the dynamic variables in the given module, and return them as a Tuple where the field names a...
void unsubscribe(const StreamProperties &props)
UnSubscribes to a stream.
void listEntities(StreamBaseEntityType::Type type, std::vector< std::string > &names)
Lists all entities of a particular type.
Schema getSchemaByName(const std::string &name)
Returns a schema by name.
void enqueue(const std::string &stream_name, TupleList &tuples)
Enqueue tuples to a stream.
DequeueResult dequeue()
Dequeue tuples from any subscribed stream.
void flushAllBuffers()
Flush any pending enqueue buffers.
void listEntities(const std::string &type, int flags, std::vector< std::string > &names)
Lists all entities of a particular type.
void status(std::vector< std::string > &aStatus, bool verbose=false)
status the streambase daemons
~StreamBaseClient()
Destroys a session.
ListEntitiesFlags
Flags for the listEntities call This must be consistent with what is found on the Java side of the ho...
Definition: StreamBaseClient.hpp:66
Type
An enumeration of the types of entities stored in a catalog.
Definition: StreamBaseEntityType.hpp:20
A URI for a StreamBase client connection.
Definition: StreamBaseURI.hpp:36
Properties of a single stream.
Definition: StreamProperties.hpp:17
CaptureTransformStrategy
Determines the method StreamBase will use to expose capture fields to client dequeuers.
Definition: StreamProperties.hpp:21
TupleLists are value types that can be copied and modified seperately thus.
Definition: TupleList.hpp:17
Tuples are value types that can be copied and modified separately thus.
Definition: Tuple.hpp:47