Using the Apache Cassandra Operators

Introduction

This topic describes how to use the TIBCO StreamBase® for Apache Cassandra operators to interact with a Apache Cassandra database cluster, and explains how to configure the operators' Properties views.

The Apache Cassandra connectivity solution is implemented as a suite of six global Java operators that allows a StreamBase application to connect to a Cassandra database cluster and access its data.

Apache Cassandra is a distributed database management system designed to handle large amounts of data across many commodity servers in a high availability, clustered environment.

In addition to the four operators dedicated to insert, delete, update and select operations, a more generic Query operator allows any Cassandra Query Language (CQL) query to be executed and a Control operator directly controls connection and disconnection operations on the cluster. The operation of all six operators is described in this document.

Placing Cassandra Operators on the Canvas

The Cassandra operators are members of the Java Operator group in the Palette view in StreamBase Studio. Select the operators from the Insert an Operator or Adapter dialog. Invoke the dialog with one of the following methods:

  • Drag the Adapters, Java Operators token from the Operators and Adapters drawer of the Palette view to the canvas.

  • Click in the canvas where you want to place the operator, and invoke the keyboard shortcut O V

  • From the top-level menu, invoke InsertOperatorJava.

From the Insert an Operator or Adapter dialog, select one of the following Cassandra-related operators and double-click or press OK:

  • Apache Cassandra Insert, which adds a row to the cluster.

  • Apache Cassandra Delete, which removes a row from the cluster.

  • Apache Cassandra Update, which modifies an existing row in the cluster.

  • Apache Cassandra Select, which retrieves rows from the cluster.

  • Apache Cassandra Query, which can be used to send arbitrary queries to the cluster.

  • Apache Cassandra Control, which can be used to connect to the cluster, disconnect from it or obtain the current connection state.

Prerequisites

In order to run, the operators assume the following to be properly set up:

  • The machine running your StreamBase application must have a copy of Apache Cassandra v3.1 or later installed.

  • Your application must be configured to locate and load the required Cassandra libraries. The procedure for doing this is different when running from StreamBase Studio then when running from the command line.

    • To run from StreamBase Studio, locate your project's top-level node in the Package Explorer view. Right-click the project node and select Build PathConfigure Build Path. On the Libraries tab of the Properties dialog, click the Add External JARs button. This brings up a file browser; navigate to the lib directory of your Cassandra installation, select all JAR files and click OK. Click OK to dismiss the Properties dialog.

    • To run from the command line, add the appropriate entries to your project's sbd.sbconf file. In the example below, long lines wrap to the next for clarity. At minimum, your configuration file must contain lines like those in bold below, edited to point to your ActiveSpaces installation:

      <?xml version="1.0" encoding="UTF-8"?>
      <streambase-configuration xmlns:xi="http://www.w3.org/2001/XInclude"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="http://www.streambase.com/schemas/sbconf/">
        <java-vm>
         <param name="jvm-args" value="
          -XX:+UseG1GC
          -XX:MaxGCPauseMillis=500
          "/>
          <!-- Edit the following lines to reflect your Apache Cassandra 
               installation directory. -->
          <jar file="C:\Program Files\DataStax-DDC\apache-cassandra\lib\
            apache-cassandra-3.7.0.jar"/>
          <jar file="C:\Program Files\DataStax-DDC\apache-cassandra\lib\
            cassandra-driver-core-3.0.1-shaded.jar"/>
          <jar file="C:\Program Files\DataStax-DDC\apache-cassandra\lib\
            guava-18.0.jar"/>
          <jar file="C:\Program Files\DataStax-DDC\apache-cassandra\lib\
            metrics-core-3.1.0.jar"/>
        </java-vm>
      </streambase-configuration>                 
  • At runtime, the operators expect the configured Cassandra Cluster to be running and ready to accept connections at the configured URLs. Configuration of your Cassandra operators is discussed in the next section.

Configuration

The different Cassandra operators share a connection to the same cluster, provided they are configured to do so. Each operator lists the clusters available in the a combo box (see Properties: Operator Properties Tab). The list's values are specified in a dedicated section of the application's sbd.sbconf file. Here is an example of such a section, containing all supported settings (long lines wrap to the next, for clarity):

<?xml version="1.0" encoding="UTF-8"?>

