Contents
This topic explains how to use the StreamBase API to create your own custom Java operators for use in StreamBase applications.
Note
StreamBase Studio ships with new projects configured by default to build with Java 6. If you have a requirement to use Java 5, configure the Studio project that contains your custom code using the steps in Using Java 5 for Custom Extensions.
This topic assumes that you used the StreamBase Java Operator wizard to generate the initial starter code for your operator, following the development process recommended in Developing Java Operators.
To understand how Java operators are used, we also recommend that you run the Java
Operator sample, view the source for StringCase.java,
and see the sample's
documentation. Examples in this topic are based on the Java Operator sample. To
load this sample into StreamBase Studio, use →
and select javaoperator from the [Extending
StreamBase] category.
The following sections describe the methods available in the Java Operator API, the steps for defining a custom operator, and walks through the creation of a sample Java operator. For additional details, refer to the Operator package's Javadoc and the description of the Java Operator's Properties view in the Authoring Guide.
Use the following methods of the Operator class to manage how an instance of your operator behaves on the EventFlow Editor canvas:
-
Call
Operator.setPortHints(int, int)in your operator's constructor to set the initial number of ports when your operator is first placed on the canvas. -
Call
Operator.setDisplayName(String)in your constructor to set a user-friendly name for your operator. This name is used in the Operator Name field in the Properties view, and as the details text in the Palette view's Details mode. -
Call
Operator.setShortDisplayName(String)in your constructor to set a short name for your operator. This name is used as the operator's name in the Insert Java Operators dialog and shows above the operator on the EventFlow canvas. -
Override
Operator.getPortCounts()if you change the number of ports based on end-user setting properties. -
Override
Operator.getIconResource(Operator.IconKind)if you want your operator to appear on the canvas with a custom icon. -
During
Operator.typecheck(), throwOperator.PropertyTypecheckExceptioninstead ofTypecheckExceptionto allow Studio to decorate your operator icon with an overlay to indicate its warning or error state. See thePropertyTypecheckExceptionclass and theOperator.getLocation()method for details. -
See the
UIHintsclass andParameterizableinterface for information on additional control over the display of properties in Studio's Properties view.
For the latest version of this list, see the Special Studio Considerations section of the Javadoc for the Operator class.
The StreamBase Java Client library for custom operators extends the following packages in the library:
-
com.streambase.sb.operator -
com.streambase.sb.operator.parameter
Like any other StreamBase operator, Java Operators may have properties that you can customize in StreamBase Studio. You must modify the initial properties generated by the StreamBase Java Operator wizard.
Each Java Operator class has an Object that implements
the com.streambase.sb.operator.Parameterizable
interface. This Parameterizable Object has
getter/setter accessor methods that are used in the StreamBase Studio Properties
view. The Parameterizable object is actually a JavaBean
as defined in the java.beans package. StreamBase finds
the Parameterizable Object by calling the Operator.getParameters method on an instance of the Java Operator
class.
The Java Operator class can either implement the Parameterizable interface directly, or delegate to another
object that implements the interface. The default Operator constructor assumes that this is the case. Another
Operator constructor that has a Parameterizable argument also exists. There is also an
Operator.setParameters method that can be used to set
the Operator's Parameterizable Object. In any case,
all Java Operators must have a public
default constructor.
Properties are declared by implementing get and set accessor methods. Here is a code
fragment of an Operator class with some properties:
public class Example extends Operator implements Parameterizable {
private int startCount = 1;
private boolean clever = true;
private String [] stockSymbols;
private String name;
public void setStartCount(int i) {startCount = i;}
public int getStartCount() {return startCount;}
public void setClever(boolean b) {clever = b;}
public boolean isClever() {return clever;}
public void setStockSymbols(String [] ss) {stockSymbols = ss;}
public String [] getStockSymbols() {return stockSymbols;}
public void setName(String s) {name=s;}
public String getName() {return name;}
public void typecheck() throws TypecheckException {}
public void processTuple(int inputPort, Tuple t) throws StreamBaseException {
}
}
The example Operator class above declares four
properties:
-
startCount
-
clever
-
stockSymbols
-
name
These properties will appear in StreamBase Studio's Properties view for an instance
of this operator. Notice that the reader accessor method for a boolean property can
be named either getPropertyName, isPropertyName, or hasPropertyName.
StreamBase currently supports the following property types:
-
int -
long -
double -
boolean -
String -
String[] -
Schema -
Enum -
ResourceFile
The only way to create an Enum property is to declare
it as a String property in the Java Operator class
and define the enumeration values in a BeanInfo
class, as in the example below.
All but the last two are simple property types that StreamBase derives
automatically by looking at the signatures of the getter/setter methods in the
Parameterizable class. These can be specified in the
BeanInfo class with SBPropertyDescriptor objects, as in the example above.
Enum properties are string properties that can only take on a specified set of values. ResourceFile properties are similar to Enum properties, but their value must be the name of a resource. In StreamBase Studio, this is enforced by displaying a drop-down list of files available from your project's folder.
Because both Enum and ResourceFile properties are implemented as Strings, their getter
and setter methods must return and expect objects of type String, respectively.
Thus, these property types cannot be automatically derived by StreamBase. To use
these properties, an adapter must have an accompanying BeanInfo class that returns the SBPropertyDescriptor subclasses EnumPropertyDescriptor or ResourceFilePropertyDescriptor in its list of PropertyDescriptors.
The BeanInfo class is described in the java.beans.BeanInfo API documentation. It can be used by the
Parameterizable object to control what
properties are exposed; add additional metadata about properties, such as which
properties are optional; and access special types of properties that can't be
automatically derived via reflection. If a BeanInfo
class is present, only the properties explicitly declared in this class will be
exposed by StreamBase.
The easiest way to make a BeanInfo class is to create a class that you name by
appending BeanInfo to the Parameterizable class name. Thus if the Parameterizable class is
called OpParams, the corresponding BeanInfo would be OpParamsBeanInfo (in the same package as OpParams) and extending
java.beans.SimpleBeanInfo.
The following example BeanInfo class changes the
Name property to an enum and makes the Clever property
optional:
public class ExampleBeanInfo extends SimpleBeanInfo {
public PropertyDescriptor [] getPropertyDescriptors() {
try {
SBPropertyDescriptor [] p = {
new SBPropertyDescriptor("StartCount", Example.class),
new SBPropertyDescriptor("Clever", Example.class).optional(),
new EnumPropertyDescriptor("Name", Example.class,
new String [] {
"Tom", "Dick", "Harry"
})
};
return p;
}
catch(Exception e) {
System.out.println("Exception: " + e);
e.printStackTrace();
}
return null;
}
}
Note that the optional() method shown in this
example simply marks the descriptor as optional.
Java Operators can also change their number of input and output ports. If this is
done by setting property values, the property's setter method should call
setPortHints with the new number of ports. Output
ports are automatically configured by calling setOutputSchema. Operators may ensure the correct number of
input ports in their typecheck method by calling
the requireInputPortCount method.
Be sure to see the Special Studio Considerations section of the Javadoc for the Operator class in the StreamBase Java Client Library.
For example:
/**
* This operator has two integer parameters, many input and many output ports
*/
public class RoundRobinOperator extends Operator implements Parameterizable {
private int _numOutputPorts = 1;
private int _nextOutputPort = 0;
private int _numInputPorts = 1;
public RoundRobinOperator() {
setPortHints(_numInputPorts, _numOutputPorts);
}
public int getInputPortNumber() {return _numInputPorts;}
public void setInputPortNumber(int i) {
_numInputPorts = i;
setPortHints(_numInputPorts, _numOutputPorts);
}
public int getOutputPortNumber() {return _numOutputPorts;}
public void setOutputPortNumber(int i) {
_numOutputPorts = i;
setPortHints(_numInputPorts, _numOutputPorts);
}
public void init() throws StreamBaseException {
System.out.println("RoundRobinOperator.init");
}
public PortCounts getPortCounts() {
return new PortCounts(_numInputPorts,1);
}
public void typecheck() throws TypecheckException {
System.out.println("RoundRobinOperator.typecheck");
Schema s = null;
StringBuffer errorMessage = new StringBuffer();
if(getInputPortCount() <= 0) {
throw new TypecheckException("Invalid Number of Input Ports: "
+ getInputPortCount()
+ " must have at least 1");
}
System.out.println("RoundRobinOperator.typecheck: requiring inputs: " + _numInputPorts);
requireInputPortCount(_numInputPorts);
for(int i = 0; i < getInputPortCount(); ++i) {
Schema inputSchema = getInputSchema(i);
if(s == null) {
s = inputSchema;
} else if(!s.sameFieldTypes(inputSchema)) {
errorMessage.append("Invalid schema on port " + i + "\n");
}
}
if(errorMessage.length() > 0)
throw new TypecheckException(errorMessage.toString());
System.out.println("RoundRobinOperator.typecheck: input schemas ok");
System.out.println("input port num " + getInputPortNumber()
+ " output port num " + getOutputPortNumber());
s = getInputSchema(0);
for(int i=0; i < getOutputPortNumber(); ++i) {
setOutputSchema(i, s);
}
System.out.println("RoundRobinOperator.typecheck: done");
}
public void processTuple(int inputPort, Tuple t) throws StreamBaseException {
sendOutput(_nextOutputPort, t);
_nextOutputPort = (_nextOutputPort + 1) % _numOutputPorts;
}
}
The Java Operator class has the following life cycle. Note that this section describes the life cycle within the StreamBase Server (sbd) process:
-
ConstructorAll Java Operators must have a public default constructor. The
Constructoris called when theOperatorinstance is created, but before theOperatoris connected to the StreamBase application. We recommended that you set the default Input port and Output port count in theConstructorwith thesetPortHints(inPortCount, outPortCount)method. The default is 1 input port, 0 output ports. -
typecheckThe
typecheckmethod is called after theOperatorinstance is connected in the StreamBase application, allowing theOperatorto validate its properties. TheOperatorclass may change the number of input or output ports by calling theerequireInputPortCount(portCount)method or thesetOutputSchema(schema, portNum)method. If theverifyInputPortCountmethod is passed a different number of ports than the Operator currently has, aPortMismatchException(subtype ofTypeCheckException) is thrown.Call the
getResourceContentsmethod during typecheck, instead of waiting until start or run to call it. This is to ensure that StreamBase Studio can indicate to the user whether it was able to find the resource during authoring, and avoid waiting until sbd fails silently. -
initIf
typechecksucceeds, theinitmethod is called before the StreamBase application is started. Note that your Operator class is not required to define theinitmethod, unless (for example) you need to perform initialization of a resource such as a JDBC pool, if your operator is making JDBC calls. -
processTupleThe
processTuplemethod is called when a tuple is available for processing. -
shutdownThe
shutdownmethod is called when the StreamBase Server is in the process of shutting down.
A Java operator runs in the same process as the StreamBase application that contains it. One advantage of this is that when the StreamBase application starts or stops, any Java operators or embedded adapters in it start and stop along with it. More generally, the application and these components undergo life cycle changes in a synchronized fashion.
Java operators undergo life cycle changes in a prescribed order. Typically, when a StreamBase application is represented in StreamBase Studio, data flows through the application from left to right. Here, the order in which operators undergo life cycle events is described as being either “left to right”, or “right to left”, where “left to right” means in the same direction as data flows through the app. “Right to left”, on the other hand, means that Java operators will undergo some life cycle change in the opposite order as they will see data.
Life cycle changes that a StreamBase application undergoes include starting, pausing, resuming and shutting down. During starting and resuming, operators are processed in right to left order. This means that operators start in an order that is opposite to that in which they will receive data. Conversely, operators are paused and stopped in left to right order, or in the same order as they will receive data.
Java operators start, pause, resume and shutdown along with the StreamBase application that contains them. It is also possible to suspend and resume Java operators independently of their StreamBase application.
This is accomplished with the same commands that are used to suspend and resume a StreamBase application as a whole. The command sbadmin suspend can be used to suspend an application; similarly, sbadmin resume is used to resume a StreamBase application.
To suspend or resume individual operators, you can append a list of Java operators to the sbadmin suspend or sbadmin resume command. If one or more strings are appended to sbadmin suspend, then the StreamBase application as a whole will not be suspended, rather the individual Java operators named by the appended strings will be suspended.
Note that an individual Java operator can be suspended or resumed only if the StreamBase application itself is running. An individual Java operator can be running only if the application that contains it is also running. Therefore, it is not meaningful to suspend or resume an operator that is not currently running if the application that contains it is not running.
The sbc status command returns status information about the StreamBase Server.
The sbc status --operators command returns the status of any Java operators contained by the server's application. Note that sbc status --operators and sbc status are disjoint commands: sbc status returns information about the server only, not about Java operators contained in the server. Similarly, sbc status --operators returns information about contained Java operators only, and not any information about the server itself.
In this context, the status of a Java operator consists only of its current state.
For example, if a Java operator has been started and is currently running, its
state will be STARTED. If a Java operator has yet
to be started, its state will be NONE. If a Java
operator has been suspended, its state will be either SUSPENDED_AND_DROPPING_TUPLES or SUSPENDED_AND_PROCESSING_TUPLES. Lastly, if an operator has
been shut down its state will be SHUTDOWN.
By default, a Java operator starts along with the StreamBase application that
contains it. In Studio, in the General tab for Java operators, there is a check box
labeled Start with application. By default, this box
is selected, meaning that the Java operator will start with the application.
Clearing this check box causes the operator to be left in the NONE state when the application starts.
A Java operator that does not start with its application will stay in the
NONE state until it is explicitly started with the
sbadmin resume operatorName command. Such an
operator will not start even if the application as a whole is resumed. So for
example, the application as a whole may suspend and then resume; this will have no
effect on an operator that has not started with its application.
If a Java operator is suspended separately from the application that contains it, tuples might still arrive at the suspended operator. You can configure the Java operator to handle these tuples in two different ways:
-
A suspended Java operator can choose to drop tuples that are delivered to it.
-
The operator can choose to process these tuples.
These two possibilities are represented by static Strings on the class com.streambase.sb.operator.Operator, SUSPENDED_AND_DROPPING_TUPLES and SUSPENDED_AND_PROCESSING_TUPLES, respectively. A Java operator
is configured to either drop or process tuples by calling the method setSuspendBehavior on its instance. setSuspendBehavior takes an int argument, the value of which must be either
SUSPENDED_AND_DROPPING_TUPLES or SUSPENDED_AND_PROCESSING_TUPLES.
This section steps you through the creation of a sample Java Operator class,
StringCase.java. This example is kept deliberately simple, to allow you to focus on
the API without having to read other lines of code. The Java Operator is used in a
StringCase.sbapp application to simply map string input
to lower-case or upper-case. The files that comprise the sample, including the
StringCase.java file discussed here, are installed with
StreamBase in the sample/javaoperator directory.
Before beginning this exercise, open StreamBase Studio and load the javaoperator
sample, as described in Java
Operator Sample. This creates the sample_javaoperator project in your Package Explorer. In this
project, double-click StringCase.java to open it in the
Java Editor.
In an editor, we started by adding these import statements in StringCase.java:
import com.streambase.sb.DataType; import com.streambase.sb.Schema; import com.streambase.sb.StreamBaseException; import com.streambase.sb.Tuple; import com.streambase.sb.operator.Operator; import com.streambase.sb.operator.Parameterizable; import com.streambase.sb.operator.TypecheckException; import java.util.Arrays;
Next, we declared the class by extending the StreamBase Operator class. For example:
public class StringCase extends Operator implements Parameterizable {
In the constructor, we defined two constants for an enum property, conversionType. In this example, we defined the metadata for the
enum property in a Java bean, the StringCaseBeanInfo class. For example:
// // Constants for enum property conversionType // public static String UPPER = "Uppercase"; public static String LOWER = "Lowercase"; /** Enum property, metadata defined in StringCaseBeanInfo.java */ private String conversionType = LOWER;
Note
The source for StringCaseBeanInfo is Stepping through
StringCaseBeanInfo.java You can find it in the sample/javaoperator directory in the StreamBase installation.
Next we declare the Java Operator's input and output, and an array for fields in the schema. For example:
private Schema inSchema; // input to this operator private Schema outSchema; // output from this schema private Schema.Field[] fields; // individual fields in the schema
Then we use the setPortHints method to specify a
fixed number of input and output ports:
public StringCase() {
// this operator has one input and one output
setPortHints(1, 1);
}
Note
If the Java Operator will take a variable number of output ports, use the
Parameterizable interface and the typecheck method. There must be a parameter for the number of
input and output ports. Within typecheck, you
must specify a setOutputSchema for each port, so
that StreamBase Studio knows how many square black dots to draw for the output
ports. If you do not use setOutputSchema for each
port, the Java Operator on the StreamBase Studio canvas will not display the
correct number of black dots.
Next, we set up the typecheck method for the class,
added exception handling, and declared that the schema of the output is the same as
for the input:
public void typecheck() throws TypecheckException {
// require exactly one input port
requireInputPortCount(1);
// the input must contain at least one string field
inSchema = getInputSchema(0);
fields = inSchema.getFields();
int stringCount = 0;
for(int i=0; i < fields.length; ++i) {
DataType dt = fields[i].getDataType();
if(DataType.STRING.equals(dt)) {
++stringCount;
}
}
if (stringCount==0)
throw new TypecheckException(
"At least one string field is required");
// the output schema is the same as the input
setOutputSchema(0, inSchema);
outSchema = inSchema;
}
Finally we defined the work that the custom Java operator will perform, by using
the processTuple method. For example:
public void processTuple(int inputPort, Tuple t)
throws StreamBaseException {
Tuple out = outSchema.createTuple();
for(int i=0; i < fields.length; ++i) {
Schema.Field f = fields[i];
DataType dt = f.getDataType();
String fname = f.getName();
if(DataType.STRING.equals(dt)) {
String str = t.getString(f);
if(LOWER.equals(conversionType)) {
out.setString(fields[i], str.toLowerCase());
} else if(UPPER.equals(conversionType)) {
out.setString(fields[i], str.toUpperCase());
}
} else {
out.setField(fields[i], t.getField(fields[i]));
}
}
sendOutput(0, out);
}
public String getConversionType() {return conversionType;}
public void setConversionType(String s) {
conversionType = s;
}
}
StringCaseBeanInfo.java is the BeanInfo class for the StringCase operator. StringCaseBeanInfo defines the metadata for the properties that
will appear in the StreamBase Studio Properties view for this Java operator. The
StringCase operator has only one property, conversionType, which is an enum whose values are defined in the
StringCase class.
Now, open StringCaseBeanInfo.java and examine it.
First we import these classes:
import java.beans.PropertyDescriptor; import java.beans.SimpleBeanInfo; import com.streambase.sb.operator.parameter.EnumPropertyDescriptor; import com.streambase.sb.operator.parameter.SBPropertyDescriptor;
And define the bean as follows:
public class StringCaseBeanInfo extends SimpleBeanInfo {
public PropertyDescriptor [] getPropertyDescriptors() {
try {
SBPropertyDescriptor [] p = {
new EnumPropertyDescriptor("ConversionType", StringCase.class,
new String [] {
StringCase.UPPER, StringCase.LOWER
})
};
return p;
}
catch(Exception e) {
System.out.println("Exception: " + e);
e.printStackTrace();
}
return null; // should never happen
}
}
