UDP Receiver Input Adapter

Introduction

Spotfire Streaming Input Adapter for UDP is an embedded adapter that receives raw string data by listening on a UDP port. It resembles the Socket Reader adapter.

the Socket Reader adapter, though, this socket adapter reads input data from a UDP socket with no associated connection IP address. Also, since the data sent through the UDP socket is represented as simple strings, this adapter has no formatting properties (such as field delimiter or string quote character).

In Multicast Mode (described below), the UDP socket is specifically a Multicast socket and does have an IP address associated with its Multicast Group. This group can consist of many members, who are all capable of receiving messages sent to the group. Furthermore, any address can send messages to the group, not just other members.

This adapter will truncate any received packet greater than 1024 bytes long.

In addition, UDP is an unreliable protocol where messages may be dropped or received out of order, your application should keep this in mind; larger packets may introduce more unreliability.

Properties

This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.

General Tab

Name: Use this required field to specify or change the name of this instance of this component. The name must be unique within the current EventFlow module. The name can contain alphanumeric characters, underscores, and escaped special characters. Special characters can be escaped as described in Identifier Naming Rules. The first character must be alphabetic or an underscore.

Adapter: A read-only field that shows the formal name of the adapter.

Class name: Shows the fully qualified class name that implements the functionality of this adapter. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start options: This field provides a link to the Cluster Aware tab, where you configure the conditions under which this adapter starts.

Enable Error Output Port: Select this checkbox to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally, enter text to briefly describe the purpose and function of the component. In the EventFlow Editor canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Adapter Properties Tab

Property Description Default
Receive Port This is the communication port number to which the UDP socket binds. It also serves to help identify the multicast group, when in multicast mode. 9000
Network Interface Address Specify the network interface for this socket. If blank the default of all interfaces is used.  
Mode If checked, the adapter will join the specified multicast group and only receive messages sent to that group Unchecked
Port Reuse is an advanced option, applicable only when in unicast mode. It allows multiple UDPReceivers (or in general, UDP Sockets)to be bound to the same port on the same machine. All of the UDPReceivers (and any unicast mode UDP Sender Output Adapter)must have this option checked for them to be able to bind to that port. Similarly, general UDP Sockets must set the SO_REUSEADDR socket option. This may not work on all systems. Unchecked
Multicast Group The hostname or IP address which is used, along with the port number, to identify the multicast group. Valid multicast IP addresses range from 224.0.0.1 to 239.255.255.255, inclusive. 239.0.0.0
Use Default Charset Use the default character set for your machine. Checked
Charset The name of the character set to use, if Use Default Charset is Unchecked. None
Max Packet Size (bytes) The maximum size in bytes of the packet that can be received. 1024
Receive Buffer Size (bytes) Sets the SO_RCVBUF option to the specified value for this DatagramSocket. The SO_RCVBUF option is used by the network implementation as a hint to size the underlying network I/O buffers. The SO_RCVBUF setting may also be used by the network implementation to determine the maximum size of the packet that can be received on this socket. Because SO_RCVBUF is a hint, applications that want to verify what size the buffers were set to should call getReceiveBufferSize(). Increasing SO_RCVBUF may allow the network implementation to buffer multiple packets when packets arrive faster than are being received using receive(DatagramPacket). Note: It is implementation specific if a packet larger than SO_RCVBUF can be received. 1024
Connect on Startup checked, the adapter creates the UDP socket during start-up. Otherwise, the socket can be created after start-up by enqueuing a tuple to the control port with a command field value of connect. Checked
Log Level Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level will be used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE. INFO

Data Transformer Tab

The data transformer allows you to specify how the data will be parsed before sent via the output port. This also allows you to specify a custom data transformer which implements the com.streambase.sb.adapter.bidirectionalsocket.common.ISocketDataTransformer interface to parse data into the payload. Please note that UDP is an unreliable transmission protocol and should be dealt with accordingly.

