Streaming C++ API
|
The StreamBase client API. More...
#include <StreamBaseClient.hpp>
Public Types | |
enum | ListEntitiesFlags { FULLY_QUALIFIED_NAMES = 1 , INCLUDE_MODULES = 2 , ALL_CONTAINERS = 4 | FULLY_QUALIFIED_NAMES } |
Flags for the listEntities call This must be consistent with what is found on the Java side of the house in StreamBaseClient.java. More... | |
Public Member Functions | |
StreamBaseClient (const StreamBaseURI &uri=StreamBaseURI::fromEnvironment(), int buffer_size=0, int flush_interval=DEFAULT_FLUSH_INTERVAL) | |
Creates an StreamBase client and establishes a connection to a remote server. More... | |
StreamBaseClient (const std::string &uris, int buffer_size=0, int flush_interval=DEFAULT_FLUSH_INTERVAL) | |
StreamBaseClient (const std::vector< StreamBaseURI > &uris, int buffer_size=0, int flush_interval=DEFAULT_FLUSH_INTERVAL) | |
StreamBaseClient (const std::vector< StreamBaseURI > &uris, sb::ClientSettings &settings, int buffer_size=0, int flush_interval=DEFAULT_FLUSH_INTERVAL) | |
~StreamBaseClient () | |
Destroys a session. More... | |
void | listEntities (const std::string &type, int flags, std::vector< std::string > &names) |
Lists all entities of a particular type. More... | |
void | listEntities (StreamBaseEntityType::Type type, int flags, std::vector< std::string > &names) |
Lists all entities of a particular type. More... | |
void | listEntities (StreamBaseEntityType::Type type, std::vector< std::string > &names) |
Lists all entities of a particular type. More... | |
std::string | describe (const std::string &entity) |
Returns an XML description of an entity, or an empty string if the method does not exist. More... | |
bool | isStreamSubscribed (const std::string &entity) |
Returns true if this stream has been subscribed to. More... | |
StreamProperties | getStreamProperties (const std::string &entity, StreamProperties::CaptureTransformStrategy strategy=StreamProperties::FLATTEN) |
Returns a description of a stream, throwing an exception if the stream does not exist. More... | |
StreamProperties | getStreamPropertiesByHash (const std::string &hex) |
Returns a description of a stream, throwing an exception if the stream does not exist. More... | |
Schema | getSchemaForStream (const std::string &entity, StreamProperties::CaptureTransformStrategy strategy=StreamProperties::FLATTEN) |
Returns the schema of a stream, throwing an exception if the stream does not exist. More... | |
void | typecheck (const std::string &sbapp, std::map< std::string, Schema > &streams, bool full=false) |
Typechecks a potential modification to the application, outputting properties of all streams in the application. More... | |
StreamProperties | subscribe (const std::string &stream_name) |
Subscribes to a stream. More... | |
StreamProperties | subscribe (const StreamProperties &props) |
Subscribes to a stream. More... | |
StreamProperties | subscribe (const std::string &stream_name, const std::string &logical_stream, const std::string &predicate) |
Subscribes to a stream with a predicate to apply to output tuples. More... | |
StreamProperties | subscribe (const StreamProperties &props, const std::string &logical_stream, const std::string &predicate) |
Subscribes to a stream with a predicate to apply to output tuples. More... | |
StreamProperties | resubscribe (const std::string &stream_name, const std::string &logical_stream, const std::string &predicate) |
Resubscribes to an already subscribed stream. More... | |
StreamProperties | resubscribe (const StreamProperties &props, const std::string &logical_stream, const std::string &predicate) |
Resubscribes to an already subscribed stream. More... | |
void | unsubscribe (const std::string &stream_name) |
UnSubscribes to a stream. More... | |
void | unsubscribe (const StreamProperties &props) |
UnSubscribes to a stream. More... | |
void | setDequeueResultInterceptor (DequeueResult::Interceptor *dri) |
Set the dequeue results interceptor for this client connection. More... | |
DequeueResult::Interceptor * | getDequeueResultsInterceptor () |
Get the current dequeue results interceptor, or null if there is no current processor. More... | |
TupleList | dequeue (StreamProperties &streamProperties) |
Dequeue tuples from any subscribed stream. More... | |
TupleList | dequeue (std::string &stream_name) |
Dequeue tuples from any subscribed stream. More... | |
DequeueResult | dequeue () |
Dequeue tuples from any subscribed stream. More... | |
DequeueResult | dequeue (int timeout_ms) |
Dequeue tuples from any subscribed stream. More... | |
void | enqueue (const std::string &stream_name, Tuple &tuple) |
Enqueue tuples to a stream. More... | |
void | enqueue (const StreamProperties &props, Tuple &tuple) |
Enqueue tuples to a stream. More... | |
void | enqueue (const std::string &stream_name, TupleList &tuples) |
Enqueue tuples to a stream. More... | |
void | enqueue (const StreamProperties &props, TupleList &tuples) |
Enqueue tuples to a stream. More... | |
Schema | getSchemaByHash (const std::string &hash) |
Returns a schema with a particular hash. More... | |
Schema | getSchemaByName (const std::string &name) |
Returns a schema by name. More... | |
void | operatorStatus (const std::string &containerName, std::vector< std::string > &aStatus) |
status of java operators and adapters More... | |
void | status (std::vector< std::string > &aStatus, bool verbose=false) |
status the streambase daemons More... | |
const StreamBaseURI & | getURI () const |
Return the URI we're talking to. More... | |
const std::vector< StreamBaseURI > & | getURIs () const |
void | close () |
Terminate the client. More... | |
bool | isClosed () |
Return true if the client connection is closed. More... | |
bool | canDequeue () |
Return true if we can call dequeue without blocking. More... | |
void | enableHeartbeating () |
Enable heartbeating for this client. Generally this is only for enqueue-only clients. More... | |
bool | isConnected () |
Returns true if this TupleConnections has any live connections to a server. More... | |
void | enableBuffering (int buffer_size, int flush_interval=DEFAULT_FLUSH_INTERVAL) |
Turn on buffering. The WakeAndFlushBuffer thread is only started if flush_interval > 0. More... | |
void | flushAllBuffers () |
Flush any pending enqueue buffers. More... | |
void | flushBuffer (const StreamProperties &props) |
Flush any pending enqueue buffer for the StreamProperties provided. More... | |
void | flushBuffer (const std::string &stream_name) |
Flush any pending enqueue buffer for the stream name provided. More... | |
bool | getIsBuffering () const |
Return whether buffering is turned on or off. More... | |
int | getBufferSize () const |
Return buffer size (in tuples) More... | |
std::string | getConnectionID () const |
Return the Connection ID for this Client Connection. More... | |
Tuple | getDynamicVariables (const std::string &modulePath) |
Get all the dynamic variables in the given module, and return them as a Tuple where the field names are the names of the dynamic variables, and the field values are the current values of the dynamic variables. More... | |
void | setDynamicVariable (const std::string &dynamicVariablePath, const std::string &value) |
Set the dynamic variable at the given path to the given value, expressed as a string in CSV style. More... | |
std::string | getConnectionIdAsHexString () const |
Return the Connection ID for this Client Connection. More... | |
TupleList | readTable (const std::string &tablePath, int limit, const std::string &predicate="") |
Read rows out of the table at this path. More... | |
int | setQuiescentLimit (int timeoutMS) |
Sets how many milliseconds a dequeueing client will tolerate not receiving a client heart beat from the StreamBase server that it is connected to. More... | |
std::string | getVersion () |
Returns the client version as a string. More... | |
sb::ClientSettings & | getSettings () |
Get the settings for the client. More... | |
long long | getTupleEnqueueCount () const |
Returns the number of tuples this client has actually enqueued to the server. More... | |
long long | getTupleDequeueCount () const |
Returns the number of tuples this client has dequeued from the server. More... | |
Static Public Attributes | |
static const unsigned int | DEFAULT_FLUSH_INTERVAL = 250 |
The StreamBase client API.
Connects to an StreamBase node over the network, via XML/RPC and the tuple wire protocol. With the exception of the close() method, the StreamBaseClient object is not thread safe, so access to a single object needs to be synchronized between threads. If multiple threads subscribe to streams, it is recommended that they use separate instances of this class.
Flags for the listEntities call This must be consistent with what is found on the Java side of the house in StreamBaseClient.java.
sb::StreamBaseClient::StreamBaseClient | ( | const StreamBaseURI & | uri = StreamBaseURI::fromEnvironment() , |
int | buffer_size = 0 , |
||
int | flush_interval = DEFAULT_FLUSH_INTERVAL |
||
) |
Creates an StreamBase client and establishes a connection to a remote server.
Optional parameter buffer_size specifies the number of tuples to buffer before enqueueing. Optional parameter flush_interval specifies the interval in milliseconds between wakeups of the wakeAndFlushBuffer. The wakeAndFlushBuffer thread is only started if buffer_size > 0.
sb::StreamBaseClient::~StreamBaseClient | ( | ) |
Destroys a session.
bool sb::StreamBaseClient::canDequeue | ( | ) |
Return true if we can call dequeue without blocking.
This means that there is something to dequeue from the server. This dequeued item could be Tuples, a null (server shutdown) or an exception.
void sb::StreamBaseClient::close | ( | ) |
Terminate the client.
May be invoked from a different thread. StreamBaseClient memory, network, and thread resources are not released until close() is called.
Returns immediately.
DequeueResult sb::StreamBaseClient::dequeue | ( | ) |
Dequeue tuples from any subscribed stream.
This method blocks until
DequeueResult sb::StreamBaseClient::dequeue | ( | int | timeout_ms | ) |
Dequeue tuples from any subscribed stream.
This method blocks until
A timeout_ms of zero will block indefinitely, or until a tuple arrives. Note that the actual dequeue timeout may vary based on normal thread scheduling, plus network interactions with the server.
TupleList sb::StreamBaseClient::dequeue | ( | std::string & | stream_name | ) |
Dequeue tuples from any subscribed stream.
This method blocks until
stream_name | (output) set to the name of the stream from which tuples have been dequeued. Note this does not contain the container name |
TupleList sb::StreamBaseClient::dequeue | ( | StreamProperties & | streamProperties | ) |
Dequeue tuples from any subscribed stream.
This method blocks until
streamProperties | (output) set to the StreamProperties of the stream from which tuples have been dequeued |
std::string sb::StreamBaseClient::describe | ( | const std::string & | entity | ) |
Returns an XML description of an entity, or an empty string if the method does not exist.
void sb::StreamBaseClient::enableBuffering | ( | int | buffer_size, |
int | flush_interval = DEFAULT_FLUSH_INTERVAL |
||
) |
Turn on buffering. The WakeAndFlushBuffer thread is only started if flush_interval > 0.
void sb::StreamBaseClient::enableHeartbeating | ( | ) |
Enable heartbeating for this client. Generally this is only for enqueue-only clients.
void sb::StreamBaseClient::enqueue | ( | const std::string & | stream_name, |
Tuple & | tuple | ||
) |
Enqueue tuples to a stream.
The stream must not be tied to the output of any operator in the application. This method can block depending on network, or StreamBase server, congestion.
Note: enqueue() will modify the tuple as it is enqueued. If you do not want the tuple modified you must make a copy of it before calling enqueue.
stream_name | the name of the stream to which to enqueue |
tuple | a tuple to enqueue |
void sb::StreamBaseClient::enqueue | ( | const std::string & | stream_name, |
TupleList & | tuples | ||
) |
Enqueue tuples to a stream.
The stream must not be tied to the output of any operator in the application. This method can block depending on network, or StreamBase server, congestion.
Note: enqueue() will modify the tuples as they are enqueued. If you do not want the tuples modified you must make copies before calling enqueue.
stream_name | the name of the stream to which to enqueue |
tuples | a list of tuples to enqueue |
void sb::StreamBaseClient::enqueue | ( | const StreamProperties & | props, |
Tuple & | tuple | ||
) |
Enqueue tuples to a stream.
The stream must not be tied to the output of any operator in the application. This method can block depending on network, or StreamBase server, congestion.
Note: enqueue() will modify the tuple as it is enqueued. If you do not want the tuple modified you must make a copy of it before calling enqueue.
props | the StreamProperties to enqueue the tuple to |
tuple | a tuple to enqueue |
void sb::StreamBaseClient::enqueue | ( | const StreamProperties & | props, |
TupleList & | tuples | ||
) |
Enqueue tuples to a stream.
The stream must not be tied to the output of any operator in the application. This method can block depending on network, or StreamBase server, congestion.
Note: enqueue() will modify the tuples as they are enqueued. If you do not want the tuples modified you must make copies before calling enqueue.
props | the StreamProperties to enqueue the tuples to |
tuples | a list of tuples to enqueue |
void sb::StreamBaseClient::flushAllBuffers | ( | ) |
Flush any pending enqueue buffers.
This operation has no effect if buffering is not enabled.
StreamBaseException | if there is an IO error while flushing the buffer |
void sb::StreamBaseClient::flushBuffer | ( | const std::string & | stream_name | ) |
Flush any pending enqueue buffer for the stream name provided.
This operation has no effect if buffering is not enabled or there is no buffer to flush for the given stream.
Note: Note that this will cause inter-stream ordering to be interrupted.
stream_name | the stream whose enqueue buffers to flush, if not empty |
StreamBaseException | if there is an IO error while flushing the buffer |
stream_name use StreamProperties based flushBuffer
use flushAllBuffers() to preserve inter-stream ordering
void sb::StreamBaseClient::flushBuffer | ( | const StreamProperties & | props | ) |
Flush any pending enqueue buffer for the StreamProperties provided.
This operation has no effect if buffering is not enabled or there is no buffer to flush for the given stream.
Note: Note that this will cause inter-stream ordering to be interrupted.
props | the stream whose enqueue buffers to flush, if not empty |
StreamBaseException | if there is an IO error while flushing the buffer |
int sb::StreamBaseClient::getBufferSize | ( | ) | const |
Return buffer size (in tuples)
std::string sb::StreamBaseClient::getConnectionID | ( | ) | const |
Return the Connection ID for this Client Connection.
Only Valid once an enqueue/dequeue has been attempted.
std::string sb::StreamBaseClient::getConnectionIdAsHexString | ( | ) | const |
Return the Connection ID for this Client Connection.
Only Valid once an enqueue/dequeue has been attempted.
DequeueResult::Interceptor * sb::StreamBaseClient::getDequeueResultsInterceptor | ( | ) |
Get the current dequeue results interceptor, or null if there is no current processor.
Tuple sb::StreamBaseClient::getDynamicVariables | ( | const std::string & | modulePath | ) |
Get all the dynamic variables in the given module, and return them as a Tuple where the field names are the names of the dynamic variables, and the field values are the current values of the dynamic variables.
StreamBaseException | on network or other errors, or if there are no dynamic variables in the named module |
bool sb::StreamBaseClient::getIsBuffering | ( | ) | const |
Return whether buffering is turned on or off.
Schema sb::StreamBaseClient::getSchemaByHash | ( | const std::string & | hash | ) |
Returns a schema with a particular hash.
Schema sb::StreamBaseClient::getSchemaByName | ( | const std::string & | name | ) |
Returns a schema by name.
This will only succeed for named schemas; anonymous schemas assigned to a port should instead be looked up via getStreamProperties().getSchema().
Schema sb::StreamBaseClient::getSchemaForStream | ( | const std::string & | entity, |
StreamProperties::CaptureTransformStrategy | strategy = StreamProperties::FLATTEN |
||
) |
Returns the schema of a stream, throwing an exception if the stream does not exist.
|
inline |
Get the settings for the client.
StreamProperties sb::StreamBaseClient::getStreamProperties | ( | const std::string & | entity, |
StreamProperties::CaptureTransformStrategy | strategy = StreamProperties::FLATTEN |
||
) |
Returns a description of a stream, throwing an exception if the stream does not exist.
entity | the path of the stream |
strategy | how to handle capture fields on the stream |
StreamProperties sb::StreamBaseClient::getStreamPropertiesByHash | ( | const std::string & | hex | ) |
Returns a description of a stream, throwing an exception if the stream does not exist.
long long sb::StreamBaseClient::getTupleDequeueCount | ( | ) | const |
Returns the number of tuples this client has dequeued from the server.
This number does not include tuples that may be dequeued as part of system services such as heartbeating. This number does however include tuples that have been dequeued but have been intercepted by the Interceptor.
long long sb::StreamBaseClient::getTupleEnqueueCount | ( | ) | const |
Returns the number of tuples this client has actually enqueued to the server.
Note that this number will not include any tuples that have been buffered but have not actually been enqueued to the server.
const StreamBaseURI & sb::StreamBaseClient::getURI | ( | ) | const |
Return the URI we're talking to.
|
inline |
Returns the client version as a string.
bool sb::StreamBaseClient::isClosed | ( | ) |
Return true if the client connection is closed.
The client connection can be closed by calling the close() method, or by a server shutdown, or a network error.
bool sb::StreamBaseClient::isConnected | ( | ) |
Returns true if this TupleConnections has any live connections to a server.
Note that a return of "false" doesn't necessarily indicate an error since the TupleConnection's state (e.g., lack of subscriptions) might mean no connections are needed.
bool sb::StreamBaseClient::isStreamSubscribed | ( | const std::string & | entity | ) |
Returns true if this stream has been subscribed to.
entity | the path of the stream |
void sb::StreamBaseClient::listEntities | ( | const std::string & | type, |
int | flags, | ||
std::vector< std::string > & | names | ||
) |
Lists all entities of a particular type.
(One can then use "describe" to obtain a description of an entity.) Names is cleared first. Use container.entity-type to resolve across containers. use the given flags to list the entities
void sb::StreamBaseClient::listEntities | ( | StreamBaseEntityType::Type | type, |
int | flags, | ||
std::vector< std::string > & | names | ||
) |
Lists all entities of a particular type.
(One can then use "describe" to obtain a description of an entity.) names is cleared first. use the given flags to list the entities
void sb::StreamBaseClient::listEntities | ( | StreamBaseEntityType::Type | type, |
std::vector< std::string > & | names | ||
) |
Lists all entities of a particular type.
(One can then use "describe" to obtain a description of an entity.) names is cleared first.
void sb::StreamBaseClient::operatorStatus | ( | const std::string & | containerName, |
std::vector< std::string > & | aStatus | ||
) |
status of java operators and adapters
TupleList sb::StreamBaseClient::readTable | ( | const std::string & | tablePath, |
int | limit, | ||
const std::string & | predicate = "" |
||
) |
Read rows out of the table at this path.
Limit to the specified number of rows returned, or -1 for no limit. Filter rows according to predicate, or return all rows if predicate is empty.
StreamProperties sb::StreamBaseClient::resubscribe | ( | const std::string & | stream_name, |
const std::string & | logical_stream, | ||
const std::string & | predicate | ||
) |
Resubscribes to an already subscribed stream.
Uses the default capture transform strategy of flatten. To specify capture strategy, use getStreamProperties(stream_name, StreamProperties::NEST) and subscribe using the resultant StreamProperties.
StreamProperties sb::StreamBaseClient::resubscribe | ( | const StreamProperties & | props, |
const std::string & | logical_stream, | ||
const std::string & | predicate | ||
) |
Resubscribes to an already subscribed stream.
void sb::StreamBaseClient::setDequeueResultInterceptor | ( | DequeueResult::Interceptor * | dri | ) |
Set the dequeue results interceptor for this client connection.
This results interceptor replaces any existing results processor. To disable pre-processing of results, set the processor to null.
This method cannot be safely called while another thread is calling dequeue().
NOTE: Once the DequeueResult.Interceptor is passed to StreamBaseClient, StreamBaseClient will manage the lifecycle of the object. StreamBaseClient will delete the object when the interceptor is reset or when the StreamBaseClient is deleted. Do not delete this passed object!!
void sb::StreamBaseClient::setDynamicVariable | ( | const std::string & | dynamicVariablePath, |
const std::string & | value | ||
) |
Set the dynamic variable at the given path to the given value, expressed as a string in CSV style.
StreamBaseException | if the dynamic variable does not exist, or the value is not appropriate for setting the dynamic variable |
int sb::StreamBaseClient::setQuiescentLimit | ( | int | timeoutMS | ) |
Sets how many milliseconds a dequeueing client will tolerate not receiving a client heart beat from the StreamBase server that it is connected to.
The default value is 120 seconds (120000). By default, StreamBase servers emit "client heart beats" every 10 seconds so StreamBase applications have no requirement to send data regularly.
timeoutMS | The number of milliseconds that is allowed to pass without receiving a message from the StreamBase server client heart beat stream. |
void sb::StreamBaseClient::status | ( | std::vector< std::string > & | aStatus, |
bool | verbose = false |
||
) |
status the streambase daemons
StreamProperties sb::StreamBaseClient::subscribe | ( | const std::string & | stream_name | ) |
Subscribes to a stream.
Uses the default capture transform strategy of flatten. To specify capture strategy, use getStreamProperties(stream_name, StreamProperties::NEST) and subscribe using the resultant StreamProperties.
StreamProperties sb::StreamBaseClient::subscribe | ( | const std::string & | stream_name, |
const std::string & | logical_stream, | ||
const std::string & | predicate | ||
) |
Subscribes to a stream with a predicate to apply to output tuples.
The stream name of dequeued tuples is logical_stream. When unsubscribing, use logical_stream.
Uses the default capture transform strategy of flatten. To specify capture strategy, use getStreamProperties(stream_name, StreamProperties::NEST) and subscribe using the resultant StreamProperties.
stream_name | the stream to subscribe to, error if empty |
logical_stream | the name of the logical stream to associate with this predicate (if empty, defaults to streamname) |
predicate | a predicate to apply to subset the stream, error if empty |
StreamProperties sb::StreamBaseClient::subscribe | ( | const StreamProperties & | props | ) |
Subscribes to a stream.
StreamProperties sb::StreamBaseClient::subscribe | ( | const StreamProperties & | props, |
const std::string & | logical_stream, | ||
const std::string & | predicate | ||
) |
Subscribes to a stream with a predicate to apply to output tuples.
The stream name of dequeued tuples is logical_stream. When unsubscribing, use logical_stream.
props | the stream to subscribe to, error if empty |
logical_stream | the name of the logical stream to associate with this predicate (if empty, defaults to streamname) |
predicate | a predicate to apply to subset the stream, error if empty |
void sb::StreamBaseClient::typecheck | ( | const std::string & | sbapp, |
std::map< std::string, Schema > & | streams, | ||
bool | full = false |
||
) |
Typechecks a potential modification to the application, outputting properties of all streams in the application.
streams is cleared first.
sbapp | the application |
streams | the schema defs of the streams |
full | do a full typecheck |
void sb::StreamBaseClient::unsubscribe | ( | const std::string & | stream_name | ) |
UnSubscribes to a stream.
Note: Any tuples that are in-flight during an unsubscribe request will be dequeued until the stream is fully drained.
void sb::StreamBaseClient::unsubscribe | ( | const StreamProperties & | props | ) |
UnSubscribes to a stream.
Note: Any tuples that are in-flight during an unsubscribe request will be dequeued until the stream is fully drained.