UDP Sender Output Adapter

Introduction

The TIBCO StreamBase® Output Adapter for UDP is an embedded adapter that writes data through a socket to another machine or process. It resembles the CSV Socket Writer adapter. Unlike the CSV Socket Writer adapter, though, this socket adapter writes output data to a UDP socket instead of a TCP socket.

When in multicast mode, the adapter will send messages to a multicast group. All listening members of the group will receive messages sent by this adapter.

N.B.: There is no maximum packet size, however if this is being used in conjunction with the UDP Receiver, then only the first 1024 bytes of the message will be received, as that adapter has a packet size limitation.

In addition, UDP is an unreliable protocol where messages may be dropped or received out of order, your application should keep this in mind; larger packets may introduce more unreliability.

Properties

Property Description Default
Port The UDP port to which this adapter binds locally. This is not the same as the port to which outgoing messages are sent. 9999
Network Interface Address Specify the network interface for this socket. If blank the default of all interfaces is used.  
Multicast Mode If this is checked, the adapter will send messages to the specified multicast group, instead of sending to the address and port supplied on its input port. Unchecked
Multicast Group The host name or IP address which identifies the target multicast group. 239.0.0.0
Multicast Port The UDP port used to identify the multicast group. 9000
Use Default Charset Use the default character set for your machine. Checked
Charset The name of the character set to use, if Use Default Charset is Unchecked. None
Reconnect Count The number of times the adapter should try to reconnect to the socket should the connection fail. 0 means attempt to reconnect indefinitely. 0
Use Default Charset Use the default character set for your machine. Checked
Charset The name of the character set to use, if Use Default Charset is unchecked. None
Connect on Startup If checked, the adapter creates the UDP socket during start-up. Otherwise, the socket can be created after start-up by enqueuing a tuple to the control port with a command field value of connect. Checked
Log Level Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level will be used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE. INFO
Port Reuse This is an advanced option, applicable only when in unicast mode. It allows multiple UDPSenders (or in general, UDP Sockets)to be bound to the same port on the same machine. All of the UDPSenders (and any unicast mode UDPReceivers)must have this option checked for them to be able to bind to that port. Similarly, general UDP Sockets must set the SO_REUSEADDR socket option. This may not work on all systems. Unchecked

Data Transformer

The data transformer allows you to specify how the data will be encoded before being sent. This also allows you to specify a custom data transformer which implements the com.streambase.sb.adapter.bidirectionalsocket.common.ISocketDataTransformer interface to encode data into the payload. Please note that UDP is an unreliable transmission protocol and should be dealt with accordingly.

Property Description Default
Data Type Determines the type of data that will be received from the UDP Socket. If custom is specified then a Fully-qualified name of a class that implements com.streambase.sb.adapter.bidirectionalsocket.common.ISocketDataTransformer must be given. The following data types are available:

  • String - Each tuple will be encoded as a string.

  • Blob - Each tuple will be encoded as binary data.

  • Custom - The class given that implements com.streambase.sb.adapter.bidirectionalsocket.common.ISocketDataTransformer will be used to encode the data with the input tuples schema. The data transformer setting properties can be used to input various settings if required.

  • Length Based - Each tuple will be encoded as an integer and string.

  • CSV - Each tuple will be encoded as a csv formatted string.

  • JSON - Each tuple will be encoded as a json formatted string.

  • Serialized Tuple - Each tuple will be encoded as a serialied binary tuple.

String
Custom Data Transformer The fully qualified path of the class that implements the com.streambase.sb.adapter.bidirectionalsocket.common.ISocketDataTransformer interface empty
Data Transformer Settings Extra settings that can be passed used with the various data types see Data Transformers empty

Typechecking and Error Handling

Typechecking fails in the following circumstances:

  • An illegal port number is specified.

  • The specified multicast group IP address or host name does not resolve to a multicast address, if configured for multicast mode.

  • Use Default Charset is unchecked and the Charset name provided is not supported.

  • The schema of the control input port is missing the required command field or contains extraneous fields.

  • The schema of the data input port is missing required fields, has fields of the wrong type, or contains extraneous fields.