Property Description Default
Data Type Determines the type of data that will be received from the UDP Socket. If custom is specified then a Fully-qualified name of a class that implements com.streambase.sb.adapter.bidirectionalsocket.common.ISocketDataTransformer must be given. The following data types are available:

  • String - Each packet of data received will be output directly as a string field in the output tuple.

  • Blob - Each packet of data received will be output directly as a binary blob field in the output tuple.

  • Custom - The class given that implements com.streambase.sb.adapter.bidirectionalsocket.common.ISocketDataTransformer will be used to transform the data before output as a tuple with the schema give by the output schema on the edit schema tab. The data transformer setting properties can be used to input various settings if required.

  • Length Based - Each packet of data is inspected for a length integer to start and then that value is used to read that many bytes of data to convert to a string.

  • CSV - Each packet is expected to be a csv formatted string and will be converted to the tuple format given by the output schema.

  • JSON - Each packet is expected to be a json formatted string and will be converted to the tuple format given by the output schema.

  • Serialized Tuple - Each packet is expected to be a serialized tuple and will be converted to the tuple format given by the output schema. See table below for optional settings.

String
Custom Data Transformer The fully qualified path of the class that implements the com.streambase.sb.adapter.bidirectionalsocket.common.ISocketDataTransformer interface empty
Data Transformer Settings Extra settings that can be passed used with the various data types see Data Transformers empty

Edit Schema Tab

Use the Edit Schema tab to specify the schema of the output tuple for this adapter. For general instructions on using the Edit Schema tab, see the Properties: Edit Schema Tab section of the Defining Input Streams page.

Cluster Aware Tab

Use the settings in this tab to enable this operator or adapter for runtime start and stop conditions in a multi-node cluster. During initial development of the fragment that contains this operator or adapter, and for maximum compatibility with releases before 10.5.0, leave the Cluster start policy control in its default setting, Start with module.

Cluster awareness is an advanced topic that requires an understanding of StreamBase Runtime architecture features, including clusters, quorums, availability zones, and partitions. See Cluster Awareness Tab Settings on the Using Cluster Awareness page for instructions on configuring this tab.

Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Typechecking and Error Handling

Typechecking fails in the following circumstances:

  • An illegal receive port number is specified.

  • The specified multicast group IP address or host name does not resolve to a multicast address, if configured for multicast mode.

  • Use Default Charset is unchecked and the Charset name provided is not supported.

  • The schema of the control input port is missing the required command field or contains extraneous fields.

Data Transformers

Data transformers are used to convert data in and out of the system.

Built-In Data Transformers

CSV Data Transformer

com.streambase.sb.adapter.bidirectionalsocket.transform.CSVDataTransformer

The CSV data transformer will convert incoming bytes into tuples of the supplied schema. This transformer has the same properties as the CSVSocketReader adapter, some properties are for the reader and some are for the writer. If a setting is not present the default is used.

Setting Applies to Reader/Writer Allowed Values Default Description
UseDefaultCharset Reader/Writer true, false true If specified, specifies whether the Java platform default character set is to be used.
Charset Reader/Writer string System Charset The name of the character set encoding that the adapter is to use to read input or write output.
CaptureTransformStrategy Reader/Writer Flatten, Nest Flatten The strategy to use when transforming capture fields for this operator.
FieldDelimiter Reader/Writer Single character , The delimiter used to separate tokens, defaults to a comma. Control characters can be entered as &#ddd; where ddd is the character's ASCII value.
NullValueRepresentation Writer string null String to write when a field is null.
StringQuote Writer QuoteIfNecessary, AlwaysQuote, NeverQuote QuoteIfNecessary Determines when string fields are quoted in the CSV file: Quote if necessary, Always quote, or Never quote.
StringQuoteCharacter Reader/Writer Single character " Matching pairs of the quote character used to delimit string constants.
TimestampFormat Reader String yyyy-MM-dd hh:mm:ss.SSSZ The format used to parse timestamp fields extracted from the input tuples. This should be in the form expected by java.text.SimpleDateFormat class described in Java Platform SE reference documentation.

If a timestamp value is read that does not match the specified format string, the entire record is discarded and a WARN message appears on the console that includes the text invalid timestamp value.

LenientTimestampParsing Reader true, false true Set this to true if you would like to parse timestamp values that do not conform to the specified format using default formats.
IncompleteRecords Reader Discard, PopulateWithNulls Discard

Specifies what should be done when the adapter receives a record with less than the required number of fields.

Discard

Discard records with less than the required number of fields.

PopulateWithNulls

When records with less than the required number of fields are encountered, process the records after populating the unspecified fields with nulls.

DiscardEmptyRecords Reader true, false true

This is a special case to handle empty lines. If rows with some fields must send output, but not empty lines, leave this true. Set to false to send empty tuples for empty lines.

