Velocity Analytics Broadcast Server Input Adapter

Introduction

The TIBCO StreamBase® Input Adapter for Velocity Analytics Broadcast Server for StreamBase receives real-time market data from the Velocity Analytics Broadcast Server via the real-time multicast interface. It converts each tick into a StreamBase tuple and sends it to StreamBase Server for processing. Additionally, it maintains a set of counters to provide processing statistics and logs events and error conditions via the StreamBase logging facility.

Prerequisites

Familiarity with the Thomson Reuters Enterprise Platform for Velocity Analytics is required. In addition, a Velocity Analytics development server environment is required with the following installed:

  • Analytics Engine

  • Broadcast Server

The following Velocity Analytics documentation, available on the Thomson Reuters Customer Zone, provides additional information about Velocity Analytics, its components, data formats, and the SQL interface to use to query the time series database and User Time Series Store (UTSS).

  • Installation Guide — provides instructions on installing Velocity Analytics components.

  • System Admin and Config Guide — provides additional information about data record formats.

  • Hardware and Software Guide — provides information about current hardware and operating system requirements.

  • Using SQL with Velocity Analytics — provides information about querying the Velocity Analytics time series bases and UTSS.

Setting Properties

This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.

General Tab

Name: Use this required field to specify or change the name of this instance of this component, which must be unique in the current EventFlow module. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Adapter: A read-only field that shows the formal name of the adapter.

Class name: Shows the fully qualified class name that implements the functionality of this adapter. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start with application: If this field is set to Yes (default) or to a module parameter that evaluates to true, this instance of this adapter starts as part of the JVM engine that runs this EventFlow module. If this field is set to No or to a module parameter that evaluates to false, the adapter instance is loaded with the engine, but does not start until you send an sbadmin resume command, or until you start the component with StreamBase Manager.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Adapter Properties Tab

Property Description
Connect On Startup When selected (default), the adapter automatically connects to the multicast when the StreamBase application is started. It is recommended to enable Connect on Startup. Otherwise, make sure the control port is enabled so that a connect command can be issued.
Enable Control Port Enables the control port and status ports. See ControlStream for further information. The Status port is the last output port and is enabled automatically whenever the control port is enabled.
Enable Statistic Port Enables the statistic port. See Statistics for further information.
Number of Output Ports Set this to the number of data output ports configured via the Port Management tab; those are the ports that are mapped to flex record definitions. This number does not include the control, status, or statistics port.
Local IP The DNS name or IP address of the local machine hosting this adapter.
Multicast IP Specifies the DNS name or IP address or host name of the network card receiving the feed multicast. This address must be on the same subnet as the multicast group IP address for the Velocity Analytics Broadcast Server. See the Velocity Analytics System Admin and Config Guide for more information about group ID and Velocity Analytics Broadcast Server configuration.
Multicast Port Multicast port for the incoming feed broadcast. This must match the setting of the portnoofstock configuration parameter for the Velocity Analytics Broadcast Server as defined in the broadcastserver process section of its configuration file. See the Velocity Analytics System Admin and Config Guide for more information about portnoofstock and Velocity Analytics Broadcast Server configuration.
Flex Record XML File The name of the flex record definition file imported into the application. This file must contain definitions corresponding to the record formats listened to by the adapter, and must match the definitions used by the Velocity Analytics Broadcast Server. This file is read whenever StreamBase typechecking is performed. An example flex record definition file is provided as VhayuRecordDefinitions.xml in the Velocity Adapters Sample.
Receive Time Format

Exchange Time Format

The output time format for the ReceiveTime and ExchangeTime fields. Supported formats include:
  • SBTIMESTAMP - StreamBase Timestamp

  • ASCII_GMT - ASCII string time value adjusted for the GMT time zone.

  • ASCII_LOCAL - ASCII string time value adjusted for the local time zone.

  • SERIALTIME - Number of seconds since Jan 1st 1970.

  • VHTIME - Number of 100 nanoseconds since Jan 1st 1601.

See the Velocity Analytics System Admin and Config Guide for more information about time and date formats.

Log Level Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level is used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.

Advanced Properties Tab

This section describes the properties on the Advanced Properties tab in the Properties view for the Velocity Analytics Broadcast Server Input adapter.

