Creating Custom Java Operators

This topic explains how to use the StreamBase API to create your own custom Java operators for use in StreamBase applications.

Note

StreamBase Studio ships with new projects configured by default to build with Java 6. If you have a requirement to use Java 5, configure the Studio project that contains your custom code using the steps in Using Java 5 for Custom Extensions.

Introduction

This topic assumes that you used the StreamBase Java Operator wizard to generate the initial starter code for your operator, following the development process recommended in Developing Java Operators.

To understand how Java operators are used, we also recommend that you run the Java Operator sample, view the source for StringCase.java, and see the sample's documentation. Examples in this topic are based on the Java Operator sample. To load this sample into StreamBase Studio, use FileLoad StreamBase Sample and select javaoperator from the [Extending StreamBase] category.

The following sections describe the methods available in the Java Operator API, the steps for defining a custom operator, and walks through the creation of a sample Java operator. For additional details, refer to the Operator package's Javadoc and the description of the Java Operator's Properties view in the Authoring Guide.

Methods for Studio Presentation

Use the following methods of the Operator class to manage how an instance of your operator behaves on the EventFlow Editor canvas:

  • Call Operator.setPortHints(int, int) in your operator's constructor to set the initial number of ports when your operator is first placed on the canvas.

  • Call Operator.setDisplayName(String) in your constructor to set a user-friendly name for your operator. This name is used in the Operator Name field in the Properties view, and as the details text in the Palette view's Details mode.

  • Call Operator.setShortDisplayName(String) in your constructor to set a short name for your operator. This name is used as the operator's name in the Insert Java Operators dialog and shows above the operator on the EventFlow canvas.

  • Override Operator.getPortCounts() if you change the number of ports based on end-user setting properties.

  • Override Operator.getIconResource(Operator.IconKind) if you want your operator to appear on the canvas with a custom icon.

  • During Operator.typecheck(), throw Operator.PropertyTypecheckException instead of TypecheckException to allow Studio to decorate your operator icon with an overlay to indicate its warning or error state. See the PropertyTypecheckException class and the Operator.getLocation() method for details.

  • See the UIHints class and Parameterizable interface for information on additional control over the display of properties in Studio's Properties view.

For the latest version of this list, see the Special Studio Considerations section of the Javadoc for the Operator class.

Java Operator Properties

The StreamBase Java Client library for custom operators extends the following packages in the library:

  • com.streambase.sb.operator

  • com.streambase.sb.operator.parameter

Like any other StreamBase operator, Java Operators may have properties that you can customize in StreamBase Studio. You must modify the initial properties generated by the StreamBase Java Operator wizard.

Each Java Operator class has an Object that implements the com.streambase.sb.operator.Parameterizable interface. This Parameterizable Object has getter/setter accessor methods that are used in the StreamBase Studio Properties view. The Parameterizable object is actually a JavaBean as defined in the java.beans package. StreamBase finds the Parameterizable Object by calling the Operator.getParameters method on an instance of the Java Operator class.

The Java Operator class can either implement the Parameterizable interface directly, or delegate to another object that implements the interface. The default Operator constructor assumes that this is the case. Another Operator constructor that has a Parameterizable argument also exists. There is also an Operator.setParameters method that can be used to set the Operator's Parameterizable Object. In any case, all Java Operators must have a public default constructor.

Properties are declared by implementing get and set accessor methods. Here is a code fragment of an Operator class with some properties:

public class Example extends Operator implements Parameterizable {
    private int         startCount = 1;
    private boolean     clever = true;
    private String []   stockSymbols;
    private String      name;

    public void setStartCount(int i) {startCount = i;}
    public int getStartCount() {return startCount;}
    public void setClever(boolean b) {clever = b;}
    public boolean isClever() {return clever;}
    public void setStockSymbols(String [] ss) {stockSymbols = ss;}
    public String [] getStockSymbols() {return stockSymbols;}
    public void setName(String s) {name=s;}
    public String getName() {return name;}
    public void typecheck() throws TypecheckException {}
    public void processTuple(int inputPort, Tuple t) throws StreamBaseException {
    }
}

