Streaming C++ API
StreamBaseClient.hpp
1// Copyright (c) 2004-2023 TIBCO Software Inc. All rights reserved.
2
3#ifndef STREAMBASE_CLIENT_H
4#define STREAMBASE_CLIENT_H
5
6#include "StreamBase.hpp"
7#include "StreamBaseVersion.hpp"
8#include "Exceptions.hpp"
9#include "Schema.hpp"
10#include "Field.hpp"
11#include "StreamProperties.hpp"
12#include "StreamBaseEntityType.hpp"
13#include "StreamBaseURI.hpp"
14#include "TupleList.hpp"
15#include "DequeueResult.hpp"
16#include "ClientSettings.hpp"
17
18SB_INTERNAL_FWD(StreamBaseClientImpl)
19
20SB_NAMESPACE_BEGIN;
21
22/// The StreamBase client API. Connects to an StreamBase node over the network,
23/// via XML/RPC and the tuple wire protocol. With the exception of the close() method,
24/// the StreamBaseClient object is not thread safe, so access to a single object
25/// needs to be synchronized between threads. If multiple threads subscribe to streams,
26/// it is recommended that they use separate instances of this class.
27///
29 public:
30 static const unsigned int DEFAULT_FLUSH_INTERVAL = 250;
31
32#ifndef DOXYGEN_SKIP
33 /// This tells the rest of the API that tuples want to remain in serialized form,
34 /// ostensibly for consumption by the .NET API.
35 static bool useRawTuples;
36#endif
37
38 /// Creates an StreamBase client and establishes a connection to a remote
39 /// server. Optional parameter buffer_size specifies the number of tuples
40 /// to buffer before enqueueing. Optional parameter flush_interval specifies
41 /// the interval in milliseconds between wakeups of the wakeAndFlushBuffer.
42 /// The wakeAndFlushBuffer thread is only started if buffer_size > 0.
43 StreamBaseClient(const StreamBaseURI& uri = StreamBaseURI::fromEnvironment(),
44 int buffer_size = 0,
45 int flush_interval=DEFAULT_FLUSH_INTERVAL);
46
47 StreamBaseClient(const std::string& uris,
48 int buffer_size = 0,
49 int flush_interval=DEFAULT_FLUSH_INTERVAL);
50
51 StreamBaseClient(const std::vector<StreamBaseURI>& uris,
52 int buffer_size = 0,
53 int flush_interval=DEFAULT_FLUSH_INTERVAL);
54
55 StreamBaseClient(const std::vector<StreamBaseURI>& uris,
56 sb::ClientSettings &settings,
57 int buffer_size = 0,
58 int flush_interval=DEFAULT_FLUSH_INTERVAL);
59
60 /// Destroys a session.
62
63 /// Flags for the listEntities call
64 /// This must be consistent with what is found on the Java side of the
65 /// house in StreamBaseClient.java
67 /// set FULLY_QUALIFIED_NAMES if you want to
68 /// include container names for all entities. If not set then the returned
69 /// names will contain the module name (if any) and the name of the entity.
70 FULLY_QUALIFIED_NAMES = 1,
71 /// set INCLUDE_MODULES if you want to include all modules in the output
72 INCLUDE_MODULES = 2,
73 /// set ALL_CONTAINERS if you want to include all
74 /// user containers in the output. This does not include the "system"
75 /// container.
76 ALL_CONTAINERS = 4 | FULLY_QUALIFIED_NAMES
77 };
78
79 /// Lists all entities of a particular type. (One can then use
80 /// "describe" to obtain a description of an entity.) Names is
81 /// cleared first. Use container.entity-type to resolve across containers.
82 /// use the given flags to list the entities
83 void listEntities(const std::string &type, int flags, std::vector<std::string>& names);
84
85 /// Lists all entities of a particular type. (One can then use
86 /// "describe" to obtain a description of an entity.) names is
87 /// cleared first.
88 /// use the given flags to list the entities
89 void listEntities(StreamBaseEntityType::Type type, int flags, std::vector<std::string>& names);
90
91 /// Lists all entities of a particular type. (One can then use
92 /// "describe" to obtain a description of an entity.) names is
93 /// cleared first.
94 void listEntities(StreamBaseEntityType::Type type, std::vector<std::string>& names);
95
96 /// Returns an XML description of an entity, or an empty string if
97 /// the method does not exist.
98 std::string describe(const std::string &entity);
99
100#ifndef DOXYGEN_SKIP
101 /// Returns true if the license for the specified feature is available and false otherwise.
102 bool checkLicense(const std::string &featureName);
103#endif
104
105 /// Returns true if this stream has been subscribed to.
106 /// @param entity the path of the stream
107 bool isStreamSubscribed(const std::string &entity);
108
109 /// Returns a description of a stream, throwing an exception if the
110 /// stream does not exist.
111 /// @param entity the path of the stream
112 /// @param strategy how to handle capture fields on the stream
113 StreamProperties getStreamProperties(const std::string &entity,
114 StreamProperties::CaptureTransformStrategy strategy=StreamProperties::FLATTEN);
115
116 /// Returns a description of a stream, throwing an exception if the
117 /// stream does not exist.
119
120 /// Returns the schema of a stream, throwing an exception if the
121 /// stream does not exist.
122 Schema getSchemaForStream(const std::string &entity,
123 StreamProperties::CaptureTransformStrategy strategy=StreamProperties::FLATTEN);
124
125 /// Typechecks a potential modification to the application, outputting
126 /// properties of all streams in the application. streams is cleared
127 /// first.
128 /// @param sbapp the application
129 /// @param streams the schema defs of the streams
130 /// @param full do a full typecheck
131 void typecheck(const std::string &sbapp,
132 std::map<std::string, Schema>& streams,
133 bool full = false);
134
135 /// Subscribes to a stream.
136 ///
137 /// Uses the default capture transform strategy of flatten. To specify
138 /// capture strategy, use getStreamProperties(stream_name,
139 /// StreamProperties::NEST) and subscribe using the resultant
140 /// StreamProperties.
141 ///
142 /// @return the StreamProperties for the stream
143 StreamProperties subscribe(const std::string &stream_name);
144 /// Subscribes to a stream.
145 /// @return the StreamProperties for the stream
147
148 /// Subscribes to a stream with a predicate to apply to output tuples.
149 /// The stream name of dequeued tuples is logical_stream.
150 /// When unsubscribing, use logical_stream.
151 ///
152 /// Uses the default capture transform strategy of flatten. To specify
153 /// capture strategy, use getStreamProperties(stream_name,
154 /// StreamProperties::NEST) and subscribe using the resultant
155 /// StreamProperties.
156 ///
157 /// @param stream_name the stream to subscribe to, error if empty
158 /// @param logical_stream the name of the logical stream to associate with this predicate (if empty, defaults to streamname)
159 /// @param predicate a predicate to apply to subset the stream, error if empty
160 /// @return the StreamProperties for the stream
161 StreamProperties subscribe(const std::string &stream_name, const std::string &logical_stream, const std::string &predicate);
162
163 /// Subscribes to a stream with a predicate to apply to output tuples.
164 /// The stream name of dequeued tuples is logical_stream.
165 /// When unsubscribing, use logical_stream.
166 /// @param props the stream to subscribe to, error if empty
167 /// @param logical_stream the name of the logical stream to associate with this predicate (if empty, defaults to streamname)
168 /// @param predicate a predicate to apply to subset the stream, error if empty
169 /// @return the StreamProperties for the stream
170 StreamProperties subscribe(const StreamProperties &props, const std::string &logical_stream, const std::string &predicate);
171
172 /// Resubscribes to an already subscribed stream
173 ///
174 /// Uses the default capture transform strategy of flatten. To specify
175 /// capture strategy, use getStreamProperties(stream_name,
176 /// StreamProperties::NEST) and subscribe using the resultant
177 /// StreamProperties.
178 ///
179 /// @return the StreamProperties for the stream
180 StreamProperties resubscribe(const std::string &stream_name, const std::string &logical_stream, const std::string &predicate);
181
182 /// Resubscribes to an already subscribed stream
183 /// @return the StreamProperties for the stream
184 StreamProperties resubscribe(const StreamProperties &props, const std::string &logical_stream, const std::string &predicate);
185
186 /// UnSubscribes to a stream. Note: Any tuples that are in-flight during
187 /// an unsubscribe request will be dequeued until the stream is fully drained.
188 void unsubscribe(const std::string &stream_name);
189
190 /// UnSubscribes to a stream. Note: Any tuples that are in-flight during
191 /// an unsubscribe request will be dequeued until the stream is fully drained.
192 void unsubscribe(const StreamProperties &props);
193
194#ifndef SWIG
195 ///
196 /// Set the dequeue results interceptor for this client connection.
197 ///
198 /// This results interceptor replaces any existing results processor. To
199 /// disable pre-processing of results, set the processor to null.
200 ///
201 /// This method cannot be safely called while another thread is calling
202 /// dequeue().
203 ///
204 /// NOTE: Once the DequeueResult.Interceptor is passed to StreamBaseClient,
205 /// StreamBaseClient will manage the lifecycle of the object. StreamBaseClient
206 /// will delete the object when the interceptor is reset or when the StreamBaseClient
207 /// is deleted.
208 /// Do not delete this passed object!!
210
211 ///
212 /// Get the current dequeue results interceptor, or null if there is no
213 /// current processor.
214 ///
216
217 /// Dequeue tuples from any subscribed stream. This method
218 /// blocks until
219 ///
220 /// - at least one tuple is available on any subscribed stream, or
221 /// - the node is shut down.
222 ///
223 /// @deprecated Use DequeueResult based dequeue
224 /// @param streamProperties (output) set to the StreamProperties of the stream from
225 /// which tuples have been dequeued
226 /// @return a list of the tuples dequeued; empty if the node has been
227 /// shut down
229
230 /// Dequeue tuples from any subscribed stream. This method
231 /// blocks until
232 ///
233 /// - at least one tuple is available on any subscribed stream, or
234 /// - the node is shut down.
235 ///
236 /// @deprecated Use DequeueResult based dequeue
237 /// @param stream_name (output) set to the name of the stream from
238 /// which tuples have been dequeued. Note this does not contain the container name
239 /// @return a list of the tuples dequeued; empty if the node has been
240 /// shut down
241 TupleList dequeue(std::string &stream_name);
242
243 /// Dequeue tuples from any subscribed stream. This method
244 /// blocks until
245 ///
246 /// - At least one tuple is available on any subscribed stream (DequeueResult will have a status of GOOD)
247 /// - The StreamBase server the client is connected to is shut down (DequeueResult will have a status of CLOSED)
248 /// - A network error occurs (StreamBaseException will be thrown)
249 ///
250 /// @return a DequeueResult containing the r