<streambase-configuration xmlns:xi="http://www.w3.org/2001/XInclude"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:noNamespaceSchemaLocation="http://www.streambase.com/schemas/sbconf/">

  <java-vm>
   <param name="jvm-args" value="
    -XX:+UseG1GC
    -XX:MaxGCPauseMillis=500
    "/>
    <!-- Edit the following lines to reflect your Apache Cassandra 
         installation directory. -->
    <jar file="C:\Program Files\DataStax-DDC\apache-cassandra\lib\
      apache-cassandra-3.7.0.jar"/>
    <jar file="C:\Program Files\DataStax-DDC\apache-cassandra\lib\
      cassandra-driver-core-3.0.1-shaded.jar"/>
    <jar file="C:\Program Files\DataStax-DDC\apache-cassandra\lib\
      guava-18.0.jar"/>
    <jar file="C:\Program Files\DataStax-DDC\apache-cassandra\lib\
      metrics-core-3.1.0.jar"/>
  </java-vm>

  <adapter-configurations>
    <adapter-configuration name="cassandra">
      <section name="cluster-definition">
        <setting name="id" val="Test Cluster"/>

          <!-- Edit these values to reflect your own Cassandra cluster -->
          <setting name="contact-points" val="qautility1"/>
          <setting name="port" val="9042"/>

                <!-- One of 'RoundRobinPolicy', 'DCAwareRoundRobinPolicy', 'TokenAwarePolicy', 'LatencyAwarePolicy', 'WhitelistPolicy'
                
                    DCAwarePolicy parameters:
                        local-dc (string)
                        used-hosts-per-remote-dc (string)
                        allow-remote-dc-for-local-consistency-level (boolean)
                       
                    TokenAwarePolicy parameters:
                        shuffle-replicas (boolean)
                       
                    WhitelistPolicy parameters:
                        whitelist-hosts (string of the form "host:port{;host:port...}")
                
                    LatencyAwarePolicy parameters:
                        with-exclusion-threshold (double)
                        with-minimum-measurements (int)
                        with-scale (long)
                        with-scale-units (one of 'milliseconds', 'seconds', 'minutes', 'hours', 'days')
                        with-retry-period (long)
                        with-retry-period-units (one of 'milliseconds', 'seconds', 'minutes', 'hours', 'days')
                        with-update-rate (long)
                        with-update-rate-units (one of 'milliseconds', 'seconds', 'minutes', 'hours', 'days')
                 -->
                <!--
                <setting name="load-balancing-policy" val="TokenAwarePolicy"/>
                <setting name="shuffle-replicas" val="true"/>
                -->

                <!-- One of 'RoundRobinPolicy', 'DCAwareRoundRobinPolicy', parameters as above -->
                <!--
                <setting name="load-balancing-policy" val="DCAwareRoundRobinPolicy"/>
                <setting name="local-dc" val="MyLocalDC"/>
                -->
                
          <!-- QueryOptions settings -->

          <!-- One of ALL, ANY, EACH_QUORUM, LOCAL_ONE, LOCAL_QUORUM, 
            LOCAL_SERIAL, ONE, QUORUM, SERIAL, THREE, TWO -->
          <!--
          <setting name="consistency-level" val="ONE"/>
          <setting name="serial-consistency-level" val="ONE"/>

          <setting name="default-idempotence" val="false"/>
          <setting name="fetch-size" val="5000"/>
          <setting name="max-pending-refresh-node-list-requests" val="20"/>
          <setting name="max-pending-refresh-node-requests" val="20"/>
          <setting name="max-pending-refresh-schema-requests" val="20"/>
          <setting name="prepare-on-all-hosts" val="true"/>
          <setting name="refresh-node-interval-millis" val="1000"/>
          <setting name="refresh-node-list-interval-millis" val="1000"/>
          <setting name="refresh-schema-interval-millis" val="1000"/>
          <setting name="reprepare-on-up" val="true"/>
          -->

          <!-- SSL. Passwords may be enciphered using sbcipher -->
          <!--
          <setting name="use-ssl" val="false"/>
          <setting name="ssl-key-store" val=""/>
          <setting name="ssl-key-store-pwd" val=""/>
          <setting name="ssl-trust-store" val=""/>
          <setting name="ssl-trust-store-pwd" val=""/>
          -->

      </section>

      <!-- Other cluster definitions go here -->

    </adapter-configuration>
  </adapter-configurations>
</streambase-configuration>                 

A best practice is to define your clusters before placing operator instances on the canvas, so that the lists are already available in the Properties view and the operators can be configured right away.

Properties View Settings

This section describes the properties you can set for the each of the six Cassandra operators, using the various tabs of the Properties view in StreamBase Studio.

Properties: General Tab

Name: Use this required field to specify or change the name of this instance of this component, which must be unique in the current EventFlow module. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Operator: A read-only field that shows the formal name of the operator.

