Using the Apache Cassandra Operators

Introduction

This topic describes how to use the Spotfire Streaming for Apache Cassandra operators to interact with a Apache Cassandra database cluster, and explains how to configure the operators' Properties views.

The Apache Cassandra connectivity solution is implemented as a suite of six global Java operators that allows a StreamBase application to connect to a Cassandra database cluster and access its data.

Apache Cassandra is a distributed database management system designed to handle large amounts of data across many commodity servers in a high availability, clustered environment.

In addition to the four operators dedicated to insert, delete, update and select operations, a more generic Query operator allows any Cassandra Query Language (CQL) query to be executed and a Control operator directly controls connection and disconnection operations on the cluster. The operation of all six operators is described in this document.

Placing Cassandra Operators on the Canvas

The Cassandra operators are members of the Java Operator group in the Palette view in StreamBase Studio. Select the operators from the Insert an Operator or Adapter dialog. Invoke the dialog with one of the following methods:

  • Drag the Adapters, Java Operators token from the Operators and Adapters drawer of the Palette view to the canvas.

  • Click in the canvas where you want to place the operator, and invoke the keyboard shortcut O V

  • From the top-level menu, invoke Insert>Operator>Java.

From the Insert an Operator or Adapter dialog, select one of the following Cassandra-related operators and double-click or press OK:

  • Apache Cassandra Insert, which adds a row to the cluster.

  • Apache Cassandra Delete, which removes a row from the cluster.

  • Apache Cassandra Update, which modifies an existing row in the cluster.

  • Apache Cassandra Select, which retrieves rows from the cluster.

  • Apache Cassandra Query, which can be used to send arbitrary queries to the cluster.

  • Apache Cassandra Control, which can be used to connect to the cluster, disconnect from it or obtain the current connection state.

Prerequisites

In order to run, the operators assume the following to be properly set up:

  • The machine running your StreamBase application must have a copy of Apache Cassandra v3.1 or later installed.

  • At runtime, the operators expect the configured Cassandra Cluster to be running and ready to accept connections at the configured URLs. Configuration of your Cassandra operators is discussed in the next section.

Configuration

The different Cassandra operators share a connection to the same cluster, provided they are configured to do so. Each operator lists the clusters available in the a combo box (see Properties: Operator Properties Tab). The list's values are specified in a dedicated section of the application's Cassandra.conf file. Here is an example of such a section, containing all supported settings (long lines wrap to the next, for clarity):

