The Spotfire Streaming Bidirectional Socket Reader Adapter was created to allow for two-way socket communication using a pair of adapters, one for reading, and one for writing. The pair of adapters work together by sharing the connections that are made. As the name implies, the reader allows for data to be read in from one or more sockets and then converted and sent downstream as a tuple.
This adapter allows you to create your own data transformations, or use pre-built ones to transform the data on the wire into a tuple. There are currently four pre-built data transformations available: CSV, JSON, BLOB, and serialized tuple. The data transformers convert bytes read from the socket into a tuple of the provided schema.
This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.
Name: Use this required field to specify or change the name of this instance of this component. The name must be unique within the current EventFlow module. The name can contain alphanumeric characters, underscores, and escaped special characters. Special characters can be escaped as described in Identifier Naming Rules. The first character must be alphabetic or an underscore.
Adapter: A read-only field that shows the formal name of the adapter.
Class name: Shows the fully qualified class name that implements the functionality of this adapter. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.
Start options: This field provides a link to the Cluster Aware tab, where you configure the conditions under which this adapter starts.
Enable Error Output Port: Select this checkbox to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.
Description: Optionally, enter text to briefly describe the purpose and function of the component. In the EventFlow Editor canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.
Property | Description |
---|---|
Port (int) | The port to use. When in server mode, this is the port to listen on. When in client mode, this is the default port to use when making new connections. |
Server Mode (check box) | This option determines whether this adapter runs as a client (check box cleared) or as a server (check box selected). |
Enable Control Port (check box) | This option determines whether this control port is available. The control port allows for commands to be sent to the adapter to start or stop in server mode, or to connect or disconnect in client mode. |
Enable Status (check box) | This option enables a status port with status information about the state of the adapter during run time. Some common status tuples include connection and disconnection information. |
Socket Identifier Field Name (string) | This required field determines the name of the field added to all outgoing data tuples. Each tuple emitted on the data output
port will contain this field with a socket identifier for the connection in the format host:port:socketidentifier , or if no socket identifier is specified for the connection, then just host:port .
|
Log Level | Controls the level of verbosity the adapter uses to send notifications to the console. This setting can be higher than the containing application's log level. If set lower, the system log level is used. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE. |
Property | Description |
---|---|
Max Connection Count (int) | The maximum number of incoming connections that this adapter allows at any given time. If a client tries to connect when the maximum number of connections is already reached, the connection is rejected and a status tuple is emitted. |
Connection Backlog (int) | The backlog parameter is the maximum number of pending connections on the socket. Its exact semantics are implementation specific. In particular, an implementation may impose a maximum length or may choose to ignore the parameter altogether. If the backlog parameter has the value 0, or a negative value, then an implementation specific default is used. |
Start Server At Startup (check box) | Determines whether the adapter starts the server when the application starts. If disabled, the end-user must enable the control port and emit a command tuple to start and stop the server. |
Property | Description |
---|---|
Default Host Name (string) | The default host name to connect to. This value is used when the Connect at startup value is selected or when a command tuple is received that has null for the host value. |
Connect at startup (check box) | Determines whether the adapter connects to the default host and port when the adapter starts. |
Reconnect Count (int) | Determines how many times this adapter will retry to connect to a host. |
Timeout Period (int) | The number of seconds to wait to try to connect after failing to connect. |
Property | Description |
---|---|
Data Transformer Class (string) | The fully qualified path to the data transformer class that will be used to convert data from bytes into a tuple and from tuples to bytes. |
Data Transformer Settings (key value pair of strings) | These settings are supplied to the data transformer. The settings are a key value pair of strings. |
This schema is used to define how the data is translated from byte information into a tuple.
-
Use the dropdown list to select the name of a named schema previously defined in or imported into this module. The drop-down list is empty unless you have defined or imported at least one named schema for the current module.
When you select a named schema, its fields are loaded into the schema grid, overriding any schema fields already present. Once you import a named schema, the schema grid is dimmed and can no longer be edited. To restore the ability to edit the schema grid, re-select
Private Schema
from the dropdown list.
Use the settings in this tab to enable this operator or adapter for runtime start and stop conditions in a multi-node cluster. During initial development of the fragment that contains this operator or adapter, and for maximum compatibility with releases before 10.5.0, leave the Cluster start policy control in its default setting, Start with module.
Cluster awareness is an advanced topic that requires an understanding of StreamBase Runtime architecture features, including clusters, quorums, availability zones, and partitions. See Cluster Awareness Tab Settings on the Using Cluster Awareness page for instructions on configuring this tab.
Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.
Caution
Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.
Use the control port to send action commands to the adapter. The adapter can be run either in server mode (where it listens for connections) or client mode (where it makes outbound connections), and each mode has its own set of commands. Tuples enqueued on this port cause the adapter to start or stop a server, or to connect or disconnect clients.
-
Command, string, the command to send to the adapter.
If the adapter is in server mode, the values are:
-
START -- This tells the adapter to start the server and listen on the specified or default host and port. Any new connection established has a
host:port
key value which outputs on each data tuple. -
STOP -- This tells the adapter to stop the server and close all client connections.
-
DISCONNECT - This tells the adapter to disconnect a client from the server. The
socketidentifer
must be included and match a current connection.
If the adapter is in client mode, the values are:
-
CONNECT -- This tells the adapter to make a new connection to the specified or default host and port. The associated values
host:port:socketidentifer
are used to identify this connection for further actions; those values are output as the socket identifier on all emitted data tuples. -
DISCONNECT - This tells the adapter to disconnect from the specified or default host and port. The
host:port:socketidentifer
combination must match the values used to initially make the connection.
-
-
Host, string. The host to connect to. This value is only valid when sending commands to adapters in client mode. If the value is null, the default value of the adapter is used.
-
Port, int. The port to use when making connections. If the value is null the default value of the adapter is used. If the adapter is in server mode, the port value determines which port to listen on. If the adapter is in client mode, the port is used for outgoing connections.
-
SocketIdentifier (Optional), string. Only used when the adapter is in client mode. If specified, this value is added to the socket identifier value emitted on the status port as the object field with the format
Host:Port:SocketIdentifier
.
The status port is used to send status information tuples downstream to inform the user of changes.
-
type, string. The type of status information emitted on this port. Status types are:
-
Connection -- Indicates this message is about a connection.
-
Server -- Indicates this message is about the listening server.
-
-
action, string.
-
Connected -- This type of action occurs when a new connection is made either as a client or server. The socket identifier will be outputted as the object of this message. For a server the socket identifier will be host:port and for clients it will be 'host:port:(optional) inputted socket identifier'
-
Disconnected -- This type of action occurs when a connection disconnects either as a client or server.
-
Failed -- A failure occurred and the failure message will be included.
-
Limit -- Used when in server mode, this message will be emitted when the max number of connections to the server has been reached.
-
Stopped -- Used when in server mode, sent when the server has been stopped.
-
Started -- Used when in server mode, sent when the server has been started.
-
-
object, string. This value may be null. If it is not null, it contains a value relevant to the status message. On new connections, this value contains the socket identifier of the new connection.
-
message, string. This is a formatted human readable message that explains the status message.
-
inputTuple. Not used for this adapter.
Data transformers are used to convert data in and out of the system.
com.streambase.sb.adapter.bidirectionalsocket.transform.CSVDataTransformer
The CSV data transformer will convert incoming bytes into tuples of the supplied schema. This transformer has the same properties as the CSVSocketReader adapter, some properties are for the reader and some are for the writer. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
UseDefaultCharset | Reader/Writer | true, false | true | If specified, specifies whether the Java platform default character set is to be used. |
Charset | Reader/Writer | string | System Charset | The name of the character set encoding that the adapter is to use to read input or write output. |
CaptureTransformStrategy | Reader/Writer | Flatten, Nest | Flatten | The strategy to use when transforming capture fields for this operator. |
FieldDelimiter | Reader/Writer | Single character | , | The delimiter used to separate tokens, defaults to a comma. Control characters can be entered as &#ddd; where ddd is the character's ASCII value.
|
NullValueRepresentation | Writer | string | null | String to write when a field is null. |
StringQuote | Writer | QuoteIfNecessary, AlwaysQuote, NeverQuote | QuoteIfNecessary | Determines when string fields are quoted in the CSV file: Quote if necessary, Always quote, or Never quote. |
StringQuoteCharacter | Reader/Writer | Single character | " | Matching pairs of the quote character used to delimit string constants. |
TimestampFormat | Reader | String | yyyy-MM-dd hh:mm:ss.SSSZ | The format used to parse timestamp fields extracted from the input tuples. This should be in the form expected by java.text.SimpleDateFormat class described in Java Platform SE reference documentation.
If a timestamp value is read that does not match the specified format string, the entire record is discarded and a WARN message
appears on the console that includes the text |
LenientTimestampParsing | Reader | true, false | true | Set this to true if you would like to parse timestamp values that do not conform to the specified format using default formats. |
IncompleteRecords | Reader | Discard, PopulateWithNulls | Discard |
Specifies what should be done when the adapter receives a record with less than the required number of fields.
|
DiscardEmptyRecords | Reader | true, false | true |
This is a special case to handle empty lines. If rows with some fields must send output, but not empty lines, leave this true. Set to false to send empty tuples for empty lines. |
LogWarning | Reader | true, false | false |
Set this to true if warning messages should be logged when incomplete records are encountered. If false, no warning messages will be logged for records with less than required number of fields. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
com.streambase.sb.adapter.bidirectionalsocket.transform.JSONDataTransformer
The JSON data transformer will convert incoming bytes into tuples of the supplied schema. Outgoing tuples are converted into escaped JSON strings with a separator character to delimit tuples. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
IncludeNullValues | Writer | true, false | true | Include fields that contain null values. |
EncodeSubType | Writer | list, map | list | The type of transformation that should be used when there are sub tuples to process. |
Separator | Reader/Writer | Single character | \n | The separator character used to determine the end of a JSON value. Hex values starting with 0x are allowed such as 0x03. |
ByteOrder | Reader/Writer | big-endian, little-endian | native order | The byte order to use when converting data. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
com.streambase.sb.adapter.bidirectionalsocket.transform.BLOBDataTransformer
The BLOB data transformer is one of the most basic data transformers. For incoming data it will take each packet of byte information received from a socket and output a tuple with a blob field containing the data. For outgoing data a specified field of type blob is read and the data from that field is sent directly out on the socket. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
BlobFieldName | Reader/Writer | string | BlobField | For the reader this value is the name of the field that will be filled for each outgoing tuple with the byte data. For the writer this is the field that transformer will read and send the contains of the blob to the socket. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
com.streambase.sb.adapter.bidirectionalsocket.transform.SerializedTupleDataTransformer
The serialized tuple data transformer transforms tuple going into the writer adapter into a byte array of data with a single int value as the header to determine size. On the reader side the adapter will read in a single integer and then read that many bytes of data to create a new tuple based on the supplied schema. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
ByteOrder | Reader/Writer | big-endian, little-endian | native order | The byte order to use when converting data. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
com.streambase.sb.adapter.bidirectionalsocket.transform.LengthBasedDataTransformer
The length based data transformer will convert incoming bytes into a string. Outgoing tuples are converted into a integer value representing the size of the string followed by the string. If a setting is not present the default is used.
Setting | Applies to Reader/Writer | Allowed Values | Default | Description |
---|---|---|---|---|
DataFieldName | Reader/Writer | any | Data | The name of the String or Blob field in the input or output schema which will contain the data. |
ReadLengthFieldName | Reader | any | ReadLength | The name of the field in the output schema which will be used to store the number of bytes read. |
FixedLength | Reader/Writer | Integer | 0 | For a reader this is the integer fixed length number of bytes to read. If this value is zero or negative it will be ignored. If this value is set all other read length parameters are ignored. For a writer this is the integer fixed length number of bytes to write. If this value is 0 or negative it is ignored and the length of the actual data is sent at the front of the packet. If this input data is less than this value the pad value will be used to pad to the length, if the length if greater than only the bytes up to the fixed length will be sent. |
FixedLengthPadByte | Writer | byte | 0 | The byte value to use when the input data value is less than the fixed length. |
ReadMaxLength | Reader | Integer | Max Integer | The maximum number of bytes to read. If this value is greater than zero then if data is encountered that is large a null tuple is produced and all data is discarded. |
ReadLengthFieldOffset | Reader | Integer | 0 | The offset of the length field. |
LengthFieldLength | Reader/Writer | Integer | 4 | The length of the length field. 1, 2, 3, 4, or 8 bytes for the size read or written at the front of the data packet |
ReadLengthAdjustment | Reader | Integer | 0 | The adjustment of the length field when reading data. |
ReadInitialBytesToStrip | Reader | Integer | 4 | The number of bytes to remove from the start of the byte array. |
WriteLengthIncludesLengthFieldLength | Writer | true/false | false | If enabled the size value sent at the start of the packet will include the length field length. |
ByteOrder | Reader/Writer | big-endian, little-endian | native order | The byte order to use when converting data. |
BufferSize | Reader/Writer | int | 10240 | The size of the read or write buffer used to hold data in transition from the socket to the data transformer. If this value is too small the application will emit error messages because of a full buffer. |
Custom data transformers allow you to build Java code to handle the input and output of the Bi-Directional Adapters. This section covers how you can build your own data transformers. There is a single interface that must be implemented for a class to work with the adapters — ISocketDataTransformer — which is covered in the next section. Each socket connection the adapter establishes creates a new copy of the given data transformer. This allows for data storage between socket reads and writes for an individual socket connection.
For more information about the Bi-Directional socket adapters, see their sample which demonstrates how you can build your own data transformations and use them in an application.
The com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer interface becomes available in your project when you add a Bi-Directional Socket adapter to the Studio canvas. This adds
the necessary adapter dependency in your project's pom.xml
.
The interface makes use of java.nio.ByteBuffer and assumes you have some familiarity with this class.
The adapter handles most of the complexities of java.nio.ByteBuffer. Regardless, you should have some familiarity in dealing with the buffers position, limit, and capacity methods. Calls to toTuple and fromTuple assume that the position of the buffer will be updated to reflect the amount of data that was read from or written to a buffer.
Example:
Five characters are read into a string during toTuple:
String testData = new String(readBuffer.array(), readBuffer.position(), 5);
then the buffers position should be updates to reflect the new position:
buffer.position(buffer.position() + 5);
Note
Some ByteBuffer methods update the position for you, such as buffer.getInt(); Please see the Oracle documentation for java.nio.ByteBuffer for details.
The com.streambase.sb.adapter.bidirectionalsocket.transform.ISocketDataTransformer interface must be implemented for your data transformer to work with the adapter. Complete the following methods for the interface:
-
public void init(Map<String, String> settings, Logger logger, String host, int port, String socketIdentifier, boolean isReader);
Description: This method is called when a socket connection is made and the data transformer needs to be initialized.
Variables
-
Map<String, String> settings — The settings you supplied in the adapter Data Transformer tab.
-
Logger logger — The adapter's logger to allow your transformer to log out information.
-
String host — The host of the socket connection.
-
int port — The port of the socket connection.
-
String socketIdentifier — The optional identifier that the client may supply.
-
boolean isReader — A flag to indicate whether this init call applies to the reader or writer adapter.
Return Value: void
-
-
public Map<String, String> getDefaultSettings();
Description: This method is called by the adapter to try and verify information about the adapter; in future versions of StreamBase it may also be used to try and pre-populate the settings list when a data transformer is selected.
Return Value: Map<String, String> The default settings of the adapter.
-
public Set<String> validateSettings(Map<String, String> settings, Schema schema, boolean isReader);
Description: This method is called when the adapter needs to validate all settings. This method gives the developer the chance to inform the end user that some information is required or invalid.
Variables
-
Map<String, String> settings — The settings you supply in the Data Transformer tab for the adapter.
-
Schema schema — The schema used to create tuples for outgoing data.
-
boolean isReader — A flag to indicate whether this validation call applies to the reader or writer adapter.
Return Value: A set of exception messages to display to the end user, or null or empty set if no errors were detected.
-
-
public Tuple toTuple(ByteBuffer readBuffer, Schema schema);
Description: This method is called for each read from the socket. The readBuffer contains the current data read from the socket including any data that was not used from any previous call to this method. It is expected that the position value of the readBuffer is updated after reading data. If the position value is updated and a null values is returned, the position is reset back to what it was before entering this method. This method is called repeatedly until a null value is returned or the buffer has zero bytes remaining.
Variables
-
ByteBuffer readBuffer — The current data to be used to create a tuple. The size of this buffer is set by the getBufferSize() method.
-
Schema schema — This is the schema to be used to create a tuple and fill its values from the incoming data.
Return Value: A valid tuple if data is available or null if a tuple could not be created.
-
-
public boolean fromTuple(Tuple tuple, ByteBuffer writeBuffer);
Description: This method is called for each tuple that is received by the writer adapter. It is expected that the tuple is transformed to bytes and added to the writeBuffer in any format needed. Please note that you must check the write buffer for available space before trying to write to it. If no space is available and you return false, the tuple is discarded.
Variables
-
Tuple tuple — The current tuple that needs to be transformed and added to the writeBuffer.
-
ByteBuffer writeBuffer — This is the data that will be directly sent on the socket. The getBufferSize() method sets this buffer size.
Return Value:
True if this tuple was successfully added to the write buffer.
False if a problem occurred or no space is available. If false is returned, the adapter attempts to write data out to the socket and free space and call this method again once more with the same tuple before discarding the tuple.
-
-
public int getBufferSize();
Description: This method is called to determine the size of the read and write buffers for each client connection. This method is called once when a connection is made.
Return Value: An integer value which specifies the size.
-
public String getName();
Description: This method is called to get the name of the data transformer (this method is not currently used and is for future use).
Return Value: A string representing the name of the data transformer.