Contents
TIBCO StreamBase® Input Adapter for UDP is an embedded adapter that receives raw string data by listening on a UDP port. It resembles the Socket Reader adapter.
the Socket Reader adapter, though, this socket adapter reads input data from a UDP socket with no associated connection IP address. Also, since
the data sent through the UDP socket is represented as simple strings, this adapter has no formatting properties (such as
field delimiter
or string quote character
).
In Multicast Mode (described below), the UDP socket is specifically a Multicast socket and does have an IP address associated with its Multicast Group. This group can consist of many members, who are all capable of receiving messages sent to the group. Furthermore, any address can send messages to the group, not just other members.
This adapter will truncate any received packet greater than 1024 bytes long.
In addition, UDP is an unreliable protocol where messages may be dropped or received out of order, your application should keep this in mind; larger packets may introduce more unreliability.
Property | Description | Default |
---|---|---|
Receive Port | This is the communication port number to which the UDP socket binds. It also serves to help identify the multicast group, when in multicast mode. | 9000 |
Network Interface Address | Specify the network interface for this socket. If blank the default of all interfaces is used. | |
Mode | If checked, the adapter will join the specified multicast group and only receive messages sent to that group | Unchecked |
Port Reuse | is an advanced option, applicable only when in unicast mode. It allows multiple UDPReceivers (or in general, UDP Sockets)to be bound to the same port on the same machine. All of the UDPReceivers (and any unicast mode UDP Sender Output Adapter)must have this option checked for them to be able to bind to that port. Similarly, general UDP Sockets must set the SO_REUSEADDR socket option. This may not work on all systems. | Unchecked |
Multicast Group | The hostname or IP address which is used, along with the port number, to identify the multicast group. Valid multicast IP addresses range from 224.0.0.1 to 239.255.255.255, inclusive. | 239.0.0.0 |
Use Default Charset | Use the default character set for your machine. | Checked |
Charset | The name of the character set to use, if Use Default Charset is Unchecked. | None |
Max Packet Size (bytes) | The maximum size in bytes of the packet that can be received. | 1024 |
Receive Buffer Size (bytes) | Sets the SO_RCVBUF option to the specified value for this DatagramSocket. The SO_RCVBUF option is used by the network implementation as a hint to size the underlying network I/O buffers. The SO_RCVBUF setting may also be used by the network implementation to determine the maximum size of the packet that can be received on this socket. Because SO_RCVBUF is a hint, applications that want to verify what size the buffers were set to should call getReceiveBufferSize(). Increasing SO_RCVBUF may allow the network implementation to buffer multiple packets when packets arrive faster than are being received using receive(DatagramPacket). Note: It is implementation specific if a packet larger than SO_RCVBUF can be received. | 1024 |
Connect on Startup | checked, the adapter creates the UDP socket during start-up. Otherwise, the socket can be created after start-up by enqueuing
a tuple to the control port with a command field value of connect .
|
Checked |
Log Level | Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level will be used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE. | INFO |
The data transformer allows you to specify how the data will be parsed before sent via the output port. This also allows you to specify a custom data transformer which implements the com.streambase.sb.adapter.bidirectionalsocket.common.ISocketDataTransformer interface to parse data into the payload. Please note that UDP is an unreliable transmission protocol and should be dealt with accordingly.
Property | Description | Default |
---|---|---|
Data Type | Determines the type of data that will be received from the UDP Socket. If custom is specified then a Fully-qualified name
of a class that implements com.streambase.sb.adapter.bidirectionalsocket.common.ISocketDataTransformer must be given. The
following data types are available:
|
String |
Custom Data Transformer | The fully qualified path of the class that implements the com.streambase.sb.adapter.bidirectionalsocket.common.ISocketDataTransformer interface | empty |
Data Transformer Settings | Extra settings that can be passed used with the various data types see Data Transformers | empty |
Typechecking fails in the following circumstances:
-
An illegal receive port number is specified.
-
The specified multicast group IP address or host name does not resolve to a multicast address, if configured for multicast mode.
-
Use Default Charset is unchecked and the Charset name provided is not supported.
-
The schema of the control input port is missing the required
command
field or contains extraneous fields.
Data transformers are used to convert data in and out of the system.
com.streambase.sb.adapter.bidirectionalsocket.transform.CSVDataTransformer
The CSV data transformer will convert incoming bytes into tuples of the supplied schema. This transformer has the same properties as the CSVSocketReader adapter, some properties are for the reader and some are for the writer. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
UseDefaultCharset | Reader/Writer | true, false | true | If specified, specifies whether the Java platform default character set is to be used. |
Charset | Reader/Writer | string | System Charset | The name of the character set encoding that the adapter is to use to read input or write output. |
CaptureTransformStrategy | Reader/Writer | Flatten, Nest | Flatten | The strategy to use when transforming capture fields for this operator. |
FieldDelimiter | Reader/Writer | Single character | , | The delimiter used to separate tokens, defaults to a comma. Control characters can be entered as &#ddd; where ddd is the character's ASCII value.
|
NullValueRepresentation | Writer | string | null | String to write when a field is null. |
StringQuote | Writer | QuoteIfNecessary, AlwaysQuote, NeverQuote | QuoteIfNecessary | Determines when string fields are quoted in the CSV file: Quote if necessary, Always quote, or Never quote. |
StringQuoteCharacter | Reader/Writer | Single character | " | Matching pairs of the quote character used to delimit string constants. |
TimestampFormat | Reader | String | yyyy-MM-dd hh:mm:ss.SSSZ | The format used to parse timestamp fields extracted from the input tuples. This should be in the form expected by java.text.SimpleDateFormat class described in Java Platform SE reference documentation.
If a timestamp value is read that does not match the specified format string, the entire record is discarded and a WARN message
appears on the console that includes the text |
LenientTimestampParsing | Reader | true, false | true | Set this to true if you would like to parse timestamp values that do not conform to the specified format using default formats. |
IncompleteRecords | Reader | Discard, PopulateWithNulls | Discard |
Specifies what should be done when the adapter receives a record with less than the required number of fields.
|
DiscardEmptyRecords | Reader | true, false | true |
This is a special case to handle empty lines. If rows with some fields must send output, but not empty lines, leave this true. Set to false to send empty tuples for empty lines. |
LogWarning | Reader | true, false | false |
Set this to true if warning messages should be logged when incomplete records are encountered. If false, no warning messages will be logged for records with less than required number of fields. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
com.streambase.sb.adapter.bidirectionalsocket.transform.JSONDataTransformer
The JSON data transformer will convert incoming bytes into tuples of the supplied schema. Outgoing tuples are converted into escaped JSON strings with a separator character to delimit tuples. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
IncludeNullValues | Writer | true, false | true | Include fields that contain null values. |
EncodeSubType | Writer | list, map | list | The type of transformation that should be used when there are sub tuples to process. |
Separator | Reader/Writer | Single character | \n | The separator character used to determine the end of a JSON value. Hex values starting with 0x are allowed such as 0x03. |
ByteOrder | Reader/Writer | big-endian, little-endian | native order | The byte order to use when converting data. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
com.streambase.sb.adapter.bidirectionalsocket.transform.BLOBDataTransformer
The BLOB data transformer is one of the most basic data transformers. For incoming data it will take each packet of byte information received from a socket and output a tuple with a blob field containing the data. For outgoing data a specified field of type blob is read and the data from that field is sent directly out on the socket. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
BlobFieldName | Reader/Writer | string | BlobField | For the reader this value is the name of the field that will be filled for each outgoing tuple with the byte data. For the writer this is the field that transformer will read and send the contains of the blob to the socket. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
com.streambase.sb.adapter.bidirectionalsocket.transform.SerializedTupleDataTransformer
The serialized tuple data transformer transforms tuple going into the writer adapter into a byte array of data with a single int value as the header to determine size. On the reader side the adapter will read in a single integer and then read that many bytes of data to create a new tuple based on the supplied schema. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
ByteOrder | Reader/Writer | big-endian, little-endian | native order | The byte order to use when converting data. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
com.streambase.sb.adapter.bidirectionalsocket.transform.LengthBasedDataTransformer
The length based data transformer will convert incoming bytes into a string. Outgoing tuples are converted into a integer value representing the size of the string followed by the string. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
DataFieldName | Reader/Writer | any | Data | The name of the String or Blob field in the input or output schema which will contain the data. |
ReadLengthFieldName | Reader | any | ReadLength | The name of the field in the output schema which will be used to store the number of bytes read. |
FixedLength | Reader/Writer | Integer | 0 | For a reader this is the integer fixed length number of bytes to read. If this value is zero or negative it will be ignored. If this value is set all other read length parameters are ignored. For a writer this is the integer fixed length number of bytes to write. If this value is 0 or negative it is ignored and the length of the actual data is sent at the front of the packet. If this input data is less than this value the pad value will be used to pad to the length, if the length if greater than only the bytes up to the fixed length will be sent. |
FixedLengthPadByte | Writer | byte | 0 | The byte value to use when the input data value is less than the fixed length. |
ReadMaxLength | Reader | Integer | Max Integer | The maximum number of bytes to read. If this value is greater than zero then if data is encountered that is large a null tuple is produced and all data is discarded. |
ReadLengthFieldOffset | Reader | Integer | 0 | The offset of the length field. |
LengthFieldLength | Reader/Writer | Integer | 4 | The length of the length field. 1, 2, 3, 4, or 8 bytes for the size read or written at the front of the data packet |
ReadLengthAdjustment | Reader | Integer | 0 | The adjustment of the length field when reading data. |
ReadInitialBytesToStrip | Reader | Integer | 4 | The number of bytes to remove from the start of the byte array. |
WriteLengthIncludesLengthFieldLength | Writer | true/false | false | If enabled the size value sent at the start of the packet will include the length field length. |
ByteOrder | Reader/Writer | big-endian, little-endian | native order | The byte order to use when converting data. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
Custom data transformers allow the end user to build Java code to handle the input and output of the Bi-Directional Adapters. In this section we will cover how the end user can build their own data transformers. There is a single interface that needs to be implemented in order for a class to work with the adapters, that interface is ISocketDataTransformer which we will cover in the next section. Each socket connection established by the adapter creates a new copy of the given data transformer which allows for data to be stored between socket reads and writes for an individual socket connection.
The Bi-Direction socket adapter samples includes two files CustomDataTransformer.java and CustomDataTransformer.sbapp that together demonstrate how to build your own data transformations and use them in an application.
To use the com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer interface you must make sure that the file streambase/lib/ext/bi-directional-socket-interface.jar is present on your build path.
Now make sure this class is available to the adapter at runtime. The most expedient way of accomplishing this is to put your class in a JAR file and place this JAR file in StreamBase's lib/adapter/ext directory.
The interface make use of java.nio.ByteBuffer and assumes that the end user of this interface has some familiarity with this class.
Most of the complexities of java.nio.ByteBuffer are handled by the adapter but the end user should know how to deal with the buffers position, limit, and capacity methods. Calls to toTuple and fromTuple assume that the position of the buffer will be updated to reflect the amount of data that was read from or written to a buffer.
Example:
Five characters are read into a string during toTuple:
String testData = new String(readBuffer.array(), readBuffer.position(), 5);
then the buffers position should be updates to reflect the new position:
buffer.position(buffer.position() + 5);
NOTE: Some methods of the ByteBuffer update the position for you like buffer.getInt(); Please read the documentation for java.nio.ByteBuffer for further details.
The com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer interface is required to be implemented for your data transformer to work with the adapter. The interface has the following methods that need to be completed:
-
public void init(Map<String, String> settings, Logger logger, String host, int port, String socketIdentifier, boolean isReader);
Description: This method is called when a socket connection is made and the data transformer needs to be initialized.
Variables
-
Map<String, String> settings - The settings from the adapter "Data Transformer" tab which the user has supplied.
-
Logger logger - The adapter's logger to allow your transformer to log out information.
-
String host - The host of the socket connection.
-
int port - The port of the socket connection.
-
String socketIdentifier - The optional identifier that may be supplied by the client.
-
boolean isReader - A flag to let the developer know if this init call is for the reader or writer adapter.
Return Value: void
-
-
public Map<String, String> getDefaultSettings();
Description: This method is called by the adapter to try and verify information about the adapter, in future versions of StreamBase it will also be used to try and pre-populate the settings list when a data transformer is selected.
Return Value: Map<String, String> The default settings of the adapter.
-
public Set<String> validateSettings(Map<String, String> settings, Schema schema, boolean isReader);
Description: This method is called when the adapter needs to validate all settings. This method gives the developer the chance to inform the end user that some information is required or invalid.
Variables
-
Map<String, String> settings - The settings from the adapter "Data Transformer" tab which the user has supplied.
-
Schema schema - This is the schema that will be used to create tuples for outgoing data.
-
boolean isReader - A flag to let the developer know if this validation call is for the reader or writer adapter.
Return Value: A set of exception messages to display to the end user, or null or empty set if no errors were detected.
-
-
public Tuple toTuple(ByteBuffer readBuffer, Schema schema);
Description: This method is called for each read from the socket. The readBuffer contains the current data read from the socket including any data that was not used from any previous call to this method. It is expected that the position value of the readBuffer will be updated after reading data. If the position value is updated and a null values is returned the position will be reset back to what it was before entering this method. This method will be called repeatedly until a null value is returned or the buffer has zero bytes remaining.
Variables
-
ByteBuffer readBuffer - The current data to be used to create a tuple. The size of this buffer is set by the getBufferSize() method.
-
Schema schema - This is the schema to be used to create a tuple and fill its values from the incoming data.
Return Value: A valid tuple if data is available or null if a tuple could not be created.
-
-
public boolean fromTuple(Tuple tuple, ByteBuffer writeBuffer);
Description: This method is called for each tuple that is received by the writer adapter. It is expected that the tuple will be transformed to bytes and added to the writeBuffer in any format needed. Please note that you must check the write buffer for available space before trying to write to it, if no space is available and you return false the tuple will be discarded.
Variables
-
Tuple tuple - The current tuple that needs to be transformed and added to the writeBuffer.
-
ByteBuffer writeBuffer - This is the data that will be directly sent on the socket. The size of this buffer is set by the getBufferSize() method.
Return Value:
True if this tuple was successfully added to the write buffer.
False if a problem occurred or no space is available. If false is returned the adapter will attempt to write data out to the socket and free space and call this method again one more time with the same tuple before discarding the tuple.
-
-
public int getBufferSize();
Description: This method is called to determine the size of the read and write buffers for each client connection. This method is called once when a connection is made.
Return Value: An integer value which specifies the size.
-
public String getName();
Description: This method is called to get the name of the data transformer. (This method is not currently used and is for future use)
Return Value: A string representing the name of the data transformer.
On suspend, this adapter closes its input socket and, if in multicast mode, leaves the multicast group.
On resumption, it reconnects its socket, rejoins the group (if in multicast mode) and continues reading tuples from it.
This adapter does not leave its socket open during suspend primarily so that the port which it is bound to will be freed for other use while it is suspended. Unlike the CSV Socket Reader, this socket is UDP and thus will simply drop incoming packets if it is connected and suspended; there would be no buffering or memory issues.
-
Control (input#1): The input schema for this port must have its first field be of type 'string' and have the name 'command'. When a valid command tuple is enqueued on this port, the adapter will execute the command directive.
The valid commands are as follows:
-
disconnect
This will cause the adapter to leave the multicast group and close its connection to the socket. Any tuples sent to that group while the socket is disconnected will not be received. This socket can be reconnected with the 'connect' command.
-
connect
This command will reconnect the socket and rejoin the multicast group, when it has been disconnected by the 'disconnect' command. In the event of connection failure and automatic retry failure, this command will not properly reconnect the socket, a restart is required in that instance.
-
status
This command will cause the adapter to output a status tuple detailing the current connection status.
-
-
Data (output #1): This is the primary output port, through which data received via the multicast connection will output tuples. The data type is determined by the data type property selected.
The schema for this port will have three fields:
-
"Payload" , various
This is the contents of the payload of the packet, the type is determined by the data type in the Data Transformer properties.
-
"SenderIP" , <string>
The physical IP address of the sender of the packet, in the form "nnn.nnn.nnn.nnn"
-
"SenderPort" , <int>
The port on which the process which sent this packet is operating
-
-
Status (output): This is a secondary output port, through which the adapter will output status tuples. Each status tuple will have a TYPE and an ACTION, which will describe the current status. In addition, it will have a message, timestamp and in some cases the tuple that caused the status report.
The TYPES and their associated ACTIONS follow:
-
Data Status
This is used for when an error occurs while trying to parse or output data within Streambase. This will always be paired with the 'Error' action.
-
Connection Status
This is used for any connection related status messages, and can have the actions: 'Connected' or 'Disconnected'
-
Command Status
This is used to confirm or deny the success of a command, entered through the control port. The associated actions are: Error, Connected , Disconnected.
-
Suspend/Resume
This status be emitted whenever the adapter has either been suspended or is about to resume operation. It will either have the action Suspended or Resuming.
-
Multicast Mode causes several changes in behavior from the default, unicast mode.
In Multicast Mode, the paradigm is that this adapter (and perhaps others) will be reporting messages to a group of many receiving adapters.
Multiple Multicast Sockets may be bound to the same port on the same machine, so that multiple UDP Sender adapters can be bound to the same port, if in multicast mode; else they must have the "Port Reuse" option checked.
Note
Many firewalls do not allow multicast communication to prevent overloading the network with traffic. You may need to adjust your firewall settings accordingly. TIBCO recommends that you allow internal multicast communication only, unless otherwise necessary, to prevent receiving unwanted external multicast traffic.
Multicast Mode will cause several changes in behavior from the normal, unicast mode.
In Multicast Mode, the paradigm is that this adapter will subscribe to a multicast group, and receive any messages sent to that group's IP and Port.
In either mode, messages can be received from any sender, via UDP protocol, on the specified Port.
Multiple Multicast Sockets may be bound to the same port on the same machine, thus multiple UDP Receiver adapters may be bound to the same Port if in multicast mode. If they are in unicast mode, then the Port Reuse option must be checked to bind multiple Receivers to the same port.
N.B. Many firewalls do not allow multicast communication, to prevent overloading the network with traffic. You will likely have to adjust your firewall settings accordingly; it is recommended that you limit this to allow internal multicast communication only, unless otherwise necessary, to prevent receiving unwanted outside multicast traffic.