name = "Cassandra.conf"
type = "com.tibco.ep.streambase.configuration.adapter"
version = "1.0.0"
configuration = {

    // An adapter group type defines a collection of EventFlow adapter configurations, indexed by adapter type.
    AdapterGroup = {

        // A collection of EventFlow adapter configurations, indexed by adapter type. This key is required and must contain at
        // least one configuration.
        adapters = {

            // The root section for an EventFlow adapter configuration.
            cassandra = {

                // Section list. This key is optional and has no default value.
                sections = [ 

                    // A configuration for an EventFlow adapter named section.
                    {

                        // Section name. The value does not have to be unique; that is, you can have multiple sections with the same name
                        // in the same array of sections. This key is required.
                        name = "cluster-definition"

                        // Section property bag. All values must be strings. This key is optional and has no default value.
                        settings = {

                            contact-points = "localhost"
                            id = "Test Cluster"
                            port = "9042"


                            //// One of 'RoundRobinPolicy', 'DCAwareRoundRobinPolicy', 'TokenAwarePolicy', 'LatencyAwarePolicy', 'WhitelistPolicy'
                            ////
                            //// DCAwarePolicy parameters:
                            ////     local-dc (string)
                            ////     used-hosts-per-remote-dc (string)
                            ////     allow-remote-dc-for-local-consistency-level (boolean)
                            ////
                            //// TokenAwarePolicy parameters:
                            ////     shuffle-replicas (boolean)
                            ////
                            //// WhitelistPolicy parameters:
                            ////     whitelist-hosts (string of the form "host:port{;host:port...}")
                            ////
                            //// LatencyAwarePolicy parameters:
                            ////     with-exclusion-threshold (double)
                            ////     with-minimum-measurements (int)
                            ////     with-scale (long)
                            ////     with-scale-units (one of 'milliseconds', 'seconds', 'minutes', 'hours', 'days')
                            ////     with-retry-period (long)
                            ////     with-retry-period-units (one of 'milliseconds', 'seconds', 'minutes', 'hours', 'days')
                            ////     with-update-rate (long)
                            ////     with-update-rate-units (one of 'milliseconds', 'seconds', 'minutes', 'hours', 'days')

                            //load-balancing-policy = "TokenAwarePolicy"
                            //shuffle-replicas = "true"

                            //// QueryOptions settings
                            ////    
                            //// One of ALL, ANY, EACH_QUORUM, LOCAL_ONE, LOCAL_QUORUM, LOCAL_SERIAL, ONE, QUORUM, SERIAL, THREE, TWO

                            //consistency-level" = "ONE"
                            //serial-consistency-level" = "ONE"

                            //default-idempotence" = "false"
                            //fetch-size" = "5000"
                            //max-pending-refresh-node-list-requests" = "20"
                            //max-pending-refresh-node-requests" = "20"
                            //max-pending-refresh-schema-requests" = "20"
                            //prepare-on-all-hosts" = "true"
                            //refresh-node-interval-millis" = "1000"
                            //refresh-node-list-interval-millis" = "1000"
                            //refresh-schema-interval-millis" = "1000"
                            //reprepare-on-up" = "true"

                            //// Credentials

                            //username" = "myusername"
                            //password" = "mypassword"

                            //// SSL. Passwords may be enciphered.

                            //use-ssl = "false"
                            //ssl-key-store = ""
                            //ssl-key-store-pwd = ""
                            //ssl-trust-store = ""
                            //ssl-trust-store-pwd = ""
                        }
                    }
                ]
            }
        }
    }
}

A best practice is to define your clusters before placing operator instances on the canvas, so that the lists are already available in the Properties view and the operators can be configured right away.

Properties View Settings

This section describes the properties you can set for the each of the six Cassandra operators, using the various tabs of the Properties view in StreamBase Studio.

Properties: General Tab

Name: Use this required field to specify or change the name of this instance of this component. The name must be unique within the current EventFlow module. The name can contain alphanumeric characters, underscores, and escaped special characters. Special characters can be escaped as described in Identifier Naming Rules. The first character must be alphabetic or an underscore.

Operator: A read-only field that shows the formal name of the operator.

Class name: Shows the fully qualified class name that implements the functionality of this operator. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start options: This field provides a link to the Cluster Aware tab, where you configure the conditions under which this operator starts.

Enable Error Output Port: Select this checkbox to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally, enter text to briefly describe the purpose and function of the component. In the EventFlow Editor canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Properties: Operator Properties Tab

This section describes the properties on the Operator Properties tab in the Properties view for the Cassandra operators. Enter all text fields as string literals, not as expressions.

Common Properties

All Cassandra operators have a common set of properties:

Cassandra Configuration

The Edit button is a shortcut to the StreamBase Configuration File Editor, used for adapter configuration or converting an existing application's adapter-configurations.xml file to HOCON format.

Cluster Definition

Specifies the name of the Cassandra Cluster to which to connect. The drop-down list contains a list of available clusters from which to choose, as defined in the HOCON configuration file (see Configuration). This setting is required.

Connect On Startup

Specifies whether to automatically attempt a connection to the cluster when the application start.

Reconnect Interval (in ms)

Specifies the number of milliseconds to wait between reconnection attempts.

Log Level

Use this to set the operator to produce more or less verbose console output, independent of the STREAMBASE_LOG_LEVEL global setting. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.

Operator-Specific Properties

In addition to the common properties listed above, some operators also have some properties of their own:

Select Operator Properties
Keyspace

Specifies the keyspace on which to perform this query.

Table Name

Specifies the table on which to perform this query.

Order By

If set, this adds an ORDER BY clause to the query with the given value.

Order

Either ASC or DESC, which refers to the ordering direction of the ORDER BY clause.