LogWarning Reader true, false false

Set this to true if warning messages should be logged when incomplete records are encountered. If false, no warning messages will be logged for records with less than required number of fields.

BufferSize Reader/Writer int 10240 The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer.

JSON Data Transformer

com.streambase.sb.adapter.bidirectionalsocket.transform.JSONDataTransformer

The JSON data transformer will convert incoming bytes into tuples of the supplied schema. Outgoing tuples are converted into escaped JSON strings with a separator character to delimit tuples. If a setting is not present the default is used.

Setting Applies to Reader/Writer Allowed Values Default Description
IncludeNullValues Writer true, false true Include fields that contain null values.
EncodeSubType Writer list, map list The type of transformation that should be used when there are sub tuples to process.
Separator Reader/Writer Single character \n The separator character used to determine the end of a JSON value. Hex values starting with 0x are allowed such as 0x03.
ByteOrder Reader/Writer big-endian, little-endian native order The byte order to use when converting data.
BufferSize Reader/Writer int 10240 The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer.

BLOB Data Transformer

com.streambase.sb.adapter.bidirectionalsocket.transform.BLOBDataTransformer

The BLOB data transformer is one of the most basic data transformers. For incoming data it will take each packet of byte information received from a socket and output a tuple with a blob field containing the data. For outgoing data a specified field of type blob is read and the data from that field is sent directly out on the socket. If a setting is not present the default is used.

Setting Applies to Reader/Writer Allowed Values Default Description
BlobFieldName Reader/Writer string BlobField For the reader this value is the name of the field that will be filled for each outgoing tuple with the byte data. For the writer this is the field that transformer will read and send the contains of the blob to the socket.
BufferSize Reader/Writer int 10240 The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer.

Serialized Tuple Data Transformer

com.streambase.sb.adapter.bidirectionalsocket.transform.SerializedTupleDataTransformer

The serialized tuple data transformer transforms tuple going into the writer adapter into a byte array of data with a single int value as the header to determine size. On the reader side the adapter will read in a single integer and then read that many bytes of data to create a new tuple based on the supplied schema. If a setting is not present the default is used.

Setting Applies to Reader/Writer Allowed Values Default Description
ByteOrder Reader/Writer big-endian, little-endian native order The byte order to use when converting data.
BufferSize Reader/Writer int 10240 The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer.

Length Based Data Transformer

com.streambase.sb.adapter.bidirectionalsocket.transform.LengthBasedDataTransformer

The length based data transformer will convert incoming bytes into a string. Outgoing tuples are converted into a integer value representing the size of the string followed by the string. If a setting is not present the default is used.

Setting Applies to Reader/Writer Allowed Values Default Description
DataFieldName Reader/Writer any Data The name of the String or Blob field in the input or output schema which will contain the data.
ReadLengthFieldName Reader any ReadLength The name of the field in the output schema which will be used to store the number of bytes read.
FixedLength Reader/Writer Integer 0 For a reader this is the integer fixed length number of bytes to read. If this value is zero or negative it will be ignored. If this value is set all other read length parameters are ignored. For a writer this is the integer fixed length number of bytes to write. If this value is 0 or negative it is ignored and the length of the actual data is sent at the front of the packet. If this input data is less than this value the pad value will be used to pad to the length, if the length if greater than only the bytes up to the fixed length will be sent.
FixedLengthPadByte Writer byte 0 The byte value to use when the input data value is less than the fixed length.
ReadMaxLength Reader Integer Max Integer The maximum number of bytes to read. If this value is greater than zero then if data is encountered that is large a null tuple is produced and all data is discarded.
ReadLengthFieldOffset Reader Integer 0 The offset of the length field.
LengthFieldLength Reader/Writer Integer 4 The length of the length field. 1, 2, 3, 4, or 8 bytes for the size read or written at the front of the data packet
ReadLengthAdjustment Reader Integer 0 The adjustment of the length field when reading data.
ReadInitialBytesToStrip Reader Integer 4 The number of bytes to remove from the start of the byte array.
WriteLengthIncludesLengthFieldLength Writer true/false false If enabled the size value sent at the start of the packet will include the length field length.
ByteOrder Reader/Writer big-endian, little-endian native order The byte order to use when converting data.
BufferSize Reader/Writer int 10240 The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer.

Building Custom Data Transformers

Introduction

