Kafka Consumer Adapter

Introduction

The TIBCO StreamBase® Input Adapter for Apache Kafka Consumer allows the system to consume data from an Apache Kafka broker.

Each message from the broker contains the topic that the message was sent to, as well as the message, key, offset, and partition. When subscribing to a topic, you can request to start from a specific offset value, or give a timestamp value which the system determines from the closest offset to the given time and produce messages from that point forward.

When requesting offsets by timestamp, there are two values that have meaning:

  • -1 Means no history, and only start receiving messages from this point forward.

  • -2 Means all history, receive all data from history and then all messages going forward.

The Apache Kafka adapter suite is implemented against the version of the Kafka libraries listed on the Supported Configurations page.

Adapter Properties

This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.

General Tab

Name: Use this required field to specify or change the name of this instance of this component, which must be unique in the current EventFlow module. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Adapter: A read-only field that shows the formal name of the adapter.

Start with application: If this field is set to Yes (default) or to a module parameter that evaluates to true, this instance of this adapter starts as part of the JVM engine that runs this EventFlow module. If this field is set to No or to a module parameter that evaluates to false, the adapter instance is loaded with the engine, but does not start until you send an sbadmin resume command, or until you start the component with StreamBase Manager.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Adapter Properties Tab

Property Type Description
Brokers string A comma-separated list of address:port[config=value|config=value] Kafka brokers. Default value is localhost:9092 This value can also be a Azure event hubs connection string. The config=value section of the brokers list allows you to specify advance configuration directly in the broker list. For example if you required a security.protocol and security.mechanism you can specify a broker list like: test.com:9093[security.protocol=SASL_SSL,sasl.mechanism=PLAIN],test2.com:9093
Enable Command Port check box When enabled, the command port allows input tuples to control the adapter. For more information see Command Port. The default state is cleared.
Enable Status Port check box When enabled, the adapter sends informational data on the status port about various states of the adapter. For more information see Status Port. The default state is cleared.
Enable Passthrough Fields check box When enabled, the adapter outputs the same tuple that started the consumer operation for each message that comes from the broker for the subscribed topic.
Output Latest Offset On Subscribe check box When enabled, the adapter will output a status message with the latest offset for the subscriptions topic and partition in the object field of the status message and the action will be Offset. If subscribing without specifying a partition you may received multiple last offset messages, one for each partition that is assigned.
Enable Metrics Port check box When enabled, the adapter allows output for the Kafka connection metrics when a metrics command is input. For more information see Metrics Port.
Output High Water Mark check box When enabled, the adapter will output the current high water mark value with each data tuple.
Log Level INFO Controls the level of verbosity the adapter uses to issue informational traces to the console. This setting is independent of the containing application's overall log level. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.

Advanced Options Tab

Property Type Description
Perform Commit check box If enabled the adapter for perform a commit call for each batch of messages received.
Poll Wait MS int The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. This value must not be negative.
Deserializer Type drop-down list Specifies the type of Deserializer to use when converting message data. Valid values are: Avro, Blob, String, Tuple, and Custom. Use Custom to supply your own Java class that implements org.apache.kafka.common.serialization.Deserializer.
Deserializer Class string The Java class that implements org.apache.kafka.common.serialization.Deserializer, used when specifying Custom in the previous row.

There are a few values that are passed to the configure() method's Map of configs, as follows:

  • schema — This is the actual schema object that you set, only passed if the data type is tuple. Retrieve this value using:

    Schema schema = (Schema) configs.get("schema");

  • characterSet — A string value containing the character set that has been set in the associated adapter properties. Retrieve this value using:

    String characterSet = (String) configs.get("characterSet");

  • useDefaultCharacterSet — A boolean value indicating whether the default character set should be used. This value comes from the associated adapter properties. Retrieve this value using:

    Boolean useDefaultCharacterSet = (Boolean) configs.get("useDefaultCharacterSet");

  • logger — This is the actual logger object of the associated adapter. Retrieve this value using:

    Logger logger = (Logger) configs.get("logger");

Deserializer Properties string key:value

A set of key value properties that will be passed into the custom Deserializer by the configuration method.

Key Deserializer Type drop-down list Specified the Deserializer type to use when converting key data. Valid values are: Avro, Blob, String, Tuple, and Custom. Use Custom to supply your own Java class that implements org.apache.kafka.common.serialization.Deserializer.
Key Deserializer Class string The Java class that implements org.apache.kafka.common.serialization.Deserializer, used when specifying Custom in the previous row.