Limit

If non-zero, this adds a LIMIT clause to the query.

Allow Filtering

If set, this adds ALLOW FILTERING to the query.

Enable Status Port

Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.

Update Operator Properties
Keyspace

Specifies the keyspace on which to perform this query.

Table Name

Specifies the table on which to perform this query.

Enable Status Port

Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.

Insert Operator Properties
Keyspace

Specifies the keyspace on which to perform this query.

Table Name

Specifies the table on which to perform this query.

If Not Exists

If set, adds an IF NOT EXISTS clause to the query.

Enable Status Port

Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.

Delete Operator Properties
Keyspace

Specifies the keyspace on which to perform this query.

Table Name

Specifies the table on which to perform this query.

If Exists

If set, adds an IF EXISTS clause to the query.

Enable Status Port

Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.

Query Operator Properties
Enable Status Port

Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.

Control Operator Properties

This operator has no configurable properties other than those listed in the common section above.

Properties: Edit Schema Tab

For general instructions on using the Edit Schema tab, see the Properties: Edit Schema Tab section of the Defining Input Streams page.

Each operator has exactly one input and one output port for emitting results, plus an optional Status port.

Note

The only exception to this is the Control operator, which has no results output port and hence no Edit Schema property tab.

Each operator's results output port has two fields: one named query containing the original query that triggered the current output, and one tuple field named row which contains the one row in the result set obtained when running the query. The schema for this row field is determined by the schema specified on the Edit Schema tab. The fields in this schema are matched with column names in the rows from the result set and the values will be mapped accordingly. Any StreamBase field in the schema not also present in the row is ignored; similarly any column in the row that has no corresponding StreamBase field is ignored. Each StreamBase field that does have a corresponding Cassandra column must be of a data type that is compatible with the Cassandra column's data type (for example, Cassandra columns of type text should have a corresponding StreamBase field of type string). The list of compatible types between StreamBase and Cassandra is specified in Type Mappings.

Cluster Aware Tab

Use the settings in this tab to enable this operator or adapter for runtime start and stop conditions in a multi-node cluster. During initial development of the fragment that contains this operator or adapter, and for maximum compatibility with releases before 10.5.0, leave the Cluster start policy control in its default setting, Start with module.

Cluster awareness is an advanced topic that requires an understanding of StreamBase Runtime architecture features, including clusters, quorums, availability zones, and partitions. See Cluster Awareness Tab Settings on the Using Cluster Awareness page for instructions on configuring this tab.

Properties: Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Input Ports

Each operator has only one input port, used to receive commands describing an operation to be performed on the Cassandra cluster (such as SELECT, INSERT, UPDATE, and so on). The schema for each operator is different because different operations require different parameters.

In all cases however, any fields found in the input schema that do not match those described below are ignored and passed through directly in the input field of result output tuples.

Query Operator

Field Name Field Type Description
query string REQUIRED. Describes the full Cassandra query to execute. For example, CREATE KEYSPACE myks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}

Select Operator

Each field in the input schema for the Select operator is used to fill out a query of the form:

SELECT <field-list-1> FROM <keyspace>.<table> WHERE <field-list-2> [ORDER BY <orderby-value> ASC|DESC] [LIMIT <limitval-value>] [ALLOW FILTERING]

In the example above, field-list-1 is a comma-separated list of the fields specified in the Edit Schema tab of the operator, and orderby, ASC|DESC, limitval and ALLOW FILTERING all specified according to their values in the operator's Properties tab. Finally, field-list-2 is filled using the contents of a field named WHERE in the input tuple.

If a field of the WHERE tuple is of StreamBase type LIST instead of a simple type, its translation in the WHERE clause will be "column IN (value[,value...])" instead of "column = value".

Insert Operator

The fields of the input tuple will be used to fill out an INSERT query of the form:

INSERT INTO <keyspace>.<table> (<field-list>) VALUE (<values-list>) [USING [TTL ttl-value] [AND TIMESTAMP timestamp-value]] [IF NOT EXISTS]

