Contents
This topic explains how the Aggregate operator works, and describes the actions you can take on each tab of the operator's Properties view.
The Aggregate operator is used to compute aggregations over moving windows of tuple values. Each window is a view on a part of the input data. For example, you could sum the volumes of trades of a stock over 30 second intervals. Or, you might calculate a moving average of prices for each of four successive trades of a given stock.
The Aggregate operator accepts a single input stream. It maintains one or more windows according to policies that are defined in the operator's properties. You define window policies in one or more dimensions. A dimension specifies:
-
When a window closes. For example, when it contains some number of tuples or some number of seconds elapse.
-
How much each new window advances relative to the previous one.
-
The type of aggregation to perform: an operation based on one of the following:
-
The number of tuples in the window
-
The time tuples arrive
-
A field in the input tuples
-
A predicate expression
-
When the conditions prescribed in the dimension's policies are true, the operator
performs one or more calculations on the tuples that currently occupy the Aggregate's
window, based on expressions that you specify. The calculations performed can include
aggregating functions, such as sum(Volume) or
avg(PricePerShare). You can also group tuples based on
a particular input field (for example, the value of a stock's Symbol field). The
result of the aggregation is released on the Aggregate operator's output port.
Note
Aggregate windows count tuples starting from zero.
The sections that follow describe the Aggregate Property view fields and settings. The examples at the end illustrate some Aggregate operations. For information about the available built-in aggregating functions, see Aggregate Functions Overview on the StreamBase Expression Language Functions page.
There is no substitute for hands-on experience, so we encourage you to run the installed Aggregate operator samples, which are described in the Operator Sample Group section of the Samples Guide. Also see the StreamBase demo applications by switching to the SB Demos Perspective in StreamBase Studio. Several of the demos use Aggregate operators.
Name: Use this field to specify or change the component's name, which must be unique in the application. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.
Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports and Error Streams to learn about Error Ports.
Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.
The Dimensions tab allows you to add, edit, or remove dimensions from an Aggregate operator. When you click the button to define a new dimension, or select an existing dimension and then click the button, StreamBase Studio displays its Edit Dimension dialog.
The Dimensions tab show the information set for each dimension in a simple text summary, as seen in this example:
|
As explained in the Introduction, a window can be opened or closed (or prevented from opening or closing) when the conditions specified in the dimension are true.
You can define more than one dimension to cover different run-time conditions. In that case, the Aggregate operator uses the first dimension whose rules can be applied. The selected dimension, and only that dimension, applies to all the Aggregate operator windows. Note that even a multi-dimension Aggregate contains a single set of one or more aggregating functions that are applied to one or more fields in the Aggregate's windows. That is, you cannot apply one type of aggregating function on one dimension, and a different type of aggregating function on another dimension.
In the Edit Dimension dialog, first choose the Type option, which specifies how the aggregation should be performed:
-
Tuple — Windows are managed based on the number of input tuples. For details, see Aggregate Operator: Tuple-Based Dimension Options.
-
Field — Windows are managed based on input tuple field values. For details, see Aggregate Operator: Field-Based Dimension Options.
-
Predicate — Windows are managed based on predicate expressions for opening, emitting from, and closing the window. For details, see Aggregate Operator: Predicate-Based Dimension Options.
-
Time — Windows are managed based on the passage of time. For details, see Aggregate Operator: Time-Based Dimension Options.
Important
StreamBase Systems recommends avoiding the time-based dimension for most applications. See the Caution on Aggregate Operator: Time-Based Dimension Options for an explanation and suggested alternatives.
Use the Aggregate Functions tab to specify one or more aggregate expressions that will be evaluated in each window. You can add, replace, or remove fields from the output.
For a new Aggregate operator newly placed on the EventFlow canvas, the grid in this tab has a single entry with the following default values:
| Action | Field Name | Expression |
|---|---|---|
| Add | * | lastval(*) |
Note
In StreamBase releases before 7.0, the default expression function was
firstval(*).
In the example below, the first expression calculates the average of prices in the window and multiplies it by two. The next two expressions capture the start and end time of the window (the argument indicates a time-based dimension).
|
The Aggregate Functions grid has the following editing features:
-
The grid is resizable. Grab the bottom row with the mouse to resize it within the Aggregate Functions tab to show fewer rows or to show more rows without scroll bars.
-
To add a grid row, use the green plus button, then select the type of action in the Action drop-down list.
-
Available actions for each grid row are: Add, Replace, and Remove.
-
When using the Remove action, you must specify a field name, but you cannot enter an expression.
-
When using the Add and Replace actions, you must specify a field name, and you must enter an expression in the Expression field for that row.
-
Expressions can contain aggregate functions and constants. Besides field names that are part of aggregate function calls, direct references to input stream fields must be grouped, as described in Properties: Group Options Tab.
-
For a list of aggregate functions, see StreamBase Expression Language Functions.
The following table describes the buttons at the top of the Field grids.
| Button | Name | Description | |
|---|---|---|---|
|
Add |
Adds a row below the currently selected row, or to the end of the grid if
none are selected. Click the arrow on the button's right to specify whether
the row should be added above or below the currently selected row.
When you add a row, the newly created row is highlighted. To start entering information, click in the cell you want to edit. (Some cells are not user-editable.) |
|
|
Remove | Removes the currently selected row. Click the arrow on the button's right to remove all rows or all selected rows. | |
|
Move Up, Move Down | moves the selected row up by one row. moves the selected row down by one row. |
By default, aggregate windows are controlled by the dimensions you define. The Group Options tab allows you to further control windows by grouping on a column of input. Each group has its own independent windows based on one or more fields. For example, if you group on a field named ID, a separate window is created for each unique value of ID that the Aggregate operator receives. If you group on multiple fields, a separate window is created for each unique combination of values of all the fields.
A group that you define here can also be referenced in an expression in the Aggregate Functions tab.
Click the button to add a row for each group you want to define. In each row:
-
In the Output Field Name column, enter the name you want to appear in the output stream of the operator for this field.
-
In the Expression column, enter the input field that you want to group on.
In the following example, the windows generated by the operator's dimensions will be further subdivided into groups by unique values of Symbol, and the resulting field in the output tuple is also named Symbol. You can suppress appending the Group By field to the output tuple by selecting the Omit Group By fields in output check box.
|
Use the Concurrency tab to specify separate threading for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.
Caution
Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.
In general, in an Aggregate operator window, null field values are not included in
the calculation. For example: in an average, such as avg(price) of ten tuples in an aggregate's window, if one of the
values is null, then the average is calculated for the remaining nine values. Null
handling for the andall() and orall()
functions does not follow this general rule.
Independent of the preceding paragraph, key fields with null values still constitute
a valid group for the purposes of grouping aggregate results. For example, let's say
a simple Aggregate operator averages prices for every three tuples with schema
{symbol, price}, and groups the results by symbol. If
three tuples arrive with price information but an empty symbol field, the operator still reports an average of those three
for the null group.
For more information, see Using Nulls.
This example demonstrates the basic Aggregate operation by tracing the windowing action in a simple application with these characteristics:
-
Each tuple in the input stream contains a single integer.
-
The Aggregate operator has a single, tuple-based dimension. The dimension is set to advance each window by one tuple, and each window is set to close and emit after
3tuples. -
The
sum()aggregating function calculates the total value of all the integers in each window.
The following diagrams show a succession of tuples arriving on an input stream. At each step, we depict the arrival of a tuple, the windows that are open at that time, the aggregate function performed on each window, and any output that occurs.
-
When the first tuple arrives on the input stream, a window is immediately opened. You should notice a few things:
-
The new window contains the data for the first tuple, but it has "room" for three tuples: recall that the dimension sets the window size to
3. -
The diagram shows the aggregation function that is performed on the window (even though here it is only a sum of one number). As more data is enqueued, the window will keep a running sum of tuple values.
-
Because the window is not full, no output occurs. Recall that the dimension is set to emit when the size is
3
-
-
When a new tuple arrives, a second window is opened and the new tuple is loaded in both windows. This is in accordance with the dimension, which sets windows to advance by one tuple. If the advance had been set to
2, a new window would open every two tuples (in the next step) instead.Notice the positions of the two windows relative to the tuples in the input stream. For readability, the diagram represents each window separately. In fact, both windows operate on the same data. A more realistic representation might show a succession of overlapping windows moving over a single stream of data.
Also notice that the aggregate result changes for Window 1, and that there is still no output from the operator.
-
When the third tuple arrives, a new window is created and all the windows are updated. For the first time, we see a tuple released on the operator's output port. The output tuple contains the aggregate result from Window 1, which is now full. As soon as the output is released, the Aggregate operator closes the window.
-
When the fourth tuple arrives, a new window is started, as before. Notice that Window 1, which we said was closed in the previous step, is gone now. At the same time, the new tuple causes Window 2 to reach its size limit. This triggers the release of Window 2's aggregate result in an output tuple, and Window 2 is closed.
-
The last step shown is just like the preceding one: input causes a window to become full and release its aggregate result on the output port. And again, a new window opens one tuple ahead of the preceding window,
This example demonstrates a dimension and related Aggregate function that generate a sequence (a count) on the output stream.
|
In this example, notice that we have selected these two options:
-
Opening policy: Do not open window based on this dimension
-
Window size: Do not close window based on this dimension
That combination of settings creates a window that never closes (while StreamBase Server is running, of course). Also notice the setting of the Emission policy, which gives you the option of emitting output before the window closes. Here, the policy is set to Intermediate emission every 1. This means that, as every tuple arrives, one tuple is emitted with the sequence number.
Now, look at the Aggregating Functions tab in the operator's Properties View, which
is set to use the count() aggregating function:
|
As each tuple arrives in the aggregate operator, it is counted. Because each tuple
causes a tuple to be emitted, the emitted tuple contains a value in the sequence_num field that is one greater than the previous tuple. To
understand how the count() function is used here, and
for further information on aggregate functions in general, see the Aggregate Functions
Overview section of the Expression Language Functions topic in the StreamBase References.
Look for further examples of using the Aggregate operator in the operator sample applications included with StreamBase
Studio.