There are a few values that are passed to the configure() method's Map of configs, as described for the Deserializer Class row above.

Key Deserializer Properties string key:value

A set of key value properties that is passed into the custom key Deserializer by the configuration method.

Use Default Character Set check box Select this control to specify the use of the Java platform default character set. You can also leave the control clear and specify the Character Set property in the next row. The default state is selected.
Character Set string When the Use Default Character Set property is clear, this control specifies the character set to use when converting strings. The default value is UTF-8.
Max Failure Count integer The maximum number of connection failures to report before removing a subscription to a topic. The default value is 5.
Advanced Config String Array This key value set of strings allows you to configure any Kafka Consumer Property. NOTE: ClientId of the input tuple if not null will override any client Id entry supplied.

Edit Schemas Tab

Property Type Description
Output Schema schema The schema used to convert Kafka messages into tuples.
Key Output Schema schema The schema used to convert Kafka message keys into tuples.

Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Command Input Port

Description

You can optionally enable the command input port for this adapter instance by means of the Enable Command Port property in the Adapter Properties page of the Properties view. Use the command port to send action commands to the adapter.

Command Port Schema

  • command, string. The command to send to the adapter. Valid values are:

    • subscribe — Subscribe to a topic and partition at a specific offset or time given by the input tuple.

    • unsubscribe — Unsubscribe from a topic and partition.

    • pause — Suspend fetching from the requested partitions. If the input tuple topic and partition are null then all current subscriptions are suspended. If not null, then only the supplied topic and partition are suspended.

    • resume — Resume requested partitions which have been paused with pause. If the input tuple topic and partition are null then all current subscriptions are resumed, if not null then only the supplied topic and partition are resumed.

    • updateBrokers — Update the broker list to which this adapter connects. This command only updates the broker list. You must resubscribe to the topics for the new broker list to take effect.

    • metrics — If this command port is enabled, the adapter produces a list of Kafka metrics associated with the current consumer.

  • topic, string. The topic from which to subscribe or unsubscribe. This value is also used optionally for pause and resume.

  • pattern, string. The pattern can be used instead of topic to subscribe to topics based on a regular expression. The topic and partition fields are ignored if this field is not empty.

  • partition (optional), integer. The partition to subscribe to or unsubscribe from. If this value is null, a consumer group is created and this subscribed is assigned one or more partitions by the server. This value is also used optionally for pause and resume.

  • time (optional), long. The timestamp long value to start the subscription from. -1 means start from latest, -2 means to start from beginning. If this value is null a value of -1 is assumed. If offset is not null and greater than or equal to zero then this value is ignored. If both time and offset are missing or null then the latest offset is assumed.

  • offset (optional), long. The exact offset to start from when subscribing. This value overrides any time value supplied. If both time and offset are missing or null then the latest offset is assumed.

  • clientId (optional), string. The client ID to send with the subscription.

  • brokers (optional), list[tuple[string host,int port]]. List of brokers to use with the updateBrokers command.

  • advancedConfig (optional), list[tuple[string key,string value]]. A list of consumer configuration properties to set per subscription. The order of configuration settings are first the default, second the advance configuration from the properties overwrite any defaults, and third the values from the input tuple overwrite any existing values.

Data Output Port

The data port is the default output port for the Kafka Consumer adapter, and is always enabled. Use the data port to receive message data from the connected Apache Broker.

The default schema for the data output port is:

  • topic, string. The topic for this message.

  • offset, long. The offset value where this message was stored.

  • message, string, tuple, blob, or custom. The content of this message.

  • partition, integer. The partition where this message was stored.

  • consumerName, string. The fully qualified name of the consumer that consumed this message. This name can be used to optional pass into the Consumer Commit adapter to commit specific messages.

  • key, string, tuple, blob, or custom. The key for this message.

  • passThrough, tuple. If pass through fields are enabled, this field is the control tuple that was input to start the subscription to this topic.

Status Output Port

Description

You can optionally enable the status output port for this adapter instance by means of the Enable Status Port property in the Adapter Properties page of the Properties view. Use the status port to retrieve status output messages from the adapter.

