Contents
This topic describes how to use the TIBCO StreamBase® for Apache Cassandra operators to interact with a Apache Cassandra database cluster, and explains how to configure the operators' Properties views.
The Apache Cassandra connectivity solution is implemented as a suite of six global Java operators that allows a StreamBase application to connect to a Cassandra database cluster and access its data.
Apache Cassandra is a distributed database management system designed to handle large amounts of data across many commodity servers in a high availability, clustered environment.
In addition to the four operators dedicated to insert, delete, update and select operations, a more generic Query operator allows any Cassandra Query Language (CQL) query to be executed and a Control operator directly controls connection and disconnection operations on the cluster. The operation of all six operators is described in this document.
The Cassandra operators are members of the Java Operator group in the Palette view in StreamBase Studio. Select the operators from the Insert an Operator or Adapter dialog. Invoke the dialog with one of the following methods:
-
Drag the Adapters, Java Operators token from the Operators and Adapters drawer of the Palette view to the canvas.
-
Click in the canvas where you want to place the operator, and invoke the keyboard shortcut
O V
-
From the top-level menu, invoke
→ → .
From the Insert an Operator or Adapter dialog, select one of the following Cassandra-related operators and double-click or press :
-
Apache Cassandra Insert, which adds a row to the cluster.
-
Apache Cassandra Delete, which removes a row from the cluster.
-
Apache Cassandra Update, which modifies an existing row in the cluster.
-
Apache Cassandra Select, which retrieves rows from the cluster.
-
Apache Cassandra Query, which can be used to send arbitrary queries to the cluster.
-
Apache Cassandra Control, which can be used to connect to the cluster, disconnect from it or obtain the current connection state.
In order to run, the operators assume the following to be properly set up:
-
The machine running your StreamBase application must have a copy of Apache Cassandra v3.1 or later installed.
-
Your application must be configured to locate and load the required Cassandra libraries. The procedure for doing this is different when running from StreamBase Studio then when running from the command line.
-
To run from StreamBase Studio, locate your project's top-level node in the Package Explorer view. Right-click the project node and select → . On the Libraries tab of the Properties dialog, click the button. This brings up a file browser; navigate to the
lib
directory of your Cassandra installation, select all JAR files and click . Click to dismiss the Properties dialog. -
To run from the command line, add the appropriate entries to your project's
sbd.sbconf
file. In the example below, long lines wrap to the next for clarity. At minimum, your configuration file must contain lines like those in bold below, edited to point to your ActiveSpaces installation:<?xml version="1.0" encoding="UTF-8"?> <streambase-configuration xmlns:xi="http://www.w3.org/2001/XInclude" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://www.streambase.com/schemas/sbconf/"> <java-vm> <param name="jvm-args" value=" -XX:+UseG1GC -XX:MaxGCPauseMillis=500 "/> <!-- Edit the following lines to reflect your Apache Cassandra installation directory. --> <jar file="C:\Program Files\DataStax-DDC\apache-cassandra\lib\ apache-cassandra-3.7.0.jar"/> <jar file="C:\Program Files\DataStax-DDC\apache-cassandra\lib\ cassandra-driver-core-3.0.1-shaded.jar"/> <jar file="C:\Program Files\DataStax-DDC\apache-cassandra\lib\ guava-18.0.jar"/> <jar file="C:\Program Files\DataStax-DDC\apache-cassandra\lib\ metrics-core-3.1.0.jar"/> </java-vm> </streambase-configuration>
-
-
At runtime, the operators expect the configured Cassandra Cluster to be running and ready to accept connections at the configured URLs. Configuration of your Cassandra operators is discussed in the next section.
The different Cassandra operators share a connection to the same cluster, provided they are configured to do so. Each operator
lists the clusters available in the a combo box (see Properties: Operator Properties Tab). The list's values are specified in a dedicated section of the application's sbd.sbconf
file. Here is an example of such a section, containing all supported settings (long lines wrap to the next, for clarity):
<?xml version="1.0" encoding="UTF-8"?>
<streambase-configuration xmlns:xi="http://www.w3.org/2001/XInclude"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="http://www.streambase.com/schemas/sbconf/">
<java-vm>
<param name="jvm-args" value="
-XX:+UseG1GC
-XX:MaxGCPauseMillis=500
"/>
<!-- Edit the following lines to reflect your Apache Cassandra
installation directory. -->
<jar file="C:\Program Files\DataStax-DDC\apache-cassandra\lib\
apache-cassandra-3.7.0.jar"/>
<jar file="C:\Program Files\DataStax-DDC\apache-cassandra\lib\
cassandra-driver-core-3.0.1-shaded.jar"/>
<jar file="C:\Program Files\DataStax-DDC\apache-cassandra\lib\
guava-18.0.jar"/>
<jar file="C:\Program Files\DataStax-DDC\apache-cassandra\lib\
metrics-core-3.1.0.jar"/>
</java-vm>
<adapter-configurations>
<adapter-configuration name="cassandra">
<section name="cluster-definition">
<setting name="id" val="Test Cluster"/>
<!-- Edit these values to reflect your own Cassandra cluster -->
<setting name="contact-points" val="qautility1"/>
<setting name="port" val="9042"/>
<!-- One of 'RoundRobinPolicy', 'DCAwareRoundRobinPolicy', 'TokenAwarePolicy', 'LatencyAwarePolicy', 'WhitelistPolicy'
DCAwarePolicy parameters:
local-dc (string)
used-hosts-per-remote-dc (string)
allow-remote-dc-for-local-consistency-level (boolean)
TokenAwarePolicy parameters:
shuffle-replicas (boolean)
WhitelistPolicy parameters:
whitelist-hosts (string of the form "host:port{;host:port...}")
LatencyAwarePolicy parameters:
with-exclusion-threshold (double)
with-minimum-measurements (int)
with-scale (long)
with-scale-units (one of 'milliseconds', 'seconds', 'minutes', 'hours', 'days')
with-retry-period (long)
with-retry-period-units (one of 'milliseconds', 'seconds', 'minutes', 'hours', 'days')
with-update-rate (long)
with-update-rate-units (one of 'milliseconds', 'seconds', 'minutes', 'hours', 'days')
-->
<!--
<setting name="load-balancing-policy" val="TokenAwarePolicy"/>
<setting name="shuffle-replicas" val="true"/>
-->
<!-- One of 'RoundRobinPolicy', 'DCAwareRoundRobinPolicy', parameters as above -->
<!--
<setting name="load-balancing-policy" val="DCAwareRoundRobinPolicy"/>
<setting name="local-dc" val="MyLocalDC"/>
-->
<!-- QueryOptions settings -->
<!-- One of ALL, ANY, EACH_QUORUM, LOCAL_ONE, LOCAL_QUORUM,
LOCAL_SERIAL, ONE, QUORUM, SERIAL, THREE, TWO -->
<!--
<setting name="consistency-level" val="ONE"/>
<setting name="serial-consistency-level" val="ONE"/>
<setting name="default-idempotence" val="false"/>
<setting name="fetch-size" val="5000"/>
<setting name="max-pending-refresh-node-list-requests" val="20"/>
<setting name="max-pending-refresh-node-requests" val="20"/>
<setting name="max-pending-refresh-schema-requests" val="20"/>
<setting name="prepare-on-all-hosts" val="true"/>
<setting name="refresh-node-interval-millis" val="1000"/>
<setting name="refresh-node-list-interval-millis" val="1000"/>
<setting name="refresh-schema-interval-millis" val="1000"/>
<setting name="reprepare-on-up" val="true"/>
-->
<!-- SSL. Passwords may be enciphered using sbcipher -->
<!--
<setting name="use-ssl" val="false"/>
<setting name="ssl-key-store" val=""/>
<setting name="ssl-key-store-pwd" val=""/>
<setting name="ssl-trust-store" val=""/>
<setting name="ssl-trust-store-pwd" val=""/>
-->
</section>
<!-- Other cluster definitions go here -->
</adapter-configuration>
</adapter-configurations>
</streambase-configuration>
A best practice is to define your clusters before placing operator instances on the canvas, so that the lists are already available in the Properties view and the operators can be configured right away.
This section describes the properties you can set for the each of the six Cassandra operators, using the various tabs of the Properties view in StreamBase Studio.
Name: Use this required field to specify or change the name of this instance of this component, which must be unique in the current EventFlow module. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.
Operator: A read-only field that shows the formal name of the operator.
Start with application: If this field is set to Yes (default) or to a module parameter that evaluates to true
, this instance of this operator starts as part of the JVM engine that runs this EventFlow module. If this field is set to
No or to a module parameter that evaluates to false
, the operator instance is loaded with the engine, but does not start until you send an sbadmin resume command, or until you start the component with StreamBase Manager.
Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.
Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.
This section describes the properties on the Operator Properties tab in the Properties view for the Cassandra operators. Enter all text fields as string literals, not as expressions.
All Cassandra operators have a common set of properties:
- Cluster Definition
-
Specifies the name of the Cassandra Cluster to which to connect. The combo box contains a list of available clusters from which to choose, as defined in the
sbd.sbconf
(see Configuration). This setting is required. - Connect On Startup
-
Specifies whether to automatically attempt a connection to the cluster when the application start.
- Reconnect Interval (in ms)
-
Specifies the number of milliseconds to wait between reconnection attempts.
- Log Level
-
Use this to set the operator to produce more or less verbose console output, independent of the
STREAMBASE_LOG_LEVEL
global setting. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE, and ALL.
In addition to the common properties listed above, some operators also have some properties of their own:
- Keyspace
-
Specifies the keyspace on which to perform this query.
- Table Name
-
Specifies the table on which to perform this query.
- Order By
-
If set, this adds an
ORDER BY
clause to the query with the given value. - Order
-
Either
ASC
orDESC
, which refers to the ordering direction of theORDER BY
clause. - Limit
-
If non-zero, this adds a
LIMIT
clause to the query. - Allow Filtering
-
If set, this adds
ALLOW FILTERING
to the query. - Enable Status Port
-
Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.
- Keyspace
-
Specifies the keyspace on which to perform this query.
- Table Name
-
Specifies the table on which to perform this query.
- Enable Status Port
-
Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.
- Keyspace
-
Specifies the keyspace on which to perform this query.
- Table Name
-
Specifies the table on which to perform this query.
- If Not Exists
-
If set, adds an
IF NOT EXISTS
clause to the query. - Enable Status Port
-
Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.
- Keyspace
-
Specifies the keyspace on which to perform this query.
- Table Name
-
Specifies the table on which to perform this query.
- If Exists
-
If set, adds an
IF EXISTS
clause to the query. - Enable Status Port
-
Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.
- Enable Status Port
-
Specifies whether to add a second output port to the operator to convey status tuples (connection notices, errors, and so on). This setting is optional.
For general instructions on using the Edit Schema tab, see the Properties: Edit Schema Tab section of the Defining Input Streams page.
Each operator has exactly one input and one output port for emitting results, plus an optional Status port.
Note
The only exception to this is the Control operator, which has no results output port and hence no Edit Schema property tab.
Each operator's results output port has two fields: one named query
containing the original query that triggered the current output, and one tuple field named row
which contains the one row in the result set obtained when executing the query. The schema for this row field is determined
by the schema specified on the Edit Schema tab. The fields in this schema are matched with column names in the rows from the result set and the values will be mapped
accordingly. Any StreamBase field in the schema not also present in the row is ignored; similarly any column in the row that
has no corresponding StreamBase field is ignored. Each StreamBase field that does have a corresponding Cassandra column must
be of a data type that is compatible with the Cassandra column's data type (for example, Cassandra columns of type text
should have a corresponding StreamBase field of type string
). The list of compatible types between StreamBase and Cassandra is specified in Type Mappings.
Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.
Caution
Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.
Each operator has only one input port, used to receive commands describing an operation to be performed on the Cassandra cluster (such as SELECT, INSERT, UPDATE, and so on). The schema for each operator is different because different operations require different parameters.
In all cases however, any fields found in the input schema that do not match those described below are ignored and passed
through directly in the input
field of result output tuples.
Field Name | Field Type | Description |
---|---|---|
query | string | REQUIRED. Describes the full Cassandra query to execute. For example, CREATE KEYSPACE myks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1} |
Each field in the input schema for the Select operator is used to fill out a query of the form:
SELECT <field-list-1> FROM <keyspace>.<table> WHERE <field-list-2> [ORDER BY <orderby-value> ASC|DESC] [LIMIT <limitval-value>]
[ALLOW FILTERING]
In the example above, field-list-1
is a comma-separated list of the fields specified in the Edit Schema tab of the operator, and orderby
, ASC
|DESC
, limitval
and ALLOW FILTERING
all specified according to their values in the operator's Properties tab. Finally, field-list-2
is filled using the contents of a field named WHERE
in the input tuple.
If a field of the WHERE tuple is of StreamBase type LIST
instead of a simple type, its translation in the WHERE clause will be "column IN (value[,value...])" instead of "column =
value".
The fields of the input tuple will be used to fill out an INSERT
query of the form:
INSERT INTO <keyspace>.<table> (<field-list>) VALUE (<values-list>) [USING [TTL ttl-value] [AND TIMESTAMP timestamp-value]]
[IF NOT EXISTS]
Field Name | Field Type | Description |
---|---|---|
values | tuple | REQUIRED. Contains the fields and values to insert into the INSERT query. Each field's name is used to fill out the field-list in the query shown above, and each value is used to fill out values-list .
|
ttl | int | If set, a USING TTL ttl-value clause is added to the query.
|
values | timestamp | If set, a USING TIMESTAMP timestamp-value clause is added to the query.
|
Finally, if the operator's If Not Exists
property is set, an IF NOT EXISTS
clause is added to the query.
The fields of the input tuple are used to fill out a DELETE
query of the form:
DELETE FROM <keyspace>.<table> WHERE column=value [, column=value] [IF [condition-list | EXISTS]]
Field Name | Field Type | Description |
---|---|---|
where | tuple | REQUIRED. Contains the fields and values to insert into the DELETE query. Each field's name is used to fill out the column names in the query shown above, and each value is used to fill out the values. |
if | tuple | If non-null, an IF query is added to the query and this tuple's fields is used to fill out the condition-list as shown above.
|
Finally, if the operator's If Exists
property is set, an IF EXISTS
clause is added to the query. This setting is ignored if the if
field is non-null in the input tuple.
The fields of the input tuple are used to fill out an UPDATE
query of the form:
UPDATE <keyspace>.<table> SET column=value [, column=value] WHERE column=value [, column=value] [IF condition-list]
Field Name | Field Type | Description |
---|---|---|
set | tuple | REQUIRED. This tuple field contains fields that will be used in the SET clause for this UPDATE statement.
|
where | tuple | REQUIRED. This tuple field contains fields that will be used in the WHERE clause for this UPDATE statement.
|
if | tuple | If non-null, an IF query is added to the query and this tuple's fields will be used to fill out the condition-list as shown above.
|
Every operator has only one fixed output port, used to deliver the results of executing operator commands, plus one optional Status output port if the operator's Enable Status Port option is checked.
The Results Port consists of one field named input
that mirrors the operator's input schema, plus one tuple field named row
whose schema matches that specified in the Edit Schema tab of the adapter.
If the ResultSet returned by executing a query contains one or more rows, those are emitted separately in successive tuples
on the results output port. The query
field contains the initial input that triggered this query, and the row
field contains one row in the result set. In any case, one additional tuple are emitted with its row
field set to null to indicate the results have been fully processed.
The Status Port, when enabled, is the same for each operator.
Field Name | Field Type | Description |
---|---|---|
status | string | Describes this event. Possible values include: Connected , Disconnected , Error .
|
info | list<string> | This is an all-purpose field used to convey additional information describing this event. For example, in the case of an Error event this will contain the text describing the error.
|
context | tuple | If this event was generated as a result of a command being sent to the operator, this field will contain the original command tuple. |
When exchanging tuples between StreamBase and Cassandra, the fields and field types of these tuples are expected to match in the following ways:
-
The fields have the same names, and
-
Data types match or can be readily converted.
The table below lists all the type translations supported by the operator.
StreamBase Type | Cassandra Type(s) |
---|---|
boolean | boolean |
string | text, ascii, inet, uuid, varchar |
int | cint, smallint, tinyint
When mapping a StreamBase int to a Cassandra smallint or tinyint, some precision may be lost. |
long | bigint, cint, smallint, tinyint, time, counter
When mapping a StreamBase long to a Cassandra cint, smallint or tinyint, some precision may be lost. |
double | double, float
When mapping a StreamBase double to an ActiveSpaces float, some precision may be lost. |
timestamp | date, timestamp |
tuple | tuple |
blob | blob |
list | list, set, map
When mapping a StreamBase list to a Cassandra set, any duplicates in the list will be removed (and the last value of duplicate entries will be used). When mapping a StreamBase list to a Cassandra map, the list elements are expected to be tuples with the first field representing an entry's key the second field representing its value. |
function | text, blob
The field will be mapped to a JSON string representing its value if the Cassandra field is of type text, or serialized into a byte array if the field is of type blob. |
capture | Not supported. |
The StreamBase installation comes with a sample demonstrating the use of this operator. To load the sample in StreamBase Studio, select Extending StreamBase section for an entry called Cassandra Operator.
→ and look under the