Kafka Producer Adapter

Introduction

The Spotfire Streaming Output Adapter for Apache Kafka Producer allows StreamBase applications to connect to an Apache Kafka Broker and to send messages to the broker on specific topics.

Messages can be sent in various formats such as tuple, string, blob, or a custom format that you provide. The input tuple determines what the data type is for the message and key. The key for each message can also be specified as any format, just like the data.

You can provide a partition number to indicate into which partition this message should be stored, or you can leave it blank for the Kafka broker to manage partition placement. When a message is delivered to the broker, an output tuple is sent from the producer with the actual partition and offset for that message. This occurs when you enable the Enable Pass Through Fields option. The Kafka broker guarantees the message order.

The Apache Kafka adapter suite is implemented against the version of the Kafka libraries listed on the Supported Configurations page.

Kafka Defaults

The Kafka Producer set the following defaults when connecting to Kafka. Note all these values can be overridden via the advanced properties on the advanced tab.

Property Value
client.id 'SBClient_producer_' plus a UUID
value.serializer org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer org.apache.kafka.common.serialization.ByteArraySerializer

Adapter Properties

This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.

General Tab

Name: Use this required field to specify or change the name of this instance of this component. The name must be unique within the current EventFlow module. The name can contain alphanumeric characters, underscores, and escaped special characters. Special characters can be escaped as described in Identifier Naming Rules. The first character must be alphabetic or an underscore.

Adapter: A read-only field that shows the formal name of the adapter.

Class name: Shows the fully qualified class name that implements the functionality of this adapter. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start options: This field provides a link to the Cluster Aware tab, where you configure the conditions under which this adapter starts.

Enable Error Output Port: Select this checkbox to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally, enter text to briefly describe the purpose and function of the component. In the EventFlow Editor canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Adapter Properties Tab

Property Type Description
Brokers string A comma-separated list of address:port[config=value|config=value] Kafka brokers. Default value is localhost:9092 This value can also be a Azure Event Hubs connection string. The config=value section of the brokers list allows you to specify advance configuration directly in the broker list. For example if you required a security.protocol and security.mechanism you can specify a broker list like: test.com:9093[security.protocol=SASL_SSL,sasl.mechanism=PLAIN],test2.com:9093
Connect On Startup check box When enabled, the adapter attempts to connect to a Kafka broker on startup. If not enabled, you must manually send a connect command; if not sent, any published tuples fail until connected.
Enable Command Port check box When enabled, the command port allows input tuples to control the adapter. For more information see Command Port.
Enable Status Port check box When enabled, the adapter sends informational data on the status port about various adapter states. For more information see Status Port.
Enable Metrics Port check box When enabled, the adapter allows output for the Kafka connection metrics when a metrics command is input. For more information see Metrics Port.
Enable Passthrough Fields check box When enabled, the adapter outputs the data tuple when the message is successfully published to the Kafka broker.
Close Connection On Error check box When enabled, the producer connection closes when any exception occurs.
Synchronous Send check box When enabled, the producer blocks until sending completes.
Log Level INFO Controls the level of verbosity the adapter uses to issue informational traces to the console. This setting is independent of the containing application's overall log level. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.

Advanced Options Tab

Property Type Description
Serializer Type Enum This value specifies how the input message field will be serialized. If Custom is selected and a serializer class is supplied, it is used. If no serializer class is supplied then the input field type determines how the message is serialized (tuple, blob, string). If Avro is selected, the input message field must be a tuple, which is serialized to an Avro record.
Serializer Class string

Specifies the class that implements the org.apache.kafka.common.serialization.Serializer interface for messages sent by this adapter instance. If empty, the adapter determines the serializer type based on the input schema; in this case, the input message type must be one of the following: String, Tuple, or Blob.

There are a few values that are passed to the configure() method's Map of configs. For example:

  • schema—This is the actual schema object that you set. It is only passed if the data type is tuple. You can retrieve this value by using:

    Schema schema = (Schema) configs.get(schema);

  • logger—This is the actual logger object of the associated adapter. You can retrieve this value with:

    Logger logger = (Logger) configs.get(logger);

Serializer Properties string key:value

A set of key value properties that are passed into the custom serializer by the configuration method.

Key Serializer Type Enum This value specifies how the input key field will be serialized. If custom and a user serialize class is supplied, it is used. If no serialize class is supplied then input field type determines how the key is serialized (tuple, blob, string). If Avro is selected the input key field must be a tuple and will serialize to an Avro record.
Key Serializer Class string

Specifies the class that implements the org.apache.kafka.common.serialization.Serializer interface for the key. If empty, the adapter determines the serializer type based on the input schema; in this case, the input message type must be one of the following: String, Tuple, or Blob.

The values passed the configure() method's Map of configs are the same as described in the previous row.

Key Serializer Properties string key:value

A set of key value properties that are passed into the custom key serializer by the configuration method.

Compression Type drop-down list The compression type for all data generated by this producer instance. Compression consists of full batches of data, so the efficacy of batching also affects the compression ratio (more batching means better compression). Available values: None, GZip, Snappy.
Advanced Config String Array This key value set of strings allows you to configure any Kafka Producer Property.

