Using the JMS and TIBCO EMS Operators

Note

This document describes using the set of JMS and TIBCO EMS operators introduced in StreamBase 7.5.4. The legacy version of the JMS and TIBCO EMS adapters is deprecated but still available, and are described in Legacy JMS Input and Output Adapters.

Introduction

The TIBCO StreamBase® Adapter for JMS allows StreamBase to integrate with a JMS-compliant message bus, including TIBCO's Enterprise Message Service (EMS) bus. The Java Message Service API is a Java message-oriented middleware API for sending messages between two or more clients.

The JMS adapter is implemented as a suite of several single-purpose global Java operators, each handling one aspect of interacting with a JMS bus. Similarly, the TIBCO EMS adapter is a separate suite of global operators that each handle one aspect of interacting with a TIBCO EMS bus. These operators are shown in the following table:

JMS and TIBCO EMS Operators

JMS Adapter TIBCO EMS Adapter Description
JMS Consumer EMS Consumer Connects to a JMS or EMS destination on the target server and reads messages from it.
JMS Producer EMS Producer Connects to a JMS or EMS destination on the target server and sends messages to it.
JMS Request/Reply EMS Request/Reply Implements the common request-reply design pattern by connecting to a JMS or EMS destination on the target server to send request messages, and waiting for the reply on a different destination (timing out if no reply is forthcoming).
JMS Connect EMS Connect Manually connects to or disconnects from the target JMS or EMS server.
JMS Ack EMS Ack Acknowledges receipt of messages from the connected server (when using the CLIENT_ACKNOWLEDGE or other explicit acknowledgement mode).
JMS Commit EMS Commit Either commits or rolls back transactions on a transacted JMS or EMS session.

The TIBCO EMS operators can only be configured to work with a TIBCO EMS bus. By contrast, the JMS operators can be configured to work with any JMS-compliant message bus.

The TIBCO EMS operators and the generic JMS operators are all delivered as part of the basic StreamBase installation package, but the two sets of operators have different licensing terms. Consult your TIBCO Order Form to determine whether your license includes access to the JMS operators. Consult your TIBCO Sales Representative for further information if you need to use the generic JMS operators.

Because the operation of both sets of operators is essentially identical, the information in this document applies equally to both sets, except where explicitly distinguished. Hereafter, references to "JMS operators" can be taken to mean "JMS or TIBCO EMS operators."

Introduction to JMS

The StreamBase JMS operators work with a JMS-compliant message bus. The Java Message Service is an API defined by Oracle Corporation as part of the Java Platform, Enterprise Edition (J2EE) for invoking operations on enterprise messaging systems. The JMS specification allows for multiple implementations, called JMS providers or brokers. The StreamBase JMS operators work with the JMS implementation of any JMS provider, but must be configured differently for each provider. This document includes configuration examples for four JMS providers:

  • TIBCO Enterprise Messaging Service™ (EMS)

  • Apache ActiveMQ

  • IBM WebSphere Application Server

  • Solace Systems

If you are connecting to a JMS server from another JMS provider, you must adapt the configuration examples and discussion on this page for the particulars of your JMS provider.

JMS Operators Introduction

The StreamBase JMS solution is implemented as a suite of five single-purpose operators, each handling one of the actions described in the JMS and EMS Operator Types table.

The operators have many options for configuring connections and messages which you can set using the JMS Configuration Editor.

Workflow

The expected workflow to configure a connection to a JMS bus and then begin developing your StreamBase application is as follows:

  1. In a new or existing EventFlow module, drag the Adapters, Java Operators icon from the Palette view to the canvas. Type jms or ems in the search field to narrow the list of results, and select a JMS Consumer or EMS Consumer operator, as appropriate.

  2. Select and double-click the newly placed canvas icon to open its Properties view.

  3. Make sure any text-based or forms-based editors of the current project's sbd.sbconf file are closed. If you have the JMS Configuration Editor already open, close it now.

  4. Click the Edit button in the Properties view. This opens the JMS Configuration Editor.

  5. Follow the instructions in JMS and TIBCO EMS Configuration Editor to name and configure the connection information for your site's JMS or EMS server with at least one destination.

  6. Use the lightning-bolt icon in the JMS Configuration Editor to confirm a successful connection to the specified server.

  7. Create a simple EventFlow module to further test connectivity to your server. For example, use a pair of JMS or EMS Producer and Consumer operators to confirm that you can send and receive messages. Use the JMSSimpleSample.sbapp or EMSSimpleSample.sbapp modules in the JMS and EMS Adapter Samples as a guide.

  8. When you can successfully write messages to your JMS bus and read them back, continue developing your StreamBase application, using the name for this configured server connection that you assigned in step 5 in any subsequently-added JMS or EMS operators.

Message Translation

When the JMS Consumer receives a message, it translates this message into a StreamBase Tuple. When the JMS Producer receives a Tuple, it translates it into a JMS message. There are six JMS message types: Message, MapMessage, TextMessage, BytesMessage, ObjectMessage, and StreamMessage. The JMS operators include built-in support for the first four of these; this support is described in this section. You may also implement your own custom converter for the operators to use; this is also discussed below.

The message translation class used by the operator for a given destination is specified using the Message Converter setting in the JMS Configuration Editor.

JMS Message Topology in a StreamBase Tuple

When converting a JMS message to and from a Tuple, the following structure is employed:

Field Name Field Type Description
jmsHeader tuple Nested tuple containing all the message's header field values. If this field is omitted in an input tuple to the JMS Producer, no header values will be set in the outgoing JMS message. If jmsHeader is present but some fields are missing, the corresponding header fields are left unset in the outgoing JMS message.