Start with application: If this field is set to Yes (default) or to a module parameter that evaluates to true, this instance of this operator starts as part of the JVM engine that runs this EventFlow module. If this field is set to No or to a module parameter that evaluates to false, the operator instance is loaded with the engine, but does not start until you send an sbadmin resume command, or until you start the component with StreamBase Manager.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Properties: Operator Properties Tab

This section describes the properties on the Operator Properties tab in the Properties view for the Cassandra operators. Enter all text fields as string literals, not as expressions.

Common Properties

All Cassandra operators have a common set of properties:

Cluster Definition

Specifies the name of the Cassandra Cluster to which to connect. The combo box contains a list of available clusters from which to choose, as defined in the sbd.sbconf (see Configuration). This setting is required.

Connect On Startup

Specifies whether to automatically attempt a connection to the cluster when the application start.

Reconnect Interval (in ms)

Specifies the number of milliseconds to wait between reconnection attempts.

Log Level

Use this to set the operator to produce more or less verbose console output, independent of the STREAMBASE_LOG_LEVEL global setting. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE, and ALL.

Operator-Specific Properties

In addition to the common properties listed above, some operators also have some properties of their own:

Select Operator Properties
Keyspace

Specifies the keyspace on which to perform this query.

Table Name

Specifies the table on which to perform this query.

Order By

If set, this adds an ORDER BY clause to the query with the given value.

Order

Either ASC or DESC, which refers to the ordering direction of the ORDER BY clause.

Limit

If non-zero, this adds a LIMIT clause to the query.

Allow Filtering

If set, this adds ALLOW FILTERING to the query.

Enable Status Port

Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.

Update Operator Properties
Keyspace

Specifies the keyspace on which to perform this query.

Table Name

Specifies the table on which to perform this query.

Enable Status Port

Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.

Insert Operator Properties
Keyspace

Specifies the keyspace on which to perform this query.

Table Name

Specifies the table on which to perform this query.

If Not Exists

If set, adds an IF NOT EXISTS clause to the query.

Enable Status Port

Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.

Delete Operator Properties
Keyspace

Specifies the keyspace on which to perform this query.

Table Name

Specifies the table on which to perform this query.

If Exists

If set, adds an IF EXISTS clause to the query.

Enable Status Port

Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.

Query Operator Properties
Enable Status Port

Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.

Control Operator Properties

This operator has no configurable properties other than those listed in the common section above.

Properties: Edit Schema Tab

For general instructions on using the Edit Schema tab, see the Properties: Edit Schema Tab section of the Defining Input Streams page.

Each operator has exactly one input and one output port for emitting results, plus an optional Status port.

Note

The only exception to this is the Control operator, which has no results output port and hence no Edit Schema property tab.

Each operator's results output port has two fields: one named query containing the original query that triggered the current output, and one tuple field named row which contains the one row in the result set obtained when executing the query. The schema for this row field is determined by the schema specified on the Edit Schema tab. The fields in this schema are matched with column names in the rows from the result set and the values will be mapped accordingly. Any StreamBase field in the schema not also present in the row is ignored; similarly any column in the row that has no corresponding StreamBase field is ignored. Each StreamBase field that does have a corresponding Cassandra column must be of a data type that is compatible with the Cassandra column's data type (for example, Cassandra columns of type text should have a corresponding StreamBase field of type string). The list of compatible types between StreamBase and Cassandra is specified in Type Mappings.

Properties: Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Input Ports

Each operator has only one input port, used to receive commands describing an operation to be performed on the Cassandra cluster (such as SELECT, INSERT, UPDATE, and so on). The schema for each operator is different because different operations require different parameters.

In all cases however, any fields found in the input schema that do not match those described below are ignored and passed through directly in the input field of result output tuples.

Query Operator

Field Name Field Type Description
query string REQUIRED. Describes the full Cassandra query to execute. For example, CREATE KEYSPACE myks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}

Select Operator

Each field in the input schema for the Select operator is used to fill out a query of the form:

SELECT <field-list-1> FROM <keyspace>.<table> WHERE <field-list-2> [ORDER BY <orderby-value> ASC|DESC] [LIMIT <limitval-value>] [ALLOW FILTERING]

In the example above, field-list-1 is a comma-separated list of the fields specified in the Edit Schema tab of the operator, and orderby, ASC|DESC, limitval and ALLOW FILTERING all specified according to their values in the operator's Properties tab. Finally, field-list-2 is filled using the contents of a field named WHERE in the input tuple.

