Apache Pulsar Adapter

This topic describes how to use the Adapter for Apache Pulsar to connect to an Apache Pulsar broker and publish messages as well as subscribe to topics and consume messages.

Introduction

The Apache Pulsar connectivity solution is implemented as a pair of global Java operators that allow a Spotfire Streaming application to connect to an Apache Pulsar broker and exchange messages with it.

Placing Pulsar Operators on the Canvas

The Pulsar operators are members of the Java Adapter group in the Palette view in StreamBase Studio. Select the operators from the Insert an Operator or Adapter dialog. Invoke the dialog with one of the following methods:

  • Drag the Adapters, Java Operators token from the Operators and Adapters drawer of the Palette view to the canvas.

  • Click in the canvas where you want to place the operator, and invoke the keyboard shortcut O V

  • From the top-level menu, invoke Insert>Operator>Java.

From the Insert an Operator or Adapter dialog, type Pulsar in the filter text box to narrow the choices in the list below it, then select one of the following operators and double-click or press OK:

  • Pulsar Consumer, which is used to subscribe to Pulsar topics, receive messages published to these topics and emit them as tuples to the application's downstream logic.

  • Pulsar Producer, which is used to send Pulsar messages to the configured cluster.

Adapter Configuration

Separate instances of the Pulsar operators can be made to share a connection to the broker. Each operator lists the connection configurations available in its Connection Definition adapter property (see Properties: Operator Properties Tab). The list is generated from the entries specified in a dedicated section of the application's adapter configuration file. The following example is very similar to the Pulsar.conf file used in the Pulsar sample:

name = "Pulsar.conf"
type = "com.tibco.ep.streambase.configuration.adapter"
version = "1.0.0"
configuration = {

    // An adapter group type defines a collection of EventFlow adapter configurations, 
    // indexed by adapter type.
    AdapterGroup = {

        // A collection of EventFlow adapter configurations, indexed by adapter type. 
        // This key is required and must contain at least one configuration.
        adapters = {

            // The root section for an adapter configuration.
            pulsar = {

                // Section list. This key is optional and has no default value.
                sections = [ 

                    // A configuration for an adapter named section.
                    {

                        // Section name. The value does not have to be unique; 
                        // that is, you can have multiple sections with the same name
                        // in the same array of sections. This key is required.
                        name = "broker-connection"

                        // Section property bag. All values must be strings. 
                        // This key is optional and has no default value.
                        settings = {

                            broker-uri = "pulsar://localhost:6650"

                            // This value may be anything you like -- 
                            // it's the value that will appear in each operator's
                            // Connection Definition drop-down to designate this entry.
                            id = "Test Broker 1"
                            
                            // One of: None, HTTPBasic, TLSAuth, TLSEncryption, TLSBoth 
                            // (i.e. TLSAuth+TLSEncryption)
                            authentication-type = "None"

                            // HTTPBasic auth credentials. Passwords may be enciphered 
                            // using the "epadmin encrypt" family of commands.

                            username = "myusername"
                            password = "mypassword"

                            // TLS auth and encryption settings. Passwords may be enciphered.

                            //use-keystore-tls = "false"
                            //tls-keystore-path = ""
                            //tls-keystore-password = ""
                            //tls-truststore-path = ""
                            //tls-truststore-password = ""
                            //enable-hostname-verification = "false"
                            //allow-tls-insecure-connection = "false"
                        }
                    }
                    // A configuration for another adapter named section.
                    {

                        // Section name. The value does not have to be unique; 
                        // that is, you can have multiple sections with the same name
                        // in the same array of sections. This key is required.
                        name = "broker-connection"

                        // Section property bag. All values must be strings. 
                        // This key is optional and has no default value.
                        settings = {

                            broker-uri = "pulsar://example.com:6650"

                            // This value may be anything you like -- 
                            // it's the value that will appear in each operator's
                            // Connection Definition drop-down to designate this entry.
                            id = "Test Broker 2"
                            
                            // One of: None, HTTPBasic, TLSAuth, TLSEncryption, TLSBoth
                            authentication-type = "None"

                            // HTTPBasic auth credentials. 
                            // Passwords may be enciphered using the "epadmin encrypt" family of commands.

                            username = "myusername"
                            password = "mypassword"

                            // TLS auth and encryption settings. Passwords may be enciphered.

                            //use-keystore-tls = "false"
                            //tls-keystore-path = ""
                            //tls-keystore-password = ""
                            //tls-truststore-path = ""
                            //tls-truststore-password = ""
                            //enable-hostname-verification = "false"
                            //allow-tls-insecure-connection = "false"
                        }
                    }
                ]
            }
        }
    }
}

In each section named broker-connection, the value of the id setting identifies this particular section (in other words, this particular connection definition) and will be listed in the Operator Properties tab's Adapter Configuration drop-down setting in each operator's Properties view. Multiple operators can be made to share a single connection by using the same value for this Adapter Configuration setting.

A best practice is to define your connections before placing operator instances on the canvas, so that the list is already available in the Properties view and the operators can be configured right away.

