Contents
This topic describes how to use the Spotfire Streaming for Apache Kudu operators to interact with an Apache Kudu database, and explains how to configure the operators' Properties views.
The Apache Kudu connectivity solution is implemented as a suite of five global Java operators that allows a StreamBase application to connect to a Kudu database and access its data.
Apache Kudu is a distributed database management system designed to provide a combination of fast inserts/updates and efficient columnar scans.
In addition to four write operators dedicated to insert, delete, update and upsert operations, a Scanner operator allows read operations. The operation of all five operators is described in this document.
The Kudu operators are members of the Java Operator group in the Palette view in StreamBase Studio. Select the operators from the Insert an Operator or Adapter dialog. Invoke the dialog with one of the following methods:
-
Drag the Adapters, Java Operators token from the Operators and Adapters drawer of the Palette view to the canvas.
-
Click in the canvas where you want to place the operator, and invoke the keyboard shortcut
O V
-
From the top-level menu, invoke
> > .
From the Insert an Operator or Adapter dialog, select one of the following Kudu-related operators and double-click or press :
-
Apache Kudu Delete, which removes a row from the cluster.
-
Apache Kudu Insert, which adds a row to the cluster.
-
Apache Kudu Row Scanner, which retrieves rows from the cluster.
-
Apache Kudu Update, which modifies an existing row in the cluster.
-
Apache Kudu Upsert, which modifies an existing row in the cluster, or adds it if it did not exist.
In order to run, the operators assume the following to be correctly configured:
-
At runtime, the operators expect the configured Kudu database on your network to be running and ready to accept connections at the configured URLs. Configuration of your Kudu operators is discussed in the next section.
The different Kudu operators share a connection to the same database, provided they
are configured to do so. Each operator lists the clusters available in the a combo
box (see Properties: Operator Properties Tab).
The list's values are specified in a dedicated section of the application's
Kudu.conf
file. Here is an example of such a section,
containing all supported settings (long lines wrap to the next, for clarity):
name = "Kudu.conf" type = "com.tibco.ep.streambase.configuration.adapter" version = "1.0.0" configuration = { // An adapter group type defines a collection of EventFlow adapter configurations, indexed by adapter type. AdapterGroup = { // A collection of EventFlow adapter configurations, indexed by adapter type. This object // is required and must contain at least one configuration. adapters = { // The root section for an EventFlow adapter configuration. kudu = { // Section list. This array is optional and has no default value. sections = [ // A configuration for an EventFlow adapter named section. { // Section name. The value does not have to be unique; that is, you can have multiple sections with // the same name in the same array of sections. This property is required. name = "master-definition" // Section for setting adapter properties. All values must be strings. This object is optional // and has no default value. settings = { hosts = "kuduserver1:7051;kuduserver2:7051" id = "Test Cluster" bossCount = "0" workerCount = "0" disableStatistics = "false" } } ] } } } }
A best practice is to define your clusters before placing operator instances on the canvas, so that the list is already available in the Properties view and the operators can be configured quickly.
This section describes the properties you can set for the each of the Kudu operators, using the various tabs of the Properties view in StreamBase Studio.
Name: Use this required field to specify or change the name of this instance of this component. The name must be unique within the current EventFlow module. The name can contain alphanumeric characters, underscores, and escaped special characters. Special characters can be escaped as described in Identifier Naming Rules. The first character must be alphabetic or an underscore.
Operator: A read-only field that shows the formal name of the operator.
Class name: Shows the fully qualified class name that implements the functionality of this operator. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.
Start options: This field provides a link to the Cluster Aware tab, where you configure the conditions under which this operator starts.
Enable Error Output Port: Select this checkbox to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.
Description: Optionally, enter text to briefly describe the purpose and function of the component. In the EventFlow Editor canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.
This section describes the properties on the Operator Properties tab in the Properties view for the Kudu operators. Enter all text fields as string literals, not as expressions.
All five Kudu operators contain a common set of properties:
- Kudu Configuration
-
The Editor, used for adapter configurationor converting an existing application's
button is a shortcut to the StreamBase Configuration Fileadapter-configurations.xml
file to HOCON format. - Kudu Master Definition
-
Specifies the name of the Kudu cluster to which to connect. The combo box contains a list of available clusters from which to choose, as defined in the
adapter-configurations
(see Configuration). This setting is required. - Table Name
-
Specifies the name of the database table on which to perform operations.
- Log Level
-
Use this to set the operator to produce more or less verbose console output, independent of the
STREAMBASE_LOG_LEVEL
global setting. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE. - Mapping Tab
-
The Mapping tab, present on all adapters, allows for mapping StreamBase field names to Kudu column names when the names are not identical.
All four Kudu operators that perform write operations (Insert, Update, Upsert, Delete) contain a common set of properties:
- Control Port
-
Specifies whether to add a control port to the operator.
- Table Name
-
Specifies the name of the database table on which to perform operations.
- Control Port
-
Specifies whether to add an input port to the operator to send commands.
- Log Level
-
Use this to set the operator to produce more or less verbose console output, independent of the
STREAMBASE_LOG_LEVEL
global setting. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE. - Query Options Tab
-
Configure the following parameters from the Query Options tab when performing the write operations:
- Flush Mode
-
Whether to automatically or manually flush changes to the server {AUTO_FLUSH_SYNC (default) | AUTO_FLUSH BACKGROUND | MANUAL_FLUSH}.
- Flush Interval
-
Number of milliseconds between automatic flushes. This setting is ignored when an automatic Flush Mode is selected above.
- External Consistency Mode
-
The consistency mode to use {CLIENT_PROPAGATED (default) | COMMIT_WAIT}.
- Timeout
-
The operation timeout in milliseconds.
- Synchronous
-
Whether to perform operations synchronously or asynchronously.
- Ignore All Duplicate Rows
-
Ignore errors where all rows returned by a tablet server are of type
AlreadyPresent
. - Mutation Buffer Space
-
The number of operations that can be buffered.
- Mutation Buffer Low Watermark
-
The low watermark for this session. The value is expected to be between 0.0 and 1.0 and represents the percentage of the Mutation Buffer Space (specified above) above which "Please Retry" exceptions will start to get randomly sent, with a greater likelihood as the buffer gets closer to full.
- Pushback Delay
-
The number of milliseconds to wait between retry attempts after a
Please Retry
exception.
In addition to the common properties listed above, some properties are operator-specific:
- Query Tab
-
The Query property tab contains parameters than can be changed when performing the row scan operations:
- Row Count Limit
-
The maximum number of row to return.
- Batch Size
-
The return batch size in bytes.
- Read Mode
-
The read mode to use. Possible values are READ_LATEST or READ_AT_SNAPSHOT. If the latter is used, a timestamp field named snapshotTimestamp must be specified on the input stream.
- Scan Request Timeout
-
The request timeout in milliseconds.
- Prefetching
-
Whether to prefetch results.
- Cache Blocks
-
Whether to cache blocks.
- Synchronous
-
Whether to use synchronous or asynchronous operation.
- Replica Selection
-
Required consistency level. Possible values are LEADER_ONLY or CLOSEST_REPLICA
- Schemas Tab
-
Defines the schema of the result rows.
Use the settings in this tab to enable this operator or adapter for runtime start and stop conditions in a multi-node cluster. During initial development of the fragment that contains this operator or adapter, and for maximum compatibility with releases before 10.5.0, leave the Cluster start policy control in its default setting, Start with module.
Cluster awareness is an advanced topic that requires an understanding of StreamBase Runtime architecture features, including clusters, quorums, availability zones, and partitions. See Cluster Awareness Tab Settings on the Using Cluster Awareness page for instructions on configuring this tab.
Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.
Caution
Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.
By default each operator contains only one input port, which is used to receive commands describing an operation to be performed on the Kudu cluster. In all cases, any field found in the input schema that does not match what is described below is ignored and passed through directly in the result output tuples.
In addition, you can configure the Insert, Update, Upsert and Delete operators to
have one more input port to receive Connect
,
Disconnnect
and Flush
commands (in MANUAL_FLUSH flush mode).
All four of these operators contain one default input port, which is expected to
have only field: a tuple field named row
, which
contains the fields to use when running the query. Any other fields present in the
schema are ignored and passed through verbatim to the results output port. This can
be used to correlate commands with their results, which can be especially useful in
asynchronous operation.
Field Name | Field Type | Description |
---|---|---|
row | tuple | REQUIRED. Contains the fields and values to insert into the query. |
All fields on the sole input port for this operator are optional. An empty tuple sent to this port causes the operator to fetch all rows from the table.
Refer to the following to refine the results by specifying certain field in the input tuple:
Field Name | Field Type | Description |
---|---|---|
lowerBound | tuple | If specified, the values in this tuple are used to set the (inclusive) lower range of keys to match. |
upperBound | tuple | If specified, the values in this tuple are used to set the (exclusive) upper range of keys to match. |
snapshotTimestamp | timestamp | If specified, sets the timestamp the scan must be executed at. Requires that the ReadMode be set to READ_AT_SNAPSHOT. |
predicates | tuple |
If specified, the values in this tuple describe one or more predicate to
filter the results against.
The fields in this tuple are expected to match Kudu column field names,
with a type of tuple. Each of those tuples are expected to have fields
with names that are one or more of
For example, to specify a query that will match rows that include an
|
All other fields found in the input schema are transparently passed through to the Results port.
Every operator contains two fixed output ports. The first is used to deliver the results of running operator commands and the second emits status tuples on interesting events, such as errors and connection events.
The Results Port for these operators consists of one tuple field named response, which holds information about the query.
Any fields found on the input tuple are also passed transparently on output tuples.
Field Name | Field Type | Description |
---|---|---|
response | tuple |
Reports on the query that was just executed. This tuple contains four
fields:
|
When exchanging tuples between StreamBase and Kudu, the fields and field types of these tuples are expected to match in the following ways:
-
The fields have the same names, or their mappings are specified in the Mapping tab of the operator properties.
-
Data types match or can be readily converted.
The table below lists all the translation types that the operator supports.
StreamBase Type | Kudu Type(s) |
---|---|
boolean | BOOL |
string | STRING |
int |
INT8, INT16, INT32
When mapping a StreamBase int to a Kudu INT8 or INT16, some precision may be lost. |
long | INT64 |
double |
DOUBLE, FLOAT
When mapping a StreamBase double to a Kudu FLOAT, some precision may be lost. |
timestamp | UNIXTIME_MICROS |
tuple | Unsupported. |
blob | BINARY |
list |
Unsupported. |
function |
Unsupported. |
capture | Unsupported. |
The StreamBase installation comes with a sample demonstrating the use of this operator. To load the sample in StreamBase Studio, select Extending StreamBase section for an entry called Kudu Operator.
> and search the