Package com.streambase.sb.client
Class DeMUXStreamBaseClient
- java.lang.Object
-
- com.streambase.sb.client.StreamBaseClient
-
- com.streambase.sb.client.DeMUXStreamBaseClient
-
- All Implemented Interfaces:
AutoCloseable
public class DeMUXStreamBaseClient extends StreamBaseClient
This is a StreamBaseClient that is intended for use by several independent queries. Rather than polling for data, when you subscribe you register a DequeueListener, which will be handed only the results for YOUR subscribe call
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interface
DeMUXStreamBaseClient.DequeueListener
Implement this interface to get calls back with data when using a DeMUXStreamBaseClient-
Nested classes/interfaces inherited from class com.streambase.sb.client.StreamBaseClient
StreamBaseClient.ListEntityFlags, StreamBaseClient.SerializedTupleBuffer, StreamBaseClient.TupleBuffer
-
-
Field Summary
Fields Modifier and Type Field Description protected com.streambase.sb.client.Admin
_admin
protected static long
DEQUEUE_WAIT
static int
VARIABLE_RESPONSE
Used to signal that a command will return a variable number of responses
-
Constructor Summary
Constructors Constructor Description DeMUXStreamBaseClient(StreamBaseURI streamBaseURI)
DeMUXStreamBaseClient(String uri)
DeMUXStreamBaseClient(List<StreamBaseURI> uris, ClientSettings settings)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description boolean
canDequeue()
Return true if we can call dequeue without blocking.DequeueResult
dequeue()
Dequeue a batch of tuples from a subscribed stream.DequeueResult
dequeue(long timeoutMS)
Dequeue a batch of tuples from a subscribed stream.int
getDequeueBufferSize()
protected static String
getFullLogicalName(StreamProperties streamProperties)
ClientSettings
getSettings()
Return the settings for this clientString
getUniqueSuffix()
This manages a threadsafe counter which will give a different value each time it is called.StreamBaseURI
getURI()
Return the URI used by this Client.List<StreamBaseURI>
getURIs()
get all of the URI's for this clientprotected boolean
haModeOn()
is HA mode onboolean
isSubscribed(StreamProperties stream)
Return status if we are subscribed to the given streamString[]
operatorStatus(String containerName)
Return the status of all the operators in the specified containerStreamProperties
resubscribe(StreamProperties props, String logicalstream, String predicate)
Resubscribe to a stream with a predicate.StreamProperties
resubscribe(String streamname, String logicalstream, String predicate)
Resubscribe to a stream with a predicate.DequeueResultsQueue
setDequeueQueue(DequeueResultsQueue dequeuer)
DequeueResult.Interceptor
setDequeueResultInterceptor(DequeueResult.Interceptor dri)
Set the dequeue results interceptor for this client connection.String[]
status()
Return the status of the StreamBase ServerString[]
status(boolean verbose)
Return the status of the StreamBase ServerStreamProperties
subscribe(StreamProperties props)
Subscribe to a streamStreamProperties
subscribe(StreamProperties props, String logicalstream, String predicate)
Subscribe to a stream with a predicate.StreamProperties
subscribe(String streamname)
Subscribe to a streamStreamProperties
subscribe(String streamName, CaptureTransformStrategy strategy)
Subscribe to a streamStreamProperties
subscribe(String streamName, CaptureTransformStrategy strategy, String logicalstream, String predicate)
Subscribe to a stream with a predicate.StreamProperties
subscribe(String streamname, String logicalstream, String predicate)
Subscribe to a stream with a predicate.StreamProperties
subscribe(String streamname, String logicalstream, String predicate, DeMUXStreamBaseClient.DequeueListener listener)
This is the only subscribe operation that is supported by the DeMUXStreamBaseClientvoid
unsubscribe(StreamProperties logicalProps)
Unsubscribe from the given stream name.void
unsubscribe(String logicalStreamName)
Unsubscribe from the given stream name.-
Methods inherited from class com.streambase.sb.client.StreamBaseClient
addConnectionStatusCallback, assureNameStartsWithContainer, checkValidStreamname, close, close, describe, enableBuffering, enableBuffering, enqueue, enqueue, enqueue, enqueue, flushAllBuffers, flushBuffer, flushBuffer, getAllStreamProperties, getAllStreamProperties, getConnectionError, getConnectionID, getConnectionStatus, getDequeueResultsInterceptor, getDynamicVariable, getDynamicVariables, getEnqueueBufferSize, getSchemaByHash, getSchemaByName, getSchemaForStream, getStreamProperties, getStreamProperties, getStreamPropertiesByHash, getStreamPropertiesByHash, getSubscribedStreamNames, getTupleDequeueCount, getTupleEnqueueCount, getVersion, hasSchema, hasStream, hasStreamProperties, isClosed, listEntities, listEntities, listEntities, listEntities, listEntities, listEntities, readTable, readTable, removeConnectionStatusCallback, setConnectionStatus, setDynamicVariable, setQuiescentLimit, typecheck, typecheck, unsubscribeInternal
-
-
-
-
Field Detail
-
DEQUEUE_WAIT
protected static final long DEQUEUE_WAIT
- See Also:
- Constant Field Values
-
VARIABLE_RESPONSE
public static final int VARIABLE_RESPONSE
Used to signal that a command will return a variable number of responses- See Also:
- Constant Field Values
-
_admin
protected final com.streambase.sb.client.Admin _admin
-
-
Constructor Detail
-
DeMUXStreamBaseClient
public DeMUXStreamBaseClient(List<StreamBaseURI> uris, ClientSettings settings) throws URIException, StreamBaseException
- Throws:
StreamBaseException
URIException
-
DeMUXStreamBaseClient
public DeMUXStreamBaseClient(String uri) throws URIException, StreamBaseException
- Throws:
URIException
StreamBaseException
-
DeMUXStreamBaseClient
public DeMUXStreamBaseClient(StreamBaseURI streamBaseURI) throws URIException, StreamBaseException
- Throws:
URIException
StreamBaseException
-
-
Method Detail
-
subscribe
public StreamProperties subscribe(String streamname, String logicalstream, String predicate, DeMUXStreamBaseClient.DequeueListener listener) throws StreamBaseException
This is the only subscribe operation that is supported by the DeMUXStreamBaseClient- Parameters:
streamname
- Qualified or partial streamname. If partial, will be relative to the connection base (usually default.)logicalstream
- Logical name for this stream. If you specify a predicate, this is required and must be different from streamname Also, only one listener per logicalStream name is allowed for a given DeMUXStreamBaseClientpredicate
- Your querylistener
- Implement this interface to be called when your data arrives (and other events)- Returns:
- StreamProperties for the stream being listened to.
- Throws:
StreamBaseException
-
getFullLogicalName
protected static String getFullLogicalName(StreamProperties streamProperties)
-
getUniqueSuffix
public String getUniqueSuffix()
This manages a threadsafe counter which will give a different value each time it is called.- Returns:
- a string with underscore and a number. Each call to it will be different.
-
canDequeue
public boolean canDequeue()
Description copied from class:StreamBaseClient
Return true if we can call dequeue without blocking. This means that there is something to dequeue from the server. This dequeued item could be Tuples, a null (server shutdown) or an exception.- Overrides:
canDequeue
in classStreamBaseClient
- Returns:
- boolean if we can dequeue without blocking
-
dequeue
public DequeueResult dequeue() throws StreamBaseException
Description copied from class:StreamBaseClient
Dequeue a batch of tuples from a subscribed stream. This method will block. Will return null if the connection is closed.- Overrides:
dequeue
in classStreamBaseClient
- Returns:
- a DequeueResult (or null) from any subscribed stream (only one stream per DequeueResult)
- Throws:
StreamBaseException
- thrown on network or other errors
-
dequeue
public DequeueResult dequeue(long timeoutMS) throws StreamBaseException
Description copied from class:StreamBaseClient
Dequeue a batch of tuples from a subscribed stream. This method will block until there are tuples available or timeoutMS milliseconds have past. It will return null if the connection is closed. It will return an empty DequeueResult if timeoutMS milliseconds have past and no tuples have returned. A timeout_ms of zero will block indefinitely, or until a tuple arrives.- Overrides:
dequeue
in classStreamBaseClient
- Parameters:
timeoutMS
- timeout in milliseconds- Returns:
- a DequeueResult from any subscribed stream (only one stream per DequeueResult)
- Throws:
StreamBaseException
- thrown on network or other errors
-
setDequeueQueue
public DequeueResultsQueue setDequeueQueue(DequeueResultsQueue dequeuer)
-
setDequeueResultInterceptor
public DequeueResult.Interceptor setDequeueResultInterceptor(DequeueResult.Interceptor dri)
Description copied from class:StreamBaseClient
Set the dequeue results interceptor for this client connection. This results interceptor replaces any existing results processor. To disable pre-processing of results, set the processor to null. This method cannot be safely called while another thread is calling dequeue().- Overrides:
setDequeueResultInterceptor
in classStreamBaseClient
- Parameters:
dri
- dequeue results interceptor for this client connection- Returns:
- the old dequeue results interceptor
-
subscribe
public StreamProperties subscribe(String streamname) throws StreamBaseException
Description copied from class:StreamBaseClient
Subscribe to a stream- Overrides:
subscribe
in classStreamBaseClient
- Parameters:
streamname
- the stream to subscribe to- Returns:
- the stream properties
- Throws:
StreamBaseException
- thrown on error
-
subscribe
public StreamProperties subscribe(String streamName, CaptureTransformStrategy strategy) throws StreamBaseException
Description copied from class:StreamBaseClient
Subscribe to a stream- Overrides:
subscribe
in classStreamBaseClient
- Parameters:
streamName
- the name of a stream, that may be fully qualified (for example,mycontainer.MyInputSchema
). When unqualified, the container from the URI the connection was established with is used, otherwiseStreamBaseURI.DEFAULT_CONTAINER
is used.strategy
- the capture transform strategy- Returns:
- the stream properties
- Throws:
StreamBaseException
- thrown on error
-
subscribe
public StreamProperties subscribe(StreamProperties props) throws StreamBaseException
Description copied from class:StreamBaseClient
Subscribe to a stream- Overrides:
subscribe
in classStreamBaseClient
- Parameters:
props
- the stream to subscribe to- Returns:
- the stream properties
- Throws:
StreamBaseException
- thrown on error
-
subscribe
public StreamProperties subscribe(String streamname, String logicalstream, String predicate) throws StreamBaseException
Description copied from class:StreamBaseClient
Subscribe to a stream with a predicate. The stream name of dequeued tuples is logicalstream. When unsubscribing, use logicalstream.- Overrides:
subscribe
in classStreamBaseClient
- Parameters:
streamname
- the stream to subscribe tologicalstream
- the name of the logical stream to associate with this predicate (if empty or null, defaults to streamname)predicate
- a predicate to apply to subset the stream- Returns:
- the stream properties
- Throws:
StreamBaseException
- thrown on error, including empty or null streamname or predicate
-
subscribe
public StreamProperties subscribe(String streamName, CaptureTransformStrategy strategy, String logicalstream, String predicate) throws StreamBaseException
Description copied from class:StreamBaseClient
Subscribe to a stream with a predicate. The stream name of dequeued tuples is logicalstream. When unsubscribing, use logicalstream.- Overrides:
subscribe
in classStreamBaseClient
- Parameters:
streamName
- the name of a stream, that may be fully qualified (for example,mycontainer.MyInputSchema
). When unqualified, the container from the URI the connection was established with is used, otherwiseStreamBaseURI.DEFAULT_CONTAINER
is used.strategy
- the CaptureTransformStrategy to use if there are capture fields in the streamlogicalstream
- the name of the logical stream to associate with this predicate (if empty or null, defaults to streamname)predicate
- a predicate to apply to subset the stream- Returns:
- Stream properties
- Throws:
StreamBaseException
- thrown on error, including empty or null streamname or predicate
-
subscribe
public StreamProperties subscribe(StreamProperties props, String logicalstream, String predicate) throws StreamBaseException
Description copied from class:StreamBaseClient
Subscribe to a stream with a predicate. The stream name of dequeued tuples is logicalstream. When unsubscribing, use logicalstream.- Overrides:
subscribe
in classStreamBaseClient
- Parameters:
props
- the stream to subscribe tologicalstream
- the name of the logical stream to associate with this predicate (if empty or null, defaults to streamname)predicate
- a predicate to apply to subset the stream- Returns:
- stream properties
- Throws:
StreamBaseException
- thrown on error, including empty or null streamname or predicate
-
resubscribe
public StreamProperties resubscribe(String streamname, String logicalstream, String predicate) throws StreamBaseException
Description copied from class:StreamBaseClient
Resubscribe to a stream with a predicate. Allows the client to change the filtered subscribe predicate atomically so that currently buffered data is not lost (as would happen if you unsubscribed, then subscribed again with a different predicate).- Overrides:
resubscribe
in classStreamBaseClient
- Parameters:
streamname
- the stream to subscribe tologicalstream
- the name of the logical stream to associate with this predicatepredicate
- a predicate to apply to subset the stream- Returns:
- stream properties
- Throws:
StreamBaseException
- thrown on error
-
resubscribe
public StreamProperties resubscribe(StreamProperties props, String logicalstream, String predicate) throws StreamBaseException
Description copied from class:StreamBaseClient
Resubscribe to a stream with a predicate. Allows the client to change the filtered subscribe predicate atomically so that currently buffered data is not lost (as would happen if you unsubscribed, then subscribed again with a different predicate).- Overrides:
resubscribe
in classStreamBaseClient
- Parameters:
props
- the stream to subscribe tologicalstream
- the name of the logical stream to associate with this predicatepredicate
- a predicate to apply to subset the stream- Returns:
- stream properties
- Throws:
StreamBaseException
- thrown on error
-
unsubscribe
public void unsubscribe(String logicalStreamName) throws StreamBaseException
Description copied from class:StreamBaseClient
Unsubscribe from the given stream name. Note: Any tuples that are in-flight during an unsubscribe request will be dequeued until the stream is fully drained.- Overrides:
unsubscribe
in classStreamBaseClient
- Parameters:
logicalStreamName
- the name of the stream to unsubscribe from, which is logical streamname on filtered subscribe- Throws:
StreamBaseException
- thrown on network or other errors
-
unsubscribe
public void unsubscribe(StreamProperties logicalProps) throws StreamBaseException
Description copied from class:StreamBaseClient
Unsubscribe from the given stream name. Note: Any tuples that are in-flight during an unsubscribe request will be dequeued until the stream is fully drained.- Overrides:
unsubscribe
in classStreamBaseClient
- Parameters:
logicalProps
- the StreamProperties for this logical subscription- Throws:
StreamBaseException
- thrown on network or other errors
-
isSubscribed
public boolean isSubscribed(StreamProperties stream)
Description copied from class:StreamBaseClient
Return status if we are subscribed to the given stream- Overrides:
isSubscribed
in classStreamBaseClient
- Parameters:
stream
- the stream we want to check- Returns:
- status if we are subscribed to the given stream
-
getDequeueBufferSize
public int getDequeueBufferSize()
-
haModeOn
protected boolean haModeOn()
is HA mode on- Returns:
- if HA mode is on
-
getSettings
public ClientSettings getSettings()
Return the settings for this client- Returns:
- settings for this client
-
status
public String[] status() throws StreamBaseException
Return the status of the StreamBase Server- Returns:
- the status
- Throws:
StreamBaseException
- thrown on network and other errors
-
status
public String[] status(boolean verbose) throws StreamBaseException
Return the status of the StreamBase Server- Parameters:
verbose
- return a verbose status- Returns:
- the status
- Throws:
StreamBaseException
- thrown on network and other errors
-
operatorStatus
public String[] operatorStatus(String containerName) throws StreamBaseException
Return the status of all the operators in the specified container- Parameters:
containerName
- the name of the container- Returns:
- an array of strings of the form "operatorname=status"
- Throws:
StreamBaseException
- error getting status
-
getURI
public StreamBaseURI getURI()
Return the URI used by this Client. In HA mode this just returns the 1st URI for the client- Returns:
- the uri for this client
-
getURIs
public List<StreamBaseURI> getURIs()
get all of the URI's for this client- Returns:
- all of the URI's for this client
-
-