The example Operator class above declares four properties:

  • startCount

  • clever

  • stockSymbols

  • name

These properties will appear in StreamBase Studio's Properties view for an instance of this operator. Notice that the reader accessor method for a boolean property can be named either getPropertyName, isPropertyName, or hasPropertyName.

Property Types

StreamBase currently supports the following property types:

  • int

  • long

  • double

  • boolean

  • String

  • String[]

  • Schema

  • Enum

  • ResourceFile

The only way to create an Enum property is to declare it as a String property in the Java Operator class and define the enumeration values in a BeanInfo class, as in the example below.

All but the last two are simple property types that StreamBase derives automatically by looking at the signatures of the getter/setter methods in the Parameterizable class. These can be specified in the BeanInfo class with SBPropertyDescriptor objects, as in the example above.

Enum properties are string properties that can only take on a specified set of values. ResourceFile properties are similar to Enum properties, but their value must be the name of a resource. In StreamBase Studio, this is enforced by displaying a drop-down list of files available from your project's folder.

Because both Enum and ResourceFile properties are implemented as Strings, their getter and setter methods must return and expect objects of type String, respectively. Thus, these property types cannot be automatically derived by StreamBase. To use these properties, an adapter must have an accompanying BeanInfo class that returns the SBPropertyDescriptor subclasses EnumPropertyDescriptor or ResourceFilePropertyDescriptor in its list of PropertyDescriptors.

The BeanInfo class is described in the java.beans.BeanInfo API documentation. It can be used by the Parameterizable object to control what properties are exposed; add additional metadata about properties, such as which properties are optional; and access special types of properties that can't be automatically derived via reflection. If a BeanInfo class is present, only the properties explicitly declared in this class will be exposed by StreamBase.

The easiest way to make a BeanInfo class is to create a class that you name by appending BeanInfo to the Parameterizable class name. Thus if the Parameterizable class is called OpParams, the corresponding BeanInfo would be OpParamsBeanInfo (in the same package as OpParams) and extending java.beans.SimpleBeanInfo.

The following example BeanInfo class changes the Name property to an enum and makes the Clever property optional:

public class ExampleBeanInfo extends SimpleBeanInfo {
  public PropertyDescriptor [] getPropertyDescriptors() {
    try {
      SBPropertyDescriptor [] p = {
        new SBPropertyDescriptor("StartCount", Example.class),
        new SBPropertyDescriptor("Clever", Example.class).optional(),
        new EnumPropertyDescriptor("Name", Example.class,
                                     new String [] {
                                     "Tom", "Dick", "Harry"
                                     })
        };

        return p;
    }
    catch(Exception e) {
      System.out.println("Exception: " + e);
        e.printStackTrace();
      }
    return null;
  }
}

Note that the optional() method shown in this example simply marks the descriptor as optional.

Note

To enable a hidden password, you can use the mask property of SBPropertyDescriptor; if the property is a String type and is set, the adapter Properties view displays asterisks instead of the characters typed.

Managing the Number of Input and Output Ports

Java Operators can also change their number of input and output ports. If this is done by setting property values, the property's setter method should call setPortHints with the new number of ports. Output ports are automatically configured by calling setOutputSchema. Operators may ensure the correct number of input ports in their typecheck method by calling the requireInputPortCount method.

Be sure to see the Special Studio Considerations section of the Javadoc for the Operator class in the StreamBase Java Client Library.

For example:

/**
 * This operator has two integer parameters, many input and many output ports
 */
public class RoundRobinOperator extends Operator implements Parameterizable {
    private int         _numOutputPorts = 1;
    private int         _nextOutputPort = 0;
    private int         _numInputPorts = 1;

    public RoundRobinOperator() {
        setPortHints(_numInputPorts, _numOutputPorts);
    }

    public int getInputPortNumber() {return _numInputPorts;}

    public void setInputPortNumber(int i) {
        _numInputPorts = i;
        setPortHints(_numInputPorts, _numOutputPorts);
    }
    public int getOutputPortNumber() {return _numOutputPorts;}

    public void setOutputPortNumber(int i) {
        _numOutputPorts = i;
        setPortHints(_numInputPorts, _numOutputPorts);
    }

