Class Operator
- java.lang.Object
-
- com.streambase.sb.operator.Operator
-
- All Implemented Interfaces:
LocatedItem
,Serializable
- Direct Known Subclasses:
InputAdapter
,OutputAdapter
public abstract class Operator extends Object implements Serializable, LocatedItem
Abstract base class for User code that is used as a Java Operator or an embedded Adapter in a StreamBase application. One instance will be created for each Java Operator in a StreamBase application. StreamBase Studio may operate on several StreamBase applications at a time, so Operator subclass instances may be in different applications.Operator subclasses must have a public default constructor.
An Operator is notified of state changes through callbacks. The StreamBase runtime 'calls back' an Operator when it changes the runtime state of the Operator. These callbacks include
resume()
,resumed()
,suspend()
,suspended()
and .shutdown()
.Operator provides "managed threads", which are threads that run concurrently with the application, but which can synchronize with its overall state changes. These threads are started, suspended, resumed, and shut down with the application. Managed threads are registered with an Operator using the method
registerRunnable
. This is particularly useful for input adapters, which typically have to respond to external events asynchronously with the application. Managed threads can callsendOutput
at any time.If an Operator registers one or more managed threads, and all of its managed threads exit their
run()
methods, then the Operator itself will shut down.The StreamBase runtime blocks while it waits for an Operator's managed threads to respond to a state change. This can be problematic if a managed thread is blocked on some event. However, the StreamBase runtime can be configured to interrupt a thread when it needs to change its state. This is accomplished by setting the flag
shouldInterrupt
to true when registering the thread withregisterRunnable
.It may be that an Operator's managed thread does not respond to a state change even after it has been interrupted. If the Operator's thread does not respond to the state within a given time interval then it is considered to be in failure and it is shut down. This time interval is specified by the server configuration parameter
operator-state-change-timeout-ms
.For sharing state amongst instances, Operators may use the services provided by the
Operator.SharedObjectManager
accessible viagetRuntimeEnvironment()
Since 7.2.12, if your operator simply passes through Tuples without creating new Tuple instances, ensure you override
isPassthruOperator()
for maximum performance.Special Studio Considerations:
- Call
setPortHints(int, int)
in your constructor to set the initial number of ports when presented in an EventFlow editor - Call
setDisplayName(String)
in your constructor to set a user-friendly name for your Operator. This name is used in the Operator Name field in the Properties view, and as the details text in the Palette view's Details mode. - Call
setShortDisplayName(String)
in your constructor to set a short user-friendly name for your Operator. This name is used as the name of the operator in the Palette view and above the operator in the EventFlow canvas. - Override
getPortCounts()
if you change port counts based on user properties changing - Override
getIconResource(com.streambase.sb.operator.Operator.IconKind)
if you wish to be displayed using your own icons - Since 6.5, during
typecheck()
, subclasses should consider throwingOperator.PropertyTypecheckException
instead ofTypecheckException
in order to allow Studio to decorate UI widgets that caused the typecheck error. SeeOperator.PropertyTypecheckException
andgetLocation(String)
for details. - See also
UIHints
andParameterizable
for additional control over property display in Studio - Since 7.3.10, override
getSearchKeywords()
to provide user assistance when users in Studio are filtering the list of available operators and adapters. - Since 10.2.0,
supportsArtifacts()
andOperator.ArtifactProperties
Note: Serializations of instances of this class that are created (e.g., by using
ObjectOutputStream
) in one version of StreamBase in general will not be deserializable in any other version of StreamBase.- See Also:
Parameterizable
, Serialized Form
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
Operator.ArtifactProperties
For operators supporting Artifacts, this class is used to communicate properties about them during development in Studio, and is reported to the operator immediately prior totypecheck()
along with all other setters.static interface
Operator.ConfigurationAccessor
Provides access to configuration information for use by operators and adapters, available from the server's configuration file.static class
Operator.IconKind
An enumeration for the different kinds of icons that StreamBase Studio may request when displaying Operators and Adapters.static class
Operator.LogLevel
Included values are, in decreasing order of chattiness:Operator.LogLevel.ALL
,Operator.LogLevel.TRACE
,Operator.LogLevel.DEBUG
,Operator.LogLevel.INFO
,Operator.LogLevel.WARN
,Operator.LogLevel.ERROR
,Operator.LogLevel.OFF
.static class
Operator.OperatorStates
The set of runtime states that an Operator can be in.class
Operator.OperatorThread
The thread that wraps an operator runnableclass
Operator.PropertyTypecheckException
A typecheck exception associated with an Operator property (or parameter) by name.static interface
Operator.RuntimeEnvironment
This interface is used to gain access to StreamBase Server information and facilities.static interface
Operator.SharedObject
A SharedObject is an object that can be shared between Operators within a Container.static interface
Operator.SharedObjectManager
The manager for SharedObjects within a container.static class
Operator.SuspendBehaviorStates
Suspend behavior defines how an Operator handles tuples when it is suspended; meaning when it is in theSUSPENDED
state.
-
Field Summary
Fields Modifier and Type Field Description static int
DEFAULT_STATE_CHANGE_TIMEOUT
Default value for the timeout for Operator state changes, in milliseconds: 10000static Operator.ConfigurationAccessor
OP_CONFIG_MGR_NO_CONF
A configuration manager that provides no information
-
Constructor Summary
Constructors Modifier Constructor Description protected
Operator()
Constructs an operator.
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Deprecated Methods Modifier and Type Method Description boolean
allowsConcurrency()
Override to indicate whether concurrency may be configured in Studio against this operator.String
evaluate(String st, Tuple input)
evaluate the given string expression in the context of the running module If an error occurs during the evaluation, the message return value will be a string containing the error messageOperatorArtifactManager
getArtifactManager()
Returns an ArtifactManager instance for managing the operator's artifacts.int[]
getAsyncInputPorts()
Override to indicate to Studio which ports (0-based) might process data asynchronously, this provides different rendering on the EventFlow canvas to assist users in understanding their data flowCaptureTransformStrategy
getCaptureStrategy()
Returns the capture transform strategy that this Operator will use.String
getContainerName()
Return the name of this operator's container.Path
getDataDirectory()
Get data directory.Connection
getDataSourceConnection(String datasource)
Retrieve a connection to a JDBC Data Source configured in HOCON configuration files.String
getDisplayDescription()
Provides a description for this type of Operator.String
getDisplayName()
Return the display name of this Operator.Tuple
getDynamicVariables()
Retrieves a read-only tuple describing the current value of all dynamic variables in the module this operator is contained in.Schema
getDynamicVariablesSchema()
Retrieves a Schema containing information about every dynamic variable available to the module this operator is contained in.String
getFullyQualifiedName()
Return the fully qualified name of this operator.URL
getIconResource(Operator.IconKind iconType)
Clients should override to support custom icons.int
getInputPortCount()
Return the number of input ports.Schema
getInputSchema(int port)
Returns the schema of an input port.Location
getLocation()
Return the Operator-wide location, useful for error reporting not associated to a particular property.Location
getLocation(String property)
Return a new location within this Operator, associated with the given property name.Lock
getLock()
Return the guard for module state.protected org.slf4j.Logger
getLogger()
Retrieves aLogger
suitable for logging messages and exceptions for this Operator.Serializable
getManagedState(String key)
Get a value from transactional memory.String
getName()
Return the name of this Operator.Schema
getNamedSchema(String name)
Operator.ConfigurationAccessor
getOperatorConfigurationAccessor()
Provides access to operator and adapter configuration information from the server's configuration file.int
getOutputPortCount()
Returns the number of output ports.Schema
getOutputSchema(int outputPort)
Return the output schema for the given output port (zero-based).Parameterizable
getParameters()
Returns the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.PortCounts
getPortCounts()
An optional method that subclasses can override to dynamically tell Studio the number of input and output ports.Schema[]
getProposedInputSchemas(String mainName)
Override to provide schemas that Studio will offer to users to import into their projects.Schema[]
getProposedOutputSchemas(String mainName)
Override to provide schemas that Studio will offer to users to import into their projects.InputStream
getResourceContents(String name)
Returns an open input stream on the contents of the named resource file.File
getResourceFile(String name)
Returns aFile
pointing to the regular file or directory if it exists.boolean
getReuseTuple()
Get the state of the input Tuple reuse flag.Operator.RuntimeEnvironment
getRuntimeEnvironment()
Return theOperator.RuntimeEnvironment
for this StreamBase Server.Schema
getRuntimeInputSchema(int port)
Returns the schema of an input port at runtime.Schema
getRuntimeOutputSchema(int outputPort)
Return the output schema that should actually use tosendOutput(int, Tuple)
orsendOutputAsync(int, Tuple)
.Schema
getSchemaForCapture(String captureName, int depth)
Finds the schema for the given capture name in the context that this operator is running under.String[]
getSearchKeywords()
Deprecated.As of StreamBase version 10.0, keywords are configured through the operator manifest settings.String
getShortDisplayName()
Return the short display name of this Operator.int
getStateChangeTimeout()
StorageMethod
getStorageMethod()
Get operator's state's storage method.TableAccessor
getTableAccessor(String name)
Get a TableAccessor for a table in the local module by name of that table.TransactionIsolationLevel
getTransactionIsolationLevel()
Get current transaction isolation level.TupleCaptureTransformer
getTupleCaptureTransformer(Schema s)
Get a TupleCaptureTransformer capable of translating tuples with the given schema to the equivalent schema with all the capture fields expanded out, and translating expanded tuples back into tuples with the given schema This method may only be called at runtime; the exact schemas of any capture fields are not fully determined at typecheck time.Schema
getTypecheckInputSchema(int port)
Returns the schema of an input port that was set at application typecheck time.Schema
getTypecheckOutputSchema(int outputPort)
Return the output schema as set by typecheck.boolean
hasNotYetStarted()
Returns true if this Operator has not yet started running.void
init()
After the Operator has typechecked successfully, but before the application or any managed threads start, the StreamBase server will call the init method.boolean
isConcurrentByDefault()
Indicate whether an operator should be in a concurrent region by defaultboolean
isDroppingTuples()
Returns true if the Operator will drop any tuples it receives when it is suspended.boolean
isPassthruOperator()
Allows an operator to declare itself to be a "pass-thru" operator -- one that calls its sendOutput method with the unmodified tuples received by its processTuple method.boolean
isProcessingTuples()
Returns true if the Operator will process any tuples it receives when it is suspended.boolean
isRunning()
Returns true if this Operator is currently running, false otherwise.boolean
isRuntime()
boolean
isShutdown()
Returns true if this Operator is currently shut down, false otherwise.boolean
isSuspended()
Returns true if this Operator is currently suspended, false otherwise.void
partitionActive(String name)
Called when a partition transitions into Active state.void
partitionNotActive(String name)
Called when a partition transitions into any other state from Active state.void
postShutdown()
postShutdown is called by the StreamBase runtime just after shutting down this Operator.abstract void
processTuple(int inputPort, Tuple tuple)
This method will be called by the StreamBase server for each Tuple given to the Operator to process.void
registerRunnable(Runnable operatorRunnable)
Deprecated.As of StreamBase version 3.7, replaced byregisterRunnable(Runnable, boolean)
void
registerRunnable(Runnable operatorRunnable, boolean shouldInterrupt)
Register a Runnable object to be managed by this Operator.void
registerRunnable(String runnableName, Runnable operatorRunnable, boolean shouldInterrupt)
Register a Runnable object to be managed by this Operator.void
registerRunnable(String runnableName, Runnable operatorRunnable, boolean shouldInterrupt, boolean synchronizedShutdown)
Register a Runnable object to be managed by this Operator.void
remoteNodeActive(String remoteNodeName)
Called when a remote node has become active in the cluster.void
remoteNodeUnavailable(String remoteNodeName)
Called when a remote node has become unavailable.void
remoteQuorumLost(String availabilityZoneName, String reason)
Called after deactivating all partitions when loss of quorum is detected.protected void
requireInputPortCount(int numPorts)
Throws a PortMismatchException if the number of ports is not numPorts.void
resume()
resume() is called when an operator starts or resumes execution, before any registered runnables are started or resumed.void
resumed()
resumed() is called after all registered runnables of the operator have started or resumed.Cancellable
scheduleRunnable(Date when, Runnable runner)
Schedules a Runnable object to be run at some future point as part of the same parallel region as this Operator.void
sendErrorOutput(String message)
Send an error tuple from this operator's error port.void
sendErrorOutput(Throwable t)
Send an exception via the error output portvoid
sendErrorOutput(Throwable t, int port, Tuple errorTuple)
Send an exception via the error output portvoid
sendOutput(int port, Tuple tuple)
Enqueue a Tuple to be sent synchronously to downstream operators.void
sendOutput(int port, List<Tuple> tuples)
Enqueue a List of Tuples to be sent synchronously to downstream operators.void
sendOutputAsync(int port, Tuple tuple)
Enqueue a Tuple to be sent asynchronously to downstream operators.void
sendOutputAsync(int port, List<Tuple> tuples)
Enqueue a List of Tuples to be sent asynchronously to downstream operators.void
setCaptureStrategy(CaptureTransformStrategy cts)
Set the capture transform strategy to use for the inputs and outputs of this Operator.protected void
setDisplayDescription(String description)
Set a description for this type of Operator.void
setDisplayName(String dn)
The display name is the String that's shown in the Operator Name field of the Properties view, and in the detailed text for this operator in the Palette view's Details mode.void
setDynamicVariable(String name, Object value)
Sets the value of this module's dynamic variable.static void
setLogLevel(org.slf4j.Logger logger, Operator.LogLevel level)
Sets the log level of the given logger.static void
setLogLevel(org.slf4j.Logger logger, String level)
Sets the log level of the given logger.boolean
setManagedState(String key, Serializable value)
Store a value in transactional memorySchema
setOutputSchema(int port, Schema outputSchema)
Sets the output schema for the given output port (port #'s are zero based).void
setParameters(Parameterizable params)
Sets the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.void
setPortHints(int numInputPorts, int numOutputPorts)
Used to tell StreamBase Studio how many ports to draw on the Java Operator when the operator is drawn in the IDE's canvas.protected void
setReuseTuple(boolean reuse)
Allow/disallow the runtime to reuse tuples on operator input.void
setShortDisplayName(String sn)
The short display name is the String displayed by Studio as the name of this operator in the Palette view, and above the operator in the EventFlow canvas.void
setSuspendBehavior(int suspendBehavior)
Set the suspend behavior of this Operator.boolean
shouldRun()
Return whether or not calling operator thread is enabled and should continue running.void
shutdown()
shutdown is called by the StreamBase runtime just prior to shutting down this Operator.int
size()
Return the "size" of this operator.boolean
supportsArtifacts()
Indicates whether or not the operator supports the use of artifacts The default implementation of this methods returns false.boolean
supportsTransactionalMemory()
Indicate whether an operator optionally supports transactional memoryvoid
suspend()
suspend() will be called when an operator suspends, before any registered runnables are suspended.void
suspended()
suspended() will be called after all registered runnables of the operator have suspended.abstract void
typecheck()
The typecheck method is called by Studio and the StreamBase server to ensure that all the parameters for this operator are correct.
-
-
-
Field Detail
-
OP_CONFIG_MGR_NO_CONF
public static final Operator.ConfigurationAccessor OP_CONFIG_MGR_NO_CONF
A configuration manager that provides no information
-
DEFAULT_STATE_CHANGE_TIMEOUT
public static final int DEFAULT_STATE_CHANGE_TIMEOUT
Default value for the timeout for Operator state changes, in milliseconds: 10000An Operator can receive state change events from the StreamBase runtime. In particular, when the runtime starts, suspends, resumes, or shuts down, it, in turn, applies the same state change to any Operators it holds.
When the server applies a state change to an Operator, it blocks until the Operator acknowledges that it has transitioned to the new state.
If an Operator fails to make such an acknowledgement, the StreamBase runtime might wait indefinitely. StateChangeTimeout limits the amount of time the server will wait for an Operator to respond to a requested state change.
If the Operator fails to respond within the timeout, the server will shut down the Operator and continue with its state change.
- See Also:
- Constant Field Values
-
-
Constructor Detail
-
Operator
protected Operator()
Constructs an operator.All Operator subclasses must have a public default constructor, otherwise StreamBase Studio and the StreamBase sbd process will not be able to instantiate the Operator subclass. Nothing substantial should be done in the constructor, except for setting port hints and setting the parameters object.
-
-
Method Detail
-
getIconResource
public URL getIconResource(Operator.IconKind iconType)
Clients should override to support custom icons. The default implementation returns null (meaning no icon) for any kind requested.- Parameters:
iconType
- the kind of icon being requested, as an Operator.IconKind enumeration value- Returns:
- a URL pointing to an image resource corresponding to the type requested, or null
- See Also:
Operator.IconKind
-
getDisplayName
public final String getDisplayName()
Return the display name of this Operator. The display name is the String that's shown in the list of available Operators.- Returns:
- The display name of the Operator.
-
getDisplayDescription
public final String getDisplayDescription()
Provides a description for this type of Operator. This is used by the user interface to assist the user in selecting an Operator during authoring. The expected usage is to return a three or four line description, without embedded new lines, describing the intent, or technical details that can assist a user in ensuring they are selecting the Operator or Adapter they need.- Returns:
- The description for this type of Operator, never null
- Since:
- 7.6 initial release
-
setDisplayDescription
protected final void setDisplayDescription(String description)
Set a description for this type of Operator. This method is to be called only during the zero-argument constructor for this Operator.- Parameters:
description
- description for this type of Operator, never null- Since:
- 7.6 initial release
- See Also:
getDisplayDescription()
-
getShortDisplayName
public final String getShortDisplayName()
Return the short display name of this Operator. This short display name is the String displayed by Studio in the palette, and used to name new instances of this Operator to an EventFlow application.By default, this returns the simple class name of this Operator.
- Returns:
- The short display name of the Operator.
- Since:
- 6.1.2 initial release, 6.5 uses this for new instances added to an EventFlow application
-
getName
public String getName()
Return the name of this Operator. Operators are named and can be managed by name. This may return null during typecheck and init phases.- Returns:
- The name of the operator during runtime, or empty string during typecheck and init
-
getContainerName
public String getContainerName()
Return the name of this operator's container. This may return null during typecheck and init phases.- Returns:
- The name of this operator's container. This may return an empty string during typecheck and init phases.
-
getFullyQualifiedName
public String getFullyQualifiedName()
Return the fully qualified name of this operator. This contains the container name, any module names and the name of the operator. This may return null during typecheck and init phases.- Returns:
- The fully qualified name of this operator. This may return an empty string during typecheck and init phases.
-
setDisplayName
public final void setDisplayName(String dn)
The display name is the String that's shown in the Operator Name field of the Properties view, and in the detailed text for this operator in the Palette view's Details mode. If the subclass doesn't override it, use the class name.- Parameters:
dn
- Display name
-
setShortDisplayName
public final void setShortDisplayName(String sn)
The short display name is the String displayed by Studio as the name of this operator in the Palette view, and above the operator in the EventFlow canvas. If the subclass doesn't override it, use the class simple name.- Parameters:
sn
- Short display name- Since:
- 6.1.2 initial release
-
setPortHints
public void setPortHints(int numInputPorts, int numOutputPorts)
Used to tell StreamBase Studio how many ports to draw on the Java Operator when the operator is drawn in the IDE's canvas. The default is 1 input port, 0 output ports.This method is only effective when called from the Java Operator's constructor. The typecheck method should call requireInputPortCount instead.
To specify the ports at runtime requireInputPortCount and setOutputSchema should be called from the typecheck method.
To specify the ports dynamically within studio, see getPortCounts.
- Parameters:
numInputPorts
- number of input ports. Must be non-negative.numOutputPorts
- number of output ports. Must be non-negative.- Throws:
IllegalArgumentException
- if an argument is less than zero- See Also:
requireInputPortCount(int)
,setOutputSchema(int, Schema)
-
typecheck
public abstract void typecheck() throws Operator.PropertyTypecheckException, TypecheckException
The typecheck method is called by Studio and the StreamBase server to ensure that all the parameters for this operator are correct. This method is also responsible for verifying input Schemas and setting output Schemas. This method must callsetOutputSchema(int, Schema)
method if there are any output ports.If the parameters are not correct, or the input port Schemas are not correct, a
Operator.PropertyTypecheckException
orTypecheckException
should be thrown. The former should always be used if the exception is related to a particular parameter.The method requireInputPortCount() should be used to verify that the required input ports are set.
If the Operator changes the number of input ports, this method must call requireInputPortCount.
- Throws:
Operator.PropertyTypecheckException
- when a parameter value is unexpected.TypecheckException
- when parameters or input Schemas are not satisfied.- See Also:
requireInputPortCount(int)
,setOutputSchema(int, Schema)
,Operator.PropertyTypecheckException
-
init
public void init() throws StreamBaseException
After the Operator has typechecked successfully, but before the application or any managed threads start, the StreamBase server will call the init method. Operators should override this method to perform custom initialization.If this operator wishes to register threads, it should call
registerRunnable(Runnable)
from this method.- Throws:
StreamBaseException
- Prevents the application from starting.
-
hasNotYetStarted
public boolean hasNotYetStarted()
Returns true if this Operator has not yet started running. hasNotYetStarted() returns true if the Operator is in the runtime state of NOT_YET_STARTED, which is the initial runtime state of an Operator.- Returns:
- true if not started, false otherwise
- See Also:
Operator.OperatorStates.INITIALIZED
-
isDroppingTuples
public boolean isDroppingTuples()
Returns true if the Operator will drop any tuples it receives when it is suspended.- Returns:
- true if tuples are dropped when operator suspended, false otherwise
- See Also:
Operator.SuspendBehaviorStates.DROPPING_TUPLES
-
isProcessingTuples
public boolean isProcessingTuples()
Returns true if the Operator will process any tuples it receives when it is suspended.- Returns:
- true if tuples are processed when suspended, false otherwise
- See Also:
Operator.SuspendBehaviorStates.PROCESSING_TUPLES
-
isRunning
public boolean isRunning()
Returns true if this Operator is currently running, false otherwise. isRunning() returns true if the Operator is in a runtime state of STARTED.- Returns:
- true if operator running, false otherwise
- See Also:
Operator.OperatorStates.STARTED
-
isShutdown
public boolean isShutdown()
Returns true if this Operator is currently shut down, false otherwise. An Operator is shut down when it is in the runtime state of SHUTDOWN, which is the terminal state for Operators.- Returns:
- true if operator shutdown, false otherwise
- See Also:
Operator.OperatorStates.SHUTDOWN
-
isSuspended
public boolean isSuspended()
Returns true if this Operator is currently suspended, false otherwise. An Operator is suspended when it is in the runtime state of SUSPENDED.- Returns:
- true if operator suspended, false otherwise
- See Also:
Operator.OperatorStates.SUSPENDED
-
processTuple
public abstract void processTuple(int inputPort, Tuple tuple) throws StreamBaseException
This method will be called by the StreamBase server for each Tuple given to the Operator to process.By default the Tuple passed in is safe for storing and mutating after the return of processTuple. The details of this Tuple's life cycle can be controlled via
setReuseTuple(boolean)
.- Parameters:
inputPort
- the input port that the tuple is from (ports are zero based)tuple
- the tuple from the given input port- Throws:
StreamBaseException
- Terminates the application.
-
resume
public void resume() throws Exception
resume() is called when an operator starts or resumes execution, before any registered runnables are started or resumed. Note that if the application is shutdown directly from a suspended state, this will not be called; insteadshutdown()
will be called. resume() is a callback that is called by the StreamBase runtime. Exceptions thrown during calls toresume()
will be processed as error tuples, and trigger a shutdown of the operator.- Throws:
Exception
- Error processing callback
-
resumed
public void resumed() throws Exception
resumed() is called after all registered runnables of the operator have started or resumed. That is, onceshouldRun()
has unblocked and returned true in all registered runnables. resumed() is a callback that is called by the StreamBase runtime. Exceptions thrown during calls toresumed()
will be processed as error tuples, and trigger a shutdown of the operator.- Throws:
Exception
- Error processing callback
-
size
public int size()
Return the "size" of this operator. This size value will be displayed in sbmonitor's "size" column.Override this method to display the size of an important data structure in this Operator. For example the size of a queue. Returns 0 by default.
This method must return promptly and never block, as this is called from a system statistics monitoring thread.
- Returns:
- size of this operator.
-
shouldRun
public final boolean shouldRun()
Return whether or not calling operator thread is enabled and should continue running.This method returns false if the operator has not yet started or has shut down. It blocks if the operator is suspended. It returns true if the operator is running.
This method links managed operator threads with the StreamBase runtime. Every registered runnable object must repeatedly call this during its entire lifetime in order to synchronize with the main StreamBase application.
An operator (and, thus, its containing application) is not considered started until all registered runnables have called this method. Likewise, an operator is not considered suspended until all registered runnables are blocked in this method. Finally, an operator is not considered shutdown until all registered runnables have exited their run() method (either on their own or in response to this method returning false).
- Returns:
- true is the operator is running, false otherwise.
- Throws:
UnsupportedOperationException
- If this was not called from an operator thread.
-
shutdown
public void shutdown()
shutdown is called by the StreamBase runtime just prior to shutting down this Operator. An implementation of shutdown should include any behavior needed to shut down the Operator - freeing resources, etc. shutdown() is a callback that is called by the StreamBase runtime.
-
postShutdown
public void postShutdown()
postShutdown is called by the StreamBase runtime just after shutting down this Operator. postShutdown() is a callback that is called by the StreamBase runtime.
-
suspend
public void suspend() throws Exception
suspend() will be called when an operator suspends, before any registered runnables are suspended. suspend() is a callback that is called by the StreamBase runtime. Exceptions thrown during calls tosuspend()
will be processed as error tuples.- Throws:
Exception
- Error processing callback
-
setSuspendBehavior
public void setSuspendBehavior(int suspendBehavior)
Set the suspend behavior of this Operator. The Operator can either process or drop tuples when suspended.setSuspendBehavior()
might be called in the Operator's constructor or ininit()
.- Parameters:
suspendBehavior
- The suspend behavior to set, either PROCESSING_TUPLES or DROPPING_TUPLES.- See Also:
Operator.SuspendBehaviorStates.PROCESSING_TUPLES
,Operator.SuspendBehaviorStates.DROPPING_TUPLES
-
suspended
public void suspended() throws Exception
suspended() will be called after all registered runnables of the operator have suspended. That is, once all registered runnables have called and are blocked inshouldRun()
. suspended() is a callback that is called by the StreamBase runtime. Exceptions thrown during calls tosuspended()
will be processed as error tuples.- Throws:
Exception
- Error processing callback
-
remoteNodeActive
public void remoteNodeActive(String remoteNodeName)
Called when a remote node has become active in the cluster.- Parameters:
remoteNodeName
- the remote node name- Since:
- 10.0 initial release
-
remoteNodeUnavailable
public void remoteNodeUnavailable(String remoteNodeName)
Called when a remote node has become unavailable.- Parameters:
remoteNodeName
- the remote node name- Since:
- 10.0 initial release
-
remoteQuorumLost
public void remoteQuorumLost(String availabilityZoneName, String reason)
Called after deactivating all partitions when loss of quorum is detected.- Parameters:
availabilityZoneName
- name of the availability zone that owns the associated quorumreason
- string containing a reason why loss was detected.- Since:
- 10.4
-
partitionActive
public void partitionActive(String name)
Called when a partition transitions into Active state.
In the Active state, partition is running on the active node for the partition.
- Parameters:
name
- name of the partition that becomes active- Since:
- 10.4
-
partitionNotActive
public void partitionNotActive(String name)
Called when a partition transitions into any other state from Active state.
- Parameters:
name
- name of the partition that becomes not Active- Since:
- 10.4
-
getParameters
public Parameterizable getParameters()
Returns the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.- Returns:
- The Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.
-
setParameters
public void setParameters(Parameterizable params)
Sets the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.- Parameters:
params
- the Java Bean that holds the user-configurable parameters visible in the StreamBase Studio Properties View of this operator.
-
getResourceContents
public InputStream getResourceContents(String name) throws ResourceNotFoundException, StreamBaseException
Returns an open input stream on the contents of the named resource file. The client is responsible for closing the stream when finished.Resource names must be a valid Java resource name which is a string consisting of a sequence of substrings, delimited by slashes (/) The default project resource directories are src/main/resources and src/test/resources (test phase only).
For example, a resource file stored in src/main/resources/data.csv is accessed using data.csv or /data.csv, while a resource file stored in src/main/resources/com/examples/datafiles/data.csv is accessed using com/examples/datafiles/data.csv or /com/examples/datafiles/data.csv.
Call this, or
getResourceFile(String)
, during typecheck (as opposed to waiting forinit()
or similar runtime methods) in order to be able to surface to the authoring environment any failures locating or reading the resource.- Parameters:
name
- resource name- Returns:
- an input stream containing the contents of the resource
- Throws:
ResourceNotFoundException
- resource cannot be foundStreamBaseException
- resource found, but cannot be opened- Since:
- 6.4 initial release, 10.0 added support for fully qualified resource names
- See Also:
ClassLoader.getResource(String)
,getResourceFile(String)
,getDataDirectory()
-
getResourceFile
public File getResourceFile(String name) throws ResourceNotFoundException, StreamBaseException
Returns aFile
pointing to the regular file or directory if it exists. If this returns a regular file,getResourceContents(String)
is guaranteed not to fail.Resource names must be a valid Java resource name which is a string consisting of a sequence of substrings, delimited by slashes (/). The default project resource directories are src/main/resources and src/test/resources (test phase only).
For example, a resource file stored in src/main/resources/data.csv is accessed using data.csv or /data.csv, while a resource file stored in src/main/resources/com/examples/datafiles/data.csv is accessed using com/examples/datafiles/data.csv or /com/examples/datafiles/data.csv.
Call this, or
getResourceContents(String)
, during typecheck (as opposed to waiting forinit()
or similar runtime methods) in order to be able to surface to the authoring environment any failures locating or reading the resource.- Parameters:
name
- resource name- Returns:
- a
File
pointing to the named resource, or null if one cannot be associated with the named resource (for example, if storage is not available on a file system for this resource) - Throws:
ResourceNotFoundException
- resource cannot be foundStreamBaseException
- resource found, but cannot be opened- Since:
- 6.4 initial release, 10.0 added support for fully qualified resource names
- See Also:
ClassLoader.getResource(String)
,getResourceContents(String)
,getDataDirectory()
-
registerRunnable
public final void registerRunnable(String runnableName, Runnable operatorRunnable, boolean shouldInterrupt, boolean synchronizedShutdown) throws StreamBaseException
Register a Runnable object to be managed by this Operator. This Runnable will be started in a Thread when the Operator starts, suspended and resumed with the Operator, and stopped with the Operator.The body of this Runnable must hook into StreamBase's thread management by repeatedly calling the
shouldRun()
method during its entire lifetime. This allows managed threads to be started, stopped, suspended, and resumed along with the rest of the application.This method should be called from the operator's
init()
method.- Parameters:
runnableName
- The name of this runnable (may be null). Names are reported on the statistics stream, and displayed by sbmonitor and sbmanager.operatorRunnable
- The Runnable to register.shouldInterrupt
- Whether this thread should be interrupted when the operator undergoes a state change. Typically true if the thread blocks on some event, input for example.synchronizedShutdown
- Whether to wait for this thread to exit before postShutdown is called.- Throws:
StreamBaseException
- If it is too late in the Operator's life cycle to register a Runnable.- Since:
- 6.6.14 initial release
-
registerRunnable
public final void registerRunnable(String runnableName, Runnable operatorRunnable, boolean shouldInterrupt) throws StreamBaseException
Register a Runnable object to be managed by this Operator. This Runnable will be started in a Thread when the Operator starts, suspended and resumed with the Operator, and stopped with the Operator.The body of this Runnable must hook into StreamBase's thread management by repeatedly calling the
shouldRun()
method during its entire lifetime. This allows managed threads to be started, stopped, suspended, and resumed along with the rest of the application.This method should be called from the operator's
init()
method.- Parameters:
runnableName
- The name of this runnable (may be null). Names are reported on the statistics stream, and displayed by sbmonitor and sbmanager.operatorRunnable
- The Runnable to register.shouldInterrupt
- Whether this thread should be interrupted when the operator undergoes a state change. Typically true if the thread blocks on some event, input for example.- Throws:
StreamBaseException
- If it is too late in the Operator's life cycle to register a Runnable.- Since:
- 6.5 initial release
-
registerRunnable
public final void registerRunnable(Runnable operatorRunnable, boolean shouldInterrupt) throws StreamBaseException
Register a Runnable object to be managed by this Operator. This Runnable will be started in a Thread when the Operator starts, suspended and resumed with the Operator, and stopped with the Operator.The body of this Runnable must hook into StreamBase's thread management by repeatedly calling the
shouldRun()
method during its entire lifetime. This allows managed threads to be started, stopped, suspended, and resumed along with the rest of the application.This method should be called from the operator's
init()
method.- Parameters:
operatorRunnable
- The Runnable to register.shouldInterrupt
- Whether this thread should be interrupted when the operator undergoes a state change. Typically true if the thread blocks on some event, input for example. -- Throws:
StreamBaseException
- If it is too late in the Operator's life cycle to register a Runnable.
-
registerRunnable
public final void registerRunnable(Runnable operatorRunnable) throws StreamBaseException
Deprecated.As of StreamBase version 3.7, replaced byregisterRunnable(Runnable, boolean)
Register a Runnable object to be managed by this Operator. This Runnable will be started in a Thread when the Operator starts, suspended and resumed with the Operator, and stopped with the Operator.The body of this Runnable must hook into StreamBase's thread management by repeatedly calling the
shouldRun()
method during its entire lifetime. This allows managed threads to be started, shut down, suspended, and resumed along with the rest of the application.This method should be called from the adapter's
init()
method.- Parameters:
operatorRunnable
- The Runnable to register.- Throws:
StreamBaseException
- If it is too late in the Operator's life cycle to register a Runnable.
-
scheduleRunnable
public Cancellable scheduleRunnable(Date when, Runnable runner)
Schedules a Runnable object to be run at some future point as part of the same parallel region as this Operator. Prefer using this to implement scheduling events by the operator rather than allocating a new thread using registerRunnable, if the parallelism is not necessary.- Parameters:
when
- Time to run the runner. If it is in the past, it is run immediatelyrunner
- The Runnable to schedule- Returns:
- Handle to task for cancellation, if the task hasn't already started running. This handle is thread-safe, but attempting to cancel an already started task has no effect.
- Since:
- 7.4 initial release
-
requireInputPortCount
protected void requireInputPortCount(int numPorts) throws PortMismatchException
Throws a PortMismatchException if the number of ports is not numPorts. The StreamBase Studio IDE will recognize this and draw the operator box with the appropriate number of input ports.This method should be called by the typecheck method.
- Parameters:
numPorts
- the number of ports- Throws:
PortMismatchException
- When the number of input ports is incorrect.
-
getPortCounts
public PortCounts getPortCounts() throws TypecheckException
An optional method that subclasses can override to dynamically tell Studio the number of input and output ports. Clients should expect this method to be called after all setters, but prior totypecheck
. This method will not be called if Studio encountered any exception while calling each setter with the property values just entered by the user.Note: This is for studio purposes only and should not be used to set any fields that you may want to use in other parts of the Operator.
- Returns:
- PortCounts record type
- Throws:
TypecheckException
- Type check exception
-
getAsyncInputPorts
public int[] getAsyncInputPorts()
Override to indicate to Studio which ports (0-based) might process data asynchronously, this provides different rendering on the EventFlow canvas to assist users in understanding their data flow- Returns:
- the zero based indices of the asynchronous ports; null by default
- Since:
- 7.6 initial release
-
getInputPortCount
public final int getInputPortCount()
Return the number of input ports.- Returns:
- number of Input ports, if not in an application 0.
-
getOutputPortCount
public final int getOutputPortCount()
Returns the number of output ports. The number of output ports are set by thesetOutputSchema(int, Schema)
method.- Returns:
- Number of output ports
- See Also:
setOutputSchema(int, Schema)
-
sendOutput
public void sendOutput(int port, Tuple tuple) throws StreamBaseException
Enqueue a Tuple to be sent synchronously to downstream operators.All calls to
sendOutput(int, Tuple)
are well ordered with respect to other calls tosendOutput(int, Tuple)
. Meaning that downstream logic will receive the tuples in the order they are sent.Calls to
sendOutputAsync(int, Tuple)
andsendOutput(int, List)
are not well ordered with respect to each other. Meaning calls from the two methods can be received downstream in any order.A note about reusing/caching the sent tuple.
If the given tuple is a tuple that this operator received from a processTuple() call, then the tuple can be reused if and only if getReuseTuple() is FALSE.
If the given tuple is created by this operator then it can be reused when the call to sendOutput() returns. StreamBase is done with the tuple once control returns to this operator.
sendOutput(int, Tuple)
uses this call's transaction, if one is active, otherwise a new transaction is started to send the Tuple to downstream operators.- Parameters:
port
- The output port the Tuple is enqueued upon (ports are zero based)tuple
- The Tuple to enqueue- Throws:
StreamBaseException
- if the port is invalid or the tuple argument doesn't match the schema of the output port.- Since:
- 10.0 transactional behavior
-
sendOutput
public void sendOutput(int port, List<Tuple> tuples) throws StreamBaseException
Enqueue a List of Tuples to be sent synchronously to downstream operators.All calls to
sendOutput(int, Tuple)
are well ordered with respect to other calls tosendOutput(int, Tuple)
. Meaning that downstream logic will receive the tuples in the order they are sent.Calls to
sendOutputAsync(int, Tuple)
andsendOutput(int, List)
are not well ordered with respect to each other. Meaning calls from the two methods can be received downstream in any order.A note about reusing/caching the sent tuples.
If the tuples in the given list are tuples that this operator received from processTuple() calls, then these tuples can be reused if and only if getReuseTuple() is FALSE.
If the given list of tuples is created by this operator, then the tuples in this list can be reused when the call to sendOutput() returns. StreamBase is done with the tuples once control returns to this operator.
sendOutput(int, List)
uses this call's transaction, if one is active, otherwise a new transaction is started to send the Tuple to downstream operators.- Parameters:
port
- The output port the Tuple is enqueued upon (ports are zero based)tuples
- The List of Tuple objects to enqueue- Throws:
StreamBaseException
- if the port is invalid or any tuple in the tuples argument doesn't match the schema of the output port.- Since:
- 10.0 transactional behavior
-
sendOutputAsync
public void sendOutputAsync(int port, Tuple tuple) throws StreamBaseException
Enqueue a Tuple to be sent asynchronously to downstream operators.All calls to
sendOutputAsync(int, Tuple)
are well ordered with respect to other calls tosendOutputAsync(int, Tuple)
. Meaning that downstream logic will receive the tuples in the order they are sent.Calls to
sendOutputAsync(int, Tuple)
andsendOutput(int, List)
are not well ordered with respect to each other. Meaning calls from the two methods can be received downstream in any order.Note: The Tuple will be queued into the application some time in the future. The asynchronous nature of this call could cause subtle race conditions in applications. The performance differential (if any) between
sendOutputAsync
andsendOutput
is highly dependent upon the application and hardware. Benchmarking is the only way to ensure the fastest possible implementation.The tuple can be reused once sendOutputAsync returns.
A new transaction is always started when the Tuple is read from the asynchronous queue for processing.
- Parameters:
port
- The output port the Tuple is enqueued upon (ports are zero based)tuple
- A tuple to enqueue asynchronously- Throws:
StreamBaseException
- if the port is invalid or the tuple argument doesn't match the schema of the output port.- Since:
- 6.4.3 initial release, 10.0 transactional behavior
-
sendOutputAsync
public void sendOutputAsync(int port, List<Tuple> tuples) throws StreamBaseException
Enqueue a List of Tuples to be sent asynchronously to downstream operators.All calls to
sendOutputAsync(int, Tuple)
are well ordered with respect to other calls tosendOutputAsync(int, Tuple)
. Meaning that downstream logic will receive the tuples in the order they are sent.Calls to
sendOutputAsync(int, Tuple)
andsendOutput(int, List)
are not well ordered with respect to each other. Meaning calls from the two methods can be received downstream in any order.Note: The Tuples will be queued into the application some time in the future. The asynchronous nature of this call could cause subtle race conditions in applications. The performance differential (if any) between
sendOutputAsync
andsendOutput
is highly dependent upon the application and hardware. Benchmarking is the only way to ensure the fastest possible implementation.The tuples (and the List<Tuple>) can be reused once sendOutputAsync returns.
A new transaction is always started when the Tuple is read from the asynchronous queue for processing.
- Parameters:
port
- The output port the Tuple is enqueued upon (ports are zero based)tuples
- A List<Tuple> to enqueue asynchronously- Throws:
StreamBaseException
- if the port is invalid or any tuple in the tuples argument doesn't match the schema of the output port.- Since:
- 6.4.3 initial release, 10.0 transactional behavior
-
getInputSchema
public final Schema getInputSchema(int port)
Returns the schema of an input port. At typecheck time or before, behaves likegetTypecheckInputSchema(int)
. After typecheck time, behaves likegetRuntimeInputSchema(int)
.- Parameters:
port
- the port to return the schema for (ports are zero based)- Returns:
- Schema for the port, null if not running in an appication
- Throws:
IndexOutOfBoundsException
- if port not in range
-
getRuntimeInputSchema
public final Schema getRuntimeInputSchema(int port)
Returns the schema of an input port at runtime. This schema will never contain capture fields, as they will have been transformed according to the CaptureTransformStrategy set bysetCaptureStrategy(CaptureTransformStrategy)
. This will be the actual schema of tuples passed toprocessTuple(int, Tuple)
- Parameters:
port
- the port to return the schema for (ports are zero based)- Returns:
- Schema for the port, null if not running in an appication
- Throws:
IndexOutOfBoundsException
- if port not in range- Since:
- 7.2.6 initial release
-
getTypecheckInputSchema
public final Schema getTypecheckInputSchema(int port)
Returns the schema of an input port that was set at application typecheck time. This schema may contain capture fields, and may not represent the actual schema of tuples passed into theprocessTuple(int, Tuple)
method. For the actual schema of those tuples, seegetRuntimeInputSchema(int)
.- Parameters:
port
- the port to return the schema for (ports are zero based)- Returns:
- Schema for the port, null if not running in an appication
- Throws:
IndexOutOfBoundsException
- if port not in range- Since:
- 7.2.6 initial release
-
getOutputSchema
public final Schema getOutputSchema(int outputPort)
Return the output schema for the given output port (zero-based). If called before the operator is initialized, this method will act asgetTypecheckOutputSchema(int)
. Otherwise, if called frominit()
or later, it will act asgetRuntimeOutputSchema(int)
- Parameters:
outputPort
- the port to return the schema for (ports are zero based)- Returns:
- Schema for the port, null if not running in an appication
- Throws:
IndexOutOfBoundsException
- if port not in range
-
evaluate
public String evaluate(String st, Tuple input) throws StreamBaseException
evaluate the given string expression in the context of the running module If an error occurs during the evaluation, the message return value will be a string containing the error message- Parameters:
st
- the expression to be evaluatedinput
- the input tuple- Returns:
- the evaluated expression as a string or an error message if evaluation failed.
- Throws:
StreamBaseException
- Could not perform evaluation- Since:
- 7.6 initial release
-
setDynamicVariable
public void setDynamicVariable(String name, Object value) throws StreamBaseException
Sets the value of this module's dynamic variable. Valid only at runtime.- Parameters:
name
- name of the dynamic variable to be setvalue
- the new value (may be null)- Throws:
StreamBaseException
- if an error occurs- Since:
- 7.6 initial release
-
getNamedSchema
public Schema getNamedSchema(String name)
- Parameters:
name
- of the named schema- Returns:
- a Schema object representing this named schema or null if there is no named schema in this module with name
- Since:
- 7.6 initial release
-
getDynamicVariablesSchema
public Schema getDynamicVariablesSchema()
Retrieves a Schema containing information about every dynamic variable available to the module this operator is contained in. Never null: an empty schema is returned to indicate that there are no visible dynamic variables.- Returns:
- the schema of the containing module's dynamic variables. Note that certain internal dynamic variables (such as those used by sequence operators) are not included.
- Since:
- 7.6 initial release
-
getDynamicVariables
public Tuple getDynamicVariables() throws TupleException
Retrieves a read-only tuple describing the current value of all dynamic variables in the module this operator is contained in. Never null: an empty tuple is returned to indicate that there are no visible dynamic variables.- Returns:
- a read-only tuple containing all dynamic variables in this module.
- Throws:
TupleException
- Error creating return tuple- Since:
- 7.6 initial release
-
setManagedState
public final boolean setManagedState(String key, Serializable value)
Store a value in transactional memoryThis method can only be called when an operator is configured to use transactional memory for its state. There must be an active transaction when this method is called.
- Parameters:
key
- Key value. Must be cluster unique if state is replicated.value
- Value to store- Returns:
true
if successful, false otherwise- Throws:
UnsupportedOperationException
- Operator state not in transactional memory or no active transaction- Since:
- 10.0 initial release
-
getManagedState
public final Serializable getManagedState(String key)
Get a value from transactional memory.There must be an active transaction when this method is called
- Parameters:
key
- Key value- Returns:
- Stored value or null if no value associated with key
- Throws:
UnsupportedOperationException
- Operator state not in transactional memory or no active transaction- Since:
- 10.0 initial release
-
getStorageMethod
public final StorageMethod getStorageMethod()
Get operator's state's storage method.May return null if information is not available, e.g. during typecheck.
- Returns:
- data storage used for this operator
- Since:
- 10.2 initial release
-
getTransactionIsolationLevel
public final TransactionIsolationLevel getTransactionIsolationLevel()
Get current transaction isolation level.May return null if information is not available, e.g. during typecheck.
- Returns:
- The current transaction isolation level
- Since:
- 10.2 initial release
-
isConcurrentByDefault
public boolean isConcurrentByDefault()
Indicate whether an operator should be in a concurrent region by defaultThe default value is false.
The value of this method is used at design-time to indicate whether an operator should be put into a concurrent region when added to an EventFlow.
- Returns:
- true if this operator will be in a concurrent region by default, false otherwise.
- Since:
- 10.0 initial release
-
supportsTransactionalMemory
public boolean supportsTransactionalMemory()
Indicate whether an operator optionally supports transactional memoryOverride this method to return true to indicate that an operator can optionally use transactional memory as storage. The default value is false.
Using
setManagedState(java.lang.String, java.io.Serializable)
andgetManagedState(java.lang.String)
are valid only if this method returns true andgetStorageMethod()
returns transactional memory- Returns:
- true if this operator optionally supports transactional memory, false otherwise.
- Since:
- 10.2 initial release
-
getDataDirectory
public final Path getDataDirectory() throws IOException
Get data directory. The data directory can be used as required to store data files. Files stored in this directory can only be accessed relative to the returned directory path, i.e. this directory is not on the classpath of the fragment. The data directory can be configured using the com.tibco.ep.streambase.configuration.sbengine configuration type.name = "data-area-configuration" version = "1.0.0" type = "com.tibco.ep.streambase.configuration.sbengine" configuration = { StreamBaseEngine = { streamBase = { dataAreaPath = "v1-data-area" } } }
- Returns:
- Path to a writable data directory. The return value can never be null.
- Throws:
IOException
- Could not create the data directory- Since:
- 10.2 initial release
-
getRuntimeOutputSchema
public final Schema getRuntimeOutputSchema(int outputPort)
Return the output schema that should actually use tosendOutput(int, Tuple)
orsendOutputAsync(int, Tuple)
. Note that this schema will sometimes be different than the output schema set at typecheck time by the transformation determined bysetCaptureStrategy(CaptureTransformStrategy)
. It will never contain capture fields. This method may not be called until after typecheck is finished; to do so will cause an IllegalStateException- Parameters:
outputPort
- the port to return the schema for (ports are zero based)- Returns:
- Schema for the port, null if not running in an appication
- Throws:
IndexOutOfBoundsException
- if port not in range- Since:
- 7.2.6 initial release
-
getTypecheckOutputSchema
public final Schema getTypecheckOutputSchema(int outputPort)
Return the output schema as set by typecheck. Note that if this schema contains any capture fields, it may not be the same as the correct schema for tuples sent tosendOutput(int, Tuple)
at runtime.- Parameters:
outputPort
- the port to return the schema for (ports are zero based)- Returns:
- Schema for the port, null if not running in an appication
- Throws:
IndexOutOfBoundsException
- if port not in range- Since:
- 7.2.6 initial release
-
getCaptureStrategy
public final CaptureTransformStrategy getCaptureStrategy()
Returns the capture transform strategy that this Operator will use.- Returns:
- the
CaptureTransformStrategy
to use - Since:
- 7.2.6 initial release
-
setCaptureStrategy
public final void setCaptureStrategy(CaptureTransformStrategy cts)
Set the capture transform strategy to use for the inputs and outputs of this Operator. Valid values for cts includeCaptureTransformStrategy.FLATTEN
andCaptureTransformStrategy.NEST
. The default isCaptureTransformStrategy.FLATTEN
.- Parameters:
cts
- theCaptureTransformStrategy
to use.- Since:
- 7.2.6 initial release
-
setOutputSchema
public final Schema setOutputSchema(int port, Schema outputSchema) throws TypecheckException
Sets the output schema for the given output port (port #'s are zero based). This method should be called from the typecheck method of a Java Operator or Adapter.Note that the actual Schema object to create tuples at runtime (for use by
sendOutput(int, Tuple)
orsendOutputAsync(int, Tuple)
) should be retrieved usinggetRuntimeOutputSchema(int)
, as that will take into account any capture field transformations determined bygetCaptureStrategy()
.- Parameters:
port
- the port to set the given schema to (ports are zero based)outputSchema
- the schema to set the given port to- Returns:
- returns the typecheck version of the output schema
- Throws:
TypecheckException
- Typecheck error- Since:
- 7.0 returns
Schema
, 7.2 The returned schema should not be the Schema used to create output tuples. For the Schema to use to create output tuples, callgetRuntimeOutputSchema(int)
in theinit()
method or later. - See Also:
getOutputPortCount()
-
sendErrorOutput
public void sendErrorOutput(String message)
Send an error tuple from this operator's error port.This method will also record the message in the debug log.
- Parameters:
message
- A description of the error, must not be null- Throws:
IllegalArgumentException
- if message is null- Since:
- 5.1 initial release
-
sendErrorOutput
public void sendErrorOutput(Throwable t)
Send an exception via the error output portThis method will also record the message in the debug log.
- Parameters:
t
- the exception to report- Throws:
IllegalArgumentException
- if exception is null- Since:
- 7.0 initial release
-
sendErrorOutput
public void sendErrorOutput(Throwable t, int port, Tuple errorTuple)
Send an exception via the error output portThis method will also record the message in the debug log.
- Parameters:
t
- the exception to reportport
- the input/output port that the error is related toerrorTuple
- a tuple that caused or is related to the error- Throws:
IllegalArgumentException
- if exception is null- Since:
- 7.0 initial release
-
getTableAccessor
public TableAccessor getTableAccessor(String name) throws StreamBaseException
Get a TableAccessor for a table in the local module by name of that table.Warning: This interface is provisional, and will likely change in upcoming versions of StreamBase.
- Parameters:
name
- the table name- Returns:
- a table accessor
- Throws:
StreamBaseException
- Error accessing TableAccessor- Since:
- 7.2 initial release
-
getLogger
protected org.slf4j.Logger getLogger()
Retrieves aLogger
suitable for logging messages and exceptions for this Operator. Equivalent toLoggerFactory.getLogger(Class)
with argumentsthis.getClass()
.- Returns:
- a
Logger
instance for this Operator - Since:
- 6.0 initial release
-
setReuseTuple
protected void setReuseTuple(boolean reuse)
Allow/disallow the runtime to reuse tuples on operator input. The default is to disallow tuple reuse. Reusing operator input tuples will create fewer objects which may increase performance of the operator. To enable call: setReuseTuple(true) from the derived constructor.Note: that if you plan to store the input Tuple as state in your operator you must copy the Tuple before you store it, if you have called setReuseTuple(true).
- Parameters:
reuse
- allow/disallow tuple reuse
-
getReuseTuple
public boolean getReuseTuple()
Get the state of the input Tuple reuse flag.- Returns:
- boolean if input Tuple reuse is allowed/disallowed
-
getStateChangeTimeout
public int getStateChangeTimeout()
- Returns:
- the Operator state change timeout value, in milliseconds
- See Also:
DEFAULT_STATE_CHANGE_TIMEOUT
-
getLocation
public Location getLocation()
Return the Operator-wide location, useful for error reporting not associated to a particular property. If an error is related to a property, usegetLocation(String)
instead- Specified by:
getLocation
in interfaceLocatedItem
- Returns:
- the location for error reporting from this operator.
-
getLocation
public Location getLocation(String property)
Return a new location within this Operator, associated with the given property name. Equivalent togetLocation()
for StreamSQL-sourced Operators. Using this as an argument toTypecheckException
constructors will cause Studio to display an error indicated alongside the UI widget corresponding to the passed in property.- Parameters:
property
- expected to be the bean name of an Operator property- Returns:
- the location for error reporting from this operator
- Since:
- 6.5 initial release
-
setLogLevel
public static void setLogLevel(org.slf4j.Logger logger, String level) throws StreamBaseException
Sets the log level of the given logger. Note that this will only succeed if the default logging implementation (Logback) is used; if another logging back-end is used this call will throw an exception and the log level will remain unchanged.- Parameters:
logger
- theLogger
object on which to set the level.level
- the level at which to set the Logger instance.- Throws:
StreamBaseException
- the log level could not be set, most likely because the current logging implementation is not Logback.- Since:
- 6.4.4 initial release
-
setLogLevel
public static void setLogLevel(org.slf4j.Logger logger, Operator.LogLevel level) throws StreamBaseException
Sets the log level of the given logger. Note that this will only succeed if the default logging implementation (Logback) is used; if another logging back-end is used this call will throw an exception and the log level will remain unchanged.- Parameters:
logger
- theLogger
object on which to set the level.level
- theOperator.LogLevel
at which to set the Logger instance.- Throws:
StreamBaseException
- the log level could not be set, most likely because the current logging implementation is not Logback.- Since:
- 6.4.6 initial release
-
getRuntimeEnvironment
public Operator.RuntimeEnvironment getRuntimeEnvironment()
Return theOperator.RuntimeEnvironment
for this StreamBase Server. NOTE: Not valid during typecheck- Returns:
- current RuntimeEnvironment, or null during typecheck
- Since:
- 7.0 initial release
-
isRuntime
public boolean isRuntime()
- Returns:
- true if called during runtime (after the server has started), false otherwise
- Since:
- 7.6 initial release
-
getOperatorConfigurationAccessor
public Operator.ConfigurationAccessor getOperatorConfigurationAccessor()
Provides access to operator and adapter configuration information from the server's configuration file. This call is intended to be made during typecheck or later.- Returns:
- a
Operator.ConfigurationAccessor
to provide access to the aforementioned - Since:
- 7.5 initial release
-
getDataSourceConnection
public Connection getDataSourceConnection(String datasource) throws StreamBaseException
Retrieve a connection to a JDBC Data Source configured in HOCON configuration files. The givendatasource
is the identifier of ajdbcDataSources
entry in the configuration'sJDBCDataSourceGroup
.This method may return an existing connection, or create a new connection if necessary.
This method always returns null during typecheck.
- Returns:
- a JDBC connection, or null on failure or during typecheck
- Throws:
StreamBaseException
- if HOCON config does not contain the specified datasource, or if there's an error trying to create a connection.- Since:
- 10.6.3
-
getSchemaForCapture
public Schema getSchemaForCapture(String captureName, int depth)
Finds the schema for the given capture name in the context that this operator is running under. For a capture field that was not nested inside any other capture field, use a depth value of 0. To expand the capture fields in a schema returned by this function with a depth of d, call this function with a depth of d+1. For example, if an output stream of this module has schema(f int, c @A)
, then you might callgetSchemaForCapture("A", 0)
to get(g int, c @B)
as the schema bound to A. Then you might callgetSchemaForCapture("B", 1)
to get(h int)
as the schema bound to B.Note that the "nesting depth" here has nothing to do with capture fields found in lists or tuples -- the only containment that affects the nesting depth is a capture within another capture.
- Parameters:
captureName
- the name of the bound capture typedepth
- the depth of nesting at which to look up the schema of this capture binding.- Returns:
- the schema associated with the bound capture type.
- Since:
- 7.2 initial release
-
getTupleCaptureTransformer
public TupleCaptureTransformer getTupleCaptureTransformer(Schema s) throws StreamBaseException
Get a TupleCaptureTransformer capable of translating tuples with the given schema to the equivalent schema with all the capture fields expanded out, and translating expanded tuples back into tuples with the given schema This method may only be called at runtime; the exact schemas of any capture fields are not fully determined at typecheck time.- Parameters:
s
- the schema with capture fields- Returns:
- a TupleCaptureTransformer able to do the given transformations
- Throws:
StreamBaseException
- if this method is called at the wrong time, or if the transformations requested are not possible. This may occur, for example, if there is a name conflict between a captured field and a field already in the schema.- Since:
- 7.2 initial release
-
isPassthruOperator
public boolean isPassthruOperator()
Allows an operator to declare itself to be a "pass-thru" operator -- one that calls its sendOutput method with the unmodified tuples received by its processTuple method. Certain runtime optimizations are possible for pass-thru operators. In addition, the "older API" messages is suppressed for such operators. The StreamBase log adapter is an example of a pass-thru operator, as it logs, but leaves unmodified, the tuples that pass through it.- Returns:
- true if the operator is pass-thru and false otherwise
- Since:
- 7.2.12 initial release
-
getSearchKeywords
@Deprecated public String[] getSearchKeywords()
Deprecated.As of StreamBase version 10.0, keywords are configured through the operator manifest settings.Override to declare strings that assist the user in filtering for this operator when in Studio. Without search keywords, operators are found only by their short display name.
NOTE: Since StreamBase 10.0, this method is not used, and has been replaced by Manifest properties. For search key-words, set the "KeyWords" manifest attribute. Please see the documentation for details.
- Returns:
- the default implementation returns no search keywords
- Since:
- 7.3.10 initial release
-
allowsConcurrency
public boolean allowsConcurrency()
Override to indicate whether concurrency may be configured in Studio against this operator. Note that this applies to all instances, and is not intended to be interpreted on a per-instance basis. The default value when not overriden istrue
.This has no effect on the actual runtime environment for operator instances.
- Returns:
- whether this operator allows Studio to set any concurrency options
- Since:
- 7.4.0 initial release
-
getProposedInputSchemas
public Schema[] getProposedInputSchemas(String mainName)
Override to provide schemas that Studio will offer to users to import into their projects. Neither property setting nor typecheck has occurred when Studio invokes this method.- Parameters:
mainName
- the name the user has requested be the prefix for the top level schema, and this name should be used throughout any sub-field schemas- Returns:
- an array of Schemas to offer for input ports
- Since:
- 7.4.0 initial release
-
getProposedOutputSchemas
public Schema[] getProposedOutputSchemas(String mainName)
Override to provide schemas that Studio will offer to users to import into their projects. Neither property setting nor typecheck has occurred when Studio invokes this method.- Parameters:
mainName
- the name the user has requested be the prefix for the top level schema, and this name should be used throughout any sub-field schemas- Returns:
- an array of Schemas to offer for output ports
- Since:
- 7.4.0 initial release
-
supportsArtifacts
public boolean supportsArtifacts()
Indicates whether or not the operator supports the use of artifacts The default implementation of this methods returns false. Override this method to return true if the operator uses artifacts.- Returns:
- true if the operator supports the use of artifacts, false otherwise
-
getArtifactManager
public OperatorArtifactManager getArtifactManager()
Returns an ArtifactManager instance for managing the operator's artifacts.- Returns:
- an ArtifactManager instance
-
getLock
public Lock getLock()
Return the guard for module state.- Returns:
- the Lock instance
-
-