Using the Apache Kudu Operators

Introduction

This topic describes how to use the Spotfire Streaming for Apache Kudu operators to interact with an Apache Kudu database, and explains how to configure the operators' Properties views.

The Apache Kudu connectivity solution is implemented as a suite of five global Java operators that allows a StreamBase application to connect to a Kudu database and access its data.

Apache Kudu is a distributed database management system designed to provide a combination of fast inserts/updates and efficient columnar scans.

In addition to four write operators dedicated to insert, delete, update and upsert operations, a Scanner operator allows read operations. The operation of all five operators is described in this document.

Placing Kudu Operators on the Canvas

The Kudu operators are members of the Java Operator group in the Palette view in StreamBase Studio. Select the operators from the Insert an Operator or Adapter dialog. Invoke the dialog with one of the following methods:

  • Drag the Adapters, Java Operators token from the Operators and Adapters drawer of the Palette view to the canvas.

  • Click in the canvas where you want to place the operator, and invoke the keyboard shortcut O V

  • From the top-level menu, invoke Insert>Operator>Java.

From the Insert an Operator or Adapter dialog, select one of the following Kudu-related operators and double-click or press OK:

  • Apache Kudu Delete, which removes a row from the cluster.

  • Apache Kudu Insert, which adds a row to the cluster.

  • Apache Kudu Row Scanner, which retrieves rows from the cluster.

  • Apache Kudu Update, which modifies an existing row in the cluster.

  • Apache Kudu Upsert, which modifies an existing row in the cluster, or adds it if it did not exist.

Prerequisites

In order to run, the operators assume the following to be correctly configured:

  • At runtime, the operators expect the configured Kudu database on your network to be running and ready to accept connections at the configured URLs. Configuration of your Kudu operators is discussed in the next section.

Configuration

The different Kudu operators share a connection to the same database, provided they are configured to do so. Each operator lists the clusters available in the a combo box (see Properties: Operator Properties Tab). The list's values are specified in a dedicated section of the application's Kudu.conf file. Here is an example of such a section, containing all supported settings (long lines wrap to the next, for clarity):

name = "Kudu.conf"
type = "com.tibco.ep.streambase.configuration.adapter"
version = "1.0.0"
configuration = {
        
// An adapter group type defines a collection of EventFlow adapter configurations, indexed by adapter type.
  AdapterGroup = {
        
// A collection of EventFlow adapter configurations, indexed by adapter type. This object 
// is required and must contain at least one configuration.
    adapters = {
        
// The root section for an EventFlow adapter configuration.
      kudu = {
        
// Section list. This array is optional and has no default value.
        sections = [
        
// A configuration for an EventFlow adapter named section.
          {
        
// Section name. The value does not have to be unique; that is, you can have multiple sections with
// the same name in the same array of sections. This property is required.
            name = "master-definition"
        
// Section for setting adapter properties. All values must be strings. This object is optional 
// and has no default value.
              settings = {
                hosts = "kuduserver1:7051;kuduserver2:7051"
                id = "Test Cluster"
                bossCount = "0"
                workerCount = "0"
                disableStatistics = "false"
            }
          }
        ]
      }
    }
  }
}

A best practice is to define your clusters before placing operator instances on the canvas, so that the list is already available in the Properties view and the operators can be configured quickly.

Properties View Settings

This section describes the properties you can set for the each of the Kudu operators, using the various tabs of the Properties view in StreamBase Studio.

Properties: General Tab

Name: Use this required field to specify or change the name of this instance of this component. The name must be unique within the current EventFlow module. The name can contain alphanumeric characters, underscores, and escaped special characters. Special characters can be escaped as described in Identifier Naming Rules. The first character must be alphabetic or an underscore.

Operator: A read-only field that shows the formal name of the operator.

Class name: Shows the fully qualified class name that implements the functionality of this operator. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start options: This field provides a link to the Cluster Aware tab, where you configure the conditions under which this operator starts.

Enable Error Output Port: Select this checkbox to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally, enter text to briefly describe the purpose and function of the component. In the EventFlow Editor canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Properties: Operator Properties Tab

