Informatica Ultra Messaging Subscribing Input Adapter

Introduction

Note

This TIBCO StreamBase® Adapter for Informatica Ultra Messaging Subscribing Input was formerly known as part of the 29 West LBM adapter suite.

This adapter allows a StreamBase application to receive messages published to one or more LBM topics. The adapter is embedded in the StreamBase application and has two output ports: one that emits status tuples, and a second that emits tuples representing received LBM messages.

The adapter subscribes to one or more topics during startup, then asynchronously and continuously receives and converts incoming LBM messages into StreamBase tuples, which it sends downstream.

The adapter supports all StreamBase data types, including tuples and lists, though lists may not contain elements of type list. The schema of the LBM message output port is set through the Edit Schema tab of the adapter's Properties view.

The adapter is configured through several properties set in the adapter's Properties view in StreamBase Studio. The properties include an optional configuration file to configure the LBM library, the set of topics to subscribe to at startup, and the adapter's log level.

29West LBM Middleware Dependencies

This adapter requires access to the JAR file that implements the 29West LBM Java API on your system, and any files referenced by that JAR file. You will need to add your site's LBM implementation version of that file to your maven repository. The 29West sample includes a pom.xml file with comments on how to maven-install the required jar files for V6.0.

mvn install:install-file -DgroupId=Informatica -DartifactId=UMS -Dversion=6.0.0 \
  -Dpackaging=jar -Dfile=path/to/Informatica/UMQ_6.0/UM_Java_API_6_0/UMS_6.0_jdk1.5.0_12.jar
mvn install:install-file -DgroupId=Informatica -DartifactId=UMSPDM -Dversion=6.0.0 \
  -Dpackaging=jar -Dfile=path/to/Informatica/UMQ_6.0/UM_Java_API_6_0/UMSPDM_6.0_jdk1.5.0_12.jar
mvn install:install-file -DgroupId=Informatica -DartifactId=UMSSDM -Dversion=6.0.0 \
  -Dpackaging=jar -Dfile=path/to/Informatica/UMQ_6.0/UM_Java_API_6_0/UMSSDM_6.0_jdk1.5.0_12.jar        

Before launching any of the sample modules, you also need to setup the launch configuration to include the Path that includes the location of the Informatica installation location. Also, an LBM_LICENSE_FILENAME variable should be defined with the path to the license file.

Note

These variables ONLY TAKE EFFECT AT INSTALLATION OF A NODE. If you are using an existing node, these do not take effect.

Informatica purchased 29West and has since released an update of the LBM API, renamed Ultra Messaging Streaming Edition, or UMS. This adapter works with the UMS 6.x JAR files, but you cannot mix LBM and UMS installations. You must thoroughly uninstall any LBM 4.x kit before reinstalling a UMS 6.x kit. Since UMS is a workalike superset of LBM, this page uses "LBM" to refer to both.

If you get an error message whose text refers to can't load lbmj: java.lang.UnsatisfiedLinkError: no lbmj in java.library.path, make sure the directory containing the LBM native libraries is on the path.

The LBM API implementation described in this section is a product of a third party, and its specifications and file names are subject to change by 29West. See your LBM documentation for the latest information.

Adapter Properties

Property Description
LBM Configuration File

The name of an optional LBM configuration file used to override the default settings used by the LBM library. Configuration files can be shared between one or more LBM subscribing and publishing adapter instances. To support such sharing, configuration files can have named sections, which are referenced by the LBM Configuration File Section property below. Named sections are identified by placing a section name inside brackets in a comment line. For example:

#[MySection]

Lines that precede the first section name are considered part of the unnamed (default) section. Lines that follow a section name are considered part of that section.

Section names must be unique within and across all referenced LBM configuration files; a runtime error is thrown when a duplicate section name is encountered. Section names can include alphanumeric characters, digits, underscores, periods, and embedded spaces; section names must begin with an upper- or lowercase letter or an underscore.

LBM Configuration File Section The name of the LBM configuration file section from which to retrieve the parameters for configuring the LBMContext and LBMReceiver objects. If unspecified, parameters from the unnamed section are used. For enhanced performance, all LBM adapter instances that specify the same configuration file and section name share an LBMContext object.
Initial Subscription Topic(s) Zero or more topics to subscribe to at startup. Separate multiple topics with spaces. For example, "top.topic1 top.topic2".
Initial Subscription Wildcard Topic(s) Zero or more wildcard topics to subscribe to at startup. Separate multiple wildcard topics with spaces. Wildcard topics use the form of regular expressions recognized by Perl; see http://perldoc.perl.org/perlrequick.html for details. Example wildcard topics include:
  • ^[xy].?$ -- One- or two-character topics that start with x or y

  • my\.topic -- Topics containing the string my.topic

  • ^my.*topic$ -- Topics starting with the string my and ending with the string topic