Cluster Aware Tab

Use the settings in this tab to enable this operator or adapter for runtime start and stop conditions in a multi-node cluster. During initial development of the fragment that contains this operator or adapter, and for maximum compatibility with releases before 10.5.0, leave the Cluster start policy control in its default setting, Start with module.

Cluster awareness is an advanced topic that requires an understanding of StreamBase Runtime architecture features, including clusters, quorums, availability zones, and partitions. See Cluster Awareness Tab Settings on the Using Cluster Awareness page for instructions on configuring this tab.

Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Command Input Port

Description

You can optionally enable the command input port for this adapter instance by means of the Enable Command Port property in the Adapter Properties page of the Properties view. Use the command port to send action commands to the adapter.

Command Port Schema

  • command, string, the command to send to the adapter.

    The valid values are: connect, disconnect, metrics, updateBrokers

    • connect—Connect to the specified Kafka broker.

    • disconnect—Disconnect from the broker.

    • metrics—If this command port is enabled, the adapter produces a list of Kafka metrics associated with the current connection. Most values are null until at least one message is sent.

    • updateBrokers—Update the list of brokers to which this adapter will connect. This command updates only the broker list. You must disconnect and reconnect for the new broker list to take effect.

  • brokers (optional), list[tuple[string,integer]]—a list of brokers to use with the updateBrokers command.

Data Input Port

The data port is the default input port for the Kafka Producer adapter, and is always enabled. Use the data port to send message data to the connected Apache Broker.

The default schema for the data input port is:

  • topic, string. The topic for this message.

  • message, string, tuple, blob, or custom. The message content.

  • myKey (optional), string, tuple, blob, or custom. The key for this message.

  • partition (optional), integer. The partition where this message is stored. If not supplied, the broker determines the partition.

Status Output Port

Description

You can optionally enable the status output port for this adapter instance by means of the Enable Status Port property in the Adapter Properties page of the Properties view. Use the status port to retrieve status output messages from the adapter.

Status Port Schema

  • type, string. The type of status information emitted. Status types are:

    • Error—This message relates to an error that occurred.

    • Warn—This message relates to a warning that the user should be aware of.

    • Info—This message relates to extra status information.

  • action, string. Valid values are:

    • Send—This message relates to an attempt to send a message to the broker.

    • Connected—The adapter has successfully connected to the broker.

    • Connecting—The adapter is currently trying to connect to the broker.

    • Disconnecting—The adapter is currently disconnecting from the broker.

    • Update Brokers—The broker list is being updated.

    • Command—This message relates to an input command sent on the command port.

  • object, string. This value may be null. If not null, it contains a value relevant to the status message.

  • message, string. This is a formatted human readable message that explains the status message.

  • time, timestamp. The time the status message occurred.

Metrics Output Port

Description

You can optionally enable the metrics output port for this adapter instance by means of the Enable Metrics Port property in the Adapter Properties page of the Properties view. Use the metrics port to retrieve metrics output messages sent from the adapter in response to a metrics command sent to its command port. A set of perhaps 40 or 50 metrics messages are sent in response to this command. Most metrics values are null until at least one message has been sent through the currently connected broker.

Metrics Port Schema

  • metrics, list(tuple). One list member for each metrics message. The schema of the tuple is:

    • name, string—The name of this metric.

    • description, string—A text description of what this metric measures.

    • value, double—The value of this metric measurement.

Suspend and Resume

Suspend

When this operator is suspended it will close all connections.

Resume

When this operator is resumed it will attempt to reconnect if it was connected when suspended.

Custom Serializer

When you select Custom for either the message or key serialization, you must provide a class that implements the org.apache.kafka.common.serialization.Serializer interface. For example:

package com.streambase.sb.adapter.kafka;

import java.util.EnumSet;
import java.util.Map;

import org.slf4j.Logger;

import com.streambase.sb.Schema;
import com.streambase.sb.Tuple;
import com.streambase.sb.TupleJSONUtil;
import com.streambase.sb.TupleJSONUtil.Options;

public class DemoSerializer implements org.apache.kafka.common
  .serialization.Serializer<Tuple> {

  private static final String CONFIG_SCHEMA = "schema";
  private static final String CONFIG_LOGGER = "logger";
    
  private Schema schema;
  private Logger logger;
    
  @Override
  public void close() {
  }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
      if (configs.containsKey(CONFIG_SCHEMA)) {
          schema = (Schema)configs.get(CONFIG_SCHEMA);
      }
      if (configs.containsKey(CONFIG_LOGGER)) {
          logger = (Logger) configs.get(CONFIG_LOGGER);
      }        
  }

  @Override
  public byte[] serialize(String topic, Tuple tuple) {
      EnumSet<Options> options = EnumSet.of(Options.INCLUDE_NULL_FIELDS, Options
  .INCLUDE_NULL_FIELDS, Options.PREFER_MAP);
      String jsonTuple = TupleJSONUtil.toJSONString(tuple, options, "");
      return jsonTuple.getBytes();
  }
}