This section describes the properties on the Operator Properties tab in the Properties view for the Kudu operators. Enter all text fields as string literals, not as expressions.

Common Properties

All five Kudu operators contain a common set of properties:

Kudu Configuration

The Edit button is a shortcut to the StreamBase Configuration File Editor, used for adapter configurationor converting an existing application's adapter-configurations.xml file to HOCON format.

Kudu Master Definition

Specifies the name of the Kudu cluster to which to connect. The combo box contains a list of available clusters from which to choose, as defined in the adapter-configurations (see Configuration). This setting is required.

Table Name

Specifies the name of the database table on which to perform operations.

Log Level

Use this to set the operator to produce more or less verbose console output, independent of the STREAMBASE_LOG_LEVEL global setting. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.

Mapping Tab

The Mapping tab, present on all adapters, allows for mapping StreamBase field names to Kudu column names when the names are not identical.

All four Kudu operators that perform write operations (Insert, Update, Upsert, Delete) contain a common set of properties:

Control Port

Specifies whether to add a control port to the operator.

Table Name

Specifies the name of the database table on which to perform operations.

Control Port

Specifies whether to add an input port to the operator to send commands.

Log Level

Use this to set the operator to produce more or less verbose console output, independent of the STREAMBASE_LOG_LEVEL global setting. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.

Query Options Tab

Configure the following parameters from the Query Options tab when performing the write operations:

Flush Mode

Whether to automatically or manually flush changes to the server {AUTO_FLUSH_SYNC (default) | AUTO_FLUSH BACKGROUND | MANUAL_FLUSH}.

Flush Interval

Number of milliseconds between automatic flushes. This setting is ignored when an automatic Flush Mode is selected above.

External Consistency Mode

The consistency mode to use {CLIENT_PROPAGATED (default) | COMMIT_WAIT}.

Timeout

The operation timeout in milliseconds.

Synchronous

Whether to perform operations synchronously or asynchronously.

Ignore All Duplicate Rows

Ignore errors where all rows returned by a tablet server are of type AlreadyPresent.

Mutation Buffer Space

The number of operations that can be buffered.

Mutation Buffer Low Watermark

The low watermark for this session. The value is expected to be between 0.0 and 1.0 and represents the percentage of the Mutation Buffer Space (specified above) above which "Please Retry" exceptions will start to get randomly sent, with a greater likelihood as the buffer gets closer to full.

Pushback Delay

The number of milliseconds to wait between retry attempts after a Please Retry exception.

Operator-Specific Properties

In addition to the common properties listed above, some properties are operator-specific:

Row Scanner Operator Properties
Query Tab

The Query property tab contains parameters than can be changed when performing the row scan operations:

Row Count Limit

The maximum number of row to return.

Batch Size

The return batch size in bytes.

Read Mode

The read mode to use. Possible values are READ_LATEST or READ_AT_SNAPSHOT. If the latter is used, a timestamp field named snapshotTimestamp must be specified on the input stream.

Scan Request Timeout

The request timeout in milliseconds.

Prefetching

Whether to prefetch results.

Cache Blocks

Whether to cache blocks.

Synchronous

Whether to use synchronous or asynchronous operation.

Replica Selection

Required consistency level. Possible values are LEADER_ONLY or CLOSEST_REPLICA

Schemas Tab

Defines the schema of the result rows.

Cluster Aware Tab

Use the settings in this tab to enable this operator or adapter for runtime start and stop conditions in a multi-node cluster. During initial development of the fragment that contains this operator or adapter, and for maximum compatibility with releases before 10.5.0, leave the Cluster start policy control in its default setting, Start with module.

Cluster awareness is an advanced topic that requires an understanding of StreamBase Runtime architecture features, including clusters, quorums, availability zones, and partitions. See Cluster Awareness Tab Settings on the Using Cluster Awareness page for instructions on configuring this tab.

Properties: Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Input Ports

By default each operator contains only one input port, which is used to receive commands describing an operation to be performed on the Kudu cluster. In all cases, any field found in the input schema that does not match what is described below is ignored and passed through directly in the result output tuples.

