RabbitMQ Producer Adapter

Introduction

The TIBCO StreamBase® Adapter for RabbitMQ Producer allows the system to send data to a RabbitMQ broker. The connection to the broker can be shared across multiple RabbitMQ adapters and each adapter will create its own channel on the connection. A producer can create an exchange and queue or use an existing exchange or queue.

Adapter Properties

This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.

General Tab

Name: Use this required field to specify or change the name of this instance of this component, which must be unique in the current EventFlow module. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Operator: A read-only field that shows the formal name of the operator.

Start with application: If this field is set to Yes (default) or to a module parameter that evaluates to true, this instance of this adapter starts as part of the JVM engine that runs this EventFlow module. If this field is set to No or to a module parameter that evaluates to false, the adapter instance is loaded with the engine, but does not start until you send an sbadmin resume command, or until you start the component with StreamBase Manager.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Operator Properties Tab

Property Type Description
RabbitMQ Server drop-down list A list of RabbitMQ server connections that have been specified in the sbconf file. See the SBConf File Configuration section of this document for information relating to the RabbitMQ connection options.
Create Channel On Connect check box When enabled a channel with the properties given at design time is created.
Qos integer If a positive integer value is supplied it is used as the channels quality of service value. A value of 0 means unlimited.
Qos Global check box When enabled the Qos setting is applied to the entire channel rather than each consumer.
Message Serializer Class string The class that implements the IRabbitMQSerializer and is instantiated to convert message bytes to the resulting message schema.

Note

See the section on custom serialization in this document.

Enable Control Port check box When enabled a control port allows commands to be sent to the operator to perform actions during runtime.
Enable Status Port check box When enabled a status port becomes available which will emit status tuples for various events of this operator.
Log Level INFO Controls the level of verbosity the adapter uses to issue informational traces to the console. This setting is independent of the containing application's overall log level. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.

Exchange Tab

Property Type Description
Name string The name of the exchange to connect to or create when Createchannel On Connect is enabled. If this value is blank the default exchange is used. This value cannot be blank if declare exchange is enabled.
Declare Exchange check box When enabled the system tries to declare this exchange on the server. This option is only valid when Createchannel On Connect is enabled. If enabled the Name property must be specified. If any other adapter is also set to declare an exchange and has the same name, the declare options must be identical or a runtime error will occur on startup. It is suggested that only on adapter set the declare flag and other adapters simply use the exchange name.
Type drop-down list The type of exchange to create. This option is only valid when Createchannel On Connect is enabled. See the RabbitMQ documentation for descriptions of the exchange types.
Use Defaults check box When checked the exchange is created as a non-autodelete, non-durable exchange with no extra arguments.
Is Durable check box When enabled the exchange is set to durable when created.
Is Auto Delete check box When enabled the exchange is set to deleted when not used.
Exchange Arguments key value pair The arguments to use when creating an exchange.

Queue Tab

Property Type Description
Name string The name of the queue to connect to or create if Createchannel On Connect is enabled. If this value is blank and declare queue is enabled a temporary queue is created. The name of the queue can be received from the status port when action is queue.
Declare Queue check box When enabled the system tries to declare this queue on the server. This option is only valid when Createchannel On Connect is enabled. If any other adapter is also set to declare a queue and has the same name, the declare options must be identical or a runtime error occurs on startup. It is suggested that only on adapter set the declare flag and other adapters simply use the queue name.
Is Durable check box When enabled the exchange is set to durable when created.
Is Auto Delete check box When enabled the exchange is set to deleted when not used.
Is Exclusive check box When enabled the queue is set to exclusive.
Queue Arguments key value pair The arguments to use when creating an queue.

Routing Tab

Property Type Description
Routing Keys key value The routing keys to use which are used differently depending on the type of exchange that is currently connected. See the RabbitMQ documentation for information about routing keys.

Message Tab

