JMS Input and Output Adapters

Four JMS Adapters

This topic describes the StreamBase embedded adapters for JMS, which allow StreamBase to integrate with a JMS-compliant message bus. The Java Message Service (JMS) API is a Java message-oriented middleware API for sending messages between two or more clients.

Note

StreamBase provides four JMS-related adapters:

  • JMS Embedded Input Adapter

  • JMS Embedded Output Adapter

  • JMS External Input Adapter

  • JMS External Output Adapter

This page documents the two embedded JMS adapters, which are installed as part of the base StreamBase installation. By contrast, the two external JMS adapters are installed with a separate installation kit. For information on the two external adapters, see JMS External Adapters.

Introduction to JMS

The StreamBase JMS adapters work with a JMS-compliant message bus. Java Message Service (JMS) is an API defined by Sun Microsystems (now Oracle Corporation) for invoking operations on enterprise messaging systems. The JMS specification allows for multiple implementations called JMS providers. The StreamBase JMS adapter works with the JMS implementation of any JMS provider, but must be configured differently for each provider. This document uses configuration examples from four JMS providers:

  • TIBCO Enterprise Messaging Service™ (EMS)

  • Apache ActiveMQ

  • IBM WebSphere Application Server

  • Solace Systems

If you are connecting to a JMS server from another JMS provider, you must adapt the configuration examples and discussion on this page for the particulars of your JMS provider.

Adapter Introduction

The JMS Embedded Input Adapter is called the JMS Reader on this page. The JMS Reader is an embedded adapter that subscribes to messages from a JMS message bus.

The JMS Embedded Output Adapter is called the JMS Writer on this page. The JMS Writer is an embedded adapter that publishes messages to a JMS message bus.

An embedded adapter is an adapter that runs in the same process as StreamBase Server. The JMS Reader listens for messages from a JMS Message bus. When it receives a message, the JMS Reader creates a tuple from the message, and then sends that tuple to the operator that is downstream from it in its StreamBase application. Conversely, the JMS Writer receives tuples from StreamBase Server, creates a JMS message from it, and publishes it to the JMS Message bus.

The JMS Embedded Adapters serialize the messages they receive. That is, the StreamBase Server will process messages in the same order as they are initially received by the JMS Reader, and the JMS broker will be sent messages in the same order that the corresponding tuples are received by the JMS Writer.

Connection To JNDI And JMS Servers

By default, the JMS adapters only try to connect to a JNDI or JMS server one time before giving up. However, you can configure the adapter to retry the connection a certain number of times at set intervals before giving up. To do this, set the server-num-retries and server-reconnect-interval attributes of the relevant <jms-server> section in the adapter's configuration file. The server-num-retries attribute controls the number of times the adapter will try to connect before giving up. The default value of this attribute is 0, which specifies no retries; the maximum value is MAX_INT. The server-reconnect-interval attribute controls the number of seconds to wait between connection attempts. The default value is 30 seconds.

Note that these values apply to both the JNDI and JMS servers configured in this <jms-server> section.

This functionality, used in conjunction with the Command Input and Status Output ports described here, can be used to implement logic to fully control the adapter's connection logic to the JMS servers it is configured to use.

Message Translation

When the JMS Reader receives a message, it translates this message into a Tuple and when the JMS Writer receives a Tuple, it translates it into a JMS message. There are five JMS message types: BytesMessage, MapMessage, ObjectMessage, StreamMessage and TextMessage. Of these types, MapMessage is the one into which tuples translate most readily. Like a tuple, a MapMessage is a set of name-value pairs, making the translation of these two message types relatively straightforward. The JMS Reader includes a class that converts JMS MapMessages to StreamBase tuples, DefaultFromJMSMapMessageConverter. Similarly the JMS Writer adapter provides DefaultToJMSMapMessageConverter to translate from a Tuple to a JMS MapMessages.

StreamBase tuples and JMS MapMessages are essentially collections of name-value pairs. The JMS Reader pairs the fields held by a JMS MapMessage with fields in a StreamBase schema that have the same names and compatible types. If the name of a field in the MapMessage doesn't exist in the StreamBase schema then that field is dropped. Similarly, if the name of a field in the Schema doesn't exist in the MapMessage then there is no input value for that field and it is nulled.