Custom data transformers allow you to build Java code to handle the input and output of the Bi-Directional Adapters. This section covers how you can build your own data transformers. There is a single interface that must be implemented for a class to work with the adapters — ISocketDataTransformer — which is covered in the next section. Each socket connection the adapter establishes creates a new copy of the given data transformer. This allows for data storage between socket reads and writes for an individual socket connection.

For more information about the Bi-Directional socket adapters, see their sample which demonstrates how you can build your own data transformations and use them in an application.

Setup

The com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer interface becomes available in your project when you add a Bi-Directional Socket adapter to the Studio canvas. This adds the necessary adapter dependency in your project's pom.xml.

java.nio.ByteBuffer

The interface makes use of java.nio.ByteBuffer and assumes you have some familiarity with this class.

The adapter handles most of the complexities of java.nio.ByteBuffer. Regardless, you should have some familiarity in dealing with the buffers position, limit, and capacity methods. Calls to toTuple and fromTuple assume that the position of the buffer will be updated to reflect the amount of data that was read from or written to a buffer.

Example:

Five characters are read into a string during toTuple:

String testData = new String(readBuffer.array(), readBuffer.position(), 5);

then the buffers position should be updates to reflect the new position:

buffer.position(buffer.position() + 5);

Note

Some ByteBuffer methods update the position for you, such as buffer.getInt(); Please see the Oracle documentation for java.nio.ByteBuffer for details.

com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer

The com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer interface must be implemented for your data transformer to work with the adapter. Complete the following methods for the interface:

  • public void init(Map<String, String> settings, Logger logger, String host, int port, String socketIdentifier, boolean isReader);

    Description: This method is called when a socket connection is made and the data transformer needs to be initialized.

    Variables

    • Map<String, String> settings — The settings you supplied in the adapter Data Transformer tab.

    • Logger logger — The adapter's logger to allow your transformer to log out information.

    • String host — The host of the socket connection.

    • int port — The port of the socket connection.

    • String socketIdentifier — The optional identifier that the client may supply.

    • boolean isReader — A flag to indicate whether this init call applies to the reader or writer adapter.

    Return Value: void

  • public Map<String, String> getDefaultSettings();

    Description: This method is called by the adapter to try and verify information about the adapter; in future versions of StreamBase it may also be used to try and pre-populate the settings list when a data transformer is selected.

    Return Value: Map<String, String> The default settings of the adapter.

  • public Set<String> validateSettings(Map<String, String> settings, Schema schema, boolean isReader);

    Description: This method is called when the adapter needs to validate all settings. This method gives the developer the chance to inform the end user that some information is required or invalid.

    Variables

    • Map<String, String> settings — The settings you supply in the Data Transformer tab for the adapter.

    • Schema schema — The schema used to create tuples for outgoing data.

    • boolean isReader — A flag to indicate whether this validation call applies to the reader or writer adapter.

    Return Value: A set of exception messages to display to the end user, or null or empty set if no errors were detected.

  • public Tuple toTuple(ByteBuffer readBuffer, Schema schema);

    Description: This method is called for each read from the socket. The readBuffer contains the current data read from the socket including any data that was not used from any previous call to this method. It is expected that the position value of the readBuffer is updated after reading data. If the position value is updated and a null values is returned, the position is reset back to what it was before entering this method. This method is called repeatedly until a null value is returned or the buffer has zero bytes remaining.

    Variables

    • ByteBuffer readBuffer — The current data to be used to create a tuple. The size of this buffer is set by the getBufferSize() method.

    • Schema schema — This is the schema to be used to create a tuple and fill its values from the incoming data.

    Return Value: A valid tuple if data is available or null if a tuple could not be created.

  • public boolean fromTuple(Tuple tuple, ByteBuffer writeBuffer);

    Description: This method is called for each tuple that is received by the writer adapter. It is expected that the tuple is transformed to bytes and added to the writeBuffer in any format needed. Please note that you must check the write buffer for available space before trying to write to it. If no space is available and you return false, the tuple is discarded.

    Variables

    • Tuple tuple — The current tuple that needs to be transformed and added to the writeBuffer.

    • ByteBuffer writeBuffer — This is the data that will be directly sent on the socket. The getBufferSize() method sets this buffer size.

    Return Value:

    True if this tuple was successfully added to the write buffer.

    False if a problem occurred or no space is available. If false is returned, the adapter attempts to write data out to the socket and free space and call this method again once more with the same tuple before discarding the tuple.

  • public int getBufferSize();

    Description: This method is called to determine the size of the read and write buffers for each client connection. This method is called once when a connection is made.

    Return Value: An integer value which specifies the size.

  • public String getName();

    Description: This method is called to get the name of the data transformer (this method is not currently used and is for future use).

    Return Value: A string representing the name of the data transformer.