Topic Field Name The name of the output stream field to receive the topic value from the received LBM message. The field name Topic is entered as a suggested default.
Flags Field Name The name of the output stream field to receive the flags from the received LBM message.
Sequence Number Field Name The name of the output stream field to receive the sequence number from the received LBM message.
Enable Dynamic Subscriptions Port If enabled (the default), the adapter includes an input port to which tuples are enqueued to subscribe to, or unsubscribe from, topics and/or wildcard topics after the adapter has started.
Capture Transform Strategy The strategy to use when transforming capture fields for this operator: FLATTEN or NEST.
Log Level Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level is used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.
Enable Pre-defined Messages

If disabled (the default), the LBM publish adapter sends self-describing messages, in which metadata describing the structure of the message is sent in each messsage. The LBM subscribe adapter uses this metadata to parse incoming messages and populate tuples.

If enabled, the LBM publish adapter sends pre-defined messages, in which the structure of the message is derived from the schema of the adapter's publish input port. The LBM subscribe adapter uses the schema of its message output port to parse incoming messages and populate tuples.

Schema Matching Policy

If Strict, the LBM publish and subscribe adapters must use identical schemas -- ones that contain the same fields in the same order -- to successfully exchange messages.

If BestVersion, the LBM publish and subscribe adapters must use compatible schemas to successfully exchange messages. Two schemas are compatible if their IDs match and the larger schema can be created simply by adding fields to the end of the smaller one.

Definition ID The message definition ID of the top-level schema used when exchanging pre-defined messages in BestVersion mode.
Major Version The major version of the message definition when using BestVersion schema matching.
Minor Version The minor version of the message definition when using BestVersion schema matching.
Nested Message Definition IDs The name and message definition ID of each tuple field in the output schema used when exchanging pre-defined messages in BestVersion mode.
Enable Persistence If enabled (not the default), the adapter registers with a UME store and recovers from its previous position in the message stream after a failure.
Enable Explicit ACK Only If enabled (not the default), you must use the 29West LBM Subscribe ACK output adapter to send the explicit ACK to the store for each message. This option will also set 29 West configuration option 'ume_explicit_ack_only' enabled. Please note if this option is enabled and the ACK output adapter is not used, this will cause a memory leak as each message will not be disposed of until it has been processed by the ACK adapter.
UME Session ID Session ID used by the Ultra Messaging library to manage recovery.
Recovery Sequence Number Field Name Name of the optional field in the dynamic subscription input stream used to override the sequence number from which recovery occurs.

Adapter Ports