In the case of the JMS Writer, all fields in the incoming Tuple are added to the MapMessage.

An interface is used to represent a class that can convert a JMS Message to a StreamBase tuple. This interface, com.streambase.sb.adapter.common.jms.enqueue.FromJMSMessageConverter, includes a method, Object convertFromJMSMessage(Message message) throws StreamBaseException that provides the implementation of the translation. A different translation class can be implemented, and the JMS Reader can then be configured to use this implementation. In this way, a converter to translate one of the other JMS message types can be specified, or an alternate implementation of a MapMessage converter can be specified.

As expected, the JMS Writer also provides a similar interface, com.streambase.sb.adapter.common.jms.dequeue.ToJMSMessageConverter which includes the Message convertToJMSMessage(Object object) throws StreamBaseException method to do the opposite conversion.

Mapping Names, Headers and Properties

A JMS message consists of two primary pieces, a header and a payload (body). In addition, properties can be set on a JMS message.

By default, the JMS Embedded Adapters map the fields of a MapMessage to the fields of a StreamBase schema. The JMS Reader looks for entries in the payload of a MapMessage that have the same names as the fields of the StreamBase schema. The JMS Writer simply creates entries in the outgoing Message to match all the fields of the incoming StreamBase schema.

The JMS Embedded Adapters also provide facilities for handling other mappings, specified in the configuration file. For example, a name map can be used to map a field on a MapMessage to a field on a StreamBase schema that has a different name. Similarly, the fields in the header of a JMS message can be mapped to the fields of a StreamBase schema. In the same way, JMS properties can also be mapped to the fields of a StreamBase schema.

Note

Some JMS message headers are read-only. As such, the JMS Writer will not be able to change them even if there is an entry to that effect in the header map. When configuring header maps for use by the JMS Writer, keep in mind that only the following headers are mutable:

  • JMSCorrelationID

  • JMSReplyTo

  • JMSType

An attempt to map any other headers for the JMS Writer results in an error.

Because the JMS Reader simply reads the JMS headers, all headers can be mapped.

The JMS Reader and Acknowledge Modes

The JMS Reader consumes messages from JMS destinations. The manner in which these messages are acknowledged is configurable within the JMS Reader. The setting for the acknowledge mode can impact the performance of the reader.

The JMS Reader supports acknowledge modes of AUTO_ACKNOWLEDGE and DUPS_OK_ACKNOWLEDGE. The reader also supports the use of TIBCO's proprietary acknowledge mode, NO_ACKNOWLEDGE. Of course, use of this acknowledge mode implies that TIBCO's EMS message bus is being used with the reader. The acknowledge mode for a given JMS server is specified in the jms-server section of the configuration file, using the attribute acknowledge-mode.

Note also that the JMS Reader makes no effort to prevent or in any way handle the delivery of duplicate JMS messages. If a JMS server delivers the same message twice, the JMS Reader will simply consume it twice.

The JMS Writer and Delivery Modes

The JMS Writer will create JMS messages and deliver them to a JMS message bus. JMS messages can be delivered with either of two delivery modes, DeliveryMode.PERSISTENT or DeliveryMode.NON_PERSISTENT. The delivery mode for a JMS destination is specified in the destination section of the configuration file, using the delivery-mode attribute.

The semantics of JMS delivery modes is beyond the scope of this discussion. In general, a delivery mode of PERSISTENT implies a higher quality of service for the JMS destination. Some performance penalty is also implied, however. In particular, if a delivery mode of PERSISTENT is specified, and there are durable subscribers for the message, then the JMS message will need to be persisted before the JMS server can acknowledge the message producer.

Note

for TIBCO EMS users: an additional EMS-specific delivery mode is supported, RELIABLE_DELIVERY.

Destination Routing for the JMS Writer