If a field of the WHERE tuple is of StreamBase type LIST instead of a simple type, its translation in the WHERE clause will be "column IN (value[,value...])" instead of "column = value".

Insert Operator

The fields of the input tuple will be used to fill out an INSERT query of the form:

INSERT INTO <keyspace>.<table> (<field-list>) VALUE (<values-list>) [USING [TTL ttl-value] [AND TIMESTAMP timestamp-value]] [IF NOT EXISTS]

Field Name Field Type Description
values tuple REQUIRED. Contains the fields and values to insert into the INSERT query. Each field's name is used to fill out the field-list in the query shown above, and each value is used to fill out values-list.
ttl int If set, a USING TTL ttl-value clause is added to the query.
values timestamp If set, a USING TIMESTAMP timestamp-value clause is added to the query.

Finally, if the operator's If Not Exists property is set, an IF NOT EXISTS clause is added to the query.

Delete Operator

The fields of the input tuple are used to fill out a DELETE query of the form:

DELETE FROM <keyspace>.<table> WHERE column=value [, column=value] [IF [condition-list | EXISTS]]

Field Name Field Type Description
where tuple REQUIRED. Contains the fields and values to insert into the DELETE query. Each field's name is used to fill out the column names in the query shown above, and each value is used to fill out the values.
if tuple If non-null, an IF query is added to the query and this tuple's fields is used to fill out the condition-list as shown above.

Finally, if the operator's If Exists property is set, an IF EXISTS clause is added to the query. This setting is ignored if the if field is non-null in the input tuple.

Update Operator

The fields of the input tuple are used to fill out an UPDATE query of the form:

UPDATE <keyspace>.<table> SET column=value [, column=value] WHERE column=value [, column=value] [IF condition-list]

Field Name Field Type Description
set tuple REQUIRED. This tuple field contains fields that will be used in the SET clause for this UPDATE statement.
where tuple REQUIRED. This tuple field contains fields that will be used in the WHERE clause for this UPDATE statement.
if tuple If non-null, an IF query is added to the query and this tuple's fields will be used to fill out the condition-list as shown above.

Control Operator

Field Name Field Type Description
command string REQUIRED. One of: CONNECT, DISCONNECT, GETCONNECTIONSTATUS.

Output Ports

Every operator has only one fixed output port, used to deliver the results of executing operator commands, plus one optional Status output port if the operator's Enable Status Port option is checked.

Results Port

The Results Port consists of one field named input that mirrors the operator's input schema, plus one tuple field named row whose schema matches that specified in the Edit Schema tab of the adapter.

If the ResultSet returned by executing a query contains one or more rows, those are emitted separately in successive tuples on the results output port. The query field contains the initial input that triggered this query, and the row field contains one row in the result set. In any case, one additional tuple are emitted with its row field set to null to indicate the results have been fully processed.

Status Port

The Status Port, when enabled, is the same for each operator.

Field Name Field Type Description
status string Describes this event. Possible values include: Connected, Disconnected, Error.
info list<string> This is an all-purpose field used to convey additional information describing this event. For example, in the case of an Error event this will contain the text describing the error.
context tuple If this event was generated as a result of a command being sent to the operator, this field will contain the original command tuple.

Type Mappings

When exchanging tuples between StreamBase and Cassandra, the fields and field types of these tuples are expected to match in the following ways:

  • The fields have the same names, and

  • Data types match or can be readily converted.

The table below lists all the type translations supported by the operator.

StreamBase Type Cassandra Type(s)
boolean boolean
string text, ascii, inet, uuid, varchar
int cint, smallint, tinyint

When mapping a StreamBase int to a Cassandra smallint or tinyint, some precision may be lost.

long bigint, cint, smallint, tinyint, time, counter

When mapping a StreamBase long to a Cassandra cint, smallint or tinyint, some precision may be lost.

double double, float

When mapping a StreamBase double to an ActiveSpaces float, some precision may be lost.

timestamp date, timestamp
tuple tuple
blob blob
list list, set, map

When mapping a StreamBase list to a Cassandra set, any duplicates in the list will be removed (and the last value of duplicate entries will be used).

When mapping a StreamBase list to a Cassandra map, the list elements are expected to be tuples with the first field representing an entry's key the second field representing its value.

function text, blob

The field will be mapped to a JSON string representing its value if the Cassandra field is of type text, or serialized into a byte array if the field is of type blob.

capture Not supported.

Cassandra Operator Sample

The StreamBase installation comes with a sample demonstrating the use of this operator. To load the sample in StreamBase Studio, select FileLoad StreamBase Sample and look under the Extending StreamBase section for an entry called Cassandra Operator.