Property Description
Multicast Socket Buffer Size Buffer size, in bytes, of the multicast socket. The default value is 1048576.
Back Channel Retry Time-out Connection retry timeout, in milliseconds, for back channel TCP request. The default value is 500.
Feed Handlers Number of threads used for feed processing. The default value is 5 threads.
Max Packet Buffer Size Maximum size of the internal packet hashmap. The default value is 0, which specifies unlimited size.
Max Tuple Queue Size Size of the queue used to store the tuples. The default value is 0, which specifies unlimited size.
Statistics Interval Time interval, in milliseconds, between two consecutive statistics outputs. The default value is 5000 ms. The statistics port must be enabled in order to see output.

Port Management Tab

This section describes the Port Management tab in the Properties view for the Velocity Analytics Broadcast Server Input adapter.

Use this tab to specify the mapping of output ports to flex record definitions. Flex records are defined in the flex record definition file corresponding to the flex records multicast from the Velocity Analytics Broadcast Server you are using. You must import the XML file that defines these record definitions into the Studio project that includes this adapter, and specify that file's name in the Flex Record XML File property. An example flex record definition file is provided as VhayuRecordDefinitions.xml in the Velocity Adapters Sample.

The Port Management tab takes the form of a Port to Flex Record Mapping grid where you enter one port#=FRDef entry per line, with each line corresponding to an output port for this adapter instance. The FRDef value must correspond to the name attribute of an <FRDef> entry in the flex record definition file; examples are Trade, Quote, FRCanCor, Depth, and so on.

The port# value is zero-based. Thus: port0, then port1, then port2, and so on.

Each record mapping line in the grid can be in one of two forms:

Port#=flexRecordDefName

In this case, all fields defined by the specified flex record are emitted to the specified output port. Examples:

port0=Trade
port1=Quote
Port#=flexRecordDefName (Field1, Field2, ... FieldN)

In this case, only the specified field names are emitted on that port. The symbol field is mandatory and the order of fields in the output tuple for this port follows their order in the list of field names. The same flex record name cannot be assigned to two or more ports. All user-defined fields in flexRecordDefName, as well as the following fixed fields are available:

  • recvTime — The receive time in VHTime format.

  • recordType — Record type.

  • symbol — The symbol name.

  • defName — Flex record definition name.

  • seqNum — Sequence number of the record provided by the feed.

  • exchTime — The exchange time, as reported by the exchange, in VHTime format.

  • subType — Subtype of the record if so provided by the feed.

For example:

port0=Trade(symbol,recvTime,ListPrice)

Time conversion functions are available for converting exchTime and recvTime to other formats. See Time Conversion Functions for further information.

Concurrency Tab

For this adapter, it is recommended to keep Multiplicity set at single.

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Using the Adapter in a StreamBase Application

This section demonstrates how to use the Velocity Analytics Broadcast Server Input adapter in a StreamBase application, and describes the use of the adapter's ports. By default, the adapter provides one output port. As shown in the EventFlow diagram below (from one of this adapter's sample applications), the adapter can communicate with the surrounding application by means of one optional input port, a configurable number of output ports, plus a optional status output port. As with other StreamBase adapters and operators, you can optionally enable an Error Output Port.

Description of This Adapter's Ports