The JMS Writer dequeues tuples from StreamBase streams, translates them into JMS messages, and routes them to JMS destinations. The adapter relies on two fields in the dequeued tuple to determine which destination on which JMS server to send the translated message. The two fields must be called __JMSServerName and __JMSDestinationName. These fields must contain the name of a configured JMS server and destination, respectively. These fields are optional if only one destination is specified in the configuration; they are required if the configuration specifies more than one destination (that is, more than one server or more than one destination in a single server).

Using a JMS Implementation

When one of the JMS Embedded Adapters executes, it needs to connect to an executing instance of a JMS message bus. Typically, this bus will be from one of the JMS providers. In order to connect to the JMS provider's JMS bus, JAR files supplied by the JMS provider must be made available to the JMS Adapters. This is true whether the adapter is executing from within StreamBase Studio or at a command-line invocation of StreamBase Server:

  • If the adapter is started from Studio, then the vendor's JMS JAR file or files must be imported into the Studio project or added to the Java Build Path of the Studio project.

  • If the adapter is executed from a command-line invocation of StreamBase Server, then the location of the vendor's JAR file or files must be specified in the java-vm section of the server's configuration file.

  • As an alternative, you can instead copy the JAR files to streambase-install-dir/lib/ext, which will make them available to both StreamBase Studio and StreamBase Server.

Examples for ActiveMQ users:

The following JAR files must be made available to StreamBase Studio and/or to StreamBase Server in order to successfully connect to an ActiveMQ server:

  • activemq_installdir/lib/activemq-core-5.0.0.jar

  • activemq_installdir/lib/geronimo-j2ee-management_1.0_spec-1.0.jar

  • activemq_installdir/lib/commons-logging-1.1.jar

Examples for TIBCO EMS users:

The following JAR file must be made available to StreamBase Studio and/or to StreamBase Server in order to successfully connect to a TIBCO EMS server:

  • TIBCO_EMS_installdir/ems/clients/java/tibjms.jar

Examples for Solace JMS users:

The following JAR files are part of the StreamBase installation kit and can be used to successfully connect to a Solace JMS server:

  • STREAMBASE_installdir/lib/ext/commons-lang-2.2.jar

  • STREAMBASE_installdir/lib/ext/sol-common-5.1.1.1.jar

  • STREAMBASE_installdir/lib/ext/sol-jcsmp-5.1.1.1.jar

  • STREAMBASE_installdir/lib/ext/sol-jms-5.1.1.1.jar

Note

Because the Solace JMS JAR files are already part of the StreamBase installation kit there is no need to add them to your StreamBase Studio project or to explicitly make them available to StreamBase Server.

Important

Regardless of the JMS vendor used, you must also provide a copy of jms.jar. This file should be included in your JMS vendor's product installation alongside its other JAR files such as those mentioned above.

Adapter Properties

Settings are used to control most of the behavior of the adapter. The settings are grouped under several tabs in the Properties view in StreamBase Studio.

In the table in this section, the Property column shows each property name as found in the one or more adapter properties tabs of the Properties view for this adapter.

When using this adapter in a StreamSQL program with the APPLY JAVA statement, you may refer to property names using the StreamSQL parameter name, also included in the Property column.

General Tab

Name: Use this field to specify or change the component's name, which must be unique in the application. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Adapter Name: A read-only field that shows the formal name of the adapter.

Class: A field that shows the fully qualified class name that implements the functionality of this adapter. Use this class name when loading the adapter in StreamSQL programs with the APPLY JAVA statement. You can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start with application: If this check box is selected, an instance of this adapter starts as part of the containing StreamBase Server. If cleared, the adapter is loaded with the server, but does not start until you send an sbadmin resume command, or start the component with StreamBase Manager. With this option cleared, the adapter does not start even if the application as a whole is suspended and later resumed. The default and recommended setting is selected.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports and Error Streams to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Adapter Properties Tab

There is only one required property for either of the JMS Embedded Adapters: its configuration file. Configuration of the adapter is fairly involved, and so is not accomplished through its properties, but rather through its configuration file. Keep in mind that just the name of the configuration file, rather than its path, should be specified in the property. The named file will be searched for using the path specified in the configuration for StreamBase itself.

