Contents
The TIBCO StreamBase® Output Adapter for UDP is an embedded adapter that writes data through a socket to another machine or process. It resembles the CSV Socket Writer adapter. Unlike the CSV Socket Writer adapter, though, this socket adapter writes output data to a UDP socket instead of a TCP socket.
When in multicast mode, the adapter will send messages to a multicast group. All listening members of the group will receive messages sent by this adapter.
N.B.: There is no maximum packet size, however if this is being used in conjunction with the UDP Receiver, then only the first 1024 bytes of the message will be received, as that adapter has a packet size limitation.
In addition, UDP is an unreliable protocol where messages may be dropped or received out of order, your application should keep this in mind; larger packets may introduce more unreliability.
Property | Description | Default |
---|---|---|
Port | The UDP port to which this adapter binds locally. This is not the same as the port to which outgoing messages are sent. | 9999 |
Network Interface Address | Specify the network interface for this socket. If blank the default of all interfaces is used. | |
Multicast Mode | If this is checked, the adapter will send messages to the specified multicast group, instead of sending to the address and port supplied on its input port. | Unchecked |
Multicast Group | The host name or IP address which identifies the target multicast group. | 239.0.0.0 |
Multicast Port | The UDP port used to identify the multicast group. | 9000 |
Use Default Charset | Use the default character set for your machine. | Checked |
Charset | The name of the character set to use, if Use Default Charset is Unchecked. | None |
Reconnect Count | The number of times the adapter should try to reconnect to the socket should the connection fail. 0 means attempt to reconnect indefinitely. | 0 |
Use Default Charset | Use the default character set for your machine. | Checked |
Charset | The name of the character set to use, if Use Default Charset is unchecked.
|
None |
Connect on Startup | If checked, the adapter creates the UDP socket during start-up. Otherwise, the socket can be created after start-up by enqueuing
a tuple to the control port with a command field value of connect .
|
Checked |
Log Level | Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level will be used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE. | INFO |
Port Reuse | This is an advanced option, applicable only when in unicast mode. It allows multiple UDPSenders (or in general, UDP Sockets)to be bound to the same port on the same machine. All of the UDPSenders (and any unicast mode UDPReceivers)must have this option checked for them to be able to bind to that port. Similarly, general UDP Sockets must set the SO_REUSEADDR socket option. This may not work on all systems. | Unchecked |
The data transformer allows you to specify how the data will be encoded before being sent. This also allows you to specify a custom data transformer which implements the com.streambase.sb.adapter.bidirectionalsocket.common.ISocketDataTransformer interface to encode data into the payload. Please note that UDP is an unreliable transmission protocol and should be dealt with accordingly.
Property | Description | Default |
---|---|---|
Data Type | Determines the type of data that will be received from the UDP Socket. If custom is specified then a Fully-qualified name
of a class that implements com.streambase.sb.adapter.bidirectionalsocket.common.ISocketDataTransformer must be given. The
following data types are available:
|
String |
Custom Data Transformer | The fully qualified path of the class that implements the com.streambase.sb.adapter.bidirectionalsocket.common.ISocketDataTransformer interface | empty |
Data Transformer Settings | Extra settings that can be passed used with the various data types see Data Transformers | empty |
Typechecking fails in the following circumstances:
-
An illegal port number is specified.
-
The specified multicast group IP address or host name does not resolve to a multicast address, if configured for multicast mode.
-
Use Default Charset is unchecked and the Charset name provided is not supported.
-
The schema of the control input port is missing the required
command
field or contains extraneous fields. -
The schema of the data input port is missing required fields, has fields of the wrong type, or contains extraneous fields.
-
Data (input #1): This is the primary input port, any tuples enqueued on this port will be sent as messages. The required schema for this port will differ depending on whether or not the adapter is in multicast mode.
If not in multicast mode, the required schema will be as follows (the fields may have arbitrary names):
-
First field , <string>
This is the contents of the payload of the packet, as represented by a string.
-
Second field , <string>
The physical IP address of the sender of the packet, in the form "nnn.nnn.nnn.nnn"
-
Third Field , <int>
The port on which the process which sent this packet is operating
If in multicast mode, the required input schema has the form:
-
First field , <string>
This is the contents of the payload of the packet, as represented by a string.
As you can see, the first field requirement is the same, regardless of mode.
-
-
Control (input#2): The input schema for this port must have its first field be of type 'string' and have the name 'command'. When a valid command tuple is enqueued on this port, the adapter will execute the command directive.
The valid commands are as follows:
-
disconnect
This will cause the adapter to leave the multicast group (if in multicast mode) and close its connection to the socket. Any tuples enqueued while the socket is disconnected will not be sent. This socket can be reconnected with the 'connect' command.
-
connect
This command will reconnect the socket and rejoin the multicast group (if in multicast mode), if it had been previously disconnected by the 'disconnect' command. In the event of connection failure and automatic retry failure, this command will not properly reconnect the socket, a restart is required in that instance.
-
status
This command will cause the adapter to output a status tuple detailing the current connection status.
-
-
Status (output#1): This is a secondary output port, through which the adapter will output status tuples. Each status tuple will have a 'TYPE' and an 'ACTION', which will describe the current status.
The 'TYPES' and their associated 'ACTIONS' follow:
-
Data Status
This is used for when an error occurs while trying to parse or output data within Streambase. This will always be paired with the 'Error' action.
-
Connection Status
This is used for any connection related status messages, and can have the actions: 'Connected' or 'Disconnected'
-
Command Status
This is used to confirm or deny the success of a command, entered through the control port. The associated actions are: 'Error', 'Connected' , 'Disconnected'.
-
Suspend/Resume
This status be emitted whenever the adapter has either been suspended or is about to resume operation. It will either have the action 'Suspended' or 'Resuming'
-
Multicast Mode causes several changes in behavior from the default, unicast mode.
First, it changes the required input schema (see UDP Sender Ports).
In Multicast Mode, the paradigm is that many receiving adapters receive messages sent to the group.
The members of the group may change, but the IP and Port identifying this group are fixed. Thus, it doesn't make sense to communicate the recipient's host and port through input tuple fields.
Multiple Multicast Sockets may be bound to the same port on the same machine, so that multiple UDP Sender adapters can be bound to the same port, if in multicast mode; else they must have the "Port Reuse" option checked.
Note
Many firewalls do not allow multicast communication, to prevent overloading the network with traffic. You may need to adjust your firewall settings accordingly. It is recommended that you allow internal multicast communication only, unless otherwise necessary, to prevent receiving unwanted multicast traffic from outside.
Multicast Mode will cause several changes in behavior from the normal, unicast mode.
First of all, it will change the required input schema (see UDP Sender Ports).
In Multicast Mode, the paradigm is that this adapter (and perhaps others) will be reporting messages to a group of many receiving adapters.
The members of that group may change, but the IP and Port identifying this group are fixed; thus it no longer makes sense to communicate the IP and Port of the recipient in the EventFlow
Multiple Multicast Sockets may be bound to the same port on the same machine, thus multiple UDP Sender adapters can be bound to the same port, if in multicast mode; else they must have the "Port Reuse" option checked.
N.B. Many firewalls do not allow multicast communication, to prevent overloading the network with traffic. You will likely have to adjust your firewall settings accordingly; it is recommended that you limit this to allow internal multicast communication only, unless otherwise necessary, to prevent receiving unwanted outside multicast traffic.
Data transformers are used to convert data in and out of the system.
com.streambase.sb.adapter.bidirectionalsocket.transform.CSVDataTransformer
The CSV data transformer will convert incoming bytes into tuples of the supplied schema. This transformer has the same properties as the CSVSocketReader adapter, some properties are for the reader and some are for the writer. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
UseDefaultCharset | Reader/Writer | true, false | true | If specified, specifies whether the Java platform default character set is to be used. |
Charset | Reader/Writer | string | System Charset | The name of the character set encoding that the adapter is to use to read input or write output. |
CaptureTransformStrategy | Reader/Writer | Flatten, Nest | Flatten | The strategy to use when transforming capture fields for this operator. |
FieldDelimiter | Reader/Writer | Single character | , | The delimiter used to separate tokens, defaults to a comma. Control characters can be entered as &#ddd; where ddd is the character's ASCII value.
|
NullValueRepresentation | Writer | string | null | String to write when a field is null. |
StringQuote | Writer | QuoteIfNecessary, AlwaysQuote, NeverQuote | QuoteIfNecessary | Determines when string fields are quoted in the CSV file: Quote if necessary, Always quote, or Never quote. |
StringQuoteCharacter | Reader/Writer | Single character | " | Matching pairs of the quote character used to delimit string constants. |
TimestampFormat | Reader | String | yyyy-MM-dd hh:mm:ss.SSSZ | The format used to parse timestamp fields extracted from the input tuples. This should be in the form expected by java.text.SimpleDateFormat class described in Java Platform SE reference documentation.
If a timestamp value is read that does not match the specified format string, the entire record is discarded and a WARN message
appears on the console that includes the text |
LenientTimestampParsing | Reader | true, false | true | Set this to true if you would like to parse timestamp values that do not conform to the specified format using default formats. |
IncompleteRecords | Reader | Discard, PopulateWithNulls | Discard |
Specifies what should be done when the adapter receives a record with less than the required number of fields.
|
DiscardEmptyRecords | Reader | true, false | true |
This is a special case to handle empty lines. If rows with some fields must send output, but not empty lines, leave this true. Set to false to send empty tuples for empty lines. |
LogWarning | Reader | true, false | false |
Set this to true if warning messages should be logged when incomplete records are encountered. If false, no warning messages will be logged for records with less than required number of fields. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
com.streambase.sb.adapter.bidirectionalsocket.transform.JSONDataTransformer
The JSON data transformer will convert incoming bytes into tuples of the supplied schema. Outgoing tuples are converted into escaped JSON strings with a separator character to delimit tuples. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
IncludeNullValues | Writer | true, false | true | Include fields that contain null values. |
EncodeSubType | Writer | list, map | list | The type of transformation that should be used when there are sub tuples to process. |
Separator | Reader/Writer | Single character | \n | The separator character used to determine the end of a JSON value. Hex values starting with 0x are allowed such as 0x03. |
ByteOrder | Reader/Writer | big-endian, little-endian | native order | The byte order to use when converting data. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
com.streambase.sb.adapter.bidirectionalsocket.transform.BLOBDataTransformer
The BLOB data transformer is one of the most basic data transformers. For incoming data it will take each packet of byte information received from a socket and output a tuple with a blob field containing the data. For outgoing data a specified field of type blob is read and the data from that field is sent directly out on the socket. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
BlobFieldName | Reader/Writer | string | BlobField | For the reader this value is the name of the field that will be filled for each outgoing tuple with the byte data. For the writer this is the field that transformer will read and send the contains of the blob to the socket. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
com.streambase.sb.adapter.bidirectionalsocket.transform.SerializedTupleDataTransformer
The serialized tuple data transformer transforms tuple going into the writer adapter into a byte array of data with a single int value as the header to determine size. On the reader side the adapter will read in a single integer and then read that many bytes of data to create a new tuple based on the supplied schema. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
ByteOrder | Reader/Writer | big-endian, little-endian | native order | The byte order to use when converting data. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
com.streambase.sb.adapter.bidirectionalsocket.transform.LengthBasedDataTransformer
The length based data transformer will convert incoming bytes into a string. Outgoing tuples are converted into a integer value representing the size of the string followed by the string. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
DataFieldName | Reader/Writer | any | Data | The name of the String or Blob field in the input or output schema which will contain the data. |
ReadLengthFieldName | Reader | any | ReadLength | The name of the field in the output schema which will be used to store the number of bytes read. |
FixedLength | Reader/Writer | Integer | 0 | For a reader this is the integer fixed length number of bytes to read. If this value is zero or negative it will be ignored. If this value is set all other read length parameters are ignored. For a writer this is the integer fixed length number of bytes to write. If this value is 0 or negative it is ignored and the length of the actual data is sent at the front of the packet. If this input data is less than this value the pad value will be used to pad to the length, if the length if greater than only the bytes up to the fixed length will be sent. |
FixedLengthPadByte | Writer | byte | 0 | The byte value to use when the input data value is less than the fixed length. |
ReadMaxLength | Reader | Integer | Max Integer | The maximum number of bytes to read. If this value is greater than zero then if data is encountered that is large a null tuple is produced and all data is discarded. |
ReadLengthFieldOffset | Reader | Integer | 0 | The offset of the length field. |
LengthFieldLength | Reader/Writer | Integer | 4 | The length of the length field. 1, 2, 3, 4, or 8 bytes for the size read or written at the front of the data packet |
ReadLengthAdjustment | Reader | Integer | 0 | The adjustment of the length field when reading data. |
ReadInitialBytesToStrip | Reader | Integer | 4 | The number of bytes to remove from the start of the byte array. |
WriteLengthIncludesLengthFieldLength | Writer | true/false | false | If enabled the size value sent at the start of the packet will include the length field length. |
ByteOrder | Reader/Writer | big-endian, little-endian | native order | The byte order to use when converting data. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
Custom data transformers allow the end user to build Java code to handle the input and output of the Bi-Directional Adapters. In this section we will cover how the end user can build their own data transformers. There is a single interface that needs to be implemented in order for a class to work with the adapters, that interface is ISocketDataTransformer which we will cover in the next section. Each socket connection established by the adapter creates a new copy of the given data transformer which allows for data to be stored between socket reads and writes for an individual socket connection.
The Bi-Direction socket adapter samples includes two files CustomDataTransformer.java and CustomDataTransformer.sbapp that together demonstrate how to build your own data transformations and use them in an application.
To use the com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer interface you must make sure that the file streambase/lib/ext/bi-directional-socket-interface.jar is present on your build path.
Now make sure this class is available to the adapter at runtime. The most expedient way of accomplishing this is to put your class in a JAR file and place this JAR file in StreamBase's lib/adapter/ext directory.
The interface make use of java.nio.ByteBuffer and assumes that the end user of this interface has some familiarity with this class.
Most of the complexities of java.nio.ByteBuffer are handled by the adapter but the end user should know how to deal with the buffers position, limit, and capacity methods. Calls to toTuple and fromTuple assume that the position of the buffer will be updated to reflect the amount of data that was read from or written to a buffer.
Example:
Five characters are read into a string during toTuple:
String testData = new String(readBuffer.array(), readBuffer.position(), 5);
then the buffers position should be updates to reflect the new position:
buffer.position(buffer.position() + 5);
NOTE: Some methods of the ByteBuffer update the position for you like buffer.getInt(); Please read the documentation for java.nio.ByteBuffer for further details.
The com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer interface is required to be implemented for your data transformer to work with the adapter. The interface has the following methods that need to be completed:
-
public void init(Map<String, String> settings, Logger logger, String host, int port, String socketIdentifier, boolean isReader);
Description: This method is called when a socket connection is made and the data transformer needs to be initialized.
Variables
-
Map<String, String> settings - The settings from the adapter "Data Transformer" tab which the user has supplied.
-
Logger logger - The adapter's logger to allow your transformer to log out information.
-
String host - The host of the socket connection.
-
int port - The port of the socket connection.
-
String socketIdentifier - The optional identifier that may be supplied by the client.
-
boolean isReader - A flag to let the developer know if this init call is for the reader or writer adapter.
Return Value: void
-
-
public Map<String, String> getDefaultSettings();
Description: This method is called by the adapter to try and verify information about the adapter, in future versions of StreamBase it will also be used to try and pre-populate the settings list when a data transformer is selected.
Return Value: Map<String, String> The default settings of the adapter.
-
public Set<String> validateSettings(Map<String, String> settings, Schema schema, boolean isReader);
Description: This method is called when the adapter needs to validate all settings. This method gives the developer the chance to inform the end user that some information is required or invalid.
Variables
-
Map<String, String> settings - The settings from the adapter "Data Transformer" tab which the user has supplied.
-
Schema schema - This is the schema that will be used to create tuples for outgoing data.
-
boolean isReader - A flag to let the developer know if this validation call is for the reader or writer adapter.
Return Value: A set of exception messages to display to the end user, or null or empty set if no errors were detected.
-
-
public Tuple toTuple(ByteBuffer readBuffer, Schema schema);
Description: This method is called for each read from the socket. The readBuffer contains the current data read from the socket including any data that was not used from any previous call to this method. It is expected that the position value of the readBuffer will be updated after reading data. If the position value is updated and a null values is returned the position will be reset back to what it was before entering this method. This method will be called repeatedly until a null value is returned or the buffer has zero bytes remaining.
Variables
-
ByteBuffer readBuffer - The current data to be used to create a tuple. The size of this buffer is set by the getBufferSize() method.
-
Schema schema - This is the schema to be used to create a tuple and fill its values from the incoming data.
Return Value: A valid tuple if data is available or null if a tuple could not be created.
-
-
public boolean fromTuple(Tuple tuple, ByteBuffer writeBuffer);
Description: This method is called for each tuple that is received by the writer adapter. It is expected that the tuple will be transformed to bytes and added to the writeBuffer in any format needed. Please note that you must check the write buffer for available space before trying to write to it, if no space is available and you return false the tuple will be discarded.
Variables
-
Tuple tuple - The current tuple that needs to be transformed and added to the writeBuffer.
-
ByteBuffer writeBuffer - This is the data that will be directly sent on the socket. The size of this buffer is set by the getBufferSize() method.
Return Value:
True if this tuple was successfully added to the write buffer.
False if a problem occurred or no space is available. If false is returned the adapter will attempt to write data out to the socket and free space and call this method again one more time with the same tuple before discarding the tuple.
-
-
public int getBufferSize();
Description: This method is called to determine the size of the read and write buffers for each client connection. This method is called once when a connection is made.
Return Value: An integer value which specifies the size.
-
public String getName();
Description: This method is called to get the name of the data transformer. (This method is not currently used and is for future use)
Return Value: A string representing the name of the data transformer.