Custom Java Aggregate Function Sample

Introduction

This sample demonstrates how to configure and call custom Java aggregate functions in an EventFlow module.

This sample's file's are:

nth.sbapp

The sample EventFlow module file, placed in the package src/main/eventflow/com.example.sample.

sbengine.conf

The sbengine type configuration file, placed in src/main/configurations, defines two aliases for the same aggregate function we want to call. Custom aggregate functions in StreamBase always extend the StreamBase Client API's AggregateWindow class, and always call the accumulate method. Whey type=aggregate, the methodName property is optional; if used, it must be methodName="accumulate".

Nth.java

Java files containing our example custom classes, placed in src/main/java/com.streambase.sample. Notice that the package name for these Java files is the not the same here as for the EventFlow module; they are not required to be the same or related.

TestCase.java

An EventFlow JUnit test file, configured and ready to run, placed in src/test/java/com.tibco.ep.sample. Run this test by selecting this file, right-clicking, and from the context menu, selecting Run As>EventFlow Fragment Unit Test.

pom.xml

The Maven object model file for this project, placed at the root of the project folder.

Description

The nth.sbapp EventFlow module has one input stream that divides into two independent Aggregate operators.

Aggregate Operator PlainAgg

This case illustrates a simple, standard use of the Aggregate operator, PlainAgg. This operator specifies a single aggregate function, sum(), from the StreamBase expression language. The operator is configured with a single dimension of type Tuple with a window size of 3. The result of this design is a single tuple emitted for every three double values sent into IncomingDbl. The output tuple contains the sum of the three input tuples.

The Aggregate operator in the top stream specifies a single aggregate function: sum(inputval)

Aggregate Operator CalcNth

This Aggregate operator calls the same sum(inputval) standard function, and then goes on to call a custom aggregate function class com.streambase.sample.Nth in three ways. The Nth class extends the StreamBase-provided AggregateWindows class, as described in the Javadoc for that class.

The Nth class performs a very simple task: it emits the current input value. Thus, the difference between the top and bottom output streams is that the top one emits only the sum of tuples sent in, while the bottom stream does that and goes on to emit each value input into the aggregate window as well.

The bottom Aggregate operator specifies the following five aggregate functions:

sum(inputval) Uses the StreamBase expression language sum() function.
firstval(inputval) Uses the expression language's firstval() function to return the first value in the aggregate window.
enth(2, intval) This project's sbengine.conf configuration defines an alias, enth, for this project's custom aggregate class. This row calls the first alias instead of using calljava().
oneline(3, intval) This project's sbengine.conf configuration defines a second alias, oneline, for the same aggregate class. This row calls the second alias.
calljava('com.streambase.sample.Nth', 4, inputval) Calls this project's custom aggregate class, but asking for an input value, 4, which is outside of the aggregate window size 3. This condition is expected to return null; it does so only because its second argument is 4, not because we used calljava().

To keep the example simple, the custom function does not validate its input at runtime.

Importing This Sample into StreamBase Studio

In StreamBase Studio, import this sample with the following steps:

  • From the top-level menu, click File>Import Samples and Community Content.

  • Enter custom agg to narrow the list of options.

  • Select the Custom Java aggregate function sample from the Extending StreamBase category.

  • Click Import Now.

StreamBase Studio creates a project for the sample.

Running This Sample in StreamBase Studio

  1. In the Project Explorer view, open this sample's folder.

    Keep an eye on the bottom right status bar of the Studio window. Make sure any Updating, Downloading, Building, or Rebuild project messages finish before you proceed.

  2. Open the src/main/eventflow/packageName folder.

  3. Double-click to open the nth.sbapp application. Make sure the application is the currently active tab in the EventFlow Editor.

  4. Click the Run button. This opens the SB Test/Debug perspective and starts the module.

  5. Wait for the Waiting for fragment to initialize message to clear.

  6. Enter data for the top stream:

    1. In the Manual Input view, select the IncomingDbl input stream.

    2. In the inputval field, enter 10 and press Send Data three times.

    3. Observe the tuples in the Output Streams view. No tuples are output until three input tuples are sent, because the aggregate's window size is set to 3. The output you should observe on the OutPlain streams is:

      SumEveryThree=30.0

      On the OutWithCustom stream, expect an output tuple like the following:

      SumEveryThree=30.0, first=10.0, second=10.0, third=10.0, bogus=null
      
  7. Send in more input values, which can be the same three times, or three different numbers, pressing Send Data after each one.

  8. When done, press F9 or click the Terminate EventFlow Fragment button.

Sample Location

When you load the sample into StreamBase Studio, Studio copies the sample project's files to your Studio workspace, which is normally part of your home directory, with full access rights.

Important

Load this sample in StreamBase Studio, and thereafter use the Studio workspace copy of the sample to run and test it, even when running from the command prompt.

Using the workspace copy of the sample avoids permission problems. The default workspace location for this sample is:

studio-workspace/sample_custom-java-aggregate

See Default Installation Directories for the default location of studio-workspace on your system.