Field Name Field Type Description
values tuple REQUIRED. Contains the fields and values to insert into the INSERT query. Each field's name is used to fill out the field-list in the query shown above, and each value is used to fill out values-list.
ttl int If set, a USING TTL ttl-value clause is added to the query.
values timestamp If set, a USING TIMESTAMP timestamp-value clause is added to the query.

Finally, if the operator's If Not Exists property is set, an IF NOT EXISTS clause is added to the query.

Delete Operator

The fields of the input tuple are used to fill out a DELETE query of the form:

DELETE FROM <keyspace>.<table> WHERE column=value [, column=value] [IF [condition-list | EXISTS]]

Field Name Field Type Description
where tuple REQUIRED. Contains the fields and values to insert into the DELETE query. Each field's name is used to fill out the column names in the query shown above, and each value is used to fill out the values.
if tuple If non-null, an IF query is added to the query and this tuple's fields is used to fill out the condition-list as shown above.

Finally, if the operator's If Exists property is set, an IF EXISTS clause is added to the query. This setting is ignored if the if field is non-null in the input tuple.

Update Operator

The fields of the input tuple are used to fill out an UPDATE query of the form:

UPDATE <keyspace>.<table> SET column=value [, column=value] WHERE column=value [, column=value] [IF condition-list]

Field Name Field Type Description
set tuple REQUIRED. This tuple field contains fields that will be used in the SET clause for this UPDATE statement.
where tuple REQUIRED. This tuple field contains fields that will be used in the WHERE clause for this UPDATE statement.
if tuple If non-null, an IF query is added to the query and this tuple's fields will be used to fill out the condition-list as shown above.

Control Operator

Field Name Field Type Description
command string REQUIRED. One of: CONNECT, DISCONNECT, GETCONNECTIONSTATUS.

Output Ports

Every operator has only one fixed output port, used to deliver the results of running operator commands, plus one optional Status output port if the operator's Enable Status Port option is checked.

Results Port

The Results Port consists of one field named input that mirrors the operator's input schema, plus one tuple field named row whose schema matches that specified in the Edit Schema tab of the adapter.

If the ResultSet returned by running a query contains one or more rows, those are emitted separately in successive tuples on the results output port. The query field contains the initial input that triggered this query, and the row field contains one row in the result set. In any case, one additional tuple are emitted with its row field set to null to indicate the results have been fully processed.

Status Port

The Status Port, when enabled, is the same for each operator.

Field Name Field Type Description
status string Describes this event. Possible values include: Connected, Disconnected, Error.
info list<string> This is an all-purpose field used to convey additional information describing this event. For example, in the case of an Error event this will contain the text describing the error.
context tuple If this event was generated as a result of a command being sent to the operator, this field will contain the original command tuple.

Type Mappings

When exchanging tuples between StreamBase and Cassandra, the fields and field types of these tuples are expected to match in the following ways:

  • The fields have the same names, and

  • Data types match or can be readily converted.

The table below lists all the type translations supported by the operator.

StreamBase Type Cassandra Type(s)
boolean boolean
string text, ascii, inet, uuid, varchar
int cint, smallint, tinyint

When mapping a StreamBase int to a Cassandra smallint or tinyint, some precision may be lost.

long bigint, cint, smallint, tinyint, time, counter

When mapping a StreamBase long to a Cassandra cint, smallint or tinyint, some precision may be lost.

double double, float

When mapping a StreamBase double to a Cassandra float, some precision may be lost.

timestamp date, timestamp
tuple tuple
blob blob
list list, set, map

When mapping a StreamBase list to a Cassandra set, any duplicates in the list will be removed (and the last value of duplicate entries will be used).

When mapping a StreamBase list to a Cassandra map, the list elements are expected to be tuples with the first field representing an entry's key the second field representing its value.

function text, blob

The field will be mapped to a JSON string representing its value if the Cassandra field is of type text, or serialized into a byte array if the field is of type blob.

capture Not supported.

Cassandra Operator Sample

The StreamBase installation comes with a sample demonstrating the use of this operator. To load the sample in StreamBase Studio, select File>Import Samples and Community Content and look under the Extending StreamBase section for an entry called Cassandra Operator.