There are also two optional properties to control whether the adapter should accept JMS connection/disconnection commands from the application and whether status tuples should be issued when JMS servers or destinations become available or unavailable.

The JMS Reader is similar to an input stream that gets its input from a JMS Message bus. As with an input stream, a Schema needs to be specified for the JMS Reader. The Schema used by the JMS Reader is specified in the Edit Schema tab of the Properties View in StreamBase Studio.

Property Description Default
Configuration File (required string)

StreamSQL name: ConfigFile

The name of the adapter configuration file. None
Log On When Application Starts (optional)

StreamSQL name: LogonAtStartup

When set, the adapter will attempt to connect to the configured server(s) when the applications is started. Otherwise the adapter will wait for an explicit "Connect" command (see Command Input Port Schema) before attempting connection. True
Add Command Input Port (optional)

StreamSQL name: UseCommandInputPort

If checked, this option will cause the adapter to expose a new input port that will be used to receive commands from the application.

The schema and semantics of this port are described below.

None
Add JMS Status OutputPort (optional)

StreamSQL name: UseStatusOutputPort

If checked, this option causes the adapter to expose a new output port used to send status tuples when interesting events occur on the configured JMS server(s).

The schema and semantics of this port are described below.

None
Log Level (optional)

StreamSQL name: logLevel

Controls the level of verbosity the adapter uses to issue informational traces to the console. This setting is independent of the containing application's overall log level. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE, and ALL. INFO
Log Full Callstacks (optional)

StreamSQL name: LogFullCallstacks

When set, the adapter will print full call stacks when an exception happens, regardless of the Log Level set above. Otherwise only an exception's message will be printed unless the Log Level is DEBUG. False

The JMS Embedded Adapters' Configuration File

The JMS Embedded Adapters are configured using an XML configuration file. This configuration file consists of two types of information, JMS configuration and configuration concerning the translation of JMS messages to tuples and back.

It may be that many JMS destinations need to be specified in the configuration file for the JMS Embedded Adapters. Where applicable, it is possible to specify a default value for a configuration attribute of a destination. The default can then be overridden by specifying a value for a particular destination.

A sample configuration file, skeleton.sbconf is included with the JMS Reader sample that is included in the StreamBase distribution. This file includes comments describing the various XML elements and attributes that make up the configuration. This sample configuration file can be used as a template for configuration files. The skeleton configuration file can also be obtained by executing the jmsreader.jar JAR file. For Linux:

java -jar streambase-install-dir/lib/adapter/jmsreader.jar

For Windows:

java -jar "streambase-install-dir\lib\adapter\jmsreader.jar"

Similarly, the JMS Writer has a sample configuration file which can be displayed by executing the jmswriter.jar JAR file. For Linux:

java -jar streambase-install-dir/lib/adapter/jmswriter.jar

For Windows:

java -jar "streambase-install-dir\lib\adapter\jmswriter.jar"

Configuration Examples

Here are some typical settings for connecting to JMS servers from TIBCO EMS, ActiveMQ, IBM WebSphere, and Solace Systems:

TIBCO EMS (using JNDI)

<jms-servers>
  <jms-server name="EMSServer" 
    provider-context-factory="com.naming.InitialContextFactory" 
    provider-url="tibjmsnaming://localhost:7222" 
    connection-factory-name="GenericConnectionFactory" 
    acknowledge-mode="NO_ACKNOWLEDGE"/>
</jms-servers>

ActiveMQ version 5.x (using JNDI)

<jms-servers>
  <jms-server name="ActiveMQServer" 
    provider-context-factory="org.apache.activemq.jndi.ActiveMQInitialContextFactory"
    provider-url="tcp://localhost:61616"
    connection-factory-name="org.apache.activemq.ActiveMQConnectionFactory"
    acknowledge-mode="NO_ACKNOWLEDGE"/>
</jms-servers>

IBM WebSphere Application Server version 6.1's Default JMS Implementation (using JNDI)