    public void init() throws StreamBaseException {
        System.out.println("RoundRobinOperator.init");
    }

    public PortCounts getPortCounts() {
        return new PortCounts(_numInputPorts,1);
    } 
    public void typecheck() throws TypecheckException {
        System.out.println("RoundRobinOperator.typecheck");
        Schema s = null;
        StringBuffer errorMessage = new StringBuffer();

        if(getInputPortCount() <= 0) {
            throw new TypecheckException("Invalid Number of Input Ports: " 
                                         + getInputPortCount() 
                                         + " must have at least 1");
        }

        System.out.println("RoundRobinOperator.typecheck: requiring inputs: " + _numInputPorts);
        requireInputPortCount(_numInputPorts);

        for(int i = 0; i < getInputPortCount(); ++i) {
            Schema inputSchema = getInputSchema(i);

            if(s == null) {
                s = inputSchema;
            } else if(!s.sameFieldTypes(inputSchema)) {
                errorMessage.append("Invalid schema on port " + i + "\n");
            }
        }
        if(errorMessage.length() > 0)
            throw new TypecheckException(errorMessage.toString());

        System.out.println("RoundRobinOperator.typecheck: input schemas ok");

        System.out.println("input port num " + getInputPortNumber()
                           + " output port num " + getOutputPortNumber());

        s = getInputSchema(0);

        for(int i=0; i < getOutputPortNumber(); ++i) {
            setOutputSchema(i, s);
        }

        System.out.println("RoundRobinOperator.typecheck: done");
    }

    public void processTuple(int inputPort, Tuple t) throws StreamBaseException {
        sendOutput(_nextOutputPort, t);
        _nextOutputPort = (_nextOutputPort + 1) % _numOutputPorts;
    }
}

The Java Operator Life Cycle

The Java Operator class has the following life cycle. Note that this section describes the life cycle within the StreamBase Server (sbd) process:

  1. Constructor

    All Java Operators must have a public default constructor. The Constructor is called when the Operator instance is created, but before the Operator is connected to the StreamBase application. We recommended that you set the default Input port and Output port count in the Constructor with the setPortHints(inPortCount, outPortCount) method. The default is 1 input port, 0 output ports.

  2. typecheck

    The typecheck method is called after the Operator instance is connected in the StreamBase application, allowing the Operator to validate its properties. The Operator class may change the number of input or output ports by calling the erequireInputPortCount(portCount) method or the setOutputSchema(schema, portNum) method. If the verifyInputPortCount method is passed a different number of ports than the Operator currently has, a PortMismatchException (subtype of TypeCheckException) is thrown.

    Call the getResourceContents method during typecheck, instead of waiting until start or run to call it. This is to ensure that StreamBase Studio can indicate to the user whether it was able to find the resource during authoring, and avoid waiting until sbd fails silently.

  3. init

    If typecheck succeeds, the init method is called before the StreamBase application is started. Note that your Operator class is not required to define the init method, unless (for example) you need to perform initialization of a resource such as a JDBC pool, if your operator is making JDBC calls.

  4. processTuple

    The processTuple method is called when a tuple is available for processing.

  5. shutdown

    The shutdown method is called when the StreamBase Server is in the process of shutting down.

Java Operators and Life Cycle Events

A Java operator runs in the same process as the StreamBase application that contains it. One advantage of this is that when the StreamBase application starts or stops, any Java operators or embedded adapters in it start and stop along with it. More generally, the application and these components undergo life cycle changes in a synchronized fashion.

Java operators undergo life cycle changes in a prescribed order. Typically, when a StreamBase application is represented in StreamBase Studio, data flows through the application from left to right. Here, the order in which operators undergo life cycle events is described as being either left to right, or right to left, where left to right means in the same direction as data flows through the app. Right to left, on the other hand, means that Java operators will undergo some life cycle change in the opposite order as they will see data.

Life cycle changes that a StreamBase application undergoes include starting, pausing, resuming and shutting down. During starting and resuming, operators are processed in right to left order. This means that operators start in an order that is opposite to that in which they will receive data. Conversely, operators are paused and stopped in left to right order, or in the same order as they will receive data.

