Using the HBase Put Operator

Introduction

The TIBCO StreamBase® Adapter for Apache HBase is implemented as a suite of five global Java operators, including the HBase Admin, Delete, Get, Put, and Scan operators.

This page describes the HBase Put operator, which allows a StreamBase application to insert data into a connected HBase database table. The operator uses property values in the project's server configuration file to set up the connection to the HBase database, as described in Configuration File Settings. Multiple HBase operators can share a single instance of an HBase connection by selecting the same HBase configuration setting ID.

Configuration File Settings

This section describes the configuration for an HBase database connection instance that you specify in your project's sbconf file. This configuration is the same for all HBase operator types.

HBase Operator Configuration

The <adapter-configurations> element of a project's sbconf file, despite its name, is used to specify configuration value groups for either operators or adapters.

The HBase configuration section of the sbconf file starts with an <adapter-configurations> element that contains one <adapter-configuration name="hbase"> element. This element, in turn, contains one or more <setting> elements.

Each <section name="hbase"> element must contain one element in the form <setting name="id" val="HBaseConfigName"/>, where HBaseConfigName is the name you assign to a group of settings that uniquely define an individual HBase database connection. All other <setting> elements are optional.

The example configuration below shows a basic configuration to connect to an HBase server. You can have as many configurations as your application requires, but each configuration must have a unique id.

Example 1. Example <adapter-configuration> Section for HBase

  <adapter-configurations>
    <adapter-configuration name="hbase">
      <section name="hbase">
        <setting name="id" val="HBase Sample"/>
        <setting name="connectAtStartup" val="true" />

        <!-- All values below are passed directly to the 
             HBaseConfiguration class unmodified --/>
        <setting name="hbase.master" val="127.0.0.1:60000" />
        <setting name="hbase.zookeeper.quorum" val="127.0.0.1" />
        <setting name="hbase.zookeeper.property.clientPort" val="2181" />
        <setting name="hbase.client.retries.number" val="5" />
        <setting name="zookeeper.session.timeout" val="5000" />
        <setting name="zookeeper.recovery.retry" val="5" />
      </section>
    </adapter-configuration>
  </adapter-configurations>


Configuration Settings

Setting Type Description
id string The value of the id setting displays in the drop-down list in the adapter's Properties view, and is used to uniquely identify this section of the configuration file.
connectAtStartup true or false If true, this operator instance connects to HBase on startup of this operator's containing module.
*** string All other values are directly sent to the HBaseConfiguration class, which is responsible for setting up a connection to the HBase server. See the Apache HBase documentation for the available client configuration options and for further information on setting up a connection to HBase.

Properties View Settings

This section describes the properties you can set for an HBase Put operator, using the various tabs of the Properties view in StreamBase Studio.

General Tab

Name: Use this required field to specify or change the name of this instance of this component, which must be unique in the current EventFlow module. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Operator: A read-only field that shows the formal name of the operator.

Class: Shows the fully qualified class name that implements the functionality of this operator. If you need to reference this class name elsewhere in your application, you can right-click this field and select Copy from the context menu to place the full class name in the system clipboard.

Start with application: If this field is set to Yes (default) or to a module parameter that evaluates to true, this instance of this operator starts as part of the JVM engine that runs this EventFlow module. If this field is set to No or to a module parameter that evaluates to false, the operator instance is loaded with the engine, but does not start until you send an sbadmin resume command, or until you start the component with StreamBase Manager.

Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports to learn about Error Ports.

Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Operator Properties Tab

Property Type Description
HBase Config drop-down list The name of the HBase configuration to use with this operator. The value selected by this drop-down list determines the database connection this operator works against. The values that populate this list are stored in the project's sbconf file, as described in the Configuration File Settings section.
Table Name string The HBase table that this operation is to be performed against.
Serializer Class string The fully qualified name of the class which implements com.streambase.sb.operator.hbase.IHBaseSerializer. If set this class will be called to serialize and deserialize to and from tuples.
Enable Status Port check box If enabled, a status port is made available for this operator instance, which will emit status tuples for various events from this operator.
Log Level INFO Controls the level of verbosity the adapter uses to issue informational traces to the console. This setting is independent of the containing application's overall log level. Available values, in increasing order of verbosity, are: OFF, ERROR, WARN, INFO, DEBUG, TRACE.