JMS headers contain a fixed set of values, each of which has a StreamBase field name.

^ jmsCorreleationID string The JMS header's JMSCorrelationID field.
jmsDestination string The JMS header's JMSDestination field.
jmsDeliveryMode int The JMS header's JMSDeliveryMode field.
jmsExpiration long The JMS header's JMSExpiration field.
jmsMessageID string The JMS header's JMSMessageID field.
jmsPriority int The JMS header's JMSPriority field.
jmsRedelivered bool The JMS header's JMSRedelivered field.
jmsReplyTo string The JMS header's JMSReplyTo field. In the JMS message, this is set to the actual javax.jms.Destination object instance, but in the Tuple it is simply set to the destination's name, with the to and from conversions happening automatically.
jmsTimestamp timestamp The JMS header's JMSTimestamp field.
jmsType string The JMS header's JMSType field.
jmsProperties tuple Nested tuple containing the JMS message's Property values. If this field is omitted in an input tuple to the JMS Producer, no property values are set in the outgoing JMS message. For the JMS Consumer, any incoming JMS property that has a corresponding field in the jmsProperties nested tuple is set; incoming JMS properties for which no corresponding StreamBase field is found are ignored.
^ <jmsproperty>   Every field in the jmsProperties nested tuple represents a JMS property field of the same name in the corresponding JMS message.
messageAckID string (JMS Consumer only)

When one of the manual acknowledgement modes is used (such as CLIENT_ACKNOWLEDGE) this field is set to an opaque unique identifier. Once the message has been processed to completion by your application's logic, you must manually acknowledge the message by passing this ID to the input port of the JMS Ack operator. See The JMS Consumer and Acknowledge Modes.

sessionID string When dealing with a transacted JMS session, use this ID as input to the JMS Commit operator along with a value of commit or rollback for its command field in order to complete a transaction.
<all other fields> - All other fields in the tuple are treated as part of the payload of the message. For Map Messages, this means corresponding fields are used on the JMS message to get and set values, for example.

Message Converter Classes

Conversion of a JMS message to and from a Tuple is executed by a class that implements the com.streambase.sb.adapter.jms2.converters.FromJMSMessageConverter interface (JMS Consumer) or the com.streambase.sb.adapter.jms2.converters.ToJMSMessageConverter interface (JMS Producer). StreamBase provides default implementations of these converters for the most commonly-used JMS message types, as described in this section.

MapMessage Converters

MapMessage is the type into which tuples translate most readily. Like a tuple, a MapMessage is a set of name-value pairs, making the translation of these two message types relatively straightforward. In the default implementation of a MapMessage converter provided by StreamBase, the JMS Consumer pairs the fields held by a JMS MapMessage with fields in a StreamBase schema that have the same names and compatible types. If the name of a field in the MapMessage does not exist in the StreamBase schema, then that field is ignored. Similarly, if the name of a field in the Schema does not exist in the MapMessage, then there is no input value for that field and it is nulled.

In the case of the JMS Producer, all fields in the incoming Tuple are added to the MapMessage. You can also specify that even Tuple fields that have a null value are to be added to the outgoing message, although by default these are simply omitted in the outgoing JMS message.

The default implementation of a MapMessage converter provided by StreamBase is:

  • JMSConsumer: com.streambase.sb.adapter.jms2.converters.DefaultFromJMSMapMessageConverter

  • JMSProducer: com.streambase.sb.adapter.jms2.converters.DefaultToJMSMapMessageConverter

TextMessage Converters

TextMessages only contain one field, used to carry a text-only payload. In the default implementation of a TextMessage converter provided by StreamBase, the JMS Consumer looks for a suitable field (of type string) in its output schema to contain the payload of an incoming message using the following heuristics:

  1. If a Converter Custom Settings value was specified for this destination, its value is taken as the name of the StreamBase field to use.

  2. If the above search fails, but a field named text or payload (regardless of case) is present, this is used.

  3. If neither of the above searches have yielded an appropriate candidate, the first field of type string in the schema is used.

The same heuristics are employed by the JMS Producer to locate the payload of incoming tuples to place in the outgoing JMS message.

Note

A common usage idiom for JMS TextMessages is to place XML data in the text payload of the message. This can easily be accommodated in a StreamBase application by combining the JMS operators with the XML-To-Tuple and Tuple-To-XML operators to decode and encode the XML payloads.

The default implementation of a TextMessage converter provided by StreamBase is:

  • JMSConsumer: com.streambase.sb.adapter.jms2.converters.DefaultFromJMSTextMessageConverter

  • JMSProducer: com.streambase.sb.adapter.jms2.converters.DefaultToJMSTextMessageConverter

BytesMessage Converters

BytesMessages only contain one field, used to carry a binary payload. In the default implementation of a BytesMessage converter provided by StreamBase, the JMS Consumer uses similar heuristics as the TextMessage converters to look for a suitable field (of type blob) in its output schema to contain the payload of an incoming message:

  1. If a converter-custom-settings attribute is specified for this destination, its value is taken as the name of the StreamBase field to use.

  2. If the above search fails but a field named bytes or payload (regardless of case) is present, this is used.

  3. If neither of the above searches have yielded an appropriate candidate, the first field of type blob in the schema is used.

The same heuristics are employed by the JMS Producer to locate the blob payload of incoming tuples to place in the outgoing JMS message.

The default implementation of a BytesMessage converter provided by StreamBase is:

  • JMSConsumer: com.streambase.sb.adapter.jms2.converters.DefaultFromJMSBytesMessageConverter

  • JMSProducer: com.streambase.sb.adapter.jms2.converters.DefaultToJMSBytesMessageConverter

