Using the Merge Operator

Introduction

  The Merge operator compares tuples on two or more input streams, sorted according to the values in a specified input field. It then combines the data, emitting one ordered output stream. All input streams must have compatible schemas (with the same fields, data types and sizes, not necessarily in the same order). Tuples on the output stream also have the same schema as the input streams. Incoming tuples are sorted in increasing order. Optionally, tuples can also be grouped by one or more input fields. Any groups that you define are ordered independently when they are merged.

The Merge operator stores arriving tuples in a buffer for each input port. It emits tuples when a new tuple's value (based on the field that was selected to merge on) is greater than or equal to the oldest tuple in the other buffer. If the group option was selected, the tuples must also evaluate to the same group.

Events are assumed to be ordered on the merge field. However, should a tuple arrive whose order-by value decreases, it is prioritized. Such tuples have an order-by field value that is less than the largest order-by value previously seen on that input stream. Any such tuple is treated as if its order-by value is the highest value previously seen on that input stream. This ensures that it will be output as soon as possible while still in the order of that input stream. The value of fields in the event are not actually changed.

The Merge operator is order sensitive. For example, if you want to combine the trades coming from The New York Stock Exchange and the Philadelphia Stock Exchange, you could use either a Union operator or a Merge operator. If you wanted to make sure that the data being combined maintained order using the trade date, use the Merge operator. If order is not important, use the Union operator.

Tip

Use the Merge operator in cases where you expect all incoming streams to receive data. This is important because the merge operation waits for data on all input ports, in order to guarantee that the output is correctly merged. For example, if one of your streams has a very low data rate, the other streams might buffer more than you intended. In this case, consider adding a Heartbeat operator upstream from the Merge operator. The Heartbeat operator can detect when tuples do not arrive at a specified interval on a less active stream and emit its own tuple, causing the Merge operator to flush buffered data accumulated from more active streams.

This topic describes the actions you can take on each tab of the Merge operator's Properties View, and illustrates how tuples are merged.

Properties: General Tab

Name: Use this required field to specify or change the name of this instance of this component, which must be unique in the current EventFlow module. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Storage method: Specify settings for this operator with the same in heap and in transactional memory options described for the Annotations tab of the EventFlow Editor. The default setting is Inherit from containing module.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow Editor canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Properties: Merge Settings Tab

Use the Merge Settings tab to specify:

  • Number of Input Ports (streams to merge; default is 2)

  • Field by which to merge tuples - An input field name. This field determines the order in which tuples on the input streams will be merged. It should contain values that generally increase monotonically.

See Example 1: Simple Merge in this topic to trace a merge operation.

Properties: Group Options Tab

The Group Options tab allows you to specify tuple groupings based on one or more fields in the input streams. Add a row for each group and identify its input field in the Expression field. See Example 2: Merge with Grouping in this topic to trace a merge operation that uses the group option.

Properties: Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Example 1: Simple Merge

Consider an application that performs a simple Merge operation where the input schema contains a stock symbol field (string data type) and a timestamp field. The operation merges on the timestamp field. That is, we want the output stream to merge the two input streams with the tuples sorted by time. Keeping in mind the earlier introduction to the Merge operation, let's trace some data flowing through the application.

Each step of this example shows a new tuple being enqueued to the Merge operator, and any output tuples that are released. The contents of the two operator buffers are shown after each tuple arrives and after any tuples are released.

  1. As we begin the trace, a tuple has already been enqueued on the second stream and is stored in the Merge buffer. Now, another tuple enters the on the second input stream. No output occurs because both inputs are on the same stream:

  2. A tuple arrives on the first stream, causing the Merge operator to evaluate the new tuple against each buffered tuple from the other stream. Because both existing tuples have a lower value based on their timestamps, Merge releases them on its output stream. The new tuple is retained in the buffer.

  3. Another arrives on the first stream. Again, only one buffer contains data, so no output occurs.

  4. A tuple arrives on the second stream. Notice that its value is between those of the existing values on the other stream. The result is that two tuples are emitted: the new one and the existing one of lower value from the other port's buffer.

  5. A tuple arrives on the first stream, with no output. Note that its value is the same as the existing tuple:

  6. A new tuple on the second stream has the same value as both tuples stored in the first port's buffer. In this case, Merge releases all three tuples of equal value to the output stream:

Example 2: Merge with Grouping

In this example we will trace the effects of the same input as in Example 1: Simple Merge, explaining how grouping changes the way Merge works. The schema and the input are the same, and we will still merge on the timestamp. But we will also group on the symbol field. That is, we want tuples to be organized by stock symbol, and sorted by timestamp within each group.

  1. As in Example 1, we begin with two tuples from the second input stream. As before, there are no tuples in the first port's buffer, so no output has occurred.

  2. When the next tuple arrives on the first stream, the same condition applies as in Example 1: the two tuples on the second port have earlier timestamps. However, no merge occurs this time, because now we want to group tuples by symbol. None of the tuples buffered on the second port matches the symbol of the incoming tuple, so the new tuple is buffered while the Merge operator waits for more input.

  3. A new tuple on the first stream meets the merge-by-group conditions: the two tuples in the first port's buffer have smaller timestamp values, and their symbol fields match the incoming tuple's symbol field. Therefore, the Merge operator releases on its output stream a sorted group of tuples with that symbol from port 2's buffer.

  4. Another tuple arrives on the second stream. Compare the result with the same step in Example 1: even though the first buffer contains a tuple with a lower value, no output occurs this time, because the symbols are different.

  5. A tuple arrives on the first stream. An existing tuple on the other port's buffer has the same symbol and a lower time value, so a merge occurs: the existing tuple is released.

  6. The last tuple, arriving on the first stream, causes no merge because the second port's buffer previously released all its tuples. Recall that in Example 1, the second port's buffer still contained a tuple with a matching symbol at this point, and a merge occurred as a result.