Class Operator

java.lang.Object
com.streambase.sb.operator.Operator
All Implemented Interfaces:
com.streambase.sb.util.LocatedItem, Serializable
Direct Known Subclasses:
InputAdapter, OutputAdapter

public abstract class Operator extends Object implements Serializable, com.streambase.sb.util.LocatedItem
Abstract base class for User code that is used as a Java Operator or an embedded Adapter in a StreamBase application. One instance will be created for each Java Operator in a StreamBase application. StreamBase Studio may operate on several StreamBase applications at a time, so Operator subclass instances may be in different applications.

Operator subclasses must have a public default constructor.

An Operator is notified of state changes through callbacks. The StreamBase runtime 'calls back' an Operator when it changes the runtime state of the Operator. These callbacks include resume(), resumed(), suspend(), suspended() and . shutdown().

Operator provides "managed threads", which are threads that run concurrently with the application, but which can synchronize with its overall state changes. These threads are started, suspended, resumed, and shut down with the application. Managed threads are registered with an Operator using the method registerRunnable. This is particularly useful for input adapters, which typically have to respond to external events asynchronously with the application. Managed threads can call sendOutput at any time.

If an Operator registers one or more managed threads, and all of its managed threads exit their run() methods, then the Operator itself will shut down.

The StreamBase runtime blocks while it waits for an Operator's managed threads to respond to a state change. This can be problematic if a managed thread is blocked on some event. However, the StreamBase runtime can be configured to interrupt a thread when it needs to change its state. This is accomplished by setting the flag shouldInterrupt to true when registering the thread with registerRunnable.

It may be that an Operator's managed thread does not respond to a state change even after it has been interrupted. If the Operator's thread does not respond to the state within a given time interval then it is considered to be in failure and it is shut down. This time interval is specified by the server configuration parameter operator-state-change-timeout-ms.

For sharing state amongst instances, Operators may use the services provided by the Operator.SharedObjectManager accessible via getRuntimeEnvironment()

Since 7.2.12, if your operator simply passes through Tuples without creating new Tuple instances, ensure you override isPassthruOperator() for maximum performance.

Special Studio Considerations:

Note: Serializations of instances of this class that are created (e.g., by using ObjectOutputStream) in one version of StreamBase in general will not be deserializable in any other version of StreamBase.