"Simple" Message Converters

So-called "simple" JMS messages (of class javax.jms.Message, returned when calling javax.jms.Session.createMessage()) contain no body fields. In the default implementation of a simple Message converter provided by StreamBase, the JMS Consumer issues a tuple containing only the message's header and property fields (if any are configured in the schema).

Similarly, the JMS Producer operator sends a JMS message with no body payload.

The default implementation of a simple Message converter provided by StreamBase is:

  • JMSConsumer: com.streambase.sb.adapter.jms2.converters.DefaultFromJMSSimpleMessageConverter

  • JMSProducer: com.streambase.sb.adapter.jms2.converters.DefaultToJMSSimpleMessageConverter

Custom Converters

Interface class names:

  • com.streambase.sb.adapter.jms2.converters.FromJMSMessageConverter (JMS Consumer)

  • com.streambase.sb.adapter.jms2.converters.ToJMSMessageConverter (JMS Producer)

An interface is used to represent a class that can convert a JMS Message to a StreamBase tuple. This interface, com.streambase.sb.adapter.jms2.converters.FromJMSMessageConverter, includes a method, Tuple convertFromJMSMessage(Message message) throws StreamBaseException that provides the implementation of the translation. A different translation class can be implemented, and the JMS Consumer can then be configured to use this implementation. In this way, a converter to translate one of the other JMS message types can be specified, or an alternate implementation of an existing converter can be specified.

As expected, the JMS Producer also provides a similar interface, com.streambase.sb.adapter.jms2.converters.ToJMSMessageConverter which includes the Message convertToJMSMessage(Tuple t) throws StreamBaseException method to do the opposite conversion.

The JMS Consumer and Acknowledge Modes

The JMS Consumer reads messages from JMS destinations. The manner in which these messages are acknowledged (so the server knows it has been received and can be safely removed from the message queue) is configurable for every destination. The setting for the acknowledge mode can impact the performance of the Consumer.

The JMS Consumer transparently supports acknowledge modes of AUTO_ACKNOWLEDGE and DUPS_OK_ACKNOWLEDGE. The Consumer also transparently supports the use of TIBCO EMS' proprietary acknowledge mode, NO_ACKNOWLEDGE. Of course, use of this acknowledge mode implies that TIBCO's EMS message bus is being used.

For the CLIENT_ACKNOWLEDGE, TIBCO EMS' EXPLICIT_CLIENT_ACKNOWLEDGE and Apache ActiveMQ's INDIVIDUAL_ACKNOWLEDGE, additional steps are required in order to acknowledge incoming messages. Specifically, when a JMS message is received and the destination has been configured with a manual acknowledgement mode, a special field named messageAckID is added to the outgoing tuple and this value should be passed to the JMS Ack operator once processing of the message is complete.

JAR File Requirements

When one of the JMS operators executes, it needs to connect to an executing instance of a JMS message bus. In order to connect to the JMS provider's JMS bus, JAR files that implement JMS client functionality and supplied by the JMS provider must be made available to the JMS operators. This is true whether the application is executing from within StreamBase Studio or at a command-line invocation of StreamBase Server:

  • If the adapter is started from Studio, then the vendor's JMS JAR file or files must be imported into the Studio project or added to the Java Build Path of the Studio project.

  • If the adapter is executed from a command-line invocation of StreamBase Server, then the location of the vendor's JAR file or files must be specified in the <java-vm> section of the server's configuration file, using <jar> or <dir> child elements.

  • As an alternative, you can instead copy the JAR files to streambase-install-dir/lib/ext, which will make them available to both StreamBase Studio and StreamBase Server. However, this alternative must be repeated after every installation of a StreamBase maintenance release, and is not considered a best practice.

Examples for TIBCO EMS users:

The following JAR files must be made available to StreamBase Studio and/or to StreamBase Server in order to successfully connect to a TIBCO EMS server. These files are normally found in the lib folder of your TIBCO EMS installation directory.

  • jms-2.0.jar

  • tibjms.jar

  • tibcrypt.jar (Only required for EMS 8.3 and earlier, and only if SSL connections are used. As of EMS 8.4.0, the functionality previously provided by tibcrypt.jar is now provided by Java JRE distributions.)

Examples for ActiveMQ users:

The following JAR files must be made available to StreamBase Studio and/or to StreamBase Server in order to successfully connect to an ActiveMQ server. These files are normally found in the lib folder of your ActiveMQ installation directory.

  • activemq-core-5.10.0.jar (adjusting the filename for your installed version of ActiveMQ)

  • geronimo-j2ee-management_1.1_spec-1.0.1.jar

  • hawtbuf-1.10.jar

Examples for Solace JMS users:

The following JAR files must be made available to StreamBase Studio and/or to StreamBase Server in order to successfully connect to a Solace JMS server:

  • commons-lang-2.2.jar

  • sol-common-5.1.1.1.jar (adjusting filename for your installed version of Solace)

  • sol-jcsmp-5.1.1.1.jar (adjusting filename for your installed version of Solace)

  • sol-jms-5.1.1.1.jar (adjusting filename for your installed version of Solace)

Important

Regardless of the JMS vendor used, you must also provide a copy of jms.jar. This file should be included in your JMS vendor's product installation alongside its other JAR files such as those mentioned above.

Properties View Settings

Settings are used to control most of the behavior of the JMS operators. The settings are grouped under several tabs in the Properties view in StreamBase Studio.

In the table in this section, the Property column shows each property name as found in the one or more operator properties tabs of the Properties view for this adapter.

When using this adapter in a StreamSQL program with the APPLY JAVA statement, you may refer to property names using the StreamSQL parameter name, also included in the Property column.

