Kafka Consumer Adapter

Introduction

The TIBCO StreamBase® Input Adapter for Apache Kafka Consumer allows the system to consume data from an Apache Kafka broker.

Each message from the broker contains the topic that the message was sent to, as well as the message, key, offset, and partition. When subscribing to a topic, you can request to start from a specific offset value, or give a timestamp value which the system determines from the closest offset to the given time and produce messages from that point forward.

When requesting offsets by timestamp, there are two values that have meaning:

  • -1 Means no history, and only start receiving messages from this point forward.

  • -2 Means all history, receive all data from history and then all messages going forward.

The Apache Kafka adapter suite is implemented against the version of the Kafka libraries listed on the Supported Configurations page.

Adapter Properties

This section describes the properties you can set for this adapter, using the various tabs of the Properties view in StreamBase Studio.

General Tab

Name: Use this field to specify or change the component's name, which must be unique in the application. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Adapter: A read-only field that shows the formal name of the adapter.

Class: A field that shows the fully qualified class name that implements the functionality of this adapter. Use this class name when loading the adapter in StreamSQL programs with the APPLY JAVA statement. You can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start with application: If this field is set to Yes or to a module parameter that evaluates to true, an instance of this adapter starts as part of the containing StreamBase Server. If this field is set to No or to a module parameter that evaluates to false, the adapter is loaded with the server, but does not start until you send an sbadmin resume command, or until you start the component with StreamBase Manager. With this option set to No or false, the adapter does not start even if the application as a whole is suspended and later resumed. The recommended setting is selected by default.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports and Error Streams to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Adapter Properties Tab

Property Type Description
Brokers string A comma-separated list of address:port Kafka brokers. Default value is localhost:9092
Enable Command Port check box When enabled, the command port allows input tuples to control the adapter. For more information see Command Port. The default state is cleared.
Enable Status Port check box When enabled, the adapter sends informational data on the status port about various states of the adapter. For more information see Status Port. The default state is cleared.
Enable Passthrough Fields check box When enabled, the adapter outputs the same tuple that started the consumer operation for each message that comes from the broker for the subscribed topic.
Output Latest Offset On Subscribe check box When enabled, the adapter will output a status message with the latest offset for the subscriptions topic and partition in the object field of the status message and the action will be Offset.
Output High Water Mark check box When enabled, the adapter will output the current high water mark value with each data tuple.
Log Level Drop-down list Controls the level of verbosity the adapter uses to issue informational traces to the console. This setting is independent of the containing application's overall log level. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE, and ALL.

Advanced Options Tab

Property Type Description
Deserializer Type drop-down list Specifies the type of deserializer to use when converting message data. Valid values are: String, Tuple, Blob, and Custom. Use Custom to supply your own Java class that implements org.apache.kafka.common.serialization.Deserializer.
Deserializer Class string The Java class that implements org.apache.kafka.common.serialization.Deserializer, used when specifying Custom in the previous row.

There are a few values that are passed to the configure() method's Map of configs, as follows:

  • schema — This is the actual schema object that you set, only passed if the data type is tuple. Retrieve this value using:

    Schema schema = (Schema) configs.get("schema");

  • characterSet — A string value containing the character set that has been set in the associated adapter properties. Retrieve this value using:

    String characterSet = (String) configs.get("characterSet");

  • useDefaultCharacterSet — A boolean value indicating whether the default character set should be used. This value comes from the associated adapter properties. Retrieve this value using:

    Boolean useDefaultCharacterSet = (Boolean) configs.get("useDefaultCharacterSet");

  • logger — This is the actual logger object of the associated adapter. Retrieve this value using:

    Logger logger = (Logger) configs.get("logger");

Deserializer Properties string key:value

A set of key value properties that will be passed into the custom deserializer by the config method.

Key Deserializer Type drop-down list Specified the deserializer type to use when converting key data. Valid values are: String, Tuple, Blob, and Custom. Use Custom to supply your own Java class that implements org.apache.kafka.common.serialization.Deserializer.
Key Deserializer Class string The Java class that implements org.apache.kafka.common.serialization.Deserializer, used when specifying Custom in the previous row.