The Velocity Analytics Broadcast Server Input adapter's ports are used as follows:

  • ControlStream (input): The optional input stream used to control the adapter during runtime. The adapter can be configured to have a control input port and corresponding status output port through the Enable Control Port check box in the Adapter Properties section of the Properties view. When enabled, they are represented by the last input and output port nodes on the adapter icon. The control port must have a schema of a single field named command of type string. The following commands can be sent to the Control port to manage the adapter during runtime or to generate a status report. Commands are case insensitive.

    connect Join the multicast group of the Velocity Analytics Broadcast Server and start processing data if not already connected.
    disconnect Leave the multicast group of the Velocity Analytics Broadcast Server and shut down all processing threads. In order to complete processing for all buffered data, it is recommended to issue a disconnect command before stopping the adapter.
    reconnect Rejoin the multicast group of the Velocity Analytics Broadcast Server and restart processing data.
    enable portNum Enable portNum to emit data when available. When the adapter is started, all the ports are enabled by default. The changed setting persists after a disconnect.
    enable flexDefName Enable flexDefName to emit data when available. When the adapter is started, all the ports are enabled by default. The changed setting persists after a disconnect. Flex definition names are specified in the Port Management tab.
    disable portNum Disable portNum from emitting data when available. When the adapter is started, all the ports are enabled by default. The changed setting persists after a disconnect.
    disable flexDefName Disable flexDefName from emitting data when available. When the adapter is started, all the ports are enabled by default. The changed setting persists after a disconnect.
    status Emit status of the ports and the connection to the status port. The status port must be connected to an output stream to see its output.
  • Trade and Quote (output): The output streams corresponding to the output data ports of the adapter. Two such ports are shown in the image above as examples, mapped specifically to the Trade and Quote FR definitions. You must configure at least one output data port, and can configure any number of output data ports. See Port Management Tab for information on configuring data output ports.

  • Statistics (output): The optional output stream carrying various adapter statistic counters. The counters in the table below are available through the statistics output port of the adapter. The statistics port can be enabled or disabled via the Enable Statistics Port check box in the Adapter Properties tab of the Properties view. The schema of the Statistics port is shown in the following table.

    AllBytesRecv Total bytes, including header, received by the Input Adapter.
    AllInvalidFeedsRecv Total invalid feed lines received by the Input Adapter.
    AllPacketsRecv Total packets received by the Input Adapter.
    AllValidFeedsRecv Total valid feed lines received by the Input Adapter.
    BackChannelDataErr Total GetPacketFromCache backchannel data bad.
    BackChannelFail Total GetPacketFromCache backchannel cache miss.
    BackChannelMismatch Total times GetPacketFromCache backchannel closes.
    BackChannelOpen Total times GetPacketFromCache backchannel opens.
    BackChannelOpenFail Total times GetPacketFromCache backchannel open failures.
    BackChannelRecv Total GetPacketFromCache backchannel receives.
    BackChannelRecvFail Total GetPacketFromCache backchannel receive failures.
    BackChannelRecvLen Total GetPacketFromCache backchannel receive too short.
    BackChannelSend Total GetPacketFromCache backchannel sends.
    BackChannelSendFail Total GetPacketFromCache backchannel send failures.
    CacheConnectTimeout Count of retry timeouts backchannel.
    DiscardPackets Total duplicate or discarded packets found.
    MulticastRead Total multicast reads.
    MulticastReadBadLen Total multicast reads with inconsistent size.
    MulticastReadErr Total multicast reads with error.
    MulticastReadLen Total bytes received successfully through Datagram.
    MulticastReadShort Total multicast reads too short.
    MulticastReadZero Total multicast reads with zero length.
    PacketCacheAttempts Total Velocity Analytics Broadcast Server cache read attempts.
    PacketCacheMiss Total Velocity Analytics Broadcast Server cache read fails.
    PacketCacheRecv Total packets received from Velocity Analytics Broadcast Server cache.
    PacketMissOnMapFull Total packet missed due to full hash map.
    PacketMissOnQueueFull Total packet missed due to full feed processor queues.
    TotalTuplesSentOut Total number of tuples sent to the output ports.
    TupleMissOnQueueOverFlow Total tuples missed due to an overflow in the tuple queue.
  • Status (output): The optional output stream carrying adapter status.

Adding the Adapter to an EventFlow Application

Add an instance of the adapter to a new EventFlow application as follows:

  1. In StreamBase Studio, create a project, including an empty StreamBase EventFlow application file to host the adapter.

  2. Insert the adapter into your application. Drag the Input Adapter icon from the Operators and Adapters tray of the Palette view. In the resulting dialog, type velocity to narrow the list of options, then select Velocity Analytics Broadcast Server from the Global Input Adapters section. (As an alternative, you can invoke InsertInput Adapter from top-level menu to invoke the same dialog.)

  3. Configure the adapter via its various properties tabs. See Setting Properties for further information.

  4. Import the flex record definitions file corresponding to the flex records multicast to the adapter.

  5. Connect Output Streams to the adapter's output ports corresponding to output port properties.

  6. Enable the Statistics Port, if desired. See Statistics for information.

  7. Enable the Control and Status Ports. See ControlStream for further information. If the Control port is not enabled, make sure Connect On Startup is enabled via the Adapter Properties tab; otherwise, there is no way to inform the adapter to establish connection with the Velocity Analytics Broadcast Server.

  8. Make sure the specified Velocity Analytics Broadcast Server is multicasting packets. For testing, configure the Velocity Analytics Broadcast Server to replay an existing feed file. See the Velocity Analytics System Admin and Config Guide for configuring the Velocity Analytics Broadcast Server for feed replay.

  9. Run the EventFlow application by clicking the Run button in the Studio toolbar.

  10. Observe the output streams.

  11. Before stopping the EventFlow application, issue a disconnect command via the Control port. This insures all buffered data is processed before the application shuts down.