General Tab

Name: Use this required field to specify or change the name of this instance of this component, which must be unique in the current EventFlow module. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Adapter: A read-only field that shows the formal name of the adapter.

Class name: Shows the fully qualified class name that implements the functionality of this adapter. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start with application: If this field is set to Yes (default) or to a module parameter that evaluates to true, this instance of this adapter starts as part of the JVM engine that runs this EventFlow module. If this field is set to No or to a module parameter that evaluates to false, the adapter instance is loaded with the engine, but does not start until you send an sbadmin resume command, or until you start the component with StreamBase Manager.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Operator Properties Tab

Each operator in the JMS suite (Consumer, Producer, Connect, Ack, Commit) has its own set of properties to be configured:

Property Operators Description Default
Server Name (required string)

StreamSQL name: serverName

Consumer
Producer
Request/Reply
Connect
Ack
Commit
The name of the JMS server (as defined in the configuration settings) to which to connect. None
Destination Definition (required string)

StreamSQL name: destinationType

Consumer
Producer
Whether the destination to which to connect is predefined in the configuration (Preconfigured) or will dynamically specified at runtime (Dynamic). If the latter is chosen, an input port is automatically added to the operator to receive subscription requests to connect to destinations on the fly. Preconfigured
Configured Destination Name (required string)

StreamSQL name: destinationName

Consumer
Producer
If Preconfigured is selected above, this designates the name of the JMS destination (as defined in the configuration settings) to which to connect.

If Dynamic is selected in the previous row, this can be set to a preconfigured destination to serve as the settings blueprint for dynamic subscriptions. For example, if the preconfigured destination has an acknowledgement mode of CLIENT_ACKNOWLEDGE, the operator uses that setting when subscribing dynamically to a destination, unless the setting is overridden in the subscription tuple. If no blueprint destination is specified, default values for all destination settings are used unless overridden by the subscription tuple.

None
Configured Destination Name For Requests (required string)

StreamSQL name: destinationName

Request/Reply
Designates the name of the JMS destination (as defined in the configuration settings) to which to connect for sending requests. None
Configured Destination Name For Replies (required string)

StreamSQL name: replyDestinationName

Request/Reply
Designates the name of the JMS destination (as defined in the configuration settings) to which to connect for waiting for replies. None
Connect On Startup (optional)

StreamSQL name: connectOnStartup

Consumer
Producer
Request/Reply
Connect
When set, the adapter attempts to connect to the configured servers when the applications is started. Otherwise the adapter waits for an explicit connect command before attempting connection. See Input Port Schemas. True
Enable Status Port (optional)

StreamSQL name: enableStatusPort

Consumer
Producer
If selected, this option causes the adapter to expose a new output port used to send status tuples when interesting events occur (such as error messages from the server, or notification of JMS messages sent).

The schema and semantics of this port are described below.

False
Timeout For Replies (in ms) (optional int)

StreamSQL name: replyTimeoutInMS

Request/Reply
The maximum number of milliseconds to wait for a reply to a given request before timing out. None
Number Of Concurrent Processing Threads (optional int)

StreamSQL name: numProcessingThreads

Request/Reply
Each outgoing request sent to the operator will be queued up to be processed by a worker thread. This value specifies the number of such threads to use. None
Max # Of Messages In Flight (optional)

StreamSQL name: maxMsgsInFlight

Consumer
If set, this specifies the maximum number of messages that are allowed to go unacknowledged before the consumer stop reading in new messages from the server. This in effect creates back-pressure on the JMS server, which may be useful in some use cases. Once some of the messages currently in flight in the application are acknowledged (using the JMSAck operator), message consumption resumes. Leaving this setting empty or setting it to -1 means the operator will keep reading messages from the server regardless of the number unacknowledged messages.

This setting is only meaningful if a manual acknowledgement mode is used, such as CLIENT_ACKNOWLEDGE or TIBCO EMS' EXPLICIT_CLIENT_ACKNOWLEDGE.

No value, which means "no limit"
Max # Of Messages In Flight (optional)

StreamSQL name: maxMsgsInFlight

Request/Reply
If set, this specifies the maximum number of messages that are allowed to go unacknowledged before the consumer stop reading in new messages from the server. This in effect creates back-pressure on the JMS server, which may be useful in some use cases. Once some of the messages currently in flight in the application are acknowledged (using the JMSAck operator), message consumption resumes. Leaving this setting empty or setting it to -1 means the operator will keep reading messages from the server regardless of the number unacknowledged messages.

This setting is only meaningful if a manual acknowledgement mode is used, such as CLIENT_ACKNOWLEDGE or TIBCO EMS' EXPLICIT_CLIENT_ACKNOWLEDGE.

No value, which means "no limit"
Emit Tuples Synchronously (optional)

StreamSQL name: synchronousOperation

Consumer
By default the Consumer will process incoming messages as quickly as possible, without waiting for the resulting tuples to finish processing before dequeueing more JMS messages.

However it may be desirable to send tuples in a synchronous manner, so that it gets fully processed by the StreamBase application before a new JMS message is processed from the server. This allows the application to implicitly apply backpressure on the server should the incoming message rate become too high.

Note

