Class StreamBaseClient

java.lang.Object
com.streambase.sb.client.BaseClient
com.streambase.sb.client.StreamBaseClient
All Implemented Interfaces:
AutoCloseable
Direct Known Subclasses:
DeMUXStreamBaseClient

public class StreamBaseClient extends BaseClient
A client used to subscribe to StreamBase streams and to send commands to the StreamBase Server. Note that this implementation is not synchronized. If instances of this class are accessed from multiple threads, access must be synchronized. With the exception of close(), the single threaded restriction dictates that no thread may reference the object - including enqueueing - while another thread is in dequeue(). If multiple threads subscribe to streams, it is recommended that they use separate instances of this class.
  • Constructor Details

  • Method Details

    • close

      public void close() throws StreamBaseException
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in class BaseClient
      Throws:
      StreamBaseException
    • close

      public void close(String message) throws StreamBaseException
      Flush all buffers and close the connection to the StreamBase server. StreamBaseClient memory, network, and thread resources are not released until close() is called.
      Parameters:
      message - close message
      Throws:
      StreamBaseException - close error
    • canDequeue

      public boolean canDequeue()
      Return true if we can call dequeue without blocking. This means that there is something to dequeue from the server. This dequeued item could be Tuples, a null (server shutdown) or an exception.
      Returns:
      boolean if we can dequeue without blocking
    • isClosed

      public boolean isClosed()
      Return true if the client connection is closed. Once reported as closed, clients will never be reported as open again. Calling dequeue, enqueue, etc. on a closed client will result in an exception. This method is provided as a convenience, there is no need to determine if a client is closed before calling any of it's methods.
      Returns:
      boolean if we can dequeue without blocking
    • dequeue

      public DequeueResult dequeue() throws StreamBaseException
      Dequeue a batch of tuples from a subscribed stream. This method will block. Will return null if the connection is closed.
      Returns:
      a DequeueResult (or null) from any subscribed stream (only one stream per DequeueResult)
      Throws:
      StreamBaseException - thrown on network or other errors
    • dequeue

      public DequeueResult dequeue(long timeoutMS) throws StreamBaseException
      Dequeue a batch of tuples from a subscribed stream. This method will block until there are tuples available or timeoutMS milliseconds have past. It will return null if the connection is closed. It will return an empty DequeueResult if timeoutMS milliseconds have past and no tuples have returned. A timeout_ms of zero will block indefinitely, or until a tuple arrives.
      Parameters:
      timeoutMS - timeout in milliseconds
      Returns:
      a DequeueResult from any subscribed stream (only one stream per DequeueResult)
      Throws:
      StreamBaseException - thrown on network or other errors
    • setDequeueResultInterceptor

      public DequeueResult.Interceptor setDequeueResultInterceptor(DequeueResult.Interceptor dri)
      Set the dequeue results interceptor for this client connection. This results interceptor replaces any existing results processor. To disable pre-processing of results, set the processor to null. This method cannot be safely called while another thread is calling dequeue().
      Parameters:
      dri - dequeue results interceptor for this client connection
      Returns:
      the old dequeue results interceptor
    • getDequeueResultsInterceptor

      public DequeueResult.Interceptor getDequeueResultsInterceptor()
      Get the current dequeue results interceptor, or null if there is no current processor.
      Returns:
      the current dequeue results interceptor
    • describe

      public String describe(String entityName) throws StreamBaseException
      Return an XML description of a StreamBase entity
      Parameters:
      entityName - the name of the entity to describe
      Returns:
      an XML description of a StreamBase entity
      Throws:
      StreamBaseException - thrown on network or other errors
    • enableBuffering

      public void enableBuffering(int bufferSize, long flushIntervalMilliSeconds) throws StreamBaseException
      Turn on buffering. A WakeAndFlushBuffer thread is only started if flushInterval > 0.
      Parameters:
      bufferSize - specifies the number of tuples to buffer before enqueueing. If set to a non-positive value, this call has no effect
      flushIntervalMilliSeconds - specifies the interval in milliseconds between wakeups of WakeAndFlushBuffer
      Throws:
      StreamBaseException - thrown on network or other errors
    • enableBuffering

      public void enableBuffering(int bufferSize) throws StreamBaseException
      Turn on buffering with a default WakeAndFlushBuffer thread set to 250ms
      Parameters:
      bufferSize - specifies the number of tuples to buffer before enqueueing. if a non-positive value is given, this call has no effect
      Throws:
      StreamBaseException - thrown on network or other errors
      See Also:
    • enqueue

      public void enqueue(String stream, Tuple tuple) throws StreamBaseException
      Enqueue a single Tuple onto a stream. This method can block depending on network, or StreamBase server, congestion.

      Performance note: this method should be avoided where possible. Use enqueue(StreamProperties, Tuple) instead.

      Parameters:
      stream - the name of the stream on which to enqueue the provided tuple.
      tuple - the Tuple to enqueue
      Throws:
      StreamBaseException - thrown on network or other errors
      See Also:
    • enqueue

      public void enqueue(StreamProperties props, Tuple tuple) throws StreamBaseException
      Enqueue a single Tuple onto a stream. This method can block depending on network, or StreamBase server, congestion.
      Parameters:
      props - the StreamProperties for the stream to enqueue the tuple to
      tuple - the Tuple to enqueue
      Throws:
      StreamBaseException - thrown on network or other errors
    • enqueue

      public void enqueue(String stream, Collection<Tuple> tuples) throws StreamBaseException
      Enqueue a collection of Tuples onto a stream. All tuples must be for the same stream. This method can block depending on network, or StreamBase server, congestion.

      Performance note: this method should be avoided where possible. Use enqueue(StreamProperties, Collection) instead.

      Parameters:
      stream - the name of the stream on which to enqueue the provided tuples.
      tuples - a Collection of Tuples to enqueue. Note that the tuples will be modified as part of enqueue
      Throws:
      StreamBaseException - thrown on network or other errors
      See Also:
    • enqueue

      public void enqueue(StreamProperties props, Collection<Tuple> tuples) throws StreamBaseException
      Enqueue a collection of Tuples onto a stream. All tuples must be for the same stream. This method can block depending on network, or StreamBase server, congestion.
      Parameters:
      props - the StreamProperties for the stream to enqueue the tuples to
      tuples - a Collection of Tuples to enqueue. Note that the tuples will be modified as part of enqueue
      Throws:
      StreamBaseException - thrown on network or other errors
    • flushAllBuffers

      public void flushAllBuffers() throws StreamBaseException
      Flush any pending enqueue buffers. This operation has no effect if buffering is not enabled.
      Throws:
      StreamBaseException - if there is an IO error while flushing the buffer
    • flushBuffer

      @Deprecated public void flushBuffer(String stream_name) throws StreamBaseException
      Deprecated.
      use flushAllBuffers() to preserve inter-stream ordering
      Flush any pending enqueue buffer for the stream name provided. This operation has no effect if buffering is not enabled or there is no buffer to flush for the given stream.

      Note: Note that this will cause inter-stream ordering to be interrupted.

      Parameters:
      stream_name - the stream whose enqueue buffers to flush, if not empty
      Throws:
      StreamBaseException - if there is an IO error while flushing the buffer
    • flushBuffer

      @Deprecated public void flushBuffer(StreamProperties props) throws StreamBaseException
      Deprecated.
      use flushAllBuffers() to preserve inter-stream ordering
      Flush any pending enqueue buffer for the StreamProperties provided. This operation has no effect if buffering is not enabled or there is no buffer to flush for the given stream.

      Note: Note that this will cause inter-stream ordering to be interrupted.

      Parameters:
      props - the stream whose enqueue buffers to flush, if not empty
      Throws:
      StreamBaseException - if there is an IO error while flushing the buffer
    • setQuiescentLimit

      public void setQuiescentLimit(long timeoutMS) throws StreamBaseException
      If more then timeoutMS milliseconds elapse without receiving data or a heart beat from the server, the client will close.
      Parameters:
      timeoutMS - - If no heart beat is received from the server in timeoutMS milliseconds, the server is considered unavailable. This only has effect if the server is configured to emit client heart beats. This is not a real-time detection mechanism, so generous timeout limits should be used. 0 disables quiescent server detection.
      Throws:
      StreamBaseException - thrown if the request limit is not at least two times the servers configured client heart beat rate.
    • getSchemaByHash

      public Schema getSchemaByHash(byte[] hash) throws StreamBaseException
      Return the Schema object for the given hash value
      Parameters:
      hash - a byte array that contains a hash value
      Returns:
      The Schema object associated with the given hash value
      Throws:
      StreamBaseException - thrown on network or other errors
    • hasSchema

      public boolean hasSchema(String schemaName)
      Returns whether or not a schema with the passed in schemaName is available; you can get this Schema by calling getSchemaByName.
      Parameters:
      schemaName - the name of the Schema to look up
      Returns:
      true if a schema was found
      Since:
      5.0
      See Also:
    • getSchemaByName

      public Schema getSchemaByName(String name) throws StreamBaseException
      Return the Schema object for the given name. This will only succeed for named schemas. For anonymous schemas assigned to streams use getStreamProperties(String).
      Parameters:
      name - the name of the Schema to lookup
      Returns:
      the Schema for the given name
      Throws:
      StreamBaseException - thrown on network or other errors
    • hasStreamProperties

      public boolean hasStreamProperties(String streamName)
      Returns whether or not a stream with the given name exists. You can get a StreamProperties for this stream by calling getStreamProperties.
      Parameters:
      streamName - the stream name to look for
      Returns:
      true if a stream with the given name exists, false otherwise
      Since:
      5.0
      See Also:
    • getStreamProperties

      public StreamProperties getStreamProperties(String streamName, CaptureTransformStrategy strategy) throws StreamBaseException
      Return the StreamProperties for the given stream name
      Parameters:
      streamName - the name of a stream, that may be fully qualified (for example, mycontainer.MyInputSchema). When unqualified, the container from the URI the connection was established with is used, otherwise StreamBaseURI.DEFAULT_CONTAINER is used.
      strategy - capture transform strategy
      Returns:
      The StreamProperties object for the given stream name
      Throws:
      StreamBaseException - thrown on network or other errors
      Since:
      7.2.6
    • getStreamProperties

      public StreamProperties getStreamProperties(String streamName) throws StreamBaseException
      Return the StreamProperties for the given stream name
      Parameters:
      streamName - the name of a stream, that may be fully qualified (for example, mycontainer.MyInputSchema). When unqualified, the container from the URI the connection was established with is used, otherwise StreamBaseURI.DEFAULT_CONTAINER is used.
      Returns:
      The StreamProperties object for the given stream name
      Throws:
      StreamBaseException - thrown on network or other errors
    • assureNameStartsWithContainer

      protected static String assureNameStartsWithContainer(StreamProperties base, String logicalName)
      Ensure name starts with container
      Parameters:
      base - Properties
      logicalName - Name to validate
      Returns:
      Fully qualified name
    • getStreamPropertiesByHash

      public StreamProperties getStreamPropertiesByHash(String hashHexString) throws StreamBaseException
      Return the StreamProperties for the given hash StreamProperties are cached locally in the client.
      Parameters:
      hashHexString - The Hash as a HexString
      Returns:
      The StreamProperties object for the given name
      Throws:
      StreamBaseException - thrown on network or other errors
    • getStreamPropertiesByHash

      public StreamProperties getStreamPropertiesByHash(byte[] hash) throws StreamBaseException
      Return the StreamProperties for the given hash StreamProperties are cached locally in the client.
      Parameters:
      hash - The Hash to lookup
      Returns:
      The StreamProperties object for the given name
      Throws:
      StreamBaseException - thrown on network or other errors
    • getAllStreamProperties

      public Set<StreamProperties> getAllStreamProperties(EntityType type, CaptureTransformStrategy strategy) throws StreamBaseException
      Return all the StreamProperties available
      Parameters:
      type - Can be STREAM, INPUT_STREAMS, or OUTPUT_STREAMS
      strategy - the CaptureTransformStrategy to use. Can be FLATTEN or NEST
      Returns:
      A Set of StreamProperties
      Throws:
      StreamBaseException - thrown on network or other errors
      Since:
      7.2.6
    • getAllStreamProperties

      public Set<StreamProperties> getAllStreamProperties(EntityType type) throws StreamBaseException
      Return all the StreamProperties available. Uses CaptureTransformStrategy.FLATTEN
      Parameters:
      type - Can be STREAM, INPUT_STREAMS, or OUTPUT_STREAMS
      Returns:
      A Set of StreamProperties
      Throws:
      StreamBaseException - thrown on network or other errors
    • hasStream

      public boolean hasStream(String streamName)
      Returns whether or not the client has a stream with the passed in name. You can get the schema for this stream by calling getSchemaForStream
      Parameters:
      streamName - stream name
      Returns:
      true if the client has streamName
      Since:
      5.0
      See Also:
    • getSchemaForStream

      public Schema getSchemaForStream(String streamName) throws StreamBaseException
      Return the schema of a stream.
      Parameters:
      streamName - the name of a stream, that may be fully qualified (for example, mycontainer.MyInputSchema). When unqualified, the container from the URI the connection was established with is used, otherwise StreamBaseURI.DEFAULT_CONTAINER is used.
      Returns:
      the schema of the stream
      Throws:
      StreamBaseException - if the stream is not found, or on network errors
    • getVersion

      public static String getVersion()
      Get the version of this client, returned in the format specified by Version.INFO_LINE
      Returns:
      a version string
    • listEntities

      public String[] listEntities(EntityType entityType, CaptureTransformStrategy strategy) throws StreamBaseException
      Return an array of entity names for the given entity type. Equivalent to listEntities(EntityType, int) with StreamBaseClient.ListEntityFlags.NO_FLAGS as the second argument. Note: if the container you are connected to does not exist when issuing this request, and this call is made without the StreamBaseClient.ListEntityFlags.ALL_CONTAINERS flag, the request will fail.
      Parameters:
      entityType - the entity type that the looking is being done for
      strategy - the strategy to use for capture fields
      Returns:
      an array of entity names for the given entity type
      Throws:
      StreamBaseException - thrown on network or other errors
      Since:
      7.2.6
      See Also:
    • listEntities

      public String[] listEntities(EntityType entityType) throws StreamBaseException
      Return an array of entity names for the given entity type. Equivalent to listEntities(EntityType, int) with StreamBaseClient.ListEntityFlags.NO_FLAGS as the second argument. Note: if the container you are connected to does not exist when issuing this request, and this call is made without the StreamBaseClient.ListEntityFlags.ALL_CONTAINERS flag, the request will fail.
      Parameters:
      entityType - the entity type that the looking is being done for
      Returns:
      an array of entity names for the given entity type
      Throws:
      StreamBaseException - thrown on network or other errors
      See Also:
    • listEntities

      public String[] listEntities(EntityType entityType, int flags, CaptureTransformStrategy strategy) throws StreamBaseException
      Return an array of entity names for the given entity type and flags.

      Examples are as follows:

       
        // list streams on container specified in StreamBaseURI given to StreamBaseClient
        client.listEntities(EntityType.STREAM, StreamBaseClient.ListEntityFlags.NO_FLAGS) 
        // list operators on ALL containers
        client.listEntities(EntityType.OPERATOR, StreamBaseClient.ListEntityFlags.ALL_CONTAINERS)
       

      Note: if the container you are connected to does not exist when issuing this request, and this call is made without the StreamBaseClient.ListEntityFlags.ALL_CONTAINERS flag, the request will fail.

      Parameters:
      entityType - the entity type that the looking is being done for
      flags - a combination of flags from StreamBaseClient.ListEntityFlags
      strategy - the strategy to use for capture fields
      Returns:
      an array of entity names for the given entity type
      Throws:
      StreamBaseException - thrown on network or other errors
      Since:
      7.2.6
      See Also:
    • listEntities

      public String[] listEntities(EntityType entityType, int flags) throws StreamBaseException
      Return an array of entity names for the given entity type and flags.

      Examples are as follows:

       
        // list streams on container specified in StreamBaseURI given to StreamBaseClient
        client.listEntities(EntityType.STREAM, StreamBaseClient.ListEntityFlags.NO_FLAGS) 
        // list operators on ALL containers
        client.listEntities(EntityType.OPERATOR, StreamBaseClient.ListEntityFlags.ALL_CONTAINERS)
       

      Note: if the container you are connected to does not exist when issuing this request, and this call is made without the StreamBaseClient.ListEntityFlags.ALL_CONTAINERS flag, the request will fail.

      Parameters:
      entityType - the entity type that the looking is being done for
      flags - a combination of flags from StreamBaseClient.ListEntityFlags
      Returns:
      an array of entity names for the given entity type
      Throws:
      StreamBaseException - thrown on network or other errors
      See Also:
    • listEntities

      public String[] listEntities(String entityPath, int flags, CaptureTransformStrategy strategy) throws StreamBaseException
      Return an array of entity names for the given entity path. The entityPath can be either a simple entity type: "streams" or a path that includes a container such as: "mycontainer.streams". Valid entity types are from the EntityType class

      Examples are as follows:

       
        // list streams on container specified in StreamBaseURI given to StreamBaseClient
        client.listEntities("streams", StreamBaseClient.ListEntityFlags.NO_FLAGS) 
        // list streams on container "other"
        client.listEntities("other.streams", StreamBaseClient.ListEntityFlags.NO_FLAGS)
        // list streams on ALL containers
        client.listEntities("streams", StreamBaseClient.ListEntityFlags.ALL_CONTAINERS)
       

      Note: if the container you are connected to does not exist when issuing this request, and this call is made without the StreamBaseClient.ListEntityFlags.ALL_CONTAINERS flag, the request will fail.

      Parameters:
      entityPath - the entity type to do the lookup. Use container.entity-type to resolve across containers
      flags - a combination of flags from StreamBaseClient.ListEntityFlags
      strategy - the strategy to use for capture fields
      Returns:
      an array of entity names for the given entity type
      Throws:
      StreamBaseException - thrown on network or other errors
      Since:
      7.2.6
      See Also:
    • listEntities

      public String[] listEntities(String entityPath, int flags) throws StreamBaseException
      Return an array of entity names for the given entity path. The entityPath can be either a simple entity type: "streams" or a path that includes a container such as: "mycontainer.streams". Valid entity types are from the EntityType class

      Examples are as follows:

       
        // list streams on container specified in StreamBaseURI given to StreamBaseClient
        client.listEntities("streams", StreamBaseClient.ListEntityFlags.NO_FLAGS) 
        // list streams on container "other"
        client.listEntities("other.streams", StreamBaseClient.ListEntityFlags.NO_FLAGS)
        // list streams on ALL containers
        client.listEntities("streams", StreamBaseClient.ListEntityFlags.ALL_CONTAINERS)
       

      Note: if the container you are connected to does not exist when issuing this request, and this call is made without the StreamBaseClient.ListEntityFlags.ALL_CONTAINERS flag, the request will fail.

      Parameters:
      entityPath - the entity type to do the lookup. Use container.entity-type to resolve across containers
      flags - a combination of flags from StreamBaseClient.ListEntityFlags
      Returns:
      an array of entity names for the given entity type
      Throws:
      StreamBaseException - thrown on network or other errors
      See Also:
    • subscribe

      public StreamProperties subscribe(String streamname) throws StreamBaseException
      Subscribe to a stream
      Parameters:
      streamname - the stream to subscribe to
      Returns:
      the stream properties
      Throws:
      StreamBaseException - thrown on error
    • subscribe

      public StreamProperties subscribe(String streamName, CaptureTransformStrategy strategy) throws StreamBaseException
      Subscribe to a stream
      Parameters:
      streamName - the name of a stream, that may be fully qualified (for example, mycontainer.MyInputSchema). When unqualified, the container from the URI the connection was established with is used, otherwise StreamBaseURI.DEFAULT_CONTAINER is used.
      strategy - the capture transform strategy
      Returns:
      the stream properties
      Throws:
      StreamBaseException - thrown on error
      Since:
      7.2.6
    • subscribe

      public StreamProperties subscribe(StreamProperties props) throws StreamBaseException
      Subscribe to a stream
      Parameters:
      props - the stream to subscribe to
      Returns:
      the stream properties
      Throws:
      StreamBaseException - thrown on error
    • subscribe

      public StreamProperties subscribe(String streamname, String logicalstream, String predicate) throws StreamBaseException
      Subscribe to a stream with a predicate. The stream name of dequeued tuples is logicalstream. When unsubscribing, use logicalstream.
      Parameters:
      streamname - the stream to subscribe to
      logicalstream - the name of the logical stream to associate with this predicate (if empty or null, defaults to streamname)
      predicate - a predicate to apply to subset the stream
      Returns:
      the stream properties
      Throws:
      StreamBaseException - thrown on error, including empty or null streamname or predicate
    • subscribe

      public StreamProperties subscribe(String streamName, CaptureTransformStrategy strategy, String logicalstream, String predicate) throws StreamBaseException
      Subscribe to a stream with a predicate. The stream name of dequeued tuples is logicalstream. When unsubscribing, use logicalstream.
      Parameters:
      streamName - the name of a stream, that may be fully qualified (for example, mycontainer.MyInputSchema). When unqualified, the container from the URI the connection was established with is used, otherwise StreamBaseURI.DEFAULT_CONTAINER is used.
      strategy - the CaptureTransformStrategy to use if there are capture fields in the stream
      logicalstream - the name of the logical stream to associate with this predicate (if empty or null, defaults to streamname)
      predicate - a predicate to apply to subset the stream
      Returns:
      Stream properties
      Throws:
      StreamBaseException - thrown on error, including empty or null streamname or predicate
      Since:
      7.2.6
    • subscribe

      public StreamProperties subscribe(StreamProperties props, String logicalStream, String predicate) throws StreamBaseException
      Subscribe to a stream with a predicate. The stream name of dequeued tuples is logicalstream. When unsubscribing, use logicalstream.
      Parameters:
      props - the stream to subscribe to
      logicalStream - the name of the logical stream to associate with this predicate (if empty or null, defaults to streamname)
      predicate - a predicate to apply to subset the stream
      Returns:
      stream properties
      Throws:
      StreamBaseException - thrown on error, including empty or null streamname or predicate
    • resubscribe

      public StreamProperties resubscribe(String streamname, String logicalstream, String predicate) throws StreamBaseException
      Resubscribe to a stream with a predicate. Allows the client to change the filtered subscribe predicate atomically so that currently buffered data is not lost (as would happen if you unsubscribed, then subscribed again with a different predicate).
      Parameters:
      streamname - the stream to subscribe to
      logicalstream - the name of the logical stream to associate with this predicate
      predicate - a predicate to apply to subset the stream
      Returns:
      stream properties
      Throws:
      StreamBaseException - thrown on error
    • resubscribe

      public StreamProperties resubscribe(StreamProperties props, String logicalStream, String predicate) throws StreamBaseException
      Resubscribe to a stream with a predicate. Allows the client to change the filtered subscribe predicate atomically so that currently buffered data is not lost (as would happen if you unsubscribed, then subscribed again with a different predicate).
      Parameters:
      props - the stream to subscribe to
      logicalStream - the name of the logical stream to associate with this predicate
      predicate - a predicate to apply to subset the stream
      Returns:
      stream properties
      Throws:
      StreamBaseException - thrown on error
    • typecheck

      public StreamProperties[] typecheck(String application) throws StreamBaseException
      Typecheck (validate) the given StreamBase Application. Return a list of StreamProperties for that Application
      Parameters:
      application - contents of an application. Text sbapp or ssql.
      Returns:
      a list of StreamProperties for the StreamBase application
      Throws:
      StreamBaseException - on network or typecheck error
    • typecheck

      public StreamProperties[] typecheck(String application, boolean full) throws StreamBaseException
      Typecheck (validate) the given StreamBase Application. Return a list of StreamProperties for that Application
      Parameters:
      application - contents of an application. Text sbapp or ssql.
      full - do a full typecheck
      Returns:
      a list of StreamProperties for the StreamBase application
      Throws:
      StreamBaseException - on network or typecheck error
    • unsubscribe

      public void unsubscribe(String logicalStreamName) throws StreamBaseException
      Unsubscribe from the given stream name. Note: Any tuples that are in-flight during an unsubscribe request will be dequeued until the stream is fully drained.
      Parameters:
      logicalStreamName - the name of the stream to unsubscribe from, which is logical streamname on filtered subscribe
      Throws:
      StreamBaseException - thrown on network or other errors
    • unsubscribe

      public void unsubscribe(StreamProperties logicalProps) throws StreamBaseException
      Unsubscribe from the given stream name. Note: Any tuples that are in-flight during an unsubscribe request will be dequeued until the stream is fully drained.
      Parameters:
      logicalProps - the StreamProperties for this logical subscription
      Throws:
      StreamBaseException - thrown on network or other errors
    • unsubscribeInternal

      protected void unsubscribeInternal(StreamProperties logicalProps) throws StreamBaseException
      Un-subscribe from stream
      Parameters:
      logicalProps - Properties
      Throws:
      StreamBaseException - Error un-subscribing
    • getConnectionID

      public byte[] getConnectionID()
      Return the Connection ID for this Client Connection. Only Valid once an enqueue/dequeue has been attempted.
      Returns:
      binary connection id or null if connection hasn't been established
    • getSubscribedStreamNames

      public Set<String> getSubscribedStreamNames()
      Returns a Set of names of the Streams this client is currently subscribed to. This list is cached locally.
      Returns:
      Set of Stream names as Strings
    • isSubscribed

      public boolean isSubscribed(StreamProperties stream)
      Return status if we are subscribed to the given stream
      Parameters:
      stream - the stream we want to check
      Returns:
      status if we are subscribed to the given stream
      Since:
      6.4
    • connectionIDObject

      com.streambase.sb.util.ByteString connectionIDObject()
      return the connectionId as a ByteString
      Returns:
      connectionId as a ByteString
    • setConnectionID

      void setConnectionID(com.streambase.sb.util.ByteString connectionID)
      set the connection id
      Parameters:
      connectionID - connection identifier
    • getSubscribedStreams

      Map<String,SubscriptionEntry> getSubscribedStreams()
      Get subscribed streams
      Returns:
      All subscribed streams
    • checkValidStreamname

      protected static void checkValidStreamname(String streamname) throws StreamBaseException
      Validate stream name
      Parameters:
      streamname - Stream name
      Throws:
      StreamBaseException - Invalid stream name
    • getDynamicVariable

      public Object getDynamicVariable(String dynamicVariablePath) throws StreamBaseException
      Get the given dynamic variable
      Parameters:
      dynamicVariablePath - the path to the dynamic variable, expressed as the path to the module containing the dynamic variable, and then a dot, and then the name of the dynamic variable.
      Returns:
      the value of the dynamic variable, with the same Java type as would be returned by Tuple.getField(String)
      Throws:
      StreamBaseException - if the dynamic variable path does not exist or an error occurs while communicating with the server
      Since:
      7.0
    • getDynamicVariables

      public Tuple getDynamicVariables(String modulePath) throws StreamBaseException
      Get a Tuple of all the dynamic variables on the given module
      Parameters:
      modulePath - the module to get the dynamic variables of
      Returns:
      a Tuple of all the dynamic variables in the given module. The fields in the tuple are the names of the dynamic variables, the values are the current values of the dynamic variables.
      Throws:
      StreamBaseException - if the module does not exist, or an error occurs while communicating with the server
      Since:
      7.0
    • setDynamicVariable

      public void setDynamicVariable(String dynvarPath, Object value) throws StreamBaseException
      Set the given dynamic variable to a new value
      Parameters:
      dynvarPath - the path to the dynamic variable to set. This consists of the dotted path to the module that contains the dynamic variable, followed by a dot, followed by the name of the dynamic variable.
      value - the value to set the dynamic variable to. It should be the same type that getDynamicVariable(String) would return for this dynamic variable.
      Throws:
      StreamBaseException - if the dynamic variable does not exist, or the value is not appropriate for setting the dynamic variable
      Since:
      7.0
    • getTupleEnqueueCount

      public long getTupleEnqueueCount()
      Returns the number of tuples this client has actually enqueued to the server. Note that this number will not include any tuples that have been buffered but have not actually been enqueued to the server.
      Returns:
      number of tuples enqueued
    • getTupleDequeueCount

      public long getTupleDequeueCount()
      Returns the number of tuples this client has dequeued from the server. This number does not include tuples that may be dequeued as part of system services such as heart beat. This number does however include tuples that have been dequeued but have been intercepted by the Intercepter.
      Returns:
      number of tuples dequeued
    • incrementTuplesEnqueued

      void incrementTuplesEnqueued(long additionalTuples)
      Increment enqueued tuple count
      Parameters:
      additionalTuples - Increment value
    • incrementTuplesDequeued

      void incrementTuplesDequeued(long additionalTuples)
      Increment dequeued tuple count
      Parameters:
      additionalTuples - Increment value
    • getDequeuer

      com.streambase.sb.client.DequeueResultsQueue getDequeuer()
      Get dequeuer
      Returns:
      Dequeue queue
    • getEnqueueBufferSize

      public int getEnqueueBufferSize()
      Return the number of tuples in the enqueue buffer
      Returns:
      number of tuples in the enqueue buffer
    • readTable

      public List<Tuple> readTable(String tablePath, int rowLimit) throws StreamBaseException
      Return rows from a Query Table or Materialized Window. No guarantees are made regarding the order of rows returned.
      Parameters:
      tablePath - Path to the table. A fully qualified path is a dot-separated string optionally starting with a container name (when omitted, the container specified by this client's URI is used), followed by zero or more module reference names, and finally the table or window name. For example, default.ModuleRef1.DataTable is a valid fully qualified path, where default. may be omitted when this client is connected to the default container.
      rowLimit - Limit on number of rows to return, or -1 for all rows.
      Returns:
      A list of rows from the table.
      Throws:
      StreamBaseException - If the connection is not valid or the server cannot be contacted, or the table does not exist, or the predicate is faulty.
      Since:
      7.2 Initial version, 10.2 Executes a cluster wide-query on partitioned transasctional memory tables
    • readTable

      public List<Tuple> readTable(String tablePath, int rowLimit, String predicate) throws StreamBaseException
      Return rows from a Query Table or Materialized Window, applying an optional predicate to decide which rows are returned. No guarantees are made regarding the order of rows returned.
      Parameters:
      tablePath - Path to the table. A fully qualified path is a dot-separated string optionally starting with a container name (when omitted, the container specified by this client's URI is used), followed by zero or more module reference names, and finally the table or window name. For example, default.ModuleRef1.DataTable is a valid fully qualified path, where default. may be omitted when this client is connected to the default container.
      rowLimit - Limit on number of rows to return, or -1 for all rows.
      predicate - A predicate to apply to the rows to select those for retrieval. A null predicate returns all rows.
      Returns:
      A list of rows from the table.
      Throws:
      StreamBaseException - If the connection is not valid or the server cannot be contacted, or the table does not exist, or the predicate is faulty.
      Since:
      7.2 Initial version, 10.2 Executes a cluster wide-query on partitioned transasctional memory tables
    • addConnectionStatusCallback

      public void addConnectionStatusCallback(ConnectionStatusCallback callback)
      Track changes to the connection state. When a state change occurs, such as the client connecting or disconnecting, call the stateChanged method.
      Parameters:
      callback - a class that implements the ConnectionStatusCallback interface
    • removeConnectionStatusCallback

      public void removeConnectionStatusCallback(ConnectionStatusCallback callback)
      Stop tracking changes to the connection state.
      Parameters:
      callback - an instance of ConnectionStatusCallback
    • setConnectionStatus

      protected void setConnectionStatus(ConnectionStatus status, String additionalInfo)
      Set connection status
      Parameters:
      status - New status
      additionalInfo - Additional information
    • getConnectionStatus

      public ConnectionStatus getConnectionStatus()
      Return the current connection status of this StreamBaseClient.
      Returns:
      the current connection status (CREATED, CONNECTED, DISCONNECTED)
    • getConnectionError

      public String getConnectionError()
      If the StreamBaseClient disconnected due to an internal error, return a message corresponding to the error, otherwise return null.
      Returns:
      the error message that caused the client to disconnect, null if not applicable