In addition, you can configure the Insert, Update, Upsert and Delete operators to have one more input port to receive Connect, Disconnnect and Flush commands (in MANUAL_FLUSH flush mode).

Insert, Update, Upsert, Delete Operators

All four of these operators contain one default input port, which is expected to have only field: a tuple field named row, which contains the fields to use when running the query. Any other fields present in the schema are ignored and passed through verbatim to the results output port. This can be used to correlate commands with their results, which can be especially useful in asynchronous operation.

Query Input Port

Field Name Field Type Description
row tuple REQUIRED. Contains the fields and values to insert into the query.

Command Input Port (optional)

Field Name Field Type Description
command string REQUIRED. One of:
  • Connect ― Connect to Kudu, start a session and open the table.

  • Disconnect ― Close the session.

  • Flush ― Causes the operator to flush pending operations to the server. Only valid when using MANUAL_FLUSH.

Row Scanner Operator

All fields on the sole input port for this operator are optional. An empty tuple sent to this port causes the operator to fetch all rows from the table.

Refer to the following to refine the results by specifying certain field in the input tuple:

Field Name Field Type Description
lowerBound tuple If specified, the values in this tuple are used to set the (inclusive) lower range of keys to match.
upperBound tuple If specified, the values in this tuple are used to set the (exclusive) upper range of keys to match.
snapshotTimestamp timestamp If specified, sets the timestamp the scan must be executed at. Requires that the ReadMode be set to READ_AT_SNAPSHOT.
predicates tuple If specified, the values in this tuple describe one or more predicate to filter the results against.

The fields in this tuple are expected to match Kudu column field names, with a type of tuple. Each of those tuples are expected to have fields with names that are one or more of lt, le, eq, ge, gt or in. The field types must match the column's type.

For example, to specify a query that will match rows that include an account column (of type LONG) with a value between 100 and 200 exclusively, the tuple would include the following values:

  • predicates (tuple)

    • account (tuple)

      • gt (long) set to the value 100

      • lt (long) set to 200

All other fields found in the input schema are transparently passed through to the Results port.

Output Ports

Every operator contains two fixed output ports. The first is used to deliver the results of running operator commands and the second emits status tuples on interesting events, such as errors and connection events.

Results Port

Insert, Update, Upsert and Delete operators

The Results Port for these operators consists of one tuple field named response, which holds information about the query.

Any fields found on the input tuple are also passed transparently on output tuples.

Field Name Field Type Description
response tuple Reports on the query that was just executed. This tuple contains four fields:
  • elapseMillis (long): The time it took to run the query.

  • writeTimestamp (timestamp): The time the operation was executed.

  • tsUUID (string): Identifier of the tablet server that sent the response.

  • message (string): Only set if an error occurred during the execution of the query. The string will describe the error.

Row Scanner operator

The results port contains one tuple field called row that contains the fields specified on the operator's Schemas tab, along with any fields found on the input port. One tuple is emitted for every row found when running the query.

Status Port

The Status port emits tuples when errors or interesting events occur.

Field Name Field Type Description
message string Status or error message.
input tuple The input tuple related to this error, if applicable (otherwise, null).

Type Mappings

When exchanging tuples between StreamBase and Kudu, the fields and field types of these tuples are expected to match in the following ways:

  • The fields have the same names, or their mappings are specified in the Mapping tab of the operator properties.

  • Data types match or can be readily converted.

The table below lists all the translation types that the operator supports.

StreamBase Type Kudu Type(s)
boolean BOOL
string STRING
int INT8, INT16, INT32

When mapping a StreamBase int to a Kudu INT8 or INT16, some precision may be lost.

long INT64
double DOUBLE, FLOAT

When mapping a StreamBase double to a Kudu FLOAT, some precision may be lost.

timestamp UNIXTIME_MICROS
tuple Unsupported.
blob BINARY
list

Unsupported.

function

Unsupported.

capture Unsupported.

Kudu Operator Sample

The StreamBase installation comes with a sample demonstrating the use of this operator. To load the sample in StreamBase Studio, select File>Import Samples and Community Content and search the Extending StreamBase section for an entry called Kudu Operator.