Suspend and Resume Behavior

On suspend, this adapter closes its input socket and, if in multicast mode, leaves the multicast group.

On resumption, it reconnects its socket, rejoins the group (if in multicast mode) and continues reading tuples from it.

This adapter does not leave its socket open during suspend primarily so that the port which it is bound to will be freed for other use while it is suspended. Unlike the CSV Socket Reader, this socket is UDP and thus will simply drop incoming packets if it is connected and suspended; there would be no buffering or memory issues.

Ports

  • Control (input#1): The input schema for this port must have its first field be of type 'string' and have the name 'command'. When a valid command tuple is enqueued on this port, the adapter will run the command directive.

    The valid commands are as follows:

    • disconnect

      This will cause the adapter to leave the multicast group and close its connection to the socket. Any tuples sent to that group while the socket is disconnected will not be received. This socket can be reconnected with the 'connect' command.

    • connect

      This command will reconnect the socket and rejoin the multicast group, when it has been disconnected by the 'disconnect' command. In the event of connection failure and automatic retry failure, this command will not properly reconnect the socket, a restart is required in that instance.

    • status

      This command will cause the adapter to output a status tuple detailing the current connection status.

  • Data (output #1): This is the primary output port, through which data received via the multicast connection will output tuples. The data type is determined by the data type property selected.

    The schema for this port will have three fields:

    • "Payload" , various

      This is the contents of the payload of the packet, the type is determined by the data type in the Data Transformer properties.

    • "SenderIP" , <string>

      The physical IP address of the sender of the packet, in the form "nnn.nnn.nnn.nnn"

    • "SenderPort" , <int>

      The port on which the process which sent this packet is operating

  • Status (output): This is a secondary output port, through which the adapter will output status tuples. Each status tuple will have a TYPE and an ACTION, which will describe the current status. In addition, it will have a message, timestamp and in some cases the tuple that caused the status report.

    The TYPES and their associated ACTIONS follow:

    • Data Status

      This is used for when an error occurs while trying to parse or output data within Streambase. This will always be paired with the 'Error' action.

    • Connection Status

      This is used for any connection related status messages, and can have the actions: 'Connected' or 'Disconnected'

    • Command Status

      This is used to confirm or deny the success of a command, entered through the control port. The associated actions are: Error, Connected , Disconnected.

    • Suspend/Resume

      This status be emitted whenever the adapter has either been suspended or is about to resume operation. It will either have the action Suspended or Resuming.

Multicast Mode causes several changes in behavior from the default, unicast mode.

In Multicast Mode, the paradigm is that this adapter (and perhaps others) will be reporting messages to a group of many receiving adapters.

Multiple Multicast Sockets may be bound to the same port on the same machine, so that multiple UDP Sender adapters can be bound to the same port, if in multicast mode; else they must have the "Port Reuse" option checked.

Note

Many firewalls do not allow multicast communication to prevent overloading the network with traffic. You may need to adjust your firewall settings accordingly. It is a best practice to allow internal multicast communication only, unless otherwise necessary, to prevent receiving unwanted external multicast traffic.

Multicast Mode

Multicast Mode will cause several changes in behavior from the normal, unicast mode.

In Multicast Mode, the paradigm is that this adapter will subscribe to a multicast group, and receive any messages sent to that group's IP and Port.

In either mode, messages can be received from any sender, via UDP protocol, on the specified Port.

Multiple Multicast Sockets may be bound to the same port on the same machine, thus multiple UDP Receiver adapters may be bound to the same Port if in multicast mode. If they are in unicast mode, then the Port Reuse option must be checked to bind multiple Receivers to the same port.

N.B. Many firewalls do not allow multicast communication, to prevent overloading the network with traffic. You will likely have to adjust your firewall settings accordingly; it is recommended that you limit this to allow internal multicast communication only, unless otherwise necessary, to prevent receiving unwanted outside multicast traffic.

Related Topics

Back to Top ^