Adapter Configuration File Options

This section describes the adapter configuration block found in the HOCON file.

If a value is not present, the default is used. Those values listed without a default are required unless otherwise noted.

Property Type Default Description
id string   ALWAYS REQUIRED. This is the ID that will appear in the adapter's Adapter Configuration drop-down.
broker-uri string   ALWAYS REQUIRED. The URI of the broker to which to connect. Will typically take the form of either pulsar://<hostname>:<port> or pulsar+ssl://<hostname>:<port>
authentication-type string None The type of authentication / encryption to use. One of: None, HTTPBasic, TLSAuth, TLSEncryption, TLSBoth (i.e. TLSAuth+TLSEncryption)
username string   The user name to specify when using authentication-type = HTTPBasic. Ignored otherwise.
password string   The password to specify when using authentication-type = HTTPBasic. Ignored otherwise. May be enciphered using the epadmin encrypt family of commands.
use-keystore-tls boolean false Whether to set up TLS using keystore. When true, tls-truststore-path and tls-truststore-password must also be set. Ignored unless authentication-type is TLSAuth, TLSEncryption or TLSBoth.
tls-truststore-path string   Path to the TLS truststore. Ignored unless authentication-type is TLSAuth, TLSEncryption or TLSBoth.
tls-truststore-password string   Password to the TLS trust store. Ignored unless authentication-type is TLSAuth, TLSEncryption or TLSBoth. May be enciphered using epadmin encrypt commands.
tls-keystore-path string   Path to the TLS keystore. Ignored unless authentication-type is TLSAuth, TLSEncryption or TLSBoth.
tls-keystore-password string   Password to the TLS keystore. Ignored unless authentication-type is TLSAuth, TLSEncryption or TLSBoth. May be enciphered using epadmin encrypt commands.
enable-hostname-verification boolean false Whether to enable hostname verification dusing TLS authentication. Ignored unless authentication-type is TLSAuth, TLSEncryption or TLSBoth.
allow-tls-insecure-connection boolean false Whether to allow insecure TLS connections. Ignored unless authentication-type is TLSAuth, TLSEncryption or TLSBoth.

Streaming Engine Configuration

In addition to providing the above configuration file for the adapter, the Streaming engine itself also needs to be configured through the use of an engine.conf file, to be placed in the project's src/main/configurations directory and containing at least the following jvmArgs setting:

name = "pulsar-adapter"
version = 1.0.0
type = "com.tibco.ep.streambase.configuration.sbengine"

//
// Streaming engine configuration for Pulsar adapter.
//
configuration =
{
    StreamBaseEngine =
    {
        // This --add-opens directive is needed when running under Java 17 or later 
        // due to Apache Pulsar's use of reflection.
        jvmArgs =
        [
            "--add-opens=java.base/sun.net=ALL-UNNAMED"                            
        ]
    }
}

Properties View Settings

This section describes the properties you can set for a Pulsar operator, using the various tabs of the Properties view in StreamBase Studio.

Properties: General Tab

Name: Use this required field to specify or change the name of this instance of this component. The name must be unique within the current EventFlow module. The name can contain alphanumeric characters, underscores, and escaped special characters. Special characters can be escaped as described in Identifier Naming Rules. The first character must be alphabetic or an underscore.

Operator: A read-only field that shows the formal name of the operator.

Class name: Shows the fully qualified class name that implements the functionality of this operator. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start options: This field provides a link to the Cluster Aware tab, where you configure the conditions under which this operator starts.

Enable Error Output Port: Select this checkbox to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally, enter text to briefly describe the purpose and function of the component. In the EventFlow Editor canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Properties: Operator Properties Tab

This section describes the properties on the Operator Properties tab in the Properties view for Pulsar Adapter operators. Enter all text fields as string literals, not as expressions.

Common Properties

All the Pulsar Adapter operators share a common set of properties:

Connection Definition

Specifies the ID of the configuration section describing the connection parameters used by this operator. The Connection Definition drop-down list of each operator will contain a list of available definitions from which to choose, as specified in the adapter configuration file (see Adapter Configuration). This setting is required.

Log Level

Use this to set the operator to produce more or less verbose console output, independent of the STREAMBASE_LOG_LEVEL global setting. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.

Consumer Properties

In addition to the common properties described above, the Pulsar Consumer operator also has the following properties:

Receiver Queue Size

Integer value. The maximum number of messages the receiver's queue can contain. Leave empty to use Pulsar's default value.

Producer Properties

In addition to the common properties described above, the Pulsar Producer operator also has the following properties:

Producer Access Mode

Enum value. The access mode to use when publishing messages to the Pulsar broker. One of Shared, Exclusive, ExclusiveWithFencing, WaitForExclusive.

Compression Type

Enum value. The compression type to use on messages when publishing. One of NONE, LZ4, ZLIB, ZSTD.

Max Pending Messages

Integer value. The maximum number of message to leave unacknowledged by the broker. Leave empty to use Pulsar's default value.