Managing Java Operators

Java operators start, pause, resume and shutdown along with the StreamBase application that contains them. It is also possible to suspend and resume Java operators independently of their StreamBase application.

This is accomplished with the same commands that are used to suspend and resume a StreamBase application as a whole. The command sbadmin suspend can be used to suspend an application; similarly, sbadmin resume is used to resume a StreamBase application.

To suspend or resume individual operators, you can append a list of Java operators to the sbadmin suspend or sbadmin resume command. If one or more strings are appended to sbadmin suspend, then the StreamBase application as a whole will not be suspended, rather the individual Java operators named by the appended strings will be suspended.

Note that an individual Java operator can be suspended or resumed only if the StreamBase application itself is running. An individual Java operator can be running only if the application that contains it is also running. Therefore, it is not meaningful to suspend or resume an operator that is not currently running if the application that contains it is not running.

Getting the Status of Java Operators

The sbc status command returns status information about the StreamBase Server.

The sbc status --operators command returns the status of any Java operators contained by the server's application. Note that sbc status --operators and sbc status are disjoint commands: sbc status returns information about the server only, not about Java operators contained in the server. Similarly, sbc status --operators returns information about contained Java operators only, and not any information about the server itself.

In this context, the status of a Java operator consists only of its current state. For example, if a Java operator has been started and is currently running, its state will be STARTED. If a Java operator has yet to be started, its state will be NONE. If a Java operator has been suspended, its state will be either SUSPENDED_AND_DROPPING_TUPLES or SUSPENDED_AND_PROCESSING_TUPLES. Lastly, if an operator has been shut down its state will be SHUTDOWN.

Starting a Java Operator Independently of its StreamBase Application

By default, a Java operator starts along with the StreamBase application that contains it. In Studio, in the General tab for Java operators, there is a check box labeled Start with application. By default, this box is selected, meaning that the Java operator will start with the application. Clearing this check box causes the operator to be left in the NONE state when the application starts.

A Java operator that does not start with its application will stay in the NONE state until it is explicitly started with the sbadmin resume operatorName command. Such an operator will not start even if the application as a whole is resumed. So for example, the application as a whole may suspend and then resume; this will have no effect on an operator that has not started with its application.

Processing of Tuples During the Suspension of an Operator

If a Java operator is suspended separately from the application that contains it, tuples might still arrive at the suspended operator. You can configure the Java operator to handle these tuples in two different ways:

  • A suspended Java operator can choose to drop tuples that are delivered to it.

  • The operator can choose to process these tuples.

These two possibilities are represented by static Strings on the class com.streambase.sb.operator.Operator, SUSPENDED_AND_DROPPING_TUPLES and SUSPENDED_AND_PROCESSING_TUPLES, respectively. A Java operator is configured to either drop or process tuples by calling the method setSuspendBehavior on its instance. setSuspendBehavior takes an int argument, the value of which must be either SUSPENDED_AND_DROPPING_TUPLES or SUSPENDED_AND_PROCESSING_TUPLES.

Stepping Through a Sample Java Operator

This section steps you through the creation of a sample Java Operator class, StringCase.java. This example is kept deliberately simple, to allow you to focus on the API without having to read other lines of code. The Java Operator is used in a StringCase.sbapp application to simply map string input to lower-case or upper-case. The files that comprise the sample, including the StringCase.java file discussed here, are installed with StreamBase in the sample/javaoperator directory.

Before beginning this exercise, open StreamBase Studio and load the javaoperator sample, as described in Java Operator Sample. This creates the sample_javaoperator project in your Package Explorer. In this project, double-click StringCase.java to open it in the Java Editor.

Stepping through StringCase.java

In an editor, we started by adding these import statements in StringCase.java:

import com.streambase.sb.DataType;
import com.streambase.sb.Schema;
import com.streambase.sb.StreamBaseException;
import com.streambase.sb.Tuple;
import com.streambase.sb.operator.Operator;
import com.streambase.sb.operator.Parameterizable;
import com.streambase.sb.operator.TypecheckException;

import java.util.Arrays;

Next, we declared the class by extending the StreamBase Operator class. For example:

