TIBCO Rendezvous Subscribing Input Adapter

Introduction

The TIBCO StreamBase® Input Adapter for TIBCO Rendezvous® allows a StreamBase application to receive messages published to one or more Rendezvous subjects. The adapter is embedded in the StreamBase application and has two output ports: one that emits status tuples, and a second that emits tuples representing received Tibrv messages. An optional control port can also be used to control the connection to Tibrv.

The adapter subscribes to one or more subjects during startup and asynchronously and continuously receives and converts incoming Tibrv messages into StreamBase tuples, which it sends downstream.

The adapter supports all StreamBase data types, except function, though lists may not contain elements of type list. The schema of the Tibrv message output port is set through the Edit Schema tab of the adapter's Properties view.

The adapter is configured through several properties set in the adapter's Properties view in StreamBase Studio. The properties include parameters used to connect to the Rendezvous daemon and the set of subjects to subscribe to at startup.

TIBCO Middleware Dependencies

This adapter requires access to the JAR file that implements the TIBCO Rendezvous Java API on your system, and any files referenced by that JAR file. The StreamBase installation kit includes version 8.1.2 of the tibrvj.jar JAR file. If your site's Rendezvous implementation requires a newer version of that file, copy the file from $TIBRV_HOME/lib to $STREAMBASE_HOME/lib/ext.

The tibrvj.jar file, in turn, relies on the tibrvj.dll and tibrv.dll files for Windows, and for UNIX, on several .so files in /usr/tibco/tibrv/lib. These DLL or .so files are supplied as part of your TIBCO Rendezvous installation, and are not shipped with StreamBase. Make sure these files are locatable by the TIBCO JAR file on the PATH or by means of the TIBRV_HOME environment variable.

If you get an error message whose text refers to Library not found: tibrvj, audit your system for multiple versions of the TIBCO JAR and DLL (or .so) files. For example, on Windows, you might have an older TIBCO release's DLL files in C:\Windows\System32, and a newer set in $TIBRV_HOME\lib. Make sure the StreamBase TIBCO adapter sees a consistent TIBCO implementation, including tibrvj.jar, tibrvj.dll, and tibrv.dll files all from the same TIBCO release.

Properties

This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.

General Tab

Name: Use this required field to specify or change the name of this instance of this component, which must be unique in the current EventFlow module. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Adapter: A read-only field that shows the formal name of the adapter.

Class name: Shows the fully qualified class name that implements the functionality of this adapter. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start options: This field provides a link to the Cluster Aware tab, where you configure the conditions under which this adapter starts.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow Editor canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Adapter Properties Tab

Property Description
Service The Rendezvous service name. Leave blank to use the default value of 7500.
Network The network interface. Leave blank to use the primary network interface for the host computer.
Daemon The Rendezvous daemon. Leave blank to use the daemon on the local computer listening on the default port.
Use Unique Queue If enabled, the adapter uses a per-adapter-instance event queue rather than the default Tibrv event queue. Using a per-instance queue can lower latency in some situations.
Subject(s) One or more subjects to subscribe to at startup. Separate multiple subjects with spaces. For example, "top.subject1 top.subject2". Use an asterisk wildcard to subscribe to all subjects at a given level. For example, "top.*".
Subject Field Name The name of the output stream field to receive the subject value from the published Tibrv message.
Ignore Advisory Messages If enabled, the adapter discards messages with subjects that start with _RV.
Raw Message Field Name The name of the output stream string or blob field to receive the raw Tibrv message.
Find Fields By ID If enabled, the adapter first tries to map a Tibrv message field to a StreamBase field by field ID. A Tibrv field with an ID of 123 would match a StreamBase field named _123. If no ID match is found, the adapter uses its default field mapping mechanism, using the Tibrv field name to find the StreamBase field.
Enable Dynamic Subscriptions Port If enabled, the adapter includes an input port to which tuples are enqueued to subscribe to, or unsubscribe from, subjects after the adapter has started.
Enable Control Port If enabled, the adapter will enable an input port for controlling connection
Auto Connect At Startup If enabled, the adapter will automatically try to connect at startup
Auto Reconnect If enabled the adapter will automatically try reconnect if a failure occurs
Auto Reconnect Wait The number of milliseconds to wait after a failed connection attempt before trying to connect again
Request Cached Values If enabled, the adapter requests the cached message for each subscribed subject and includes an additional boolean field, _IsCachedValue, on its second output port, which contains true if the tuple represents a cached message and false otherwise. Cache requests are processed in the background, and the adapter may emit one or more tuples containing non-cached messages before emitting a tuple with a cached message for a given subject. The StreamBase application must be prepared to discard or merge stale cached messages. When a cache request for a given subject times out, the adapter emits a no cached message available status tuple.
Cache Request Timeout Specifies the time, in milliseconds, to wait for a response from the cache server. The total time required to request the cached messages for a burst of subscriptions increases linearly with the Cache Request Timeout (assuming a significant number of subjects are not cached).
Cache Request Max Thread Count Specifies the maximum number of threads to use in servicing cache requests. These threads are created on demand and remain active until there are no cache requests to service, at which point they exit. The total time required to request the cached messages for a burst of subscriptions decreases linearly with the Max Thread Count (assuming a significant number of subjects are not cached).
Log Level Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level is used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.