Property Type Description
Properties drop-down list A selection of property types that can be set for messages. If the type is set to custom you can set up the type with any format you require using the other properties on this tab. The property types are defined as follows:
  • Default - RabbitMQ default message properties

  • Basic - Content-type "application/octet-stream", Delivery Mode non-persistent, priority zero

  • Minimal Basic - Empty basic properties, with no fields set

  • Minimal Persistent Basic - Empty basic properties, with only Delivery Mode set to persistent

  • Persistent Basic - Content-type "application/octet-stream", Delivery Mode persistent, priority zero

  • Persistent Text Plain - Content-type "text/plain", Delivery Mode persistent, priority zero

  • Text Plain - Content-type "text/plain", Delivery Mode non-persistent, priority zero

  • Custom - User defined

Content Encoding string The content encoding to use when publishing the message. Only valid when Properties set to Custom.
Content Type string The content type to use when publishing the message. Only valid when Properties set to Custom.
Delivery Mode option The delivery mode to use when publishing the message. Only valid when Properties set to Custom.
Priority string The priority to use when publishing the message, must be a valid integer greater than 0. Only valid when Properties set to Custom.

SBConf File Configuration

Description

The RabbitMQ adapter configuration section starts with an <adapter-configurations> element containing one <adapter-configuration name="RabbitMQServers"> element that contains one or more <section name="RabbitMQServer"> elements.

Each <section name="RabbitMQServer"> configuration must contain a <setting name="id" val="RabbitMQ XYZ"/> and <setting name="Host" val="nnnn" /> settings fields. All other setting elements are optional.

This example configuration shows a basic configuration. You can have as many configurations as your application requires, but each must have a unique ID.

Example

    <adapter-configurations>
        <adapter-configuration name="RabbitMQServers">
            <section name="RabbitMQServer">
                <setting name="id" val="RabbitMQ"/>
                <setting name="Username" val=""/>
                <setting name="Password" val=""/>
                <setting name="Host" val="127.0.0.1"/>
                <setting name="ConnectOnStartup" val="true"/>
                <!-- <setting name="VirtualHost" val=""/> -->
                <!-- <setting name="Port" val=""/> -->
                <!-- <setting name="AutomaticRecoveryEnabled" val=""/> -->
                <!-- <setting name="NetworkRecoveryInterval" val=""/> -->
                <!-- <setting name="ConnectionTimeout" val=""/> -->
                <!-- <setting name="HandshakeTimeout" val=""/> -->
                <!-- <setting name="ShutdownTimeout" val=""/> -->
                <!-- <setting name="RequestedHeartbeat" val=""/> -->
                <!-- <setting name="TopologyRecoveryEnabled" val=""/> -->
                
                <!-- SSL Settings -->                     
                <!-- <setting name="sslEnabled" val="false"/> -->
                <!-- <setting name="sslHostnameVerification" val="false"/> -->
                <!-- <setting name="sslKeyStorePassword" val=""/> -->
                <!-- <setting name="sslKeyStoreType" val="PKCS12"/> -->
                <!-- <setting name="sslKeyStoreFile" val=""/> -->
                <!-- <setting name="sslKeyStoreManagerType" val="SunX509"/> -->
                <!-- <setting name="sslTrustStorePassword" val=""/> -->
                <!-- <setting name="sslTrustStoreType" val="JKS"/> -->
                <!-- <setting name="sslTrustStoreFile" val=""/> -->
                <!-- <setting name="sslTrustStoreManagerType" val="SunX509"/> -->
                <!-- <setting name="sslContextType" val="TLSv1.2"/> -->
            </section>
        </adapter-configuration>
    </adapter-configurations>
         

Settings