Using this setting to force synchronous operation may cause issues with some JMS providers if their client API does not support re-entrancy. Your application may experience deadlocks if synchronous operation is selected and you attempt for example to explicitly acknowledge a message downstream (since this causes another call into the JMS client's API. TIBCO EMS is known to support re-entrancy so this setting will not pose a problem if you use it; however you will need to experiment to ensure your JMS provider of choice also works correctly if you elect to enable this setting.

false (i.e. send tuples asynchronously)
Publish Null Fields (optional)

StreamSQL name: publishNulls

Producer
Request/Reply
By default, if a field of an incoming tuple is null, it is ignored and not included in the JMS message to be sent. If this option is selected, a corresponding field is added to the JMS message with a value of null. False
Emit "Message Sent" Status Tuples (optional)

StreamSQL name: emitMsgSentTuples

Producer
If selected, this option causes the adapter to emit a tuple on the status port every time a JMS message is successfully sent. False
Log Level (optional)

StreamSQL name: logLevel

Consumer
Producer
Request/Reply
Connect
Ack
Commit
Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level is used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE, and ALL. INFO

Edit Schemas Tab

Use the Edit Schema tab to specify the schema of the output tuple for this adapter. For general instructions on using the Edit Schema tab, see the Properties: Edit Schema Tab section of the Defining Input Streams page.

The Edit Schemas tab appears only in the Properties view of the JMS or EMS Consumer, or JMS or EMS Request/Reply operators.

Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Configuration

You can edit the list of JMS server and destination connections used by the JMS operators by using the JMS Configuration Editor. To do this, click the Edit button next to the Server Name setting of any operator in the JMS suite.

To import server and destination definitions from a previous StreamBase application that was using the Legacy JMS Adapters, use the Import button next to the Server Name setting of any operator in the JMS suite. This lets you select an old .jmsconf file to translate its configuration settings into the new format.

The following sections show some typical basic settings for connecting to JMS servers using JNDI for TIBCO EMS, Apache ActiveMQ, IBM WebSphere, and Solace Systems JMS Providers.

Configuration Example: TIBCO EMS, No Authentication

TIBCO EMS, where connections are made without authentication.

  • JNDI Context Factory: com.tibco.tibjms.naming.TibjmsInitialContextFactory

  • JNDI Provider URL: tibjmsnaming://localhost:7222

  • JNDI Connection Factory: ConnectionFactory

Configuration Example: TIBCO EMS, Simple Authentication

TIBCO EMS with connections made using StreamBase simple authentication.

  • The tibemsd.conf file contains:

    authorization = enabled
    user_auth     = local
  • The users.conf file contains one or more lines in the following format:

    username:enciphered-password:description

    Generate an enciphered password with the tibemsadmin command:

    set password username password
  • Connection Type:

    • JNDI Context Factory: com.tibco.tibjms.naming.TibjmsInitialContextFactory

    • JNDI Provider URL: tibjmsnaming://localhost:7222

    • JNDI Connection Factory: ConnectionFactory

  • Advanced JNDI Options:

    • Security Principal: username

    • Security Credentials: password

  • Connection Options:

    • Username: username

    • Password: password

Configuration Example: TIBCO EMS, SSL Connection

TIBCO EMS (SSL Connection). Connections via SSL are vendor-specific so settings will vary; here are typical values for TIBCO EMS:

  • JNDI Context Factory: com.tibco.tibjms.naming.TibjmsInitialContextFactory

  • JNDI Provider URL: tibjmsnaming://localhost:7222

  • JNDI Connection Factory: SSLConnectionFactory

  • Advanced JNDI Options:

    • Security Principal: YourUserName

    • Security Credentials: YourPassword

    • Initial Context Builder: Leave blank if using the EMS adapter, or set to com.streambase.sb.adapter.jms2.TIBCOEMSJNDIInitialContextBuilder if using the JMS adapter

    • Additional Properties: One or more of the following, as required by your EMS server's configuration:

      • java.naming.factory.url.pkgs: com.tibco.tibjms.naming

      • com.tibco.tibjms.naming.security_protocol: ssl

      • com.tibco.tibjms.naming.ssl_vendor: j2se or entrust6

      • com.tibco.tibjms.naming.ssl_trusted_certs: certFile1;certfile2

      • com.tibco.tibjms.naming.ssl_identity: identityFileName

      • com.tibco.tibjms.naming.ssl_password: passwordOrFileName

      • com.tibco.tibjms.naming.ssl_key: fileName

      • com.tibco.tibjms.naming.ssl_auth_only: true or false

      • com.tibco.tibjms.naming.ssl_enable_verify_host: true or false

      • com.tibco.tibjms.naming.ssl_enable_verify_hostname: true or false

      • com.tibco.tibjms.naming.ssl_expected_hostname: hostName

      • com.tibco.tibjms.naming.ssl_trace: true or false

      • com.tibco.tibjms.naming.ssl_debug_trace: true or false

Configuration Example: ActiveMQ

ActiveMQ (version 5.0 and above).

  • JNDI Context Factory: org.apache.activemq.jndi.ActiveMQInitialContextFactory

  • JNDI Provider URL: tcp://localhost:61616

  • JNDI Connection Factory: ConnectionFactory

Configuration Example: IBM WebsSphere

IBM WebSphere Application Server version 6.1's default JMS implementation.

  • JNDI Context Factory: com.ibm.websphere.naming.WsnInitialContextFactory

  • JNDI Provider URL: iiop://localhost:2809

  • JNDI Connection Factory: jms/MyConnFactory

  • Initial Context Builder: com.streambase.sb.adapter.jms2.WASJNDIInitialContextBuilder

Configuration Example: Solace

Solace default JMS implementation.

  • JNDI Context Factory: com.solacesystems.jndi.SolJNDIInitialContextFactory

  • JNDI Provider URL: smf://localhost:1234

  • JNDI Connection Factory: MyConnFactory

  • Security Principal: MySecurityPrincipal

  • Security Credentials: MySecurityCredentials

  • Initial Context Builder: com.streambase.sb.adapter.jms2.SolaceJNDIInitialContextBuilder

  • Username: MyUserName

  • Password: MyPassword

  • Additional Properties:

    Name Value
    Solace_JMS_OptimizeDirect false
    Solace_JMS_VPN MyJmsVpn

Operator Port Schemas

These sections describe the schemas for the input and output ports of the five JMS and five TIBCO EMS operators.

Input Port Schemas

Request/Reply Operator

The JMS Request/Reply operator has one input port to receive outgoing JMS messages. The schema for this port describes the JMS message to be sent, and is discussed in detail in the Message Translation section above.

Producer Operator

The JMS Producer operator has one input port to receive outgoing JMS messages. The schema for this port describes the JMS message to be sent, and is discussed in detail in the Message Translation section above.

If the operator's Destination Definition setting is set to Dynamic, a second input port is added to receive destination subscription commands. The schema for this port can include the following fields:

Field Name Type Description Required
command string Either subscribe or unsubscribe. Yes
destinationName string The name of the JMS destination to which this command applies. This should not be null unless the isTempDestination field is set to true, or the destinationID field is set. No
messageConverter string Indicates the fully qualified name of the ToJMSMessageConverter implementation to use to convert Tuples to JMS messages. For more information and possible values, see Message Converter Classes above.

The default value of this setting is com.streambase.sb.adapter.jms2.converters.DefaultToJMSMapMessageConverter.

No
messageConverterCustomSettings string Contains extra information to be passed along to the message converter class when it is instantiated. The value of this settings is not examined by the operator in any way and is passed verbatim to the converter class. No
isTopic bool Whether this destination is a Topic (true) or a Queue (false). Default value is false. No
isTransacted bool Whether this destination should be connected using a transacted session. Default value is false.

When this value is set to true, the status tuple that will be emitted by the operator to confirm the subscription will also contain a non-null value for the sessionID field to uniquely identify this transacted session. This value can be passed along to other Consumers and Producers downstream when subscribing to their destinations to signify that this session should be used (and thus that these operators are part of the same transacted session).

No
isTempDestination bool Whether this destination is temporary. Default value is false. No
subscriberName string If this is non-null, a durable subscription will be made using the value as a subscription name. Otherwise a normal (non-durable) subscription will be made. No
timestampFormat string The time format (specified using a standard Java DateTime format string) used to convert Tuple Timestamp fields into strings. Default value is "MM/dd/yyyy hh:mm:ss aa". No
timeToLive int The length of time in milliseconds from its dispatch time that a produced message should be retained by the message system. No
deliveryMode string The delivery mode to use with this destination. Available values are:
  • PERSISTENT

  • NON_PERSISTENT

  • RELIABLE_DELIVERY (TIBCO EMS only)

Default value is NON_PERSISTENT.

No
priority int The producer's priority. The JMS API defines ten levels of priority value, with 0 as the lowest priority and 9 as the highest. Default value is 4. No
disableMsgID bool Whether message IDs are set or not in the outgoing message. Default is false. No
disableMsgTimestamp bool Whether message timestamps are set or not in the outgoing message. Default is false. No
destinationID string Used to subscribe to the destination specified in a previously-received JMS message's JMSReplyTo field. This is used to send reply messages in Request-Reply scenarios. Default is null. No
sessionID string Used to specify the session to use when subscribing to the destination. See isTransacted above. Default is null. No

Unless otherwise indicated above, any of the non-required fields can be omitted from the schema (or left null). In that case, if the operator's Configured Destination Name property is specified, the corresponding value is used for this field; if not, a reasonable default is provided.

Consumer Operator

By default, the JMS Consumer has no input port. But if its Destination Definition setting is set to Dynamic, an input port is added to receive destination subscription commands. The schema for this port can include the following fields:

Field Name Type Description Required
command string Either subscribe or unsubscribe. Yes
destinationName string The name of the JMS destination to which this command applies. This should not be null unless the isTempDestination field is set to true, or the destinationID field is set. No
messageConverter string Indicates the fully qualified name of the FromJMSMessageConverter implementation to use to convert JMS messages to Tuples. For more information and possible values, see Message Converter Classes above.

The default value of this setting is com.streambase.sb.adapter.jms2.converters.DefaultFromJMSMapMessageConverter.

No
messageConverterCustomSettings string Contains extra information to be passed along to the message converter class when it is instantiated. The value of this settings is not examined by the operator in any way and is passed verbatim to the converter class. No
isTopic bool Whether this destination is a Topic (true) or a Queue (false). Default value is false. No
selector string The selector expression to use when subscribing to the destination. This value is passed verbatim to the JMS API. Default value is null. No
isTransacted bool Whether this destination should be connected using a transacted session. Default value is false.

When this value is set to true, the status tuple that will be emitted by the operator to confirm the subscription will also contain a non-null value for the sessionID field to uniquely identify this transacted session. This value can be passed along to other Consumers and Producers downstream when subscribing to their destinations to signify that this session should be used (and thus that these operators are part of the same transacted session).

No
isTempDestination bool Whether this destination is temporary. Default value is false. No
subscriberName string If this is non-null, a durable subscription will be made using the value as a subscription name. Otherwise a normal (non-durable) subscription will be made. No
timestampFormat string The time format (specified using a standard Java DateTime format string) used to convert JMS time values to Tuple Timestamp fields. Default value is "MM/dd/yyyy hh:mm:ss aa". No
ackMode string The acknowledgement mode to use with this destination. Available values are:
  • AUTO_ACKNOWLEDGE

  • CLIENT_ACKNOWLEDGE

  • DUPS_OK_ACKNOWLEDGE

  • NO_ACKNOWLEDGE (TIBCO EMS only)

  • EXPLICIT_CLIENT_ACKNOWLEDGE (TIBCO EMS only)

  • EXPLICIT_CLIENT_DUPS_OK_ACKNOWLEDGE (TIBCO EMS only)

  • INDIVIDUAL_ACKNOWLEDGE (Apache ActiveMQ only)

Default value is AUTO_ACKNOWLEDGE.

No
destinationID string Used to subscribe to the destination specified in a previously-received JMS message's JMSReplyTo field. This is used to send reply messages in Request-Reply scenarios. Default is null. No
sessionID string Used to specify the session to use when subscribing to the destination. See isTransacted above. Default is null. No

Unless otherwise indicated above, any of the non-required fields can be omitted from the schema (or left null). In that case, if the operator's Configured Destination Name property was specified, the corresponding value is used for this field; if not, a reasonable default is provided.

Connect Operator

This operator has one input port with the following field:

Field Name Type Description Required
command string Either connect or disconnect. Yes

Ack Operator

This operator has one input port with the following field:

Field Name Type Description Required
messageAckID string The acknowledgement ID of the message to be acknowledged. Take this value from a JMS Consumer operator's output tuple field of the same name when a message is received. Yes

Commit Operator

This operator has one input port with the following fields:

Field Name Type Description Required
command string Either commit or rollback. Yes
sessionID string The ID of the session to commit or roll back. Take the value for this field from the sessionID field of the Destination Subscription Successful status output tuple of a JMS Consumer or Producer operator. Yes

Output Port Schemas

Request/Reply Operator

The JMS Request/Reply operator has two output ports, the first of which is used to emit incoming JMS messages. The schema for this port describes the received JMS message, and is discussed in detail in the Message Translation section above. If no reply was received within the specified timeout period, a tuple will be emitted with all fields null except for jmsHeader.jmsCorrelationID.

The second output port allows the operator to send status tuples when interesting events occur (such as when an error occurs, for example). The schema for this port has the following fields:

Field Name Type Description Always Set?
status string The event for which this tuple was produced. Valid values include:
  • Connected: The operator successfully connected to the JMS server.

  • Disconnected: The operator successfully disconnected to the JMS server.

  • Connection Lost: The operator's connection to the JMS server was interrupted unexpectedly.

  • JMS Server Exception: An exception was received from the JMS server. The details of the exception are listed in the extraInfo field.

  • JMS Server Status Received: The JMS server issued a status message. The details of this message are listed in the extraInfo field.

  • Destination Subscription Successful: The operator successfully subscribed to the JMS destination.

  • Destination Subscription Failed: The operator failed to subscribe to the JMS destination. The details of this error are listed in the extraInfo field.

  • Error Consuming Message: An error prevented the Consumer from reading a JMS message. The details of this error are listed in the extraInfo field.

Yes
serverName string The name of the JMS server to which this status applies. No
destinationName string The name of the JMS destination to which this status applies. No
destinationID string The ID of the JMS destination to which this status applies. This will only be set when dynamically subscribing to destinations. No
sessionID string The ID of the JMS session to which this status applies. This will only be set when using transacted sessions. No
messageID string Unique identifies the JMS message to which this status applies. If a manual acknowledgement mode was specified when subscribing to the destination, this value can be used later as input to an Ack operator to acknowledge this message. No
extraInfo list(tuple(string name,string value)) Contains additional information about this status message. No
originalCommand tuple Contains the tuple that was sent to the Subscription input port of the operator to trigger this status tuple. No
originalMessage tuple Unused; always null. No

Producer Operator

By default the JMS Producer operator has no output ports.

If the operator's Enable Status Port setting is selected, an output port is added to allow the operator to send status tuples when interesting events occur (such as when a destination subscription succeeds or fails, for example). The schema for this port has the following fields:

Field Name Type Description Always Set?
status string The event for which this tuple was produced. Valid values include:
  • Connected: The operator successfully connected to the JMS server.

  • Disconnected: The operator successfully disconnected to the JMS server.

  • Connection Lost: The operator's connection to the JMS server was interrupted unexpectedly.

  • JMS Server Exception: An exception was received from the JMS server. The details of the exception are listed in the extraInfo field.

  • JMS Server Status Received: The JMS server issued a status message. The details of this message are listed in the extraInfo field.

  • Destination Subscription Successful: The operator successfully subscribed to the JMS destination.

  • Destination Subscription Failed: The operator failed to subscribe to the JMS destination. The details of this error are listed in the extraInfo field.

  • Message NOT Sent: An error prevented the Producer from sending a JMS message. The details of this error are listed in the extraInfo field.

  • Message Conversion Failed: The operator could not convert the incoming tuple to a JMS message. The details of this error are listed in the extraInfo field.

  • Message Sent: The operator successfully sent a JMS message. These status tuples will only be generated if the operator's Emit 'Message Sent' Status Tuples property is enabled.

Yes
serverName string The name of the JMS server to which this status applies. No
destinationName string The name of the JMS destination to which this status applies. No
destinationID string The ID of the JMS destination to which this status applies. This will only be set when dynamically subscribing to destinations. No
sessionID string The ID of the JMS session to which this status applies. This will only be set when using transacted sessions. No
messageID string Unique identifies the JMS message to which this status applies. Used exclusively for the "Message Sent" status tuples. No
extraInfo list(tuple(string name,string value)) Contains additional information about this status message. No
originalCommand tuple Contains the tuple that was sent to the Subscription input port of the operator to trigger this status tuple. No
originalMessage tuple Contains the tuple that was sent to the Message input port of the operator to trigger this status tuple. No

Consumer Operator

The JMS Consumer operator has one output port to emit incoming JMS messages. The schema for this port describes the received JMS message, and is discussed in detail in the Message Translation section above.

If the operator's Enable Status Port setting is set, an output port is added to allow the operator to send status tuples when interesting events occur (such as when a destination subscription succeeds or fails, for example). The schema for this port has the following fields:

Field Name Type Description Always Set?
status string The event for which this tuple was produced. Valid values include:
  • Connected: The operator successfully connected to the JMS server.

  • Disconnected: The operator successfully disconnected to the JMS server.

  • Connection Lost: The operator's connection to the JMS server was interrupted unexpectedly.

  • JMS Server Exception: An exception was received from the JMS server. The details of the exception are listed in the extraInfo field.

  • JMS Server Status Received: The JMS server issued a status message. The details of this message are listed in the extraInfo field.

  • Destination Subscription Successful: The operator successfully subscribed to the JMS destination.

  • Destination Subscription Failed: The operator failed to subscribe to the JMS destination. The details of this error are listed in the extraInfo field.

  • Error Consuming Message: An error prevented the Consumer from reading a JMS message. The details of this error are listed in the extraInfo field.

Yes
serverName string The name of the JMS server to which this status applies. No
destinationName string The name of the JMS destination to which this status applies. No
destinationID string The ID of the JMS destination to which this status applies. This will only be set when dynamically subscribing to destinations. No
sessionID string The ID of the JMS session to which this status applies. This will only be set when using transacted sessions. No
messageID string Unique identifies the JMS message to which this status applies. If a manual acknowledgement mode was specified when subscribing to the destination, this value can be used later as input to an Ack operator to acknowledge this message. No
extraInfo list(tuple(string name,string value)) Contains additional information about this status message. No
originalCommand tuple Contains the tuple that was sent to the Subscription input port of the operator to trigger this status tuple. No
originalMessage tuple Unused; always null. No

Connect Operator

By default the JMS Connect operator has no output ports.

If the operator's Enable Status Port setting is set, an output port is added to allow the operator to send status tuples when interesting events occur (such as when a connection attempt succeeds or fails, for example). The schema for this port has the following fields:

Field Name Type Description Always Set?
status string The event for which this tuple was produced. Valid values include:
  • Connected: The operator successfully connected to the JMS server.

  • Disconnected: The operator successfully disconnected to the JMS server.

  • Connection Lost: The operator's connection to the JMS server was interrupted unexpectedly.

  • JMS Server Exception: An exception was received from the JMS server. The details of the exception are listed in the extraInfo field.

  • JMS Server Status Received: The JMS server issued a status message. The details of this message are listed in the extraInfo field.

Yes
serverName string The name of the JMS server to which this status applies. Yes
destinationName string Unused; always null. No
destinationID string Unused; always null. No
sessionID string Unused; always null. No
messageID string Unused; always null. No
extraInfo list(tuple(string name,string value)) Contains additional information about this status message. No
originalCommand tuple Unused; always null. No
originalMessage tuple Unused; always null. No

Ack Operator

By default the JMS Ack operator has no output ports.

If the operator's Enable Status Port setting is set, an output port is added to allow the operator to send status tuples when interesting events occur (such as when a message was successfully acknowledged, for example). The schema for this port has the following fields:

Field Name Type Description Always Set?
status string The event for which this tuple was produced. Valid values include:
  • Message Acknowledged: The operator successfully acknowledged the specified JMS message.

Yes
serverName string Unused; always null. No
destinationName string Unused; always null. No
destinationID string Unused; always null. No
sessionID string Unused; always null. No
messageID string The unique ID of the message that was acknowledged. Yes
extraInfo list(tuple(string name,string value)) Unused; always null. No
originalCommand tuple Unused; always null. No
originalMessage tuple Unused; always null. No

Commit Operator

By default the JMS Commit operator has no output ports.

If the operator's Enable Status Port setting is set, an output port is added to allow the operator to send status tuples when interesting events occur (such as when a message was successfully acknowledged, for example). The schema for this port has the following fields:

Field Name Type Description Always Set?
status string The event for which this tuple was produced. Valid values include:
  • Transaction Commit Successful: The operator successfully committed the transaction.

  • Transaction Commit Failed: An error occurred while committing the transaction.

  • Transaction Rollback Successful: The operator successfully rolled back the transaction.

  • Transaction Rollback Failed: An error occurred while rolling back the transaction.

Yes
serverName string Unused; always null. No
destinationName string Unused; always null. No
destinationID string Unused; always null. No
sessionID string The ID of the session on which the operation was made. Yes
messageID string Unused; always null. No
extraInfo list(tuple(string name,string value)) Unused; always null. No
originalCommand tuple Unused; always null. No
originalMessage tuple Unused; always null. No

Suspend and Resume Behavior

On suspend, if either the JMS Consumer or Producer is currently processing a message, it finishes processing that message or tuple to completion before suspending. That is, the StreamBase application itself finishes processing the message or tuple before the JMS operator returns from a suspend command. Once this has occurred, if any messages (or tuples for the JMS Producer) are delivered while the operator is suspended, it drops the messages or tuples and fails to acknowledge them.

Legacy JMS and EMS Adapters

The JMS and EMS Input Adapter and Output Adapter from StreamBase release 7.5.3 and earlier are still present but deprecated in release 7.5.4 and later. Your StreamBase applications that use these adapters still compile and run under current StreamBase Server releases.

TIBCO strongly recommends migrating your JMS and EMS applications to the new operator suite so that you can take advantage of their greater flexibility. However, if you must maintain an application that continues to use the 7.5.3 and earlier adapters, see Using Legacy JMS and EMS Adapter Configurations for further details.