Description of this Adapter's Ports

  • Data (input #1): This is the primary input port, any tuples enqueued on this port will be sent as messages. The required schema for this port will differ depending on whether or not the adapter is in multicast mode.

    If not in multicast mode, the required schema will be as follows (the fields may have arbitrary names):

    • First field , <string>

      This is the contents of the payload of the packet, as represented by a string.

    • Second field , <string>

      The physical IP address of the sender of the packet, in the form "nnn.nnn.nnn.nnn"

    • Third Field , <int>

      The port on which the process which sent this packet is operating

    If in multicast mode, the required input schema has the form:

    • First field , <string>

      This is the contents of the payload of the packet, as represented by a string.

    As you can see, the first field requirement is the same, regardless of mode.

  • Control (input#2): The input schema for this port must have its first field be of type 'string' and have the name 'command'. When a valid command tuple is enqueued on this port, the adapter will execute the command directive.

    The valid commands are as follows:

    • disconnect

      This will cause the adapter to leave the multicast group (if in multicast mode) and close its connection to the socket. Any tuples enqueued while the socket is disconnected will not be sent. This socket can be reconnected with the 'connect' command.

    • connect

      This command will reconnect the socket and rejoin the multicast group (if in multicast mode), if it had been previously disconnected by the 'disconnect' command. In the event of connection failure and automatic retry failure, this command will not properly reconnect the socket, a restart is required in that instance.

    • status

      This command will cause the adapter to output a status tuple detailing the current connection status.

  • Status (output#1): This is a secondary output port, through which the adapter will output status tuples. Each status tuple will have a 'TYPE' and an 'ACTION', which will describe the current status.

    The 'TYPES' and their associated 'ACTIONS' follow:

    • Data Status

      This is used for when an error occurs while trying to parse or output data within Streambase. This will always be paired with the 'Error' action.

    • Connection Status

      This is used for any connection related status messages, and can have the actions: 'Connected' or 'Disconnected'

    • Command Status

      This is used to confirm or deny the success of a command, entered through the control port. The associated actions are: 'Error', 'Connected' , 'Disconnected'.

    • Suspend/Resume

      This status be emitted whenever the adapter has either been suspended or is about to resume operation. It will either have the action 'Suspended' or 'Resuming'

Multicast Mode causes several changes in behavior from the default, unicast mode.

First, it changes the required input schema (see UDP Sender Ports).

In Multicast Mode, the paradigm is that many receiving adapters receive messages sent to the group.

The members of the group may change, but the IP and Port identifying this group are fixed. Thus, it doesn't make sense to communicate the recipient's host and port through input tuple fields.

Multiple Multicast Sockets may be bound to the same port on the same machine, so that multiple UDP Sender adapters can be bound to the same port, if in multicast mode; else they must have the "Port Reuse" option checked.

Note

Many firewalls do not allow multicast communication, to prevent overloading the network with traffic. You may need to adjust your firewall settings accordingly. It is recommended that you allow internal multicast communication only, unless otherwise necessary, to prevent receiving unwanted multicast traffic from outside.

Multicast Mode

Multicast Mode will cause several changes in behavior from the normal, unicast mode.

First of all, it will change the required input schema (see UDP Sender Ports).

In Multicast Mode, the paradigm is that this adapter (and perhaps others) will be reporting messages to a group of many receiving adapters.

The members of that group may change, but the IP and Port identifying this group are fixed; thus it no longer makes sense to communicate the IP and Port of the recipient in the EventFlow

Multiple Multicast Sockets may be bound to the same port on the same machine, thus multiple UDP Sender adapters can be bound to the same port, if in multicast mode; else they must have the "Port Reuse" option checked.

N.B. Many firewalls do not allow multicast communication, to prevent overloading the network with traffic. You will likely have to adjust your firewall settings accordingly; it is recommended that you limit this to allow internal multicast communication only, unless otherwise necessary, to prevent receiving unwanted outside multicast traffic.

Data Transformers

Data transformers are used to convert data in and out of the system.

Built-In Data Transformers

CSV Data Transformer

com.streambase.sb.adapter.bidirectionalsocket.transform.CSVDataTransformer

The CSV data transformer will convert incoming bytes into tuples of the supplied schema. This transformer has the same properties as the CSVSocketReader adapter, some properties are for the reader and some are for the writer. If a setting is not present the default is used.

Setting Applies to Reader/Writer Allowed Values Default Description
UseDefaultCharset Reader/Writer true, false true If specified, specifies whether the Java platform default character set is to be used.
Charset Reader/Writer string System Charset The name of the character set encoding that the adapter is to use to read input or write output.
CaptureTransformStrategy Reader/Writer Flatten, Nest Flatten The strategy to use when transforming capture fields for this operator.
FieldDelimiter Reader/Writer Single character , The delimiter used to separate tokens, defaults to a comma. Control characters can be entered as &#ddd; where ddd is the character's ASCII value.
NullValueRepresentation Writer string null String to write when a field is null.
StringQuote Writer QuoteIfNecessary, AlwaysQuote, NeverQuote QuoteIfNecessary Determines when string fields are quoted in the CSV file: Quote if necessary, Always quote, or Never quote.
StringQuoteCharacter Reader/Writer Single character " Matching pairs of the quote character used to delimit string constants.
TimestampFormat Reader String yyyy-MM-dd hh:mm:ss.SSSZ The format used to parse timestamp fields extracted from the input tuples. This should be in the form expected by java.text.SimpleDateFormat class described in Java Platform SE reference documentation.

If a timestamp value is read that does not match the specified format string, the entire record is discarded and a WARN message appears on the console that includes the text invalid timestamp value.

LenientTimestampParsing Reader true, false true Set this to true if you would like to parse timestamp values that do not conform to the specified format using default formats.
IncompleteRecords Reader Discard, PopulateWithNulls Discard

Specifies what should be done when the adapter receives a record with less than the required number of fields.

Discard

Discard records with less than the required number of fields.

PopulateWithNulls

When records with less than the required number of fields are encountered, process the records after populating the unspecified fields with nulls.

DiscardEmptyRecords Reader true, false true

This is a special case to handle empty lines. If rows with some fields must send output, but not empty lines, leave this true. Set to false to send empty tuples for empty lines.

LogWarning Reader true, false false

Set this to true if warning messages should be logged when incomplete records are encountered. If false, no warning messages will be logged for records with less than required number of fields.

BufferSize Reader/Writer int 10240 The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer.

JSON Data Transformer

com.streambase.sb.adapter.bidirectionalsocket.transform.JSONDataTransformer

The JSON data transformer will convert incoming bytes into tuples of the supplied schema. Outgoing tuples are converted into escaped JSON strings with a separator character to delimit tuples. If a setting is not present the default is used.

Setting Applies to Reader/Writer Allowed Values Default Description
IncludeNullValues Writer true, false true Include fields that contain null values.
EncodeSubType Writer list, map list The type of transformation that should be used when there are sub tuples to process.
Separator Reader/Writer Single character \n The separator character used to determine the end of a JSON value. Hex values starting with 0x are allowed such as 0x03.
ByteOrder Reader/Writer big-endian, little-endian native order The byte order to use when converting data.
BufferSize Reader/Writer int 10240 The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer.

BLOB Data Transformer

com.streambase.sb.adapter.bidirectionalsocket.transform.BLOBDataTransformer

The BLOB data transformer is one of the most basic data transformers. For incoming data it will take each packet of byte information received from a socket and output a tuple with a blob field containing the data. For outgoing data a specified field of type blob is read and the data from that field is sent directly out on the socket. If a setting is not present the default is used.

Setting Applies to Reader/Writer Allowed Values Default Description
BlobFieldName Reader/Writer string BlobField For the reader this value is the name of the field that will be filled for each outgoing tuple with the byte data. For the writer this is the field that transformer will read and send the contains of the blob to the socket.
BufferSize Reader/Writer int 10240 The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer.

Serialized Tuple Data Transformer

com.streambase.sb.adapter.bidirectionalsocket.transform.SerializedTupleDataTransformer

The serialized tuple data transformer transforms tuple going into the writer adapter into a byte array of data with a single int value as the header to determine size. On the reader side the adapter will read in a single integer and then read that many bytes of data to create a new tuple based on the supplied schema. If a setting is not present the default is used.

Setting Applies to Reader/Writer Allowed Values Default Description
ByteOrder Reader/Writer big-endian, little-endian native order The byte order to use when converting data.
BufferSize Reader/Writer int 10240 The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer.

Length Based Data Transformer

com.streambase.sb.adapter.bidirectionalsocket.transform.LengthBasedDataTransformer

The length based data transformer will convert incoming bytes into a string. Outgoing tuples are converted into a integer value representing the size of the string followed by the string. If a setting is not present the default is used.

Setting Applies to Reader/Writer Allowed Values Default Description
DataFieldName Reader/Writer any Data The name of the String or Blob field in the input or output schema which will contain the data.
ReadLengthFieldName Reader any ReadLength The name of the field in the output schema which will be used to store the number of bytes read.
FixedLength Reader/Writer Integer 0 For a reader this is the integer fixed length number of bytes to read. If this value is zero or negative it will be ignored. If this value is set all other read length parameters are ignored. For a writer this is the integer fixed length number of bytes to write. If this value is 0 or negative it is ignored and the length of the actual data is sent at the front of the packet. If this input data is less than this value the pad value will be used to pad to the length, if the length if greater than only the bytes up to the fixed length will be sent.
FixedLengthPadByte Writer byte 0 The byte value to use when the input data value is less than the fixed length.
ReadMaxLength Reader Integer Max Integer The maximum number of bytes to read. If this value is greater than zero then if data is encountered that is large a null tuple is produced and all data is discarded.
ReadLengthFieldOffset Reader Integer 0 The offset of the length field.
LengthFieldLength Reader/Writer Integer 4 The length of the length field. 1, 2, 3, 4, or 8 bytes for the size read or written at the front of the data packet
ReadLengthAdjustment Reader Integer 0 The adjustment of the length field when reading data.
ReadInitialBytesToStrip Reader Integer 4 The number of bytes to remove from the start of the byte array.
WriteLengthIncludesLengthFieldLength Writer true/false false If enabled the size value sent at the start of the packet will include the length field length.
ByteOrder Reader/Writer big-endian, little-endian native order The byte order to use when converting data.
BufferSize Reader/Writer int 10240 The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer.

Building Custom Data Transformers

Introduction

Custom data transformers allow the end user to build Java code to handle the input and output of the Bi-Directional Adapters. In this section we will cover how the end user can build their own data transformers. There is a single interface that needs to be implemented in order for a class to work with the adapters, that interface is ISocketDataTransformer which we will cover in the next section. Each socket connection established by the adapter creates a new copy of the given data transformer which allows for data to be stored between socket reads and writes for an individual socket connection.

The Bi-Direction socket adapter samples includes two files CustomDataTransformer.java and CustomDataTransformer.sbapp that together demonstrate how to build your own data transformations and use them in an application.

Setup

To use the com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer interface you must make sure that the file streambase/lib/ext/bi-directional-socket-interface.jar is present on your build path.

Now make sure this class is available to the adapter at runtime. The most expedient way of accomplishing this is to put your class in a JAR file and place this JAR file in StreamBase's lib/adapter/ext directory.

java.nio.ByteBuffer

The interface make use of java.nio.ByteBuffer and assumes that the end user of this interface has some familiarity with this class.

Most of the complexities of java.nio.ByteBuffer are handled by the adapter but the end user should know how to deal with the buffers position, limit, and capacity methods. Calls to toTuple and fromTuple assume that the position of the buffer will be updated to reflect the amount of data that was read from or written to a buffer.

Example:

Five characters are read into a string during toTuple:

String testData = new String(readBuffer.array(), readBuffer.position(), 5);

then the buffers position should be updates to reflect the new position:

buffer.position(buffer.position() + 5);

NOTE: Some methods of the ByteBuffer update the position for you like buffer.getInt(); Please read the documentation for java.nio.ByteBuffer for further details.

com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer

The com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer interface is required to be implemented for your data transformer to work with the adapter. The interface has the following methods that need to be completed:

  • public void init(Map<String, String> settings, Logger logger, String host, int port, String socketIdentifier, boolean isReader);

    Description: This method is called when a socket connection is made and the data transformer needs to be initialized.

    Variables

    • Map<String, String> settings - The settings from the adapter "Data Transformer" tab which the user has supplied.

    • Logger logger - The adapter's logger to allow your transformer to log out information.

    • String host - The host of the socket connection.

    • int port - The port of the socket connection.

    • String socketIdentifier - The optional identifier that may be supplied by the client.

    • boolean isReader - A flag to let the developer know if this init call is for the reader or writer adapter.

    Return Value: void

  • public Map<String, String> getDefaultSettings();

    Description: This method is called by the adapter to try and verify information about the adapter, in future versions of StreamBase it will also be used to try and pre-populate the settings list when a data transformer is selected.

    Return Value: Map<String, String> The default settings of the adapter.

  • public Set<String> validateSettings(Map<String, String> settings, Schema schema, boolean isReader);

    Description: This method is called when the adapter needs to validate all settings. This method gives the developer the chance to inform the end user that some information is required or invalid.

    Variables

    • Map<String, String> settings - The settings from the adapter "Data Transformer" tab which the user has supplied.

    • Schema schema - This is the schema that will be used to create tuples for outgoing data.

    • boolean isReader - A flag to let the developer know if this validation call is for the reader or writer adapter.

    Return Value: A set of exception messages to display to the end user, or null or empty set if no errors were detected.

  • public Tuple toTuple(ByteBuffer readBuffer, Schema schema);

    Description: This method is called for each read from the socket. The readBuffer contains the current data read from the socket including any data that was not used from any previous call to this method. It is expected that the position value of the readBuffer will be updated after reading data. If the position value is updated and a null values is returned the position will be reset back to what it was before entering this method. This method will be called repeatedly until a null value is returned or the buffer has zero bytes remaining.

    Variables

    • ByteBuffer readBuffer - The current data to be used to create a tuple. The size of this buffer is set by the getBufferSize() method.

    • Schema schema - This is the schema to be used to create a tuple and fill its values from the incoming data.

    Return Value: A valid tuple if data is available or null if a tuple could not be created.

  • public boolean fromTuple(Tuple tuple, ByteBuffer writeBuffer);

    Description: This method is called for each tuple that is received by the writer adapter. It is expected that the tuple will be transformed to bytes and added to the writeBuffer in any format needed. Please note that you must check the write buffer for available space before trying to write to it, if no space is available and you return false the tuple will be discarded.

    Variables

    • Tuple tuple - The current tuple that needs to be transformed and added to the writeBuffer.

    • ByteBuffer writeBuffer - This is the data that will be directly sent on the socket. The size of this buffer is set by the getBufferSize() method.

    Return Value:

    True if this tuple was successfully added to the write buffer.

    False if a problem occurred or no space is available. If false is returned the adapter will attempt to write data out to the socket and free space and call this method again one more time with the same tuple before discarding the tuple.

  • public int getBufferSize();

    Description: This method is called to determine the size of the read and write buffers for each client connection. This method is called once when a connection is made.

    Return Value: An integer value which specifies the size.

  • public String getName();

    Description: This method is called to get the name of the data transformer. (This method is not currently used and is for future use)

    Return Value: A string representing the name of the data transformer.