Setting Type Required Description
id string true The value to display in the drop down list and is used to key to this section of the configuration file.
ConnectOnStartup true/false true Connect to the RabbitMQ broker on startup.
Host string true The host address of the RabbitMQ Server.
Username string false The user name to use when connection to the RabbitMQ Server.
Password string false The password to use when connection to the RabbitMQ Server.
VirtualHost string false The virtual host to use when connection to the RabbitMQ Server.
Port integer false The target port to use when connection to the RabbitMQ Server.
AutomaticRecoveryEnabled true/false false Enables or disables automatic connection recovery.
NetworkRecoveryInterval long false Sets connection recovery interval. Default is 5000 milliseconds.
ConnectionTimeout integer false Set the TCP connection timeout in milliseconds; zero for infinite.
HandshakeTimeout integer false Set the AMQP0-9-1 protocol handshake timeout, in milliseconds.
ShutdownTimeout integer false Set the shutdown timeout in milliseconds; zero for infinite; default 10000. This is the amount of time that Consumer implementations have to continue working through deliveries (and other Consumer callbacks) after the connection has closed but before the ConsumerWorkService is torn down. If consumers exceed this timeout then any remaining queued deliveries (and other Consumer callbacks, including the Consumer's handleShutdownSignal() invocation) will be lost.
RequestedHeartbeat integer false Set the requested heartbeat timeout, in seconds; zero for none. Heartbeat frames will be sent at about 1/2 the timeout interval.
TopologyRecoveryEnabled true/false false Enables or disables topology recovery.
sslEnabled true/false false Enables SSL for the connection.
sslHostnameVerification true/false false Enable server hostname verification for TLS connections.
sslKeyStorePassword string   The keystore password. This value can be plain text or a value encoded using sbcipher.
sslKeyStoreType string PKCS12 The type of keystore. See the KeyStore section in the Java Cryptography Architecture Standard Algorithm Name Documentation for information about standard keystore types.
sslKeyStoreFile string   The full path to the key store file location.
sslKeyStoreManagerType string SunX509 The standard name of the requested algorithm. See the Java Secure Socket Extension Reference Guide for information about standard algorithm names.
sslTrustStorePassword string   The trust store password. This value can be plain text or a value encoded using sbcipher.
sslTrustStoreType string JKS The type of trust store. See the KeyStore section in the Java Cryptography Architecture Standard Algorithm Name Documentation for information about standard keystore types.
sslTrustStoreFile string   The full path to the trust store file location.
sslTrustStoreManagerType string SunX509 the standard name of the requested trust management algorithm. See the Java Secure Socket Extension Reference Guide for information about standard algorithm names.
sslContextType string TLSv1.2 the standard name of the requested protocol.See the SSLContext section in the Java Cryptography Architecture Standard Algorithm NameDocumentation for information about standard protocol names.

Control Port

Description

Use the control port to send action commands to the adapter.

Control Port Schema

  • command, string. The command to send to the adapter.

    The values are:

    • connect — Connect to the RabbitMQ server.

    • disconnect — Disconnect from the RabbitMQ server.

    • createchannel — Create a channel with the given exchange and queue information.

    • removechannel — Remove a channel using channel number supplied.

  • exchangeName, string. Used with createchannel command, determines the name of the exchange to create or use.

  • exchangeDeclare, boolean. Used with createchannel command to determine whether an exchange should be declared.

  • exchangeDefaults, boolean. Used with createchannel command and exchangeDeclare is true. If true, declares the exchange with RabbitMQ defaults.

  • exchangeType, string, Used with createchannel command and exchangeDeclare is true. Determines the type of exchange to create, valid values are:

    • direct

    • topic

    • fanout

    • headers

  • exchangeDurable, boolean. Used with createchannel command and exchangeDeclare is true. If true, declares the exchange as durable.

  • exchangeAutoDelete, boolean. Used with createchannel command and exchangeDeclare is true. If true, declares the exchange as auto-delete.

  • exchangeArgs. list<tuple<string key, string value>>. Used with createchannel command and exchangeDeclare is true. A list of key value pairs to be used as the exchange arguments.

  • queueName, string. Used with createchannel command. Determines the name of the queue to create or use.

  • queueDeclare, boolean. Used with createchannel command to determine whether an queue should be declared.

  • queueDurable, boolean. Used with createchannel command and queueDeclare is true. If true, declares the queue as durable.

  • queueExclusive, boolean. Used with createchannel command and queueDeclare is true. If true declares the queue as exclusive.

  • queueAutoDelete, boolean. Used with createchannel command and queueDeclare is true. If true declares the queue as auto-delete.

  • queueArgs, list<tuple<string key, string value>>. Used with createchannelcommand and queueDeclare is true. A list of key value pairs to be used as the queue arguments.

  • routingKeys, list<string>. Used with createchannel command, a list of routing keys.

  • channel, integer. Used with removechannel command, the channel ID to remove. The channel ID is output as the object field of the status port when action equals channel.

Data Port

Description

Use the data port to send message data.

Data Port Schema

  • message, string/tuple/blob/custom. The message. If the message is a tuple it is serialized as a binary tuple unless a custom serializer is specified.

  • channel (optional), integer. If the adapter is connected to multiple channels, a channel ID must be specified to determine which channel to send the message on. The channel ID is output as the object field of the status port when action equals channel. If the adapter is only connected to a single channel it will be used by default and this field can be ignored.

  • exchangeName (optional), string. The exchange to which to send the message to. If this field is missing or empty the currently connected exchange is used.

  • routingKey (optional), string. The routing key to use when sending this message. If this field is missing an empty routing key is used.

  • mandatory (optional), boolean. If present and a value of true the message is set as mandatory. If the field is missing the default value of false is used.

  • immediate (optional), boolean. If present and a value of true the message is set as immediate. If the field is missing the default value of false is used.

  • replyTo (optional), string. The replyTo property to set for the message. If this field is missing or empty this value is ignored.

  • correlationId (optional), string. The correlationId property to set for the message. If this field is missing, empty, or null the value is ignored.

  • expiration (optional), integer. The expiration time in milliseconds property to set for the message. If this field is missing, empty, or null the value is ignored.

  • priority (optional), integer. The priority property to set for the message. If this field is missing, empty, or null the value is ignored.

Status Port

Description

The status port is used to send status information tuples downstream to inform the user of changes.

Status Port Schema

  • type, string. The type of status information emitted on this port. Status types are:

    • Error — Indicates this message is related to an error that occurred.

    • Warn — Indicates this message is related to a warning that the user should be aware of.

    • Info — Indicates this message is related to extra status information.

  • action, string.

    • Connected — Connected to requested broker list.

    • Connecting — Starting to connect

    • Disconnecting — Disconnecting from.

    • Connection — Information about the current connection

    • Update Brokers — Information about updating the brokers

    • Command — A user command

    • Send — Trying to send a message

  • object, string. This value may be null. If it is not null, it contains a value relevant to the status message.

  • message, string. This is a formatted human readable message that explains the status message.

  • time, timestamp. The time the status message occurred.

Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Custom Serializer

When a class is supplied for the message serialization and the input message field is of type tuple you must provide a class that implements the com.streambase.sb.adapter.rabbitmq.IRabbitMQSerializer interface. Here is an example:

package com.streambase.rabbitmq.sample;

import java.util.EnumSet;

import com.streambase.sb.Tuple;
import com.streambase.sb.adapter.rabbitmq.IRabbitMQSerializer;
import com.streambase.sb.TupleJSONUtil;
import com.streambase.sb.TupleJSONUtil.Options;

public class CustomSerializer implements IRabbitMQSerializer {

  @Override
  public byte[] seralize(Tuple tuple, String exchange, String routingKey) throws Exception {
      EnumSet<Options> options = EnumSet.of(Options.INCLUDE_NULL_FIELDS, Options.INCLUDE_NULL_FIELDS, Options.PREFER_MAP);
      String jsonTuple = TupleJSONUtil.toJSONString(tuple, options, "");
      return jsonTuple.getBytes();
  }
}