public class StringCase extends Operator implements Parameterizable {

In the constructor, we defined two constants for an enum property, conversionType. In this example, we defined the metadata for the enum property in a Java bean, the StringCaseBeanInfo class. For example:

//
// Constants for enum property conversionType
//
public static String UPPER = "Uppercase";
public static String LOWER = "Lowercase";

/** Enum property, metadata defined in StringCaseBeanInfo.java */
private String conversionType = LOWER;

Note

The source for StringCaseBeanInfo is Stepping through StringCaseBeanInfo.java You can find it in the sample/javaoperator directory in the StreamBase installation.

Next we declare the Java Operator's input and output, and an array for fields in the schema. For example:

private Schema inSchema;        // input to this operator
private Schema outSchema;       // output from this schema
private Schema.Field[] fields;  // individual fields in the schema

Then we use the setPortHints method to specify a fixed number of input and output ports:

public StringCase() {
    // this operator has one input and one output
    setPortHints(1, 1);
}

Note

If the Java Operator will take a variable number of output ports, use the Parameterizable interface and the typecheck method. There must be a parameter for the number of input and output ports. Within typecheck, you must specify a setOutputSchema for each port, so that StreamBase Studio knows how many square black dots to draw for the output ports. If you do not use setOutputSchema for each port, the Java Operator on the StreamBase Studio canvas will not display the correct number of black dots.

Next, we set up the typecheck method for the class, added exception handling, and declared that the schema of the output is the same as for the input:

public void typecheck() throws TypecheckException {

    // require exactly one input port
    requireInputPortCount(1);

    // the input must contain at least one string field
    inSchema = getInputSchema(0);
    fields = inSchema.getFields();
    int stringCount = 0;
    for(int i=0; i < fields.length; ++i) {
        DataType dt = fields[i].getDataType();
        if(DataType.STRING.equals(dt)) {
            ++stringCount;
        }
    }
    if (stringCount==0)
        throw new TypecheckException(
                   "At least one string field is required");
     // the output schema is the same as the input
    setOutputSchema(0, inSchema);
    outSchema = inSchema;
}

Finally we defined the work that the custom Java operator will perform, by using the processTuple method. For example:

public void processTuple(int inputPort, Tuple t)
    throws StreamBaseException {
    Tuple out = outSchema.createTuple();

    for(int i=0; i < fields.length; ++i) {
        Schema.Field f = fields[i];
        DataType dt = f.getDataType();
        String fname = f.getName();

        if(DataType.STRING.equals(dt)) {
            String str = t.getString(f);
            if(LOWER.equals(conversionType)) {
                out.setString(fields[i], str.toLowerCase());
            } else if(UPPER.equals(conversionType)) {
                out.setString(fields[i], str.toUpperCase());
            }
        } else {
            out.setField(fields[i], t.getField(fields[i]));
        }
    }
    sendOutput(0, out);
}

public String getConversionType() {return conversionType;}

public void setConversionType(String s) {
    conversionType = s;
    }
}

Stepping through StringCaseBeanInfo.java

StringCaseBeanInfo.java is the BeanInfo class for the StringCase operator. StringCaseBeanInfo defines the metadata for the properties that will appear in the StreamBase Studio Properties view for this Java operator. The StringCase operator has only one property, conversionType, which is an enum whose values are defined in the StringCase class.

Now, open StringCaseBeanInfo.java and examine it. First we import these classes:

import java.beans.PropertyDescriptor;
import java.beans.SimpleBeanInfo;

import com.streambase.sb.operator.parameter.EnumPropertyDescriptor;
import com.streambase.sb.operator.parameter.SBPropertyDescriptor;

And define the bean as follows:

public class StringCaseBeanInfo extends SimpleBeanInfo {
  public PropertyDescriptor [] getPropertyDescriptors() {
    try {
      SBPropertyDescriptor [] p = {
        new EnumPropertyDescriptor("ConversionType", StringCase.class,
                     new String [] {
                     StringCase.UPPER, StringCase.LOWER
                     })
        };

        return p;
    }
    catch(Exception e) {
        System.out.println("Exception: " + e);
        e.printStackTrace();
    }

    return null; // should never happen
  }
}