Edit Schema Tab

Use the Edit Schema tab to specify the schema of the output tuple for this adapter. For general instructions on using the Edit Schema tab, see the Properties: Edit Schema Tab section of the Defining Input Streams page.

Cluster Aware Tab

Use the settings in this tab to allow this operator or adapter to start and stop based on conditions that occur at runtime in a cluster with more than one node. During initial development of the fragment that contains this operator or adapter, and for maximum compatibility with TIBCO Streaming releases before 10.5.0, leave the Cluster start policy control in its default setting, Start with module.

Cluster awareness is an advanced topic that requires an understanding of StreamBase Runtime architecture features, including clusters, quorums, availability zones, and partitions. See Cluster Awareness Tab Settings on the Using Cluster Awareness page for instructions on configuring this tab.

Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Using the Adapter in a StreamBase Application

This section discusses how to use the TIBCO Rendezvous Subscribing Input Adapter in a StreamBase application. As shown in the lower left corner of the diagram below (depicting one of the adapter's sample applications), the subscribing adapter has no input ports and two output ports to communicate with the surrounding application.

The TIBCO Rendezvous Subscribing Input Adapter's ports are used as follows:

  • DynamicSubscribe (input): If enabled, this input port receives tuples used to subscribe to, or unsubscribe from, subjects after the adapter has started. The DynamicSubscribe port has the following schema:

    • Subject, string: Contains the subject to subscribe to or unsubscribe from.

    • Subscribe, bool: true or false to subscribe or unsubscribe, respectively. A null value defaults to true (subscribe).

  • Control (input): If enabled, this input port receives tuples used to connect and disconnect. The control port has the following schema:

    • Command, string: The command to send to the operator, Connect or Disconnect. When a disconnect is performed the current subscriptions are saved and will resubscribe on the next connect unless an unsubscribe is received.

  • Status (output): This output port emits status, information, and error tuples. The Status port has the following schema:

    • type, string: Contains one of the following values describing the type of event that occurred:

      • Transport

      • Process Tuple

      • Process Tibrv Message

      • Suspend/Resume

      • Subscription

    • object, string: the name of the object associated with the event, such as the Rendezvous transport parameters (service, network, and daemon), the contents of the Tibrv message that wasn't processed successfully, or the name of the adapter that was suspended or resumed.

    • action, string: the action associated with the event, which may be any of the following values:

      • Created

      • Subscribed

      • Unsubscribed

      • Suspending

      • Resumed

      • Failed

      • Disconnected

      • Rejected

      • Datatype mismatch

      • No cached message available

    • message, string: A human-readable message string

  • Tibrv Messages (output): The adapter emits a tuple on its second output port for each Tibrv message received. The port's schema is derived from the Edit Schema tab of the adapter's property view and can contain all StreamBase data types, including lists and (nested) tuples to accommodate hierarchy in Tibrv messages. If a field from Tibrv starts with a numeric then the StreamBase field MUST start with an underscore to match, for example if the Tibrv field is 123 then the StreamBase field MUST be _123. Tibrv message fields are mapped to tuple fields by case-sensitive name. List fields may contain all StreamBase types except list types. StreamBase list fields are used to capture Tibrv array fields (such as I8ARRAY) in the Tibrv message, as well as non-array Tibrv fields that occur multiple times with the same name. Fields not present in a Tibrv message are sent as null in the tuple.

    Note

    When configured to request cached values, the adapter includes an additional boolean field, _IsCachedValue, on its second output port, which contains true if the tuple represents a cached message and false otherwise.

    In processing incoming Tibrv messages, the adapter converts Tibrv data types to StreamBase data types. Not all possible conversions are valid. When an attempt to perform an invalid conversion occurs, a one-time warning for the field is logged, a status tuple is emitted, and the corresponding field in the Tibrv tuple is set to null. The following table shows the set of valid conversions.

    Tibrv Type Valid StreamBase Types
    BOOL bool, list<bool>
    DATETIME timestamp, list<timestamp>
    ENCRYPTED  
    F32 double, string, list<double>, list<string>
    F32ARRAY list<double>, list<string>
    F64 double, string, list<double>, list<string>
    F64ARRAY list<double>, list<string>
    I8 bool, int, long, string, list<bool>, list<int>, list<long>, list<string>
    I8ARRAY blob, list<bool>, list<int>, list<long>, list<string>
    I16 bool, int, long, string, list<bool>, list<int>, list<long>, list<string>
    I16ARRAY list<bool>, list<int>, list<long>, list<string>
    I32 bool, int, long, string, list<bool>, list<int>, list<long>, list<string>
    I32ARRAY list<bool>, list<int>, list<long>, list<string>
    I64 bool, int, long, string, list<bool>, list<int>, list<long>, list<string>
    I64ARRAY list<bool>, list<int>, list<long>, list<string>
    IPPORT16
    IPADDR32
    MSG tuple, list<tuple>
    MSGARRAY list<tuple>
    OPAQUE blob, string, list<bool>
    STRING blob, string, list<blob>, list<string>
    STRINGARRAY list<string>
    U8 bool, int, long, string, list<bool>, list<int>, list<long>, list<string>
    U8ARRAY blob, list<bool>, list<int>, list<long>, list<string>
    U16 bool, int, long, string, list<bool>, list<int>, list<long>, list<string>
    U16ARRAY list<bool>, list<int>, list<long>, list<string>
    U32 bool, int, long, string, list<bool>, list<int>, list<long>, list<string>
    U32ARRAY list<bool>, list<int>, list<long>, list<string>
    U64 bool, int, long, string, list<bool>, list<int>, list<long>, list<string>
    U64ARRAY list<bool>, list<int>, list<long>, list<string>
    XML string, blob, list<string>

Add an instance of the adapter to a new StreamBase application as follows:

  1. In StreamBase Studio, create a project, and create an EventFlow application file to host the adapter.

  2. Drag an instance of the TIBCO Rendezvous Subscribing Input Adapter from the Operators and Adapter drawer in the Palette view to the canvas.

  3. Connect Output streams to the adapter's two output ports.

  4. Configure the schema of the Tibrv message output stream (port 2) using the Edit Schema tab of the adapter's property view to match the set of fields expected in the incoming Tibrv messages.

  5. Select the adapter icon, and in the Properties view, select Adapter Settings and fill in any desired properties. Subject(s) is the only required property.

Typechecking and Error Handling

The TIBCO Rendezvous Subscribing Input adapter uses typecheck messages to help you configure the adapter in your StreamBase application. In particular, the adapter generates typecheck messages when no subjects are provided or when a list field in the Tibrv message output port uses an unsupported element type.

The adapter generates messages on the status port during runtime under various conditions, including:

  • The adapter creates or fails to create a transport to the Rendezvous daemon during startup.

  • A data type mismatch is detected between a field in an incoming Tibrv message and the corresponding tuple field.

  • The adapter is suspended or resumed.

Suspend and Resume Behavior

When suspended, the TIBCO Rendezvous Subscribing Input Adapter unsubscribes from all subscriptions, disconnects and no longer receives Tibrv messages. Current subscriptions are saved for when resume is called.

When resumed, the adapter connects and resubscribes to all subscriptions that had been subscribe to before a suspend was invoked.