See Also:
  • Field Details

    • OP_CONFIG_MGR_NO_CONF

      public static final Operator.ConfigurationAccessor OP_CONFIG_MGR_NO_CONF
      A configuration manager that provides no information
    • DEFAULT_STATE_CHANGE_TIMEOUT

      public static final int DEFAULT_STATE_CHANGE_TIMEOUT
      Default value for the timeout for Operator state changes, in milliseconds: 10000

      An Operator can receive state change events from the StreamBase runtime. In particular, when the runtime starts, suspends, resumes, or shuts down, it, in turn, applies the same state change to any Operators it holds.

      When the server applies a state change to an Operator, it blocks until the Operator acknowledges that it has transitioned to the new state.

      If an Operator fails to make such an acknowledgement, the StreamBase runtime might wait indefinitely. StateChangeTimeout limits the amount of time the server will wait for an Operator to respond to a requested state change.

      If the Operator fails to respond within the timeout, the server will shut down the Operator and continue with its state change.

      See Also:
  • Constructor Details

    • Operator

      protected Operator()
      Constructs an operator.

      All Operator subclasses must have a public default constructor, otherwise StreamBase Studio and the StreamBase sbd process will not be able to instantiate the Operator subclass. Nothing substantial should be done in the constructor, except for setting port hints and setting the parameters object.

      See Also:
  • Method Details

    • setLogLevel

      public static void setLogLevel(org.slf4j.Logger logger, String level) throws StreamBaseException
      Sets the log level of the given logger. Note that this will only succeed if the default logging implementation (Logback) is used; if another logging back-end is used this call will throw an exception and the log level will remain unchanged.
      Parameters:
      logger - the Logger object on which to set the level.
      level - the level at which to set the Logger instance.
      Throws:
      StreamBaseException - the log level could not be set, most likely because the current logging implementation is not Logback.
      Since:
      6.4.4 initial release
    • setLogLevel

      public static void setLogLevel(org.slf4j.Logger logger, Operator.LogLevel level) throws StreamBaseException
      Sets the log level of the given logger. Note that this will only succeed if the default logging implementation (Logback) is used; if another logging back-end is used this call will throw an exception and the log level will remain unchanged.
      Parameters:
      logger - the Logger object on which to set the level.
      level - the Operator.LogLevel at which to set the Logger instance.
      Throws:
      StreamBaseException - the log level could not be set, most likely because the current logging implementation is not Logback.
      Since:
      6.4.6 initial release
    • getIconResource

      public URL getIconResource(Operator.IconKind iconType)
      Clients should override to support custom icons. The default implementation returns null (meaning no icon) for any kind requested.
      Parameters:
      iconType - the kind of icon being requested, as an Operator.IconKind enumeration value
      Returns:
      a URL pointing to an image resource corresponding to the type requested, or null
      See Also:
    • getDisplayName

      public final String getDisplayName()
      Return the display name of this Operator. The display name is the String that's shown in the list of available Operators.
      Returns:
      The display name of the Operator.
    • setDisplayName

      public final void setDisplayName(String dn)
      The display name is the String that's shown in the Operator Name field of the Properties view, and in the detailed text for this operator in the Palette view's Details mode. If the subclass doesn't override it, use the class name.
      Parameters:
      dn - Display name
    • getDisplayDescription

      public final String getDisplayDescription()
      Provides a description for this type of Operator. This is used by the user interface to assist the user in selecting an Operator during authoring. The expected usage is to return a three or four line description, without embedded new lines, describing the intent, or technical details that can assist a user in ensuring they are selecting the Operator or Adapter they need.
      Returns:
      The description for this type of Operator, never null
      Since:
      7.6 initial release
    • setDisplayDescription

      protected final void setDisplayDescription(String description)
      Set a description for this type of Operator. This method is to be called only during the zero-argument constructor for this Operator.
      Parameters:
      description - description for this type of Operator, never null
      Since:
      7.6 initial release
      See Also:
    • getShortDisplayName

      public final String getShortDisplayName()
      Return the short display name of this Operator. This short display name is the String displayed by Studio in the palette, and used to name new instances of this Operator to an EventFlow application.

      By default, this returns the simple class name of this Operator.

      Returns:
      The short display name of the Operator.
      Since:
      6.1.2 initial release, 6.5 uses this for new instances added to an EventFlow application
    • setShortDisplayName

      public final void setShortDisplayName(String sn)
      The short display name is the String displayed by Studio as the name of this operator in the Palette view, and above the operator in the EventFlow canvas. If the subclass doesn't override it, use the class simple name.
      Parameters:
      sn - Short display name
      Since:
      6.1.2 initial release
    • getName

      public String getName()
      Return the name of this Operator. Operators are named and can be managed by name. This may return null during typecheck and init phases.
      Returns:
      The name of the operator during runtime, or empty string during typecheck and init
    • getContainerName

      public String getContainerName()
      Return the name of this operator's container. This may return null during typecheck and init phases.
      Returns:
      The name of this operator's container. This may return an empty string during typecheck and init phases.
    • getFullyQualifiedName

      public String getFullyQualifiedName()
      Return the fully qualified name of this operator. This contains the container name, any module names and the name of the operator. This may return null during typecheck and init phases.
      Returns:
      The fully qualified name of this operator. This may return an empty string during typecheck and init phases.
    • setPortHints

      public void setPortHints(int numInputPorts, int numOutputPorts)
      Used to tell StreamBase Studio how many ports to draw on the Java Operator when the operator is drawn in the IDE's canvas. The default is 1 input port, 0 output ports.

      This method is only effective when called from the Java Operator's constructor. The typecheck method should call requireInputPortCount instead.

      To specify the ports at runtime requireInputPortCount and setOutputSchema should be called from the typecheck method.

      To specify the ports dynamically within studio, see getPortCounts.

      Parameters:
      numInputPorts - number of input ports. Must be non-negative.
      numOutputPorts - number of output ports. Must be non-negative.
      Throws:
      IllegalArgumentException - if an argument is less than zero
      See Also:
    • typecheck

      public abstract void typecheck() throws Operator.PropertyTypecheckException, TypecheckException
      The typecheck method is called by Studio and the StreamBase server to ensure that all the parameters for this operator are correct. This method is also responsible for verifying input Schemas and setting output Schemas. This method must call setOutputSchema(int, Schema) method if there are any output ports.

      If the parameters are not correct, or the input port Schemas are not correct, a Operator.PropertyTypecheckException or TypecheckException should be thrown. The former should always be used if the exception is related to a particular parameter.

      The method requireInputPortCount() should be used to verify that the required input ports are set.

      If the Operator changes the number of input ports, this method must call requireInputPortCount.

      Throws:
      Operator.PropertyTypecheckException - when a parameter value is unexpected.
      TypecheckException - when parameters or input Schemas are not satisfied.
      See Also:
    • init

      public void init() throws StreamBaseException
      After the Operator has typechecked successfully, but before the application or any managed threads start, the StreamBase server will call the init method. Operators should override this method to perform custom initialization.

      If this operator wishes to register threads, it should call registerRunnable(Runnable) from this method.

      Throws:
      StreamBaseException - Prevents the application from starting.
    • hasNotYetStarted

      public boolean hasNotYetStarted()
      Returns true if this Operator has not yet started running. hasNotYetStarted() returns true if the Operator is in the runtime state of NOT_YET_STARTED, which is the initial runtime state of an Operator.
      Returns:
      true if not started, false otherwise
      See Also:
    • isDroppingTuples

      public boolean isDroppingTuples()
      Returns true if the Operator will drop any tuples it receives when it is suspended.
      Returns:
      true if tuples are dropped when operator suspended, false otherwise
      See Also:
    • isProcessingTuples

      public boolean isProcessingTuples()
      Returns true if the Operator will process any tuples it receives when it is suspended.
      Returns:
      true if tuples are processed when suspended, false otherwise
      See Also:
    • isRunning

      public boolean isRunning()
      Returns true if this Operator is currently running, false otherwise. isRunning() returns true if the Operator is in a runtime state of STARTED.
      Returns:
      true if operator running, false otherwise
      See Also:
    • isShutdown

      public boolean isShutdown()
      Returns true if this Operator is currently shut down, false otherwise. An Operator is shut down when it is in the runtime state of SHUTDOWN, which is the terminal state for Operators.
      Returns:
      true if operator shutdown, false otherwise
      See Also:
    • isSuspended

      public boolean isSuspended()
      Returns true if this Operator is currently suspended, false otherwise. An Operator is suspended when it is in the runtime state of SUSPENDED.
      Returns:
      true if operator suspended, false otherwise
      See Also:
    • processTuple

      public abstract void processTuple(int inputPort, Tuple tuple) throws StreamBaseException
      This method will be called by the StreamBase server for each Tuple given to the Operator to process.

      By default the Tuple passed in is safe for storing and mutating after the return of processTuple. The details of this Tuple's life cycle can be controlled via setReuseTuple(boolean).

      Parameters:
      inputPort - the input port that the tuple is from (ports are zero based)
      tuple - the tuple from the given input port
      Throws:
      StreamBaseException - Terminates the application.
    • resume

      public void resume() throws Exception
      resume() is called when an operator starts or resumes execution, before any registered runnables are started or resumed. Note that if the application is shutdown directly from a suspended state, this will not be called; instead shutdown() will be called.

      resume() is a callback that is called by the StreamBase runtime.

      Exceptions thrown during calls to resume() will be processed as error tuples, and trigger a shutdown of the operator.

      Throws:
      Exception - Error processing callback
    • resumed

      public void resumed() throws Exception
      resumed() is called after all registered runnables of the operator have started or resumed. That is, once shouldRun() has unblocked and returned true in all registered runnables.

      resumed() is a callback that is called by the StreamBase runtime.

      Exceptions thrown during calls to resumed() will be processed as error tuples, and trigger a shutdown of the operator.

      Throws:
      Exception - Error processing callback
    • size

      public int size()
      Return the "size" of this operator. This size value will be displayed in sbmonitor's "size" column.

      Override this method to display the size of an important data structure in this Operator. For example the size of a queue. Returns 0 by default.

      This method must return promptly and never block, as this is called from a system statistics monitoring thread.

      Returns:
      size of this operator.
    • shouldRun

      public final boolean shouldRun()
      Return whether or not calling operator thread is enabled and should continue running.

      This method returns false if the operator has not yet started or has shut down. It blocks if the operator is suspended. It returns true if the operator is running.

      This method links managed operator threads with the StreamBase runtime. Every registered runnable object must repeatedly call this during its entire lifetime in order to synchronize with the main StreamBase application.

      An operator (and, thus, its containing application) is not considered started until all registered runnables have called this method. Likewise, an operator is not considered suspended until all registered runnables are blocked in this method. Finally, an operator is not considered shutdown until all registered runnables have exited their run() method (either on their own or in response to this method returning false).

      Returns:
      true is the operator is running, false otherwise.
      Throws:
      UnsupportedOperationException - If this was not called from an operator thread.
    • shutdown

      public void shutdown()
      shutdown is called by the StreamBase runtime just prior to shutting down this Operator. An implementation of shutdown should include any behavior needed to shut down the Operator - freeing resources, etc.

      shutdown() is a callback that is called by the StreamBase runtime.

    • postShutdown

      public void postShutdown()
      postShutdown is called by the StreamBase runtime just after shutting down this Operator.

      postShutdown() is a callback that is called by the StreamBase runtime.

    • suspend

      public void suspend() throws Exception
      suspend() will be called when an operator suspends, before any registered runnables are suspended.

      suspend() is a callback that is called by the StreamBase runtime.

      Exceptions thrown during calls to suspend() will be processed as error tuples.

      Throws:
      Exception - Error processing callback
    • setSuspendBehavior

      public void setSuspendBehavior(int suspendBehavior)
      Set the suspend behavior of this Operator. The Operator can either process or drop tuples when suspended. setSuspendBehavior() might be called in the Operator's constructor or in init().
      Parameters:
      suspendBehavior - The suspend behavior to set, either PROCESSING_TUPLES or DROPPING_TUPLES.
      See Also:
    • suspended

      public void suspended() throws Exception
      suspended() will be called after all registered runnables of the operator have suspended. That is, once all registered runnables have called and are blocked in shouldRun().

      suspended() is a callback that is called by the StreamBase runtime.

      Exceptions thrown during calls to suspended() will be processed as error tuples.

      Throws:
      Exception - Error processing callback
    • remoteNodeActive

      public void remoteNodeActive(String remoteNodeName)
      Called when a remote node has become active in the cluster.
      Parameters:
      remoteNodeName - the remote node name
      Since:
      10.0 initial release
    • remoteNodeUnavailable

      public void remoteNodeUnavailable(String remoteNodeName)
      Called when a remote node has become unavailable.
      Parameters:
      remoteNodeName - the remote node name
      Since:
      10.0 initial release
    • remoteQuorumLost

      public void remoteQuorumLost(String availabilityZoneName, String reason)
      Called after deactivating all partitions when loss of quorum is detected.
      Parameters:
      availabilityZoneName - name of the availability zone that owns the associated quorum
      reason - string containing a reason why loss was detected.
      Since:
      10.4
    • partitionActive

      public void partitionActive(String name)

      Called when a partition transitions into Active state.

      In the Active state, partition is running on the active node for the partition.

      Parameters:
      name - name of the partition that becomes active
      Since:
      10.4
    • partitionNotActive

      public void partitionNotActive(String name)

      Called when a partition transitions into any other state from Active state.

      Parameters:
      name - name of the partition that becomes not Active
      Since:
      10.4
    • getParameters

      public Parameterizable getParameters()
      Returns the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.
      Returns:
      The Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.
    • setParameters

      public void setParameters(Parameterizable params)
      Sets the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.
      Parameters:
      params - the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.
    • getResourceContents

      public InputStream getResourceContents(String name) throws ResourceNotFoundException, StreamBaseException
      Returns an open input stream on the contents of the named resource file. The client is responsible for closing the stream when finished.

      Resource names must be a valid Java resource name which is a string consisting of a sequence of substrings, delimited by slashes (/) The default project resource directories are src/main/resources and src/test/resources (test phase only).

      For example, a resource file stored in src/main/resources/data.csv is accessed using data.csv or /data.csv, while a resource file stored in src/main/resources/com/examples/datafiles/data.csv is accessed using com/examples/datafiles/data.csv or /com/examples/datafiles/data.csv.

      Call this, or getResourceFile(String), during typecheck (as opposed to waiting for init() or similar runtime methods) in order to be able to surface to the authoring environment any failures locating or reading the resource.

      Parameters:
      name - resource name
      Returns:
      an input stream containing the contents of the resource
      Throws:
      ResourceNotFoundException - resource cannot be found
      StreamBaseException - resource found, but cannot be opened
      Since:
      6.4 initial release, 10.0 added support for fully qualified resource names
      See Also:
    • getResourceFile

      public File getResourceFile(String name) throws ResourceNotFoundException, StreamBaseException
      Returns a File pointing to the regular file or directory if it exists. If this returns a regular file, getResourceContents(String) is guaranteed not to fail.

      Resource names must be a valid Java resource name which is a string consisting of a sequence of substrings, delimited by slashes (/). The default project resource directories are src/main/resources and src/test/resources (test phase only).

      For example, a resource file stored in src/main/resources/data.csv is accessed using data.csv or /data.csv, while a resource file stored in src/main/resources/com/examples/datafiles/data.csv is accessed using com/examples/datafiles/data.csv or /com/examples/datafiles/data.csv.

      Call this, or getResourceContents(String), during typecheck (as opposed to waiting for init() or similar runtime methods) in order to be able to surface to the authoring environment any failures locating or reading the resource.

      Parameters:
      name - resource name
      Returns:
      a File pointing to the named resource, or null if one cannot be associated with the named resource (for example, if storage is not available on a file system for this resource)
      Throws:
      ResourceNotFoundException - resource cannot be found
      StreamBaseException - resource found, but cannot be opened
      Since:
      6.4 initial release, 10.0 added support for fully qualified resource names
      See Also:
    • registerRunnable

      public final void registerRunnable(String runnableName, Runnable operatorRunnable, boolean shouldInterrupt, boolean synchronizedShutdown) throws StreamBaseException
      Register a Runnable object to be managed by this Operator. This Runnable will be started in a Thread when the Operator starts, suspended and resumed with the Operator, and stopped with the Operator.

      The body of this Runnable must hook into StreamBase's thread management by repeatedly calling the shouldRun() method during its entire lifetime. This allows managed threads to be started, stopped, suspended, and resumed along with the rest of the application.

      This method should be called from the operator's init() method.

      Parameters:
      runnableName - The name of this runnable (may be null). Names are reported on the statistics stream, and displayed by sbmonitor and sbmanager.
      operatorRunnable - The Runnable to register.
      shouldInterrupt - Whether this thread should be interrupted when the operator undergoes a state change. Typically true if the thread blocks on some event, input for example.
      synchronizedShutdown - Whether to wait for this thread to exit before postShutdown is called.
      Throws:
      StreamBaseException - If it is too late in the Operator's life cycle to register a Runnable.
      Since:
      6.6.14 initial release
    • registerRunnable

      public final void registerRunnable(String runnableName, Runnable operatorRunnable, boolean shouldInterrupt) throws StreamBaseException
      Register a Runnable object to be managed by this Operator. This Runnable will be started in a Thread when the Operator starts, suspended and resumed with the Operator, and stopped with the Operator.

      The body of this Runnable must hook into StreamBase's thread management by repeatedly calling the shouldRun() method during its entire lifetime. This allows managed threads to be started, stopped, suspended, and resumed along with the rest of the application.

      This method should be called from the operator's init() method.

      Parameters:
      runnableName - The name of this runnable (may be null). Names are reported on the statistics stream, and displayed by sbmonitor and sbmanager.
      operatorRunnable - The Runnable to register.
      shouldInterrupt - Whether this thread should be interrupted when the operator undergoes a state change. Typically true if the thread blocks on some event, input for example.
      Throws:
      StreamBaseException - If it is too late in the Operator's life cycle to register a Runnable.
      Since:
      6.5 initial release
    • registerRunnable

      public final void registerRunnable(Runnable operatorRunnable, boolean shouldInterrupt) throws StreamBaseException
      Register a Runnable object to be managed by this Operator. This Runnable will be started in a Thread when the Operator starts, suspended and resumed with the Operator, and stopped with the Operator.

      The body of this Runnable must hook into StreamBase's thread management by repeatedly calling the shouldRun() method during its entire lifetime. This allows managed threads to be started, stopped, suspended, and resumed along with the rest of the application.

      This method should be called from the operator's init() method.

      Parameters:
      operatorRunnable - The Runnable to register.
      shouldInterrupt - Whether this thread should be interrupted when the operator undergoes a state change. Typically true if the thread blocks on some event, input for example. -
      Throws:
      StreamBaseException - If it is too late in the Operator's life cycle to register a Runnable.
    • registerRunnable

      @Deprecated public final void registerRunnable(Runnable operatorRunnable) throws StreamBaseException
      Deprecated.
      As of StreamBase version 3.7, replaced by registerRunnable(Runnable, boolean)
      Register a Runnable object to be managed by this Operator. This Runnable will be started in a Thread when the Operator starts, suspended and resumed with the Operator, and stopped with the Operator.

      The body of this Runnable must hook into StreamBase's thread management by repeatedly calling the shouldRun() method during its entire lifetime. This allows managed threads to be started, shut down, suspended, and resumed along with the rest of the application.

      This method should be called from the adapter's init() method.

      Parameters:
      operatorRunnable - The Runnable to register.
      Throws:
      StreamBaseException - If it is too late in the Operator's life cycle to register a Runnable.
    • scheduleRunnable

      public Cancellable scheduleRunnable(Date when, Runnable runner)
      Schedules a Runnable object to be run at some future point as part of the same parallel region as this Operator. Prefer using this to implement scheduling events by the operator rather than allocating a new thread using registerRunnable, if the parallelism is not necessary.
      Parameters:
      when - Time to run the runner. If it is in the past, it is run immediately
      runner - The Runnable to schedule
      Returns:
      Handle to task for cancellation, if the task hasn't already started running. This handle is thread-safe, but attempting to cancel an already started task has no effect.
      Since:
      7.4 initial release
    • requireInputPortCount

      protected void requireInputPortCount(int numPorts) throws com.streambase.sb.operator.PortMismatchException
      Throws a PortMismatchException if the number of ports is not numPorts. The StreamBase Studio IDE will recognize this and draw the operator box with the appropriate number of input ports.

      This method should be called by the typecheck method.

      Parameters:
      numPorts - the number of ports
      Throws:
      com.streambase.sb.operator.PortMismatchException - When the number of input ports is incorrect.
    • getPortCounts

      public PortCounts getPortCounts() throws TypecheckException
      An optional method that subclasses can override to dynamically tell Studio the number of input and output ports.

      Clients should expect this method to be called after all setters, but prior to typecheck. This method will not be called if Studio encountered any exception while calling each setter with the property values just entered by the user.

      Note: This is for studio purposes only and should not be used to set any fields that you may want to use in other parts of the Operator.

      Returns:
      PortCounts record type
      Throws:
      TypecheckException - Type check exception
    • getAsyncInputPorts

      public int[] getAsyncInputPorts()
      Override to indicate to Studio which ports (0-based) might process data asynchronously, this provides different rendering on the EventFlow canvas to assist users in understanding their data flow
      Returns:
      the zero based indices of the asynchronous ports; null by default
      Since:
      7.6 initial release
    • getInputPortCount

      public final int getInputPortCount()
      Return the number of input ports.
      Returns:
      number of Input ports, if not in an application 0.
    • getOutputPortCount

      public final int getOutputPortCount()
      Returns the number of output ports. The number of output ports are set by the setOutputSchema(int, Schema) method.
      Returns:
      Number of output ports
      See Also:
    • newTupleDataInitializer

      public ITupleDataInitializer newTupleDataInitializer(int inputPort, int outputPort) throws StreamBaseException
      Create a new ITupleDataInitializer based on the specified input and output ports.

      A ITupleDataInitializer provides an efficient way to create an output tuple from an input tuple. It will copy the data from the input into the output tuple. The input is cleared in the process.

      Parameters:
      inputPort - The input port
      outputPort - The output port
      Returns:
      A new tuple data initializer
      Throws:
      StreamBaseException - The tuple data initializer could not be created
      Since:
      11.1.0
    • sendOutput

      public void sendOutput(int port, Tuple tuple) throws StreamBaseException
      Enqueue a Tuple to be sent synchronously to downstream operators.

      All calls to sendOutput(int, Tuple) are well ordered with respect to other calls to sendOutput(int, Tuple). Meaning that downstream logic will receive the tuples in the order they are sent.

      Calls to sendOutputAsync(int, Tuple) and sendOutput(int, List) are not well ordered with respect to each other. Meaning calls from the two methods can be received downstream in any order.

      A note about reusing/caching the sent tuple.

      If the given tuple is a tuple that this operator received from a processTuple() call, then the tuple can be reused if and only if getReuseTuple() is FALSE.

      If the given tuple is created by this operator then it can be reused when the call to sendOutput() returns. StreamBase is done with the tuple once control returns to this operator.

      sendOutput(int, Tuple) uses this call's transaction, if one is active, otherwise a new transaction is started to send the Tuple to downstream operators.

      Parameters:
      port - The output port the Tuple is enqueued upon (ports are zero based)
      tuple - The Tuple to enqueue
      Throws:
      StreamBaseException - if there is an error internally sending the output.
      IllegalArgumentException - if the port is invalid or any tuple in the tuples argument doesn't match the schema of the output port.
      Since:
      10.0 transactional behavior
    • sendOutput

      public void sendOutput(int port, List<Tuple> tuples) throws StreamBaseException
      Enqueue a List of Tuples to be sent synchronously to downstream operators.

      All calls to sendOutput(int, Tuple) are well ordered with respect to other calls to sendOutput(int, Tuple). Meaning that downstream logic will receive the tuples in the order they are sent.

      Calls to sendOutputAsync(int, Tuple) and sendOutput(int, List) are not well ordered with respect to each other. Meaning calls from the two methods can be received downstream in any order.

      A note about reusing/caching the sent tuples.

      If the tuples in the given list are tuples that this operator received from processTuple() calls, then these tuples can be reused if and only if getReuseTuple() is FALSE.

      If the given list of tuples is created by this operator, then the tuples in this list can be reused when the call to sendOutput() returns. StreamBase is done with the tuples once control returns to this operator.

      sendOutput(int, List) uses this call's transaction, if one is active, otherwise a new transaction is started to send the Tuple to downstream operators.

      Parameters:
      port - The output port the Tuple is enqueued upon (ports are zero based)
      tuples - The List of Tuple objects to enqueue
      Throws:
      StreamBaseException - if there is an error internally sending the output.
      IllegalArgumentException - if the port is invalid or any tuple in the tuples argument doesn't match the schema of the output port.
      Since:
      10.0 transactional behavior
    • sendOutputAsync

      public void sendOutputAsync(int port, Tuple tuple) throws StreamBaseException
      Enqueue a Tuple to be sent asynchronously to downstream operators.

      All calls to sendOutputAsync(int, Tuple) are well ordered with respect to other calls to sendOutputAsync(int, Tuple). Meaning that downstream logic will receive the tuples in the order they are sent.

      Calls to sendOutputAsync(int, Tuple) and sendOutput(int, List) are not well ordered with respect to each other. Meaning calls from the two methods can be received downstream in any order.

      Note: The Tuple will be queued into the application some time in the future. The asynchronous nature of this call could cause subtle race conditions in applications. The performance differential (if any) between sendOutputAsync and sendOutput is highly dependent upon the application and hardware. Benchmarking is the only way to ensure the fastest possible implementation.

      The tuple can be reused once sendOutputAsync returns.

      A new transaction is always started when the Tuple is read from the asynchronous queue for processing.

      Parameters:
      port - The output port the Tuple is enqueued upon (ports are zero based)
      tuple - A tuple to enqueue asynchronously
      Throws:
      StreamBaseException - if there is an error internally sending the output.
      IllegalArgumentException - if the port is invalid or any tuple in the tuples argument doesn't match the schema of the output port.
      Since:
      6.4.3 initial release, 10.0 transactional behavior
    • sendOutputAsync

      public void sendOutputAsync(int port, List<Tuple> tuples) throws StreamBaseException
      Enqueue a List of Tuples to be sent asynchronously to downstream operators.

      All calls to sendOutputAsync(int, Tuple) are well ordered with respect to other calls to sendOutputAsync(int, Tuple). Meaning that downstream logic will receive the tuples in the order they are sent.

      Calls to sendOutputAsync(int, Tuple) and sendOutput(int, List) are not well ordered with respect to each other. Meaning calls from the two methods can be received downstream in any order.

      Note: The Tuples will be queued into the application some time in the future. The asynchronous nature of this call could cause subtle race conditions in applications. The performance differential (if any) between sendOutputAsync and sendOutput is highly dependent upon the application and hardware. Benchmarking is the only way to ensure the fastest possible implementation.

      The tuples (and the List<Tuple>) can be reused once sendOutputAsync returns.

      A new transaction is always started when the Tuple is read from the asynchronous queue for processing.

      Parameters:
      port - The output port the Tuple is enqueued upon (ports are zero based)
      tuples - A List<Tuple> to enqueue asynchronously
      Throws:
      StreamBaseException - if there is an error internally sending the output.
      IllegalArgumentException - if the port is invalid or any tuple in the tuples argument doesn't match the schema of the output port.
      Since:
      6.4.3 initial release, 10.0 transactional behavior
    • getInputSchema

      public final Schema getInputSchema(int port)
      Returns the schema of an input port. At typecheck time or before, behaves like getTypecheckInputSchema(int). After typecheck time, behaves like getRuntimeInputSchema(int).
      Parameters:
      port - the port to return the schema for (ports are zero based)
      Returns:
      Schema for the port, null if not running in an appication
      Throws:
      IndexOutOfBoundsException - if port not in range
    • getRuntimeInputSchema

      public final Schema getRuntimeInputSchema(int port)
      Returns the schema of an input port at runtime. This schema will never contain capture fields, as they will have been transformed according to the CaptureTransformStrategy set by setCaptureStrategy(CaptureTransformStrategy). This will be the actual schema of tuples passed to processTuple(int, Tuple)
      Parameters:
      port - the port to return the schema for (ports are zero based)
      Returns:
      Schema for the port, null if not running in an appication
      Throws:
      IndexOutOfBoundsException - if port not in range
      Since:
      7.2.6 initial release
    • getTypecheckInputSchema

      public final Schema getTypecheckInputSchema(int port)
      Returns the schema of an input port that was set at application typecheck time. This schema may contain capture fields, and may not represent the actual schema of tuples passed into the processTuple(int, Tuple) method. For the actual schema of those tuples, see getRuntimeInputSchema(int).
      Parameters:
      port - the port to return the schema for (ports are zero based)
      Returns:
      Schema for the port, null if not running in an appication
      Throws:
      IndexOutOfBoundsException - if port not in range
      Since:
      7.2.6 initial release
    • getOutputSchema

      public final Schema getOutputSchema(int outputPort)
      Return the output schema for the given output port (zero-based). If called before the operator is initialized, this method will act as getTypecheckOutputSchema(int). Otherwise, if called from init() or later, it will act as getRuntimeOutputSchema(int)
      Parameters:
      outputPort - the port to return the schema for (ports are zero based)
      Returns:
      Schema for the port, null if not running in an appication
      Throws:
      IndexOutOfBoundsException - if port not in range
    • evaluate

      public String evaluate(String st, Tuple input) throws StreamBaseException
      evaluate the given string expression in the context of the running module If an error occurs during the evaluation, the message return value will be a string containing the error message
      Parameters:
      st - the expression to be evaluated
      input - the input tuple
      Returns:
      the evaluated expression as a string or an error message if evaluation failed.
      Throws:
      StreamBaseException - Could not perform evaluation
      Since:
      7.6 initial release
    • setDynamicVariable

      public void setDynamicVariable(String name, Object value) throws StreamBaseException
      Sets the value of this module's dynamic variable. Valid only at runtime.
      Parameters:
      name - name of the dynamic variable to be set
      value - the new value (may be null)
      Throws:
      StreamBaseException - if an error occurs
      Since:
      7.6 initial release
    • getNamedSchema

      public Schema getNamedSchema(String name)
      Parameters:
      name - of the named schema
      Returns:
      a Schema object representing this named schema or null if there is no named schema in this module with name
      Since:
      7.6 initial release
    • getDynamicVariablesSchema

      public Schema getDynamicVariablesSchema()
      Retrieves a Schema containing information about every dynamic variable available to the module this operator is contained in. Never null: an empty schema is returned to indicate that there are no visible dynamic variables.
      Returns:
      the schema of the containing module's dynamic variables. Note that certain internal dynamic variables (such as those used by sequence operators) are not included.
      Since:
      7.6 initial release
    • getDynamicVariables

      public Tuple getDynamicVariables() throws TupleException
      Retrieves a read-only tuple describing the current value of all dynamic variables in the module this operator is contained in. Never null: an empty tuple is returned to indicate that there are no visible dynamic variables.
      Returns:
      a read-only tuple containing all dynamic variables in this module.
      Throws:
      TupleException - Error creating return tuple
      Since:
      7.6 initial release
    • setManagedState

      public final boolean setManagedState(String key, Serializable value)
      Store a value in transactional memory

      This method can only be called when an operator is configured to use transactional memory for its state. There must be an active transaction when this method is called.

      Parameters:
      key - Key value. Must be cluster unique if state is replicated.
      value - Value to store
      Returns:
      true if successful, false otherwise
      Throws:
      UnsupportedOperationException - Operator state not in transactional memory or no active transaction
      Since:
      10.0 initial release
    • getManagedState

      public final Serializable getManagedState(String key)
      Get a value from transactional memory.

      There must be an active transaction when this method is called

      Parameters:
      key - Key value
      Returns:
      Stored value or null if no value associated with key
      Throws:
      UnsupportedOperationException - Operator state not in transactional memory or no active transaction
      Since:
      10.0 initial release
    • getStorageMethod

      public final StorageMethod getStorageMethod()
      Get operator's state's storage method.

      May return null if information is not available, e.g. during typecheck.

      Returns:
      data storage used for this operator
      Since:
      10.2 initial release
    • getTransactionIsolationLevel

      public final TransactionIsolationLevel getTransactionIsolationLevel()
      Get current transaction isolation level.

      May return null if information is not available, e.g. during typecheck.

      Returns:
      The current transaction isolation level
      Since:
      10.2 initial release
    • isConcurrentByDefault

      public boolean isConcurrentByDefault()
      Indicate whether an operator should be in a concurrent region by default

      The default value is false.

      The value of this method is used at design-time to indicate whether an operator should be put into a concurrent region when added to an EventFlow.

      Returns:
      true if this operator will be in a concurrent region by default, false otherwise.
      Since:
      10.0 initial release
    • supportsTransactionalMemory

      public boolean supportsTransactionalMemory()
      Indicate whether an operator optionally supports transactional memory

      Override this method to return true to indicate that an operator can optionally use transactional memory as storage. The default value is false.

      Using setManagedState(java.lang.String, java.io.Serializable) and getManagedState(java.lang.String) are valid only if this method returns true and getStorageMethod() returns transactional memory

      Returns:
      true if this operator optionally supports transactional memory, false otherwise.
      Since:
      10.2 initial release
    • getDataDirectory

      public final Path getDataDirectory() throws IOException
      Get data directory.

      The data directory can be used as required to store data files. Files stored in this directory can only be accessed relative to the returned directory path, i.e. this directory is not on the classpath of the fragment.

      The data directory can be configured using the com.tibco.ep.streambase.configuration.sbengine configuration type.

       
       name = "data-area-configuration"
       version = "1.0.0"
       type = "com.tibco.ep.streambase.configuration.sbengine"
       configuration =
       {
       StreamBaseEngine =
       {
       streamBase =
       {
       dataAreaPath = "v1-data-area"
       }
       }
       }
       
       

      Any files created in this directory are the responsibility of the operator that created them. Any clean up must be explicitly done by the operator.

      The data directory is global to an executing fragment, so all operators running in a fragment can write files into this directory. It is recommended that sub-directories be used to isolate files.

      WARNING: It is illegal to call this method during typecheck

      Returns:
      Path to a writable data directory. The return value can never be null.
      Throws:
      IOException - Could not create the data directory
      Since:
      10.2 initial release
    • getRuntimeOutputSchema

      public final Schema getRuntimeOutputSchema(int outputPort)
      Return the output schema that should actually use to sendOutput(int, Tuple) or sendOutputAsync(int, Tuple). Note that this schema will sometimes be different than the output schema set at typecheck time by the transformation determined by setCaptureStrategy(CaptureTransformStrategy). It will never contain capture fields.

      This method may not be called until after typecheck is finished; to do so will cause an IllegalStateException

      Parameters:
      outputPort - the port to return the schema for (ports are zero based)
      Returns:
      Schema for the port, null if not running in an appication
      Throws:
      IndexOutOfBoundsException - if port not in range
      Since:
      7.2.6 initial release
    • getTypecheckOutputSchema

      public final Schema getTypecheckOutputSchema(int outputPort)
      Return the output schema as set by typecheck. Note that if this schema contains any capture fields, it may not be the same as the correct schema for tuples sent to sendOutput(int, Tuple) at runtime.
      Parameters:
      outputPort - the port to return the schema for (ports are zero based)
      Returns:
      Schema for the port, null if not running in an appication
      Throws:
      IndexOutOfBoundsException - if port not in range
      Since:
      7.2.6 initial release
    • getCaptureStrategy

      public final CaptureTransformStrategy getCaptureStrategy()
      Returns the capture transform strategy that this Operator will use.
      Returns:
      the CaptureTransformStrategy to use
      Since:
      7.2.6 initial release
    • setCaptureStrategy

      public final void setCaptureStrategy(CaptureTransformStrategy cts)
      Set the capture transform strategy to use for the inputs and outputs of this Operator. Valid values for cts include CaptureTransformStrategy.FLATTEN and CaptureTransformStrategy.NEST. The default is CaptureTransformStrategy.FLATTEN.
      Parameters:
      cts - the CaptureTransformStrategy to use.
      Since:
      7.2.6 initial release
    • setOutputSchema

      public final Schema setOutputSchema(int port, Schema outputSchema) throws TypecheckException
      Sets the output schema for the given output port (port #'s are zero based). This method should be called from the typecheck method of a Java Operator or Adapter.

      Note that the actual Schema object to create tuples at runtime (for use by sendOutput(int, Tuple) or sendOutputAsync(int, Tuple)) should be retrieved using getRuntimeOutputSchema(int), as that will take into account any capture field transformations determined by getCaptureStrategy().

      Parameters:
      port - the port to set the given schema to (ports are zero based)
      outputSchema - the schema to set the given port to
      Returns:
      returns the typecheck version of the output schema
      Throws:
      TypecheckException - Typecheck error
      Since:
      7.0 returns Schema, 7.2 The returned schema should not be the Schema used to create output tuples. For the Schema to use to create output tuples, call getRuntimeOutputSchema(int) in the init() method or later.
      See Also:
    • sendErrorOutput

      public void sendErrorOutput(String message)
      Send an error tuple from this operator's error port.

      This method will also record the message in the debug log.

      Parameters:
      message - A description of the error, must not be null
      Throws:
      IllegalArgumentException - if message is null
      Since:
      5.1 initial release
    • sendErrorOutput

      public void sendErrorOutput(Throwable t)
      Send an exception via the error output port

      This method will also record the message in the debug log.

      Parameters:
      t - the exception to report
      Throws:
      IllegalArgumentException - if exception is null
      Since:
      7.0 initial release
    • sendErrorOutput

      public void sendErrorOutput(Throwable t, int port, Tuple errorTuple)
      Send an exception via the error output port

      This method will also record the message in the debug log.

      Parameters:
      t - the exception to report
      port - the input/output port that the error is related to
      errorTuple - a tuple that caused or is related to the error
      Throws:
      IllegalArgumentException - if exception is null
      Since:
      7.0 initial release
    • getTableAccessor

      public TableAccessor getTableAccessor(String name) throws StreamBaseException
      Get a TableAccessor for a table in the local module by name of that table.

      Warning: This interface is provisional, and will likely change in upcoming versions of StreamBase.

      Parameters:
      name - the table name
      Returns:
      a table accessor
      Throws:
      StreamBaseException - Error accessing TableAccessor
      Since:
      7.2 initial release
    • getLogger

      public org.slf4j.Logger getLogger()
      Retrieves a Logger suitable for logging messages and exceptions for this Operator. Equivalent to LoggerFactory.getLogger(Class) with arguments this.getClass().
      Returns:
      a Logger instance for this Operator
      Since:
      6.0 initial release
    • getReuseTuple

      public boolean getReuseTuple()
      Get the state of the input Tuple reuse flag.
      Returns:
      boolean if input Tuple reuse is allowed/disallowed
    • setReuseTuple

      protected void setReuseTuple(boolean reuse)
      Allow/disallow the runtime to reuse tuples on operator input. The default is to disallow tuple reuse. Reusing operator input tuples will create fewer objects which may increase performance of the operator. To enable call: setReuseTuple(true) from the derived constructor.

      Note: that if you plan to store the input Tuple as state in your operator you must copy the Tuple before you store it, if you have called setReuseTuple(true).

      Parameters:
      reuse - allow/disallow tuple reuse
    • getStateChangeTimeout

      public int getStateChangeTimeout()
      Returns:
      the Operator state change timeout value, in milliseconds
      See Also:
    • getLocation

      public com.streambase.sb.util.Location getLocation()
      Return the Operator-wide location, useful for error reporting not associated to a particular property. If an error is related to a property, use getLocation(String) instead
      Specified by:
      getLocation in interface com.streambase.sb.util.LocatedItem
      Returns:
      the location for error reporting from this operator.
    • getLocation

      public com.streambase.sb.util.Location getLocation(String property)
      Return a new location within this Operator, associated with the given property name. Equivalent to getLocation() for StreamSQL-sourced Operators. Using this as an argument to TypecheckException constructors will cause Studio to display an error indicated alongside the UI widget corresponding to the passed in property.
      Parameters:
      property - expected to be the bean name of an Operator property
      Returns:
      the location for error reporting from this operator
      Since:
      6.5 initial release
    • getRuntimeEnvironment

      public Operator.RuntimeEnvironment getRuntimeEnvironment()
      Return the Operator.RuntimeEnvironment for this StreamBase Server. NOTE: Not valid during typecheck
      Returns:
      current RuntimeEnvironment, or null during typecheck
      Since:
      7.0 initial release
    • isRuntime

      public boolean isRuntime()
      Returns:
      true if called during runtime (after the server has started), false otherwise
      Since:
      7.6 initial release
    • getOperatorConfigurationAccessor

      public Operator.ConfigurationAccessor getOperatorConfigurationAccessor()
      Provides access to operator and adapter configuration information from the server's configuration file. This call is intended to be made during typecheck or later.
      Returns:
      a Operator.ConfigurationAccessor to provide access to the aforementioned
      Since:
      7.5 initial release
    • getDataSourceConnection

      public Connection getDataSourceConnection(String datasource) throws StreamBaseException
      Retrieve a connection to a JDBC Data Source configured in HOCON configuration files. The given datasource is the identifier of a jdbcDataSources entry in the configuration's JDBCDataSourceGroup.

      This method may return an existing connection, or create a new connection if necessary.

      This method always returns null during typecheck.

      Parameters:
      datasource - data source identifier
      Returns:
      a JDBC connection, or null on failure or during typecheck
      Throws:
      StreamBaseException - if HOCON config does not contain the specified datasource, or if there's an error trying to create a connection.
      Since:
      10.6.3
    • getSchemaForCapture

      public Schema getSchemaForCapture(String captureName, int depth)
      Finds the schema for the given capture name in the context that this operator is running under. For a capture field that was not nested inside any other capture field, use a depth value of 0. To expand the capture fields in a schema returned by this function with a depth of d, call this function with a depth of d+1. For example, if an output stream of this module has schema (f int, c @A), then you might call getSchemaForCapture("A", 0) to get (g int, c @B) as the schema bound to A. Then you might call getSchemaForCapture("B", 1) to get (h int) as the schema bound to B.

      Note that the "nesting depth" here has nothing to do with capture fields found in lists or tuples -- the only containment that affects the nesting depth is a capture within another capture.

      Parameters:
      captureName - the name of the bound capture type
      depth - the depth of nesting at which to look up the schema of this capture binding.
      Returns:
      the schema associated with the bound capture type.
      Since:
      7.2 initial release
    • getTupleCaptureTransformer

      public TupleCaptureTransformer getTupleCaptureTransformer(Schema s) throws StreamBaseException
      Get a TupleCaptureTransformer capable of translating tuples with the given schema to the equivalent schema with all the capture fields expanded out, and translating expanded tuples back into tuples with the given schema

      This method may only be called at runtime; the exact schemas of any capture fields are not fully determined at typecheck time.

      Parameters:
      s - the schema with capture fields
      Returns:
      a TupleCaptureTransformer able to do the given transformations
      Throws:
      StreamBaseException - if this method is called at the wrong time, or if the transformations requested are not possible. This may occur, for example, if there is a name conflict between a captured field and a field already in the schema.
      Since:
      7.2 initial release
    • isPassthruOperator

      public boolean isPassthruOperator()
      Allows an operator to declare itself to be a "pass-thru" operator -- one that calls its sendOutput method with the unmodified tuples received by its processTuple method. Certain runtime optimizations are possible for pass-thru operators. In addition, the "older API" messages is suppressed for such operators. The StreamBase log adapter is an example of a pass-thru operator, as it logs, but leaves unmodified, the tuples that pass through it.
      Returns:
      true if the operator is pass-thru and false otherwise
      Since:
      7.2.12 initial release
    • getSearchKeywords

      @Deprecated public String[] getSearchKeywords()
      Deprecated.
      As of StreamBase version 10.0, keywords are configured through the operator manifest settings.

      Override to declare strings that assist the user in filtering for this operator when in Studio. Without search keywords, operators are found only by their short display name.

      NOTE: Since StreamBase 10.0, this method is not used, and has been replaced by Manifest properties. For search key-words, set the "KeyWords" manifest attribute. Please see the documentation for details.

      Returns:
      the default implementation returns no search keywords
      Since:
      7.3.10 initial release
    • allowsConcurrency

      public boolean allowsConcurrency()
      Override to indicate whether concurrency may be configured in Studio against this operator. Note that this applies to all instances, and is not intended to be interpreted on a per-instance basis. The default value when not overriden is true.

      This has no effect on the actual runtime environment for operator instances.

      Returns:
      whether this operator allows Studio to set any concurrency options
      Since:
      7.4.0 initial release
    • getProposedInputSchemas

      public Schema[] getProposedInputSchemas(String mainName)
      Override to provide schemas that Studio will offer to users to import into their projects. Neither property setting nor typecheck has occurred when Studio invokes this method.
      Parameters:
      mainName - the name the user has requested be the prefix for the top level schema, and this name should be used throughout any sub-field schemas
      Returns:
      an array of Schemas to offer for input ports
      Since:
      7.4.0 initial release
    • getProposedOutputSchemas

      public Schema[] getProposedOutputSchemas(String mainName)
      Override to provide schemas that Studio will offer to users to import into their projects. Neither property setting nor typecheck has occurred when Studio invokes this method.
      Parameters:
      mainName - the name the user has requested be the prefix for the top level schema, and this name should be used throughout any sub-field schemas
      Returns:
      an array of Schemas to offer for output ports
      Since:
      7.4.0 initial release
    • supportsArtifacts

      public boolean supportsArtifacts()
      Indicates whether or not the operator supports the use of artifacts

      The default implementation of this methods returns false. Override this method to return true if the operator uses artifacts.

      Returns:
      true if the operator supports the use of artifacts, false otherwise
    • getArtifactManager

      public OperatorArtifactManager getArtifactManager()
      Returns an ArtifactManager instance for managing the operator's artifacts.
      Returns:
      an ArtifactManager instance
    • getLock

      public Lock getLock()
      Return the guard for module state.
      Returns:
      the Lock instance