Send Timeout (in ms)

Integer value. The maximum number of milliseconds to allow a message to go unacknowledged by the broker. Leave empty to use Pulsar's default value.

Enable Batching

Boolean value. When checked, batched mode will be enabled.

Max Messages

Integer value. The maximum number of messages per batch. Leave empty to use Pulsar's default value.

Max Bytes

Integer value. The maximum number of bytes per batch. Leave empty to use Pulsar's default value.

Max Publish Delay (in ms)

Integer value. Maximum number of milliseconds to wait before publishing a batch. Leave empty to use Pulsar's default value.

Schema Management Tab

This section describes the properties on the Schema Management tab in the Properties view for Pulsar Adapter operators.

Consumer Properties

The properties on the Schema Management tab are specific to the Consumer operator:

Pulsar Schema Type

Enum value. The schema to use when reading the Pulsar message payload. One of ByteArray, String, JSON.

Pulsar Message Payload Schema

Schema value. Describes in full the schema to use to read the Pulsar message payload. Ignored unless Pulsar Schema Type is set to JSON.

Producer Properties

The properties on the Schema Management tab are specific to the Producer operator:

Pulsar Schema Type

Enum value. The schema to use when constructing the Pulsar message payload. One of ByteArray, String, JSON.

Cluster Aware Tab

Use the settings in this tab to enable this operator or adapter for runtime start and stop conditions in a multi-node cluster. During initial development of the fragment that contains this operator or adapter, and for maximum compatibility with releases before 10.5.0, leave the Cluster start policy control in its default setting, Start with module.

Cluster awareness is an advanced topic that requires an understanding of StreamBase Runtime architecture features, including clusters, quorums, availability zones, and partitions. See Cluster Awareness Tab Settings on the Using Cluster Awareness page for instructions on configuring this tab.

Properties: Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Input Ports

Consumer Operator

The Consumer operator has one input port, used to receive commands describing an operation to be performed (such as Subscribe or Unsubscribe). The schema for this input port is as follows:

Field Name Field Type Description
command string REQUIRED. Specifies the command to execute. Case is ignored.

Available commands are:

  • Subscribe: Instructs the Consumer to subscribe to the specified topic(s). When this command is specified, one of the topic, topics or topicRegEx fields must also be specified.

  • Unsubscribe: Instructs the Consumer to unsubscribe from current topic(s)

topic string Specifies the topic to which to subscribe.
topics string Specifies multiple topics, separated by a comma, to which to subscribe. If topic is specified, this will be ignored.
topicRegEx string Specifies a regular expression describing the topics to which to subscribe. If topic or topics are specified, this will be ignored.
subscriptionType string Specifies the type or subscription to use. Must be one of Exclusive, Shared, Failover, Key_Shared. Case is ignored. If left unspecified, the default of Exclusive will be used.

Producer Operator

The Producer operator has one input port, used to receive tuples describing a message to be sent to the Pulsar broker. The schema for this input port is as follows:

Field Name Field Type Description
topic string REQUIRED. The name of the topic to which to publish this message.
key string The key to use when publishing this message.
sequenceID string This can be used to set the sequence ID for this message when publishing, if desired.
message Context-dependent This field contains the actual payload of the message to be sent to the Pulsar broker for publishing. The field's type is expected to be in accordance with the value of the operator's Pulsar Schema Type property (see Schema Management Tab):
  • ByteArray schema type: The message field is expected to be of type BLOB.

  • String schema type: The message field is expected to be of type STRING.

  • JSON schema type: The message field is expected to be of type TUPLE, with the tuple's fields matching in name and type with those of the JSON schema used by this topic. The tuple's schema may comprise only a subset of the JSON schema, in which case only the fields present will be set.

Output Ports

Each operator has a Status output port used to issue various informational and error messages:

Field Name Field Type Description
Status string Describes this event (Success, Error, Connection Error).
Time timestamp The time at which this status tuple was generated.
Info list<tuple> Contains any additional information pertaining to this event. Each tuple has two fields, Name and Value to describe one informational entry.

Consumer Operator Results Port

In addition to the Status port described above, the Consumer operator also presents an output port used to relay Pulsar messages as they are received from a subscribed topic. The schema for this port is as follows:

Field Name Field Type Description
topic string The name of the topic from which this message was received.
key string The key of this this message.
message Context-dependent This field contains the actual payload of the message received from the Pulsar topic. The field's type is determined in accordance with the value of the operator's Pulsar Schema Type property (see Schema Management Tab):
  • ByteArray schema type: The message field is of type BLOB.

  • String schema type: The message field is of type STRING.

  • JSON schema type: The message field is of type TUPLE, with the tuple's fields matching those of the schema specified in the Consumer's Pulsar Message Payload Schema property (see Schema Management Tab).

Pulsar Adapter Sample

The Spotfire Streaming installation comes with a sample demonstrating the use of this adapter. To load the sample in Streaming Studio, select File>Import Samples and Community Content and look under the Messaging Adapters category for an entry called Pulsar Adapter.