As shown in the diagram below (depicting one of the adapter's sample applications), the subscribing adapter has one input port and two output ports to communicate with the surrounding application.

The 29West LBM Subscribing Input Adapter's ports are used as follows:

  • DynamicSubscribe: If enabled, this input port receives tuples used to subscribe to, or unsubscribe from, topics and/or wildcard topics after the adapter has started. The DynamicSubscribe port has the following schema:

    • Topic, string: Contains the topic to subscribe to or unsubscribe from.

    • WildcardTopic, string: Contains the wildcard topic to subscribe to or unsubscribe from. Either Topic or WildcardTopic, but not both, must be non-null.

    • Subscribe, bool: true or false to subscribe or unsubscribe, respectively. A null value defaults to true (subscribe).

    • RecoverySequenceNumber, long: This optional field can be used to set the sequence number from which recovery occurs for a topic. If null, recovery occurs from the point in the message stream of the receiver at the time of the failure. This field's named is derived from the adapter's Recovery Sequence Number Field Name property on the Persistence tab.

  • SubscribeStatus: This output port emits status, information, and error tuples. The SubscribeStatus port has the following schema:

    • type, string: Contains one of the following values describing the type of event that occurred:

      • Transport

      • Source Notification

      • Process Tuple

      • Process LBM Message

      • Subscription

      • Suspend/Resume

    • action, string: the action associated with the event, such as a datatype mismatch or adapter suspended or resumed.

    • object, string: the name of the object associated with the event, such as source and topic from the LBM message, the name of the tuple field that could not be populated, or the name of the adapter that was suspended or resumed.

    • message, string: A human-readable message string

    • extendedInfo, tuple: This field is holds extended information related to persistence and has the following subfields:

      • Topic, string: The topic name

      • ClientToken, string: The token value specified by the StreamBase application

      • SequenceNumber, long: The message sequence number

      • SequenceNumberLast: long: The last message sequence number received in source Message Sequence Number Information events, which it typically the same as the sequence number.

      • SourceRegistrationID, long: The source's registration ID used by the Ultra Message library to manage recovery

      • ReceiverRegistrationID, long: The receiver's registration ID used by the Ultra Message library to manage recovery

      • ReceiverName, string: The name of the receiver

      • UMEStoreIndex, int: The index of the store a source successfully registered to

      • UMEStoreName, string: The name (IP address) of the store a source successfully registered to

      • Flags: list<string>: Zero or more flags associated with a source event

  • MessagesReceived: The adapter emits a tuple on its second output port for each LBM message received. The port's schema is derived from the Edit Schema tab of the adapter's property view and can contain all StreamBase data types, including lists and nested tuples to accommodate hierarchy in LBM messages. LBM message fields are mapped to tuple fields by case-sensitive name. List fields can contain all StreamBase types except list types. StreamBase list fields are used to capture LBM array fields (such as ARRAY_INT8) in the LBM message. Fields not present in a LBM message are sent as null in the tuple.

    In processing incoming LBM messages, the adapter converts LBM data types to StreamBase data types. Each LBM type requires a specific StreamBase type. If the StreamBase field is not the correct type for the corresponding LBM field, the LBM field is discarded, a warning is displayed, a status tuple is emitted, and the corresponding field in the tuple is set to null. LBM fields of type DECIMAL, which contain a mantissa and exponent, are placed in a nested tuple field containing two sub-fields: a long for the mantissa and an int for the exponent. The two sub-fields can appear in either order in the nested tuple The following table shows the required StreamBase field type for each LBM field type.

    LBM Type Required StreamBase Type
    BLOB blob
    BOOLEAN bool
    DECIMAL tuple{mantissa: long, exponent: int}
    DOUBLE double
    FLOAT double
    INT8 int
    INT16 int
    INT32 int
    INT64 long
    MESSAGE tuple
    STRING string
    TIMESTAMP timestamp
    UINT8 int
    UINT16 int
    UINT32 long
    UINT64 long
    UNICODE string
    BLOB ARRAY list<blob>
    BOOLEAN ARRAY list<bool>
    DECIMAL ARRAY list<tuple{mantissa: long, exponent: int}>
    DOUBLE ARRAY list<double>
    FLOAT ARRAY list<double>
    INT8 ARRAY list<int>
    INT16 ARRAY list<int>
    INT32 ARRAY list<int>
    INT64 ARRAY list<long>
    MESSAGE ARRAY list<tuple>
    STRING ARRAY list<string>
    TIMESTAMP ARRAY list<timestamp>
    UINT8 ARRAY list<int>
    UINT16 ARRAY list<int>
    UINT32 ARRAY list<long>
    UINT64 ARRAY list<long>
    UNICODE ARRAY list<string>

Using the Adapter in a StreamBase Application

Add an instance of the adapter to a new StreamBase application as follows:

  1. In StreamBase Studio, create a project, and create an EventFlow application file to host the adapter.

  2. Drag the Input Adapter icon from the Operators & Adapters drawer in the Palette view to the canvas. Select the 29West LBM Subscribing Input Adapter from the list and click OK.

  3. Connect an input stream to the adapter's input port and output streams to the adapter's two output ports.

  4. Configure the schema of the input stream with three fields: Topic: string, WildcardTopic: string, and Subscribe: bool.

  5. Configure the schema of the MessagesReceived output stream (port 2) using the Edit Schema tab of the adapter's Properties view to match the set of fields expected in the incoming LBM messages.

  6. Select the adapter icon, and in the Properties view, select Adapter Settings and fill in any desired properties. None of the properties are required.

Typechecking and Error Handling

The 29West LBM Subscribing Input adapter uses typecheck messages to help you configure the adapter in your StreamBase application. In particular, the adapter generates typecheck messages when:

  • An invalid LBM configuration file is specified.

  • No initial subscription topics or wildcard topics are provided and the dynamic subscription port is disabled.

  • An invalid topic field name within the MessagesReceived output port schema is specified.

  • Duplicate topic or wildcard topics are specified.

  • Persistence is enabled and an invalid UME sesion ID is specified.

  • Persistence is enabled and no UME sesion ID is specified.

  • Persistence is enabled and an invalid recovery sequence number field name is specified.

  • The schema of the dynamic subscription input port is misconfigured.

  • The schema of the MessagesReceived output port contains one or more list fields with elements of type list.

The adapter generates messages on the status port during runtime under various conditions, including:

  • When an LBM event such as a Beginning of Transport Session or End of Transport Session occurs.

  • A dynamic subscription tuple is rejected because it contains no topic or wildcard topic.

  • A new source topic is detected.

  • An error is returned by the LBM library when the adapter attempts to subscribe to, or unsubscribe from, a topic.

  • The adapter is suspended or resumed.

Suspend and Resume Behavior

When suspended, the 29West LBM Subscribing Input Adapter stops emitting tuples on its output ports.

When resumed, the adapter begins emitting tuples on its output ports once again.