This sample demonstrates how to configure and call custom Java aggregate functions in an EventFlow module.
This sample's file's are:
The sample EventFlow module file, placed in the package
sbenginetype configuration file, placed in
src/main/configurations, defines two aliases for the same aggregate function we want to call. Custom aggregate functions in StreamBase always extend the StreamBase Client API's
AggregateWindowclass, and always call the
type=aggregate, the methodName property is optional; if used, it must be
Java files containing our example custom classes, placed in
src/main/java/com.streambase.sample. Notice that the package name for these Java files is the not the same here as for the EventFlow module; they are not required to be the same or related.
An EventFlow JUnit test file, configured and ready to run, placed in
src/test/java/com.tibco.ep.sample. Run this test by selecting this file, right-clicking, and from the context menu, selecting > .
The Maven object model file for this project, placed at the root of the project folder.
nth.sbapp EventFlow module has one input stream
that divides into two independent Aggregate operators.
- Aggregate Operator PlainAgg
This case illustrates a simple, standard use of the Aggregate operator,
PlainAgg. This operator specifies a single aggregate function,
sum(), from the StreamBase expression language. The operator is configured with a single dimension of type
Tuplewith a window size of 3. The result of this design is a single tuple emitted for every three double values sent into IncomingDbl. The output tuple contains the sum of the three input tuples.
The Aggregate operator in the top stream specifies a single aggregate function:
- Aggregate Operator CalcNth
This Aggregate operator calls the same
sum(inputval)standard function, and then goes on to call a custom aggregate function class
com.streambase.sample.Nthin three ways. The
Nthclass extends the StreamBase-provided
AggregateWindowsclass, as described in the Javadoc for that class.
Nthclass performs a very simple task: it emits the current input value. Thus, the difference between the top and bottom output streams is that the top one emits only the sum of tuples sent in, while the bottom stream does that and goes on to emit each value input into the aggregate window as well.
The bottom Aggregate operator specifies the following five aggregate functions:
Uses the StreamBase expression language
Uses the expression language's
firstval()function to return the first value in the aggregate window.
sbengine.confconfiguration defines an alias,
enth, for this project's custom aggregate class. This row calls the first alias instead of using
sbengine.confconfiguration defines a second alias,
oneline, for the same aggregate class. This row calls the second alias.
calljava('com.streambase.sample.Nth', 4, inputval)
Calls this project's custom aggregate class, but asking for an input value, 4, which is outside of the aggregate window size 3. This condition is expected to return null; it does so only because its second argument is 4, not because we used
To keep the example simple, the custom function does not validate its input at runtime.
In StreamBase Studio, import this sample with the following steps:
From the top-level menu, click> .
custom aggto narrow the list of options.
Select the Custom Java aggregate function sample from the Extending StreamBase category.
StreamBase Studio creates a project for the sample.
In the Project Explorer view, open this sample's folder.
Keep an eye on the bottom right status bar of the Studio window. Make sure any
Rebuild projectmessages finish before you proceed.
Double-click to open the
nth.sbappapplication. Make sure the application is the currently active tab in the EventFlow Editor.
Click the Run button. This opens the SB Test/Debug perspective and starts the module.
Wait for the Waiting for fragment to initialize message to clear.
Enter data for the top stream:
In the Manual Input view, select the
10and press three times.
Observe the tuples in the Output Streams view. No tuples are output until three input tuples are sent, because the aggregate's window size is set to 3. The output you should observe on the
OutWithCustomstream, expect an output tuple like the following:
SumEveryThree=30.0, first=10.0, second=10.0, third=10.0, bogus=null
Send in more input values, which can be the same three times, or three different numbers, pressingafter each one.
When done, press F9 or click the Terminate EventFlow Fragment button.
When you load the sample into StreamBase Studio, Studio copies the sample project's files to your Studio workspace, which is normally part of your home directory, with full access rights.
Load this sample in StreamBase Studio, and thereafter use the Studio workspace copy of the sample to run and test it, even when running from the command prompt.
Using the workspace copy of the sample avoids permission problems. The default workspace location for this sample is:
See Default Installation
Directories for the default location of
studio-workspace on your system.