Creating Custom C++ Functions

The StreamBase C++ API supports custom C++ functions that you can call directly in StreamBase expressions by using the callcpp() function or using a function alias. There are two forms of the callcpp() function, one for calling simple functions, the other for calling functions used in a StreamBase aggregate context.

Custom C++ Function Samples

The StreamBase installation includes the source files for two custom C++ function samples:

Sample Described in
custom-simple-function Custom C++ Simple Function Sample
custom-aggregate-function Custom C++ Aggregate Function Sample

These custom functions were built by extending the sb::PluginFunction and sb::PluginAggregate classes, respectively, of the StreamBase C++ Client library. These classes are described in the C++ API Documentation.

Steps for Creating Custom C++ Functions

The following sections explain how to create custom C++ functions for your StreamBase applications, by extending the StreamBase C++ Custom Function API.

Notes:

  • A StreamBase custom C++ function is mapped to a class.

  • Multiple classes (multiple StreamBase custom C++ functions that you write) can be added to a single DLL on Windows, or to a single shared library on UNIX.

Creating Custom Simple Functions

This section outlines the steps for writing a custom C++ simple function.

  1. As illustrated in the simple function sample, first include two header files:

    #include "StreamBase.hpp"
    #include "PluginFunction.hpp"

    The two referenced header files are delivered in streambase-install-dir/include. Make sure your compiler can locate these files, as described below in Building Custom C++ Functions on UNIX and Building Custom C++ Functions on Windows.

  2. Next, define the following namespaces:

    using namespace std;
    using namespace sb;
  3. Define the name for your function's class and have it inherit from PluginFunction in the StreamBase C++ Client API. For example:

    class MySimpleFunction : public PluginFunction
  4. Two virtual methods are required in simple function classes. They are:

    • virtual void typecheck(const Schema &arg_types)

      This function should validate the number and type of arguments that your function uses. The typecheck method is the one used by StreamBase Studio when authoring and also by StreamBase Server when the application is started.

    • virtual void eval(Tuple &retval, ConstTuple &args)

      The eval method is called when the function is executed in a StreamBase application.

  5. Within the simple function class, you must declare the class to StreamBase:

    STREAMBASE_DECLARE_PLUGIN_FUNCTION(MySimpleFunction);

    This declaration provides some infrastructure that aids in registering the simple function class for use by StreamBase.

  6. Finally, outside of the simple function class definition, you must define the class and its public name in the StreamBase application:

    STREAMBASE_DEFINE_PLUGIN_FUNCTION 
    (MySimpleFunction, "MyApplicationCalculation");

    This definition provides a means to register your simple function and provides the mapping between the simple function class name and the name you want to use to invoke this function in your StreamBase application.

Creating Custom Aggregate Functions

This section outlines the steps for writing a custom C++ aggregate function.

  1. As for simple functions, include two header files:

    #include "StreamBase.hpp"
    #include "PluginAggregate.hpp"
  2. Define two namespaces:

    using namespace std;
    using namespace sb;

  3. Then define the class name and have it inherit from PluginAggregate (instead of PluginFunction):

    class MyAggregateFunction : public PluginAggregate
  4. There are four required methods in an aggregate function:

    • virtual void typecheck(const Schema &arg_types)

      Validates the argument types.

    • virtual void initialize()

      Clear any window state an aggregate function may keep.

    • virtual void increment(ConstTuple &args)

      Use to add values to the windows state.

    • virtual void calculate(Tuple &retval)

      Use to calculate the value of the aggregate over the values currently in the window.

  5. Within the aggregate function class, you must declare the class to StreamBase:

    STREAMBASE_DECLARE_PLUGIN_AGGREGATE(MyAggregateFunction);

  6. Finally, outside of the aggregate function class definition, you must "define" the class and what it will be known as in the StreamBase application:

    STREAMBASE_DEFINE_PLUGIN_AGGREGATE
    (MyAggregateFunction, "MyApplicationAggregate");

Typechecking Arguments Passed to Functions

When you write custom functions by extending the StreamBase C++ API classes, your function object may use internal fields to record the data types with which it was typechecked. Note that functions can be polymorphic (take different argument data types), and they can also take a variable number of arguments.

To specify variable number of arguments, call arg_types.size() in your typecheck method.

If your function is polymorphic, then it is probably necessary to record the type information provided at typecheck time. This recorded information can be used in the implementation of the eval() method, to vary behavior based on the types of the arguments. The object is guaranteed to have been typechecked before eval() is called, and to receive arguments to eval() that match the types passed to typecheck().