There are a few values that are passed to the configure() method's Map of configs, as described for the Deserializer Class row above.

Key Deserializer Properties string key:value

A set of key value properties that is passed into the custom key deserializer by the configuration method.

Use Default Character Set check box Select this control to specify the use of the Java platform default character set. You can also leave the control clear and specify the Character Set property in the next row. The default state is selected.
Character Set string When the Use Default Character Set property is clear, this control specifies the character set to use when converting strings. The default value is UTF-8.
Default Buffer Size integer Specifies the default buffer size in bytes to use when fetching data for a topic. This value can be overridden per topic when performing a subscribe on the control port. The default value is 65536.
Default Fetch Size integer Specifies the default fetch size in bytes to use when fetching records from a topic. This value can be overridden per topic when performing a subscribe on the control port. The default value is 100,000.
Default Timeout integer The default timeout in milliseconds to use when fetching records from a topic. This value can be overridden per topic when performing a subscribe on the control port. The default value is 100,000 (100 minutes).
Max Failure Count integer The maximum number of connection failures to report before removing a subscription to a topic. The default value is 5.

Edit Schemas Tab

Property Type Description
Output Schema schema The schema used to convert Kafka messages into tuples.
Key Output Schema schema The schema used to convert Kafka message keys into tuples.

Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Command Input Port

Description

You can optionally enable the command input port for this adapter instance by means of the Enable Command Port property in the Adapter Properties page of the Properties view. Use the command port to send action commands to the adapter.

Command Port Schema

  • command, string. The command to send to the adapter. Valid values are:

    • subscribe — Subscribe to a topic and partition at a specific offset or time given by the input tuple.

    • unsubscribe — Unsubscribe from a topic and partition.

    • pause — Causes the adapter to stop reading data from all currently subscribed topics and partitions.

    • resume — If the adapter is currently paused, resumes reading from the already subscribed-to list of topics and partitions.

    • updateBrokers — Update the broker list to which this adapter connects. This command only updates the broker list. You must unsubscribe and resubscribe to the topics for the new broker list to take effect.

  • topic, string. The topic from which to subscribe or unsubscribe.

  • partition (optional), integer. The partition to subscribe or unsubscribe from, if this value is null the default of 0 is used.

  • time (optional), long. The timestamp long value to start the subscription from. -1 means start from latest, -2 means to start from beginning. If this value is null a value of -1 is assumed. If offset is not null and greater than or equal to zero then this value is ignored. If both time and offset are missing or null then the latest offset is assumed.

  • offset (optional), long. The exact offset to start from when subscribing. This value overrides any time value supplied. If both time and offset are missing or null then the latest offset is assumed.

  • bufferSize (optional), integer. Overrides the default buffer size for the topic addressed in this command.

  • fetchSize (optional), integer. Overrides the default fetch size for the topic addressed in this command.

  • timeout (optional), integer. Overrides the default timeout for the topic addressed in this command.

  • clientId (optional), string. The client ID to send with the subscription.

  • brokers (optional), list[tuple[string,int]]. List of brokers to use with the updateBrokers command.

Data Output Port

The data port is the default output port for the Kafka Consumer adapter, and is always enabled. Use the data port to receive message data from the connected Apache Broker.

The default schema for the data output port is:

  • topic, string. The topic for this message.

  • offest, long. The offset value where this message was stored.

  • message, string, tuple, blob, or custom. The content of this message.

  • partition, integer. The partition where this message was stored.

  • key, string, tuple, blob, or custom. The key for this message.

  • passThrough, tuple. If pass through fields are enabled, this field is the control tuple that was input to start the subscription to this topic.

Status Output Port

Description

You can optionally enable the status output port for this adapter instance by means of the Enable Status Port property in the Adapter Properties page of the Properties view. Use the status port to retrieve status output messages from the adapter.

Status Port Schema

  • type, string. The type of status information emitted. Status types are:

    • Error — This message relates to an error that occurred.

    • Warn — This message relates to a warning that the user should be aware of.

    • Info — This message relates to extra status information.

  • action, string. Valid values are:

    • Subscribe — This message relates to subscribing to a topic and partition, If the level is INFO it will contain a subscription successful message, if it is a WARN message it will contain information as to why the subscription may have not performed successfully.

    • Unsubscribe — This message relates to unsubscribing from a topic and partition, If the level is INFO it will contain a unsubscribe successful message, if it is a WARN message it will contain information as to why the subscription may have not unsubscribed successfully.

    • UpdateBrokers — This message related to updating brokers. If the level is WARN the message will contain information as to why this action may have not been performed correctly.

    • Command — This message relates to an input command sent on the command port.

    • Convert — This message relates to errors that occur converting messages into the StreamBase tuple schema.

    • NewLeader — This message relates to new leadership election for a subscription. If the level is ERROR it will contain information on why the leadership failed. If the level if WARN the message will contain information about leadership discovery. If the level if INFO it will contain an information message that we are trying to find new leadership.

    • FindLeader — This message relates to finding a leader for the subscription. If the level is ERROR it will contain information on why finding leadership failed.

    • Fetch — This message contains information when trying to find a topic and partitions offset

    • Process — This message contains information when trying to process a message for a subscription.

    • Offset — This message contains the current offset of a topic and partition when a subscription happens and the option "Output Latest Offset On Subscribe" is enabled.

    • HighWaterMark — This message contains the current high water mark level in the object field and is sent when the current read offset requested is less than the current high water mark for a topic and partition.

  • object, string. This value may be null. If not null, it contains a value relevant to the status message.

  • message, string. This is a formatted human readable message that explains the status message.

  • time, timestamp. The time the status message occurred.

Custom Deserializer

When a class is supplied for either message or key serialization, you must provide a class that implements the org.apache.kafka.common.serialization.Deserializer interface. The following is an example of such a class:

package com.streambase.sb.adapter.kafka;

import java.io.UnsupportedEncodingException;
import java.util.Map;

import org.slf4j.Logger;

import com.streambase.sb.Schema;
import com.streambase.sb.StreamBaseException;
import com.streambase.sb.Tuple;
import com.streambase.sb.TupleJSONUtil;

public class DemoDeserializer implements org.apache.kafka.common
  .serialization.Deserializer<Tuple> {

  private static final String CONFIG_SCHEMA = "schema";
  private static final String CONFIG_CHARACTER_SET = "characterSet";
  private static final String CONFIG_USE_DEFAULT_CHARACTER_SET = 
    "useDefaultCharacterSet";
  private static final String CONFIG_LOGGER = "logger";
    
  private Schema schema;
  private String characterSet;
  private boolean useDefaultCharacterSet = true;
  private Logger logger;
    
  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    schema = (Schema) configs.get(CONFIG_SCHEMA);
    characterSet = (String) configs.get(CONFIG_CHARACTER_SET);
    useDefaultCharacterSet = (Boolean) 
      configs.get(CONFIG_USE_DEFAULT_CHARACTER_SET);
    logger = (Logger) configs.get(CONFIG_LOGGER);
  }

  @Override
  public Tuple deserialize(String topic, byte[] data) {
    Tuple tuple = schema.createTuple();
    try {
        String tupleJSON = useDefaultCharacterSet ? new String(data) : 
          new String(data, characterSet);
        logger.info("Deserializing tuple for topic '" + 
          topic + "' from string: " + tupleJSON);
        TupleJSONUtil.setTupleFromJSONLoose(tuple, tupleJSON, "");            
      } catch (UnsupportedEncodingException | StreamBaseException e) {
          logger.error("Error deserializing topic '" + topic + 
            "': " + e.getMessage(), e);
      }
      return tuple;
  }

  @Override
  public void close() {
  }

}