<jms-servers>
  <jms-server name="server1" 
    provider-context-factory="com.ibm.websphere.naming.WsnInitialContextFactory"
    provider-url="iiop://localhost:2809"
    connection-factory-name="jms/MyConnFactory"
    jndi-initial-context-builder="com.streambase.sb.adapter.common.jms.common.WASJNDIInitialContextBuilder"
    create-destinations="false"
    acknowledge-mode="AUTO_ACKNOWLEDGE"/>
</jms-servers>

Solace default JMS Implementation (using JNDI)

<jms-servers>
  <jms-server name="solaceJms" 
    provider-context-factory="com.solacesystems.jndi.SolJNDIInitialContextFactory"
    provider-url="smf://MyProviderHost:MyProviderPort"
    connection-factory-name="MyConnFactory"
    acknowledge-mode="AUTO_ACKNOWLEDGE"
    jndi-security-principal="MySecurityPrincipal"
    jndi-security-credentials="MySecurityCredentials"
    jndi-initial-context-builder="com.streambase.sb.adapter.common.jms.common.SolaceJNDIInitialContextBuilder"
    username="MyUsername"
    create-destinations="true"
    password="MyPassword" >
    <jndi-extra-properties>
      <extra-property name='Solace_JMS_OptimizeDirect' value='false'/>
      <extra-property name='Solace_JMS_VPN' value='MyJmsVpn'/>
    </jndi-extra-properties>
    <destinations>
      <destination name="topics.test1" name-map="MSFTConversion"/>
    </destinations>
  </jms-server>
</jms-servers>

Adapter Port Schemas

Command Input Port Schema

The command input port may be used by the application to request connections or disconnections to a configured JMS server, or to pause or resume processing for a JMS destination.

If the adapter is configured to expose a command input port, its schema is expected to have the following fields:

Field Name Type Description Required
server string The name of the JMS server to which this command applies. Yes
destination string The name of the JMS destination to which this command applies. This field may be left null if the command applies to the entire server instead of a single destination. Yes
command string The actual command to execute. Accepted values are described below. Note that case is unimportant. Yes

Permitted values for the command field are the following (case is unimportant):

  • Connect: Establish a connection to the specified JMS server. Attempting to connect to an already-connected JMS server has no effect. When using Connect, the destination field is ignored and can be left null.

  • Disconnect: Sever the connection to the specified JMS server. Disconnecting from an already-disconnected JMS server has no effect. When using Disconnect, the destination field is ignored and can be left null.

  • Pause: For an enqueuing adapter, JMS messages are still read from the specified destination on the specified server but are longer converted to tuples and sent to the application. For a dequeuing adapter, tuples are still dequeued from the application's stream, but are no longer converted to JMS messages and sent to the specified destination on the specified server. Pausing an already-paused destination has no effect.

  • Resume: For an enqueuing adapter, JMS messages are read from the specified destination on the specified server, and will now be converted to tuples and sent to the application. For an dequeuing adapter, tuples dequeued from the application's stream are now converted to JMS messages and sent to the specified destination on the specified server. Resuming an already-resumed destination has no effect.

  • CreateDestination: This command is only recognized by enqueuing adapters. It will cause the adapter to attempt to create the specified destination on the specified JMS server (or on all configured JMS Servers if the server field is left null). This command will only be successful if the create-destinations="true" attribute was specified in the configuration file's JMS server definition, and a default-destination section was also specified for that server (see Dynamic Endpoints).

Status Output Port Schema

The status output port can be used by the application to receive tuples describing status events for configured JMS servers (connection and disconnection) and optionally for destinations (available or unavailable.)

If the adapter is configured to expose a status output port, its schema has the following fields:

Field Name Type Description Required
server string The name of the JMS server on which this event occurred. Yes
destination string The name of the JMS destination for which this event is generated. This field may be left null if the event applies to the entire server instead of a single destination. Yes
status string The actual event that occurred. Possible values are described below. Yes
time timestamp The time at which this event was detected. Yes