Status Port Schema

  • type, string. The type of status information emitted. Status types are:

    • Error — This message relates to an error that occurred.

    • Warn — This message relates to a warning that the user should be aware of.

    • Info — This message relates to extra status information.

  • action, string. Valid values are:

    • Subscribe — This message relates to subscribing to a topic and partition. If the level is INFO, it will contain a subscription-successful message. If it is a WARN message, it will contain information as to why the subscription may have not performed successfully. If no partition is given during the subscribe command, this status will be given for the overall topic as well as for the initial assignment of topics given by the server.

    • Unsubscribe — This message relates to unsubscribing from a topic and partition. If the level is INFO, it will contain a unsubscribe successful message. If it is a WARN message, it will contain information as to why unsubscribing was not successful.

    • Rebalance Assinged — This message relates to partition assignments given by the server. After the initial assignment any change in partitions will produce assignment status messages and will inform of an assigned partition.

    • Rebalance Revoked — This message relates to partition assignments given by the server. After the initial assignment any change in partitions will produce assignment status messages and will inform of a revoked partition.

    • UpdateBrokers — This message related to updating brokers. If the level is WARN the message will contain information as to why this action may have not been performed correctly.

    • Command — This message relates to an input command sent on the command port.

    • Convert — This message relates to errors that occur converting messages into the StreamBase tuple schema.

    • Fetch — This message contains information when trying to find a topic and partitions offset

    • Process — This message contains information when trying to process a message for a subscription.

    • Pause — This message contains information when pausing a subscription.

    • Resume — This message contains information when resuming a subscription.

    • Offset — This message contains the current offset of a topic and partition when a subscription happens and the option Output Latest Offset On Subscribe is enabled.

    • HighWaterMark — This message contains the current high water mark level in the object field and is sent when the current read offset requested is less than the current high water mark for a topic and partition.

  • object, string. This value may be null. If not null, it contains a value relevant to the status message.

  • message, string. This is a formatted human readable message that explains the status message.

  • time, timestamp. The time the status message occurred.

Metrics Output Port

Description

You can optionally enable the metrics output port for this adapter instance by means of the Enable Metrics Port property in the Adapter Properties page of the Properties view. Use the metrics port to retrieve metrics output messages sent from the adapter in response to a metrics command sent to its command port. A set of perhaps 40 or 50 metrics messages are sent in response to this command.

Metrics Port Schema

  • metrics, list(tuple). One list member for each metrics message. The schema of the tuple is:

    • name, string — The name of this metric.

    • description, string — A text description of what this metric measures.

    • value, double — The value of this metric measurement.

Suspend and Resume

Suspend

When this operator is suspended it will close all connections and record the current subscriptions it contains.

Resume

When this operator is resumed it will attempt to recreate the subscriptions that had been active before being suspended.

Custom Deserializer

When a class is supplied for either message or key serialization, you must provide a class that implements the org.apache.kafka.common.serialization.Deserializer interface. The following is an example of such a class:

package com.streambase.sb.adapter.kafka;

import java.io.UnsupportedEncodingException;
import java.util.Map;

import org.slf4j.Logger;

import com.streambase.sb.Schema;
import com.streambase.sb.StreamBaseException;
import com.streambase.sb.Tuple;
import com.streambase.sb.TupleJSONUtil;

public class DemoDeserializer implements org.apache.kafka.common
  .serialization.Deserializer<Tuple> {

  private static final String CONFIG_SCHEMA = "schema";
  private static final String CONFIG_CHARACTER_SET = "characterSet";
  private static final String CONFIG_USE_DEFAULT_CHARACTER_SET = 
    "useDefaultCharacterSet";
  private static final String CONFIG_LOGGER = "logger";
    
  private Schema schema;
  private String characterSet;
  private boolean useDefaultCharacterSet = true;
  private Logger logger;
    
  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    schema = (Schema) configs.get(CONFIG_SCHEMA);
    characterSet = (String) configs.get(CONFIG_CHARACTER_SET);
    useDefaultCharacterSet = (Boolean) 
      configs.get(CONFIG_USE_DEFAULT_CHARACTER_SET);
    logger = (Logger) configs.get(CONFIG_LOGGER);
  }

  @Override
  public Tuple deserialize(String topic, byte[] data) {
    Tuple tuple = schema.createTuple();
    try {
        String tupleJSON = useDefaultCharacterSet ? new String(data) : 
          new String(data, characterSet);
        logger.info("Deserializing tuple for topic '" + 
          topic + "' from string: " + tupleJSON);
        TupleJSONUtil.setTupleFromJSONLoose(tuple, tupleJSON, "");            
      } catch (UnsupportedEncodingException | StreamBaseException e) {
          logger.error("Error deserializing topic '" + topic + 
            "': " + e.getMessage(), e);
      }
      return tuple;
  }

  @Override
  public void close() {
  }

}