Logging

The adapter uses the logging facility provided by StreamBase, which provides built-in support for standard loggers such as Log4J and Logback. See Using StreamBase Logging in the Administration Guide.

Logging is controlled through the XML configuration file, logback.xml. Using this configuration file, you can customize logging based on your needs. For example, you can change the log message format, include or remove timestamps, and so on. The logging level for the individual adapter messages can be set through the Log Level drop-down control in the Adapter Properties section of the Properties view.

Typechecking and Error Handling

The adapter uses typechecking messages to help you configure the adapter within your StreamBase application. In particular, the adapter generates typecheck messages for the following reasons:

  • The VhayuRecordDefinition.xml file is not found.

  • Values in the Properties view tabs are invalid.

  • The flex record definition used in a port mapping is not found in the VhayuRecordDefinitions.xml file.

  • The schema of the Control port is wrong. It must have a single field named command of type string.

  • Two or more ports are mapped to the same flex record definition name.

  • The number of ports specified is more than the number of ports configured in the Port Management tab.

  • The SYMBOL field is missing from the list of fields for any port definitions that list specific fields.

  • Empty or duplicate field names exist for a flex record definition.

  • The syntax of the field list in a port mapping is incorrect. It must have comma separated field names within parentheses.

The adapter generates messages during runtime under various conditions, including:

  • Multicast IP is invalid.

  • Local IP is invalid.

  • Multicast group joining fails.

  • Out of Memory error, which can happen due to bottlenecks in the network or anything that can lead to queuing data in the adapter's buffers. These messages might also be due to a data rate too high.

Suspend and Resume Behavior

When suspended, the adapter stops emitting tuples, and all new incoming records from the Velocity Analytics Broadcast Server are discarded without any processing. The records already processed by the adapter but not sent through its output ports are cached.

Time Conversion Functions

The custom functions described in this section are available in the following classes:

com.thomsonreuters.sb.adapter.velocityanalytics.broadcastserver.Utility
com.thomsonreuters.sb.adapter.velocityanalytics.utss.Utility

These classes are provided as part of the JAR files that implement the Velocity Analytics adapters. See the Velocity Analytics Samples for examples of calling these functions with the StreamBase expression language's calljava() function.

The following sections list the available time conversion functions based on the source time format. In all instances, time parameters are defined as:

asciiTime The string representation of a date/time, expressed with the following format: yyyymmddhhmmss[.tttttt][-timezone].
sbTime StreamBase proprietary time format.
serialTime UTC time expressed with the ISO C standard time_t type definition. It is the number of seconds since Jan 1, 1970, 00:00:00 (midnight).
vhTime Velocity Analytics proprietary time format. This is UTC time expressed by the number of tenths of a microsecond (100ns) since Jan 1, 1601, 00:00:00.000,000,0 (midnight). To convert from serial time to VHTime, use the following formula: (time_t * 10000000) + 116444736000000000

Source Time Format: ASCII Time

Destination Time Format Function
ASCII Time String toGMTAscii(String asciiTime)

String toLocalAscii(String asciiTime)

StreamBase Time Timestamp asciiTimeToSBTimestamp(String asciiTime)
VH Time long asciiTimeToVH(String asciiTime)
Serial Time int asciiTimeToSerial(String asciiTime)

Source Time Format: StreamBase Time

Destination Time Format Function
ASCII Time String sbTimestampToLocalAscii(Timestamp sbTime)

String sbTimestampToGMTAscii(Timestamp sbTime)

StreamBase Time N/A
VH Time long sbTimestampToVH(Timestamp sbTime)
Serial Time int sbTimestampToSerial(Timestamp sbTime)

Source Time Format: VH Time

Destination Time Format Function
ASCII Time String vhTimeToLocalAscii(long vhTime)

String vhTimeToGMTAscii(long vhTime)

StreamBase Time Timestamp vhTimeToSBTimestamp(long vhTime)
VH Time N/A
Serial Time int vhTimeToSerial (long vhTime)

Source Time Format: Serial Time

Destination Time Format Function
ASCII Time String serialTimeToLocalAscii(int serialTime)

String serialTimeToGMTAscii(int serialTime)

StreamBase Time Timestamp serialTimeToSBTimestamp (int serialTime)
VH Time long serialTimeToVH (int serialTime)
Serial Time N/A

Related Topics