Possible values for the status field are:

  • Connecting: The adapter is attempting to connect to the named JMS server. Note that the destination field is left null.

  • Connected: The named JMS server is now connected. Note that the destination field is left null.

  • Disconnected: The named JMS server has disconnected. Note that the destination field is left null.

  • Available: The named destination on the named JMS server is now available.

  • Unavailable: The named destination on the named JMS server is now unavailable.

  • Connection Failed: The connection attempt to the named JMS server failed. Note that the destination field is left null.

  • Connection Retries Exceeded: The number of connection attempts specified in the adapter's cofiguration file for the named JMS server have been exhausted. No more attempts will be made unless a new Connect command is issued (see Command Input Port Schema). Note that the destination field is left null.

Destination Monitoring

Monitoring of JMS server connections is always enabled. However, in order to receive status tuples relating to destinations becoming available or unavailable, the adapter must periodically check whether a destination is active. Because this is an intrusive process (at least minimally), this functionality is disabled by default. To enable monitoring of JMS destinations on a given JMS server, add a destination-monitor-interval attribute to this JMS server's <jms-server> section in the adapter's configuration file. The value for this attribute is the number of milliseconds the adapter should wait between destination health checks. A value of 0 disables the functionality, as does removing the attribute.

Dynamic Endpoints

By default the JMS adapters expect the endpoints (topics or queues) it will use to have been defined in the destinations section of their respective JMS server definitions in the configuration file. However, in some cases it may be desirable to have endpoints created on the fly instead (assuming the JMS server allows it). This section describes how to configure and use the JMS adapters to accomplish this.

Configuring the Adapter to Create Endpoints

First, you must tell the JMS adapter what kind of destination you wish to create by adding a default-destination entry to your JMS server's destinations section in the adapter's configuration file. The syntax to this section is identical to that of regular destination entries, except that the name attribute is ignored. You must also make sure that the JMS server's definition includes a create-destinations="true" attribute:

<jms-servers>
  <jms-server name="myJMSServer" 
    provider-context-factory="..."
    provider-url="..."
    connection-factory-name="MyConnFactory"
    acknowledge-mode="AUTO_ACKNOWLEDGE"
    jndi-security-principal="MySecurityPrincipal"
    jndi-security-credentials="MySecurityCredentials"
    jndi-initial-context-builder="..."
    create-destinations="true"
    username="MyUsername"
    password="MyPassword" >
    <destinations>
      <default-destination name-map="MyNameMap" use-topics="true"/>
      <destination name="topics.test1" name-map="MyNameMap"/>
    </destinations>
  </jms-server>
</jms-servers>

Creating Endpoints Using the Adapter

The way endpoints are created dynamically at runtime depends on whether you wish to publish or subscribe to the new endpoint — in other words, it depends on whether you are using the JMS Reader or JMS Writer adapter.

JMS Reader

To create, subscribe to, and receive JMS messages from a dynamic endpoint, you must send a CreateDestination command to the adapter's Command port (see Command Input Port Schema).

JMS Writer

To create and send JMS messages to a dynamic endpoint, send a tuple to the adapter's message port as you would normally, setting the __JMSServerName appropriately and the __JMSDestinationName to the name of the endpoint you wish to create. If the name of the destination is not found among the list of pre-configured destinations, it will create it before sending the message.

Typechecking and Error Handling

The JMS Reader performs two checks during typechecking. The JMS Reader ensures that a Schema has been defined and that a value for its configuration file has been specified.

The JMS Writer performs two checks during typechecking. It ensures that a value for its configuration file has been specified and that the input schema contains the fields __JMSServerName and __JMSDestinationName, if they are required.

If the Add Command Input Port or Add JMS Output Port options are selected, the adapters ensure that the corresponding schemas contain the expected fields. See here for a description of the expected schemas for these ports.

Suspend and Resume Behavior

On suspend, if either of the JMS Embedded Adapters is currently processing a message, it finishes processing that message or tuple to completion before suspending. That is, the StreamBase application itself finishes processing the message or tuple before the JMS adapter returns from a suspend command. Once this has occurred, the JMS Reader unsubscribes for messages. If any messages (or tuples for the JMS Writer) are delivered while the JMS adapter is suspended, it drops the messages or tuples and fails to acknowledge them.

On resume, the JMS Reader resubscribes for messages. The JMS adapters begin processing newly delivered messages/tuples once it has returned from the resume command.