The following code example returns the sum of the integer or double arguments, as a double. If there are no arguments, it returns 0 using the setIntValue method. The eval method is used to verify the data type of inputs.

class IntDoubleSumFunction : public PluginFunction {
  private:
    unsigned int _arg_count;
  public:
     typecheck(const Schema &argSchema)
    {
        _arg_count = argSchema.getNumFields();
        for (unsigned int i = 0; i < _arg_count; ++i)
            requireType(argSchema, i, DataType::INT, DataType::DOUBLE);
        setReturnType(DataType::DOUBLE);
    }
    virtual void eval(Tuple &retval, ConstTuple &args)
    {
        if (_arg_count == 0)
            retval.setDouble(0, 0);
        else {
            double ret = 0.0;
            for (unsigned int i = 0; i < _arg_count; ++i) {
                DataType dt = args.getSchema().getField(i).getType();

                if(DataType::DOUBLE == dt) {
                    ret += args.getDouble(i);
                } else if(DataType::INT == dt) {
                    ret += args.getInt(i);
                }
            }
            retval.setDouble(0, ret);
        }
    }
    STREAMBASE_DECLARE_PLUGIN_FUNCTION(IntDoubleSumFunction);
};
STREAMBASE_DEFINE_PLUGIN_FUNCTION(IntDoubleSumFunction, "int_double_sum");

Building Custom C++ Functions on UNIX

On a supported UNIX machine where StreamBase is installed, use the sb-config utility to set up the environment and define the compiler to use when compiling your program. For example:

CXX=`sb-config --cxx`
$CXX MyClient.cpp `sb-config --cflags` -c -o MyFunction.o
$CXX MyClient.o `sb-config --libs` -o MyFunction

Substitute the name of your function for MyFunction.

Use the Makefiles in the samples as a guide to setting up your projects.

StreamBase requires G++ 3.4 through G++ 4.2 for building C++ clients on UNIX. StreamBase does not support building custom functions with G++ 4.3, which is the default compiler on newer Linux distributions. On such distributions, install GCC and G++ 4.2, and set the CC and CXX environment variables before building StreamBase C++ code, including StreamBase samples that include C++ code. For example:

export CC=gcc-4.2
export CXX=g++-4.2

Building Custom C++ Functions on Windows

To build your custom C++ function on Windows, you must configure Microsoft Visual C++ as described in Configuring Visual C++.

StreamBase supports only 64-bit custom C++ native-code functions on Windows. Support for building 32-bit C++ functions was removed due to library changes that accompanied that release.

If you are migrating an application from an earlier StreamBase release, any existing C++ custom functions that you were compiling to 32-bit DLLs in the earlier release must be reconfigured to work with current releases. You have the following options to migrate these functions:

  • Port your existing 32-bit C++ functions to 64-bit functions.

  • Rewrite your 32-bit C++ functions as Java functions.

  • For algorithms that you wrote in C++ for performance reasons, you can rewrite those functions in pure C, and write a JNI bridge layer also in C, then implement those functions as StreamBase Java functions that call into the JNI layer.

Configuration Steps

  • Before running your application in StreamBase Studio:

    Register the plug-in function name in a StreamBase Server configuration file. If a configuration file does not already exist in your project, you can click NewServer Configuration to create one.

    Edit the <custom-functions> section in your configuration file, adding a <custom-function> element for each C++ custom function that you built. For example:

    <custom-functions>
        <custom-function name="func1" type="simple" >
          <args>
            <arg type="int" />   
            <arg type="string" />    
          </args>
          <return type="string" />
        </custom-function>
    </custom-functions>
  • When you deploy your application:

    Edit the StreamBase Server configuration file that will be used to run your application. In addition to declaring the function names in the <custom-functions> section, identify in the <global> section the location of the DLL or .so file that implement your functions. For example:

    <streambase-configuration>
      <global>
        <plugin directory="${STREAMBASE_HOME}/plugins"
      </global>
      ...
    </streambase-configuration>

    This example assumes that the environment variable STREAMBASE_HOME is set for the environment in which the server will run. If it is not set in the server's environment, use a full absolute path to the plugins directory. StreamBase Server automatically loads all the files in the specified directory.

  • Finally, identify your customized configuration file when you start the server. For example:

    sbd -f MyAppsConfig.sbconf MyApp.sbapp

See the StreamBase Server Configuration XML topic for details on editing your server configuration file.

Back to top ^