Contents
This topic describes how to use the Adapter for Apache Pulsar to connect to an Apache Pulsar broker and publish messages as well as subscribe to topics and consume messages.
The Apache Pulsar connectivity solution is implemented as a pair of global Java operators that allow a Spotfire Streaming application to connect to an Apache Pulsar broker and exchange messages with it.
The Pulsar operators are members of the Java Adapter group in the Palette view in StreamBase Studio. Select the operators from the Insert an Operator or Adapter dialog. Invoke the dialog with one of the following methods:
-
Drag the Adapters, Java Operators token from the Operators and Adapters drawer of the Palette view to the canvas.
-
Click in the canvas where you want to place the operator, and invoke the keyboard shortcut
O V
-
From the top-level menu, invoke
> > .
From the Insert an Operator or Adapter dialog, type
Pulsar
in the filter text box to
narrow the choices in the list below it, then select one of the following operators
and double-click or press :
-
Pulsar Consumer, which is used to subscribe to Pulsar topics, receive messages published to these topics and emit them as tuples to the application's downstream logic.
-
Pulsar Producer, which is used to send Pulsar messages to the configured cluster.
Separate instances of the Pulsar operators can be made to share a connection to the
broker. Each operator lists the connection configurations available in its
Connection Definition adapter property (see Properties: Operator Properties Tab). The
list is generated from the entries specified in a dedicated section of the
application's adapter configuration file. The following example is very similar to
the Pulsar.conf
file used in the Pulsar sample:
name = "Pulsar.conf" type = "com.tibco.ep.streambase.configuration.adapter" version = "1.0.0" configuration = { // An adapter group type defines a collection of EventFlow adapter configurations, // indexed by adapter type. AdapterGroup = { // A collection of EventFlow adapter configurations, indexed by adapter type. // This key is required and must contain at least one configuration. adapters = { // The root section for an adapter configuration. pulsar = { // Section list. This key is optional and has no default value. sections = [ // A configuration for an adapter named section. { // Section name. The value does not have to be unique; // that is, you can have multiple sections with the same name // in the same array of sections. This key is required. name = "broker-connection" // Section property bag. All values must be strings. // This key is optional and has no default value. settings = { broker-uri = "pulsar://localhost:6650" // This value may be anything you like -- // it's the value that will appear in each operator's // Connection Definition drop-down to designate this entry. id = "Test Broker 1" // One of: None, HTTPBasic, TLSAuth, TLSEncryption, TLSBoth // (i.e. TLSAuth+TLSEncryption) authentication-type = "None" // HTTPBasic auth credentials. Passwords may be enciphered // using the "epadmin encrypt" family of commands. username = "myusername" password = "mypassword" // TLS auth and encryption settings. Passwords may be enciphered. //use-keystore-tls = "false" //tls-keystore-path = "" //tls-keystore-password = "" //tls-truststore-path = "" //tls-truststore-password = "" //enable-hostname-verification = "false" //allow-tls-insecure-connection = "false" } } // A configuration for another adapter named section. { // Section name. The value does not have to be unique; // that is, you can have multiple sections with the same name // in the same array of sections. This key is required. name = "broker-connection" // Section property bag. All values must be strings. // This key is optional and has no default value. settings = { broker-uri = "pulsar://example.com:6650" // This value may be anything you like -- // it's the value that will appear in each operator's // Connection Definition drop-down to designate this entry. id = "Test Broker 2" // One of: None, HTTPBasic, TLSAuth, TLSEncryption, TLSBoth authentication-type = "None" // HTTPBasic auth credentials. // Passwords may be enciphered using the "epadmin encrypt" family of commands. username = "myusername" password = "mypassword" // TLS auth and encryption settings. Passwords may be enciphered. //use-keystore-tls = "false" //tls-keystore-path = "" //tls-keystore-password = "" //tls-truststore-path = "" //tls-truststore-password = "" //enable-hostname-verification = "false" //allow-tls-insecure-connection = "false" } } ] } } } }
In each section named broker-connection
, the value of
the id
setting identifies this particular section (in
other words, this particular connection definition) and will be listed in the
Operator Properties tab's Adapter Configuration
drop-down setting in each operator's Properties view. Multiple operators can be made
to share a single connection by using the same value for this Adapter Configuration setting.
A best practice is to define your connections before placing operator instances on the canvas, so that the list is already available in the Properties view and the operators can be configured right away.
This section describes the adapter configuration block found in the HOCON
file.
If a value is not present, the default is used. Those values listed without a default are required unless otherwise noted.
Property | Type | Default | Description |
---|---|---|---|
id | string | ALWAYS REQUIRED. This is the ID that will appear in the adapter's Adapter Configuration drop-down. | |
broker-uri | string |
ALWAYS REQUIRED. The URI of the broker to which to connect. Will typically
take the form of either pulsar://<hostname>:<port> or pulsar+ssl://<hostname>:<port>
|
|
authentication-type | string | None |
The type of authentication / encryption to use. One of: None , HTTPBasic , TLSAuth , TLSEncryption ,
TLSBoth (i.e. TLSAuth +TLSEncryption )
|
username | string |
The user name to specify when using authentication-type = HTTPBasic . Ignored otherwise.
|
|
password | string |
The password to specify when using authentication-type = HTTPBasic . Ignored otherwise. May be
enciphered using the epadmin
encrypt family of commands.
|
|
use-keystore-tls | boolean | false |
Whether to set up TLS using keystore. When true , tls-truststore-path and
tls-truststore-password must also be set.
Ignored unless authentication-type is
TLSAuth , TLSEncryption or TLSBoth .
|
tls-truststore-path | string |
Path to the TLS truststore. Ignored unless authentication-type is TLSAuth , TLSEncryption or
TLSBoth .
|
|
tls-truststore-password | string |
Password to the TLS trust store. Ignored unless authentication-type is TLSAuth , TLSEncryption or
TLSBoth . May be enciphered using epadmin encrypt commands.
|
|
tls-keystore-path | string |
Path to the TLS keystore. Ignored unless authentication-type is TLSAuth , TLSEncryption or
TLSBoth .
|
|
tls-keystore-password | string |
Password to the TLS keystore. Ignored unless authentication-type is TLSAuth , TLSEncryption or
TLSBoth . May be enciphered using epadmin encrypt commands.
|
|
enable-hostname-verification | boolean | false |
Whether to enable hostname verification dusing TLS authentication. Ignored
unless authentication-type is TLSAuth , TLSEncryption or
TLSBoth .
|
allow-tls-insecure-connection | boolean | false |
Whether to allow insecure TLS connections. Ignored unless authentication-type is TLSAuth , TLSEncryption or
TLSBoth .
|
In addition to providing the above configuration file for the adapter, the
Streaming engine itself also needs to be configured through the use of an
engine.conf
file, to be placed in the project's
src/main/configurations
directory and containing at
least the following jvmArgs
setting:
name = "pulsar-adapter" version = 1.0.0 type = "com.tibco.ep.streambase.configuration.sbengine" // // Streaming engine configuration for Pulsar adapter. // configuration = { StreamBaseEngine = { // This --add-opens directive is needed when running under Java 17 or later // due to Apache Pulsar's use of reflection. jvmArgs = [ "--add-opens=java.base/sun.net=ALL-UNNAMED" ] } }
This section describes the properties you can set for a Pulsar operator, using the various tabs of the Properties view in StreamBase Studio.
Name: Use this required field to specify or change the name of this instance of this component. The name must be unique within the current EventFlow module. The name can contain alphanumeric characters, underscores, and escaped special characters. Special characters can be escaped as described in Identifier Naming Rules. The first character must be alphabetic or an underscore.
Operator: A read-only field that shows the formal name of the operator.
Class name: Shows the fully qualified class name that implements the functionality of this operator. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.
Start options: This field provides a link to the Cluster Aware tab, where you configure the conditions under which this operator starts.
Enable Error Output Port: Select this checkbox to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.
Description: Optionally, enter text to briefly describe the purpose and function of the component. In the EventFlow Editor canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.
This section describes the properties on the Operator Properties tab in the Properties view for Pulsar Adapter operators. Enter all text fields as string literals, not as expressions.
All the Pulsar Adapter operators share a common set of properties:
- Connection Definition
-
Specifies the ID of the configuration section describing the connection parameters used by this operator. The Connection Definition drop-down list of each operator will contain a list of available definitions from which to choose, as specified in the adapter configuration file (see Adapter Configuration). This setting is required.
- Log Level
-
Use this to set the operator to produce more or less verbose console output, independent of the
STREAMBASE_LOG_LEVEL
global setting. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.
In addition to the common properties described above, the Pulsar Consumer operator also has the following properties:
- Receiver Queue Size
-
Integer value. The maximum number of messages the receiver's queue can contain. Leave empty to use Pulsar's default value.
In addition to the common properties described above, the Pulsar Producer operator also has the following properties:
- Producer Access Mode
-
Enum value. The access mode to use when publishing messages to the Pulsar broker. One of
Shared
,Exclusive
,ExclusiveWithFencing
,WaitForExclusive
. - Compression Type
-
Enum value. The compression type to use on messages when publishing. One of
NONE
,LZ4
,ZLIB
,ZSTD
. - Max Pending Messages
-
Integer value. The maximum number of message to leave unacknowledged by the broker. Leave empty to use Pulsar's default value.
- Send Timeout (in ms)
-
Integer value. The maximum number of milliseconds to allow a message to go unacknowledged by the broker. Leave empty to use Pulsar's default value.
- Enable Batching
-
Boolean value. When checked, batched mode will be enabled.
- Max Messages
-
Integer value. The maximum number of messages per batch. Leave empty to use Pulsar's default value.
- Max Bytes
-
Integer value. The maximum number of bytes per batch. Leave empty to use Pulsar's default value.
- Max Publish Delay (in ms)
-
Integer value. Maximum number of milliseconds to wait before publishing a batch. Leave empty to use Pulsar's default value.
This section describes the properties on the Schema Management tab in the Properties view for Pulsar Adapter operators.
The properties on the Schema Management tab are specific to the Consumer operator:
- Pulsar Schema Type
-
Enum value. The schema to use when reading the Pulsar message payload. One of
ByteArray
,String
,JSON
. - Pulsar Message Payload Schema
-
Schema value. Describes in full the schema to use to read the Pulsar message payload. Ignored unless Pulsar Schema Type is set to
JSON
.
Use the settings in this tab to enable this operator or adapter for runtime start and stop conditions in a multi-node cluster. During initial development of the fragment that contains this operator or adapter, and for maximum compatibility with releases before 10.5.0, leave the Cluster start policy control in its default setting, Start with module.
Cluster awareness is an advanced topic that requires an understanding of StreamBase Runtime architecture features, including clusters, quorums, availability zones, and partitions. See Cluster Awareness Tab Settings on the Using Cluster Awareness page for instructions on configuring this tab.
Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.
Caution
Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.
The Consumer operator has one input port, used to receive commands describing an operation to be performed (such as Subscribe or Unsubscribe). The schema for this input port is as follows:
Field Name | Field Type | Description |
---|---|---|
command | string |
REQUIRED. Specifies the command to execute. Case is ignored.
Available commands are:
|
topic | string | Specifies the topic to which to subscribe. |
topics | string |
Specifies multiple topics, separated by a comma, to which to subscribe. If
topic is specified, this will be ignored.
|
topicRegEx | string |
Specifies a regular expression describing the topics to which to subscribe.
If topic or topics are specified, this will be ignored.
|
subscriptionType | string |
Specifies the type or subscription to use. Must be one of Exclusive , Shared ,
Failover , Key_Shared . Case is ignored. If left unspecified, the
default of Exclusive will be used.
|
The Producer operator has one input port, used to receive tuples describing a message to be sent to the Pulsar broker. The schema for this input port is as follows:
Field Name | Field Type | Description |
---|---|---|
topic | string | REQUIRED. The name of the topic to which to publish this message. |
key | string | The key to use when publishing this message. |
sequenceID | string | This can be used to set the sequence ID for this message when publishing, if desired. |
message | Context-dependent |
This field contains the actual payload of the message to be sent to the
Pulsar broker for publishing. The field's type is expected to be in
accordance with the value of the operator's Pulsar
Schema Type property (see Schema Management Tab):
|
Each operator has a Status output port used to issue various informational and error messages:
Field Name | Field Type | Description |
---|---|---|
Status | string |
Describes this event (Success , Error , Connection Error ).
|
Time | timestamp | The time at which this status tuple was generated. |
Info | list<tuple> |
Contains any additional information pertaining to this event. Each tuple has
two fields, Name and Value to describe one informational entry.
|
In addition to the Status port described above, the Consumer operator also presents an output port used to relay Pulsar messages as they are received from a subscribed topic. The schema for this port is as follows:
Field Name | Field Type | Description |
---|---|---|
topic | string | The name of the topic from which this message was received. |
key | string | The key of this this message. |
message | Context-dependent |
This field contains the actual payload of the message received from the
Pulsar topic. The field's type is determined in accordance with the value
of the operator's Pulsar Schema Type property
(see Schema Management Tab):
|
The Spotfire Streaming installation comes with a sample demonstrating the use of this adapter. To load the sample in Streaming Studio, select Messaging Adapters category for an entry called Pulsar Adapter.
> and look under the