Field Map Tab

Property Type Description
Data Field Name string The field in the incoming schema that contains the tuple or list of tuples to use for the Put operation. If this field is blank, the full incoming tuple is used.
Row Id Field Name string The field in the incoming schema that contains the field to be used as the row ID.
Field Map field grid The field map used to determine which fields of the incoming schema map to the family:columns of the HBase table. If this contains no rows, auto-matching is used. Auto-matching requires the input schema have only tuples for the upper level fields that represent the family names. The sub fields of the tuples represent the columns. The input schema may also be a list of tuples containing family, column, and value fields.

Concurrency Tab

Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.

Caution

Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.

Input Schema

This section describes the input schema for the HBase Put operator, which allows for two different kinds of schema inputs. One method is a list of family and column value pairs, while the other is a well-defined set of family tuples with sub-data.

Family and Column Tuple Input Schema

This form of input schema must contain only Tuples on the top level, with the exception of a rowId field. The Tuples provided represent the family names in the table. The sub-fields of these tuples represent the column names and values to be created or updated.

Example Schema

  rowId (anyType),
  Family1 tuple<
    column1 string,
    column2 double,
    column3 int
  >,
  Family2 tuple<
    column1 timestamp,
    column2 long,
    column3 blob
  >

Family and Column Value List Input Schema

This form of input schema allows for a single list containing a predefined tuple schema with family, column, and value fields.

Example Schema

  rowId (anyType)
  familyColumnValues list<
    tuple<
      family string,
      column string,
      value anyType,
    >
  >

Serialize Interface

Interface Description

The serialize interface (com.streambase.sb.operator.hbase.IHBaseSerializer) is used create custom serialization for the tuples. This class will allow you to store and receive the tuples in any format you create to and from the hbase server.

When you have compiled a Java class to serve as a serializer, provide the fully qualified name of your class in the Serializer Class field in the Operator Properties tab in your HBase Scan, Get, or Put operator's Properties view.

The next section provides an example class file that takes the tuples implements the methods required to perform a serialize call to and from the custom data format.

Serializer Example

package com.streambase.sb.hbase.serialize;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;

import org.apache.hadoop.hbase.util.Bytes;

import com.streambase.sb.ByteArrayView;
import com.streambase.sb.CompleteDataType;
import com.streambase.sb.Schema;
import com.streambase.sb.Schema.Field;
import com.streambase.sb.StreamBaseException;
import com.streambase.sb.Timestamp;
import com.streambase.sb.Tuple;
import com.streambase.sb.TupleException;
import com.streambase.sb.operator.TypecheckException;
import com.streambase.sb.operator.hbase.IHBaseSerializer;

public class DemoSerializer implements IHBaseSerializer {

    private static final String FIELD_COLUMNS = "columns";
    private static final String FIELD_COLUMN = "column";
    private static final String FIELD_FAMILY = "family";
    private static final String FIELD_VALUE = "value";

    private static final String FAMILY1 = "Family1";    
    private static final String COLUMN1 = "Column1";
    private static final String COLUMN2 = "Column2";
    private static final String COLUMN3 = "Column3";
    private static final String COLUMN4 = "Column4";
    private static final String COLUMN5 = "Column5";
    private static final String COLUMN6 = "Column6";
    private static final String COLUMN7 = "Column7";
            
    Map<String, Map<String, CompleteDataType>> families;
    
    public DemoSerializer() {
        families = new HashMap<String, Map<String, CompleteDataType>>();
        Map<String, CompleteDataType> family1Columns = new HashMap<String, CompleteDataType>();
        family1Columns.put(COLUMN1, CompleteDataType.forInt());
        family1Columns.put(COLUMN2, CompleteDataType.forLong());
        family1Columns.put(COLUMN3, CompleteDataType.forDouble());
        family1Columns.put(COLUMN4, CompleteDataType.forString());
        family1Columns.put(COLUMN5, CompleteDataType.forBlob());
        family1Columns.put(COLUMN6, CompleteDataType.forTimestamp());
        family1Columns.put(COLUMN7, CompleteDataType.forBoolean());
        families.put(FAMILY1, family1Columns);        
    }
    
    /***
     * This method is used to determine if the class should serialize and deserialize this family column
     * @param family - Will this class handle this family column 
     * @return true if the class can handle this family otherwise false
     */
    @Override
    public boolean canSerializeFamily(String family) { 
        return families.containsKey(family);
    }
    
    /***
     * This method is used to get the schemas that should be set for the families during deserialize.  This method is called at typecheck time as well as runtime.
     * @param maxVersions - The maximum number of versions that the end user is going to request from the server.
     * @return A map of schemas for the families that this class supports
     */
    @Override
    public Map<String, Schema> getSchemas(int maxVersions) {
        Map<String, Schema> schemas = new HashMap<String, Schema>();
        for (Entry<String, Map<String, CompleteDataType>> family : families.entrySet()) {
            List<Field> fields = new ArrayList<Field>(); 
            for (Entry<String, CompleteDataType> columns : family.getValue().entrySet()) {
                fields.add(new Schema.Field(columns.getKey(), maxVersions <= 1 ? columns.getValue() : CompleteDataType.forList(columns.getValue())));
            }
            Schema schema = new Schema("", fields);
            schemas.put(family.getKey(), schema);
        }
        return schemas;
    }
    
    /***
     * This method is called to deserialize a family
     * @param schema - The schema of the family
     * @param family - The family
     * @param columnBytes - The family column bytes for each column under the family.  The value NavigableMap<byte[], NavigableMap<Long, byte[]>>. the byte[] is the column name (convert to String via Bytes.toString({value})), the Long is the version timestamp which can be converted to a Streambase timestamp via Timestamp.msecs(Timestamp.TIMESTAMP, {Long time value})
     * @param maxVersions - The maximum number of versions that the end user has requested from the server.
     * @return A tuple fill with the family column data
     */
    @Override
    public Tuple deserialize(Schema schema, String family, NavigableMap<byte[], NavigableMap<Long, byte[]>> columnBytes, int maxVersions) throws StreamBaseException {
        if (family.equals(FAMILY1)) {
            Tuple tuple = schema.createTuple();
            for (Entry<byte[], NavigableMap<Long, byte[]>> entry : columnBytes.entrySet()) {
                String column = Bytes.toString(entry.getKey());
                List<Object> data = new ArrayList<Object>();
                for (Entry<Long, byte[]> dataEntry : entry.getValue().entrySet()) {
                    if (dataEntry.getValue() == null || dataEntry.getValue().length <= 0) {
                        break;
                    }
                    if (column.equals(COLUMN1)) {
                        data.add(Bytes.toInt(dataEntry.getValue()));
                    } else if (column.equals(COLUMN2)) {
                        data.add(Bytes.toLong(dataEntry.getValue()));
                    } else if (column.equals(COLUMN3)) {
                        data.add(Bytes.toDouble(dataEntry.getValue()));
                    } else if (column.equals(COLUMN4)) {
                        data.add(Bytes.toString(dataEntry.getValue()));
                    } else if (column.equals(COLUMN5)) {
                        data.add(ByteArrayView.makeCopiedView(dataEntry.getValue()));
                    } else if (column.equals(COLUMN6)) {
                        data.add(Timestamp.msecs(Timestamp.TIMESTAMP, Bytes.toLong(dataEntry.getValue())));
                    } else if (column.equals(COLUMN7)) {
                        data.add(Bytes.toBoolean(dataEntry.getValue()));
                    }                        
                }
                if (data != null && data.size() > 0) {
                    tuple.setField(column, maxVersions <= 1 ? data.get(0) : data);
                }
            }            

            return tuple;
        }
        return null;
    }

    /***
     * This method is called only for the PUT operations to determine if the input schema provided by the user is valid for the serialize method
     * @param family - The family, this value may be an empty string if the user input is a generic input schema which consists of tuple(columns list((column string, family string, value string))).
     * @param schema - The schema given by the end user for the family
     * @throws TypecheckException - Throw a typecheck exception if the schema is invalid
     */
    @Override
    public void typecheckSerialize(String family, Schema schema) throws TypecheckException {
        if (family.isEmpty()) { // no family may happen if a generic schema is given tuple(columns list((column string, family string, value string))), you can choose to support this or not
            // we support the generic case only with string as the value type
            try {
                if (!schema.getField(FIELD_COLUMNS).getElementType().getSchema().getField(FIELD_VALUE).getCompleteDataType().equals(CompleteDataType.forString())) {
                    throw new TypecheckException("Generic fields are only supported with a string type");    
                }
            } catch (TupleException e) {
                throw new TypecheckException("Error checking generic fields: " + e.getMessage(), e);
            }
            return;
        }
        Map<String, CompleteDataType> family1Columns = families.get(family);
        if (family1Columns != null) { // check to make sure that the schema field types match
              for (Field field : schema.fields()) {
                  CompleteDataType completeDataType = family1Columns.get(field.getName());
                  if (completeDataType == null) {
                      throw new TypecheckException("Invalid field [" + field.getName() + "] found in family [" + family + "]");
                  }
                  if (!field.getCompleteDataType().equals(completeDataType)) {
                      throw new TypecheckException("Field [" + field.getName() + "] must have a datatype of [" + completeDataType + "]");
                  }
              }
        }
    }

    /***
     * This method is called to serialize a tuple into data for the put operation
     * @param family - The family to serialize
     * @param tuple - The input tuple to convert
     * @return A map of family column names and their associated data 
     */
    @Override
    public Map<String, byte[]> serialize(String family, Tuple tuple) throws StreamBaseException {        
        if (family.equals(FAMILY1) && tuple != null) {
            Map<String, byte[]> familyData = new HashMap<String, byte[]>();
            if (tuple.getSchema().hasField(FIELD_COLUMNS)) { // check for the generic form of a tuple that we have stated via typecheck that we support
                List<?> columns = tuple.getList(FIELD_COLUMNS);
                for (Object familyColumnObject : columns) {
                    Tuple familyColumnTuple = (Tuple)familyColumnObject;
                    String genericFamily = familyColumnTuple.isNull(FIELD_FAMILY) ? null : familyColumnTuple.getString(FIELD_FAMILY);
                    String genericColumn = familyColumnTuple.isNull(FIELD_COLUMN) ? null : familyColumnTuple.getString(FIELD_COLUMN);
                    String genericValue = familyColumnTuple.isNull(FIELD_VALUE) ? null : familyColumnTuple.getString(FIELD_VALUE);
                    if (genericFamily != null && genericFamily.equals(family) && genericColumn != null && genericValue != null) {
                        familyData.put(genericColumn, Bytes.toBytes(genericValue));
                    }
                }
            } else {                
                for (Field field : tuple.getSchema().fields()) {
                    if (tuple.isNull(field)) {
                        continue;
                    }
                    if (field.getName().equals(COLUMN1)) {
                        familyData.put(field.getName(), Bytes.toBytes(tuple.getInt(field)));                        
                    } else if (field.getName().equals(COLUMN2)) {
                        familyData.put(field.getName(), Bytes.toBytes(tuple.getLong(field)));                        
                    } else if (field.getName().equals(COLUMN3)) {
                        familyData.put(field.getName(), Bytes.toBytes(tuple.getDouble(field)));                        
                    } else if (field.getName().equals(COLUMN4)) {
                        familyData.put(field.getName(), Bytes.toBytes(tuple.getString(field)));
                    } else if (field.getName().equals(COLUMN5)) {
                        familyData.put(field.getName(), tuple.getBlobBuffer(field).copyBytes());
                    } else if (field.getName().equals(COLUMN6)) {
                        familyData.put(field.getName(), Bytes.toBytes(tuple.getTimestamp(field).toMsecs()));
                    } else if (field.getName().equals(COLUMN7)) {                        
                        familyData.put(field.getName(), Bytes.toBytes(tuple.getBoolean(field)));
                    } 
                }
            }
            return familyData;
        }
        return null;
    }
}