Creating StreamBase Java Clients

You can use the StreamBase Client wizard to create a Java client enqueuer or dequeuer program that will interact with a StreamBase application. The generated code has commented sections that you can later edit to add the functionality you want. When you want to deploy the client to use with your StreamBase applications, you can package your code into a JAR file and export it to your file system.

Using the StreamBase Client Wizard

To open the wizard, click File>New>StreamBase Client. Edit these fields in the wizard:

Type

Indicate the type of client you want to generate: Choose one of the following:

Dequeuer

A program that dequeues data from StreamBase Server, sometimes called a consumer client. You might use dequeue clients as part of your testing procedure, or as an integral part of your deployed application.

Enqueuer

A program that feeds data into StreamBase Server, and sometimes called a producer client. An enqueuer might be part of your testing procedure (to simulate streaming data in a way that sbfeedsim cannot) or part of the implementation of your deployed application.

Source folder

The path to the src/main/java subfolder of an existing project folder in your workspace.

Package

While you could use the default package, it is recommended that you specify a different one, such as com.example.mysbapps.clients.

Name

A name for your class, such as MyEnqueueClient.

For example:

When you are ready, click Next to display the next page (either the enqueuer or dequeuer client page).

New StreamBase Client Dequeuer

Use this dialog to enter options for your new dequeue client. A dequeue client gets results from one or more output streams in a StreamBase application.

  1. Choose one of the stream options:

    All Streams

    The generated code will retrieve tuples from all of the output streams of the running StreamBase application.

    Specified Streams

    Allows you to specify which of the application's output streams to dequeue data from. For each stream:

    1. Enter the stream name in the Add a Stream field, and click Add.

    2. Optionally enter a predicate expression that restricts the tuples dequeued to those matching the expression. For example, PRICE > 50 dequeues only prices greater than 50.

  2. Specify the host name of the computer running the StreamBase application, and the TCP/IP port on which to access the output streams.

  3. Optionally select the Use a background thread option. (For guidance, see the Note on this subject in the next section.)

  4. Click Finish.

The wizard generates the Java file, and opens it in StreamBase Studio. All the required package and import statements are provided and the StreamBase Java Client library class and method definitions are started (based on your selections in the dialogs). Just edit the // TODO comments to complete the coding.

New StreamBase Client Enqueuer

Use the New StreamBase Client Enqueuer page to enter options for your new enqueuer client. An enqueuer client submits data to one or more input streams in a StreamBase application.

  1. Choose one of the following options:

    All Streams

    The generated code will retrieve a listing of all available input streams from the StreamBaseClient proxy instance once it has connected to the StreamBase application.

    Specified Streams

    Allows you to specify which of the application's input streams the client will submit data to. Enter a stream name in the Add a Stream field, and click Add.

  2. Specify the host name of the computer running the StreamBase application, and the TCP/IP port on which the application will receive input messages.

  3. Optionally choose any of these options:

    Use a background thread

    Note

    Use threading with caution. You can create and use separate StreamBaseClient objects in separate threads, provided that no two threads use the same StreamBaseClient concurrently. Refer to the Class StreamBaseClient in the Java Client library reference documentation for further information.

    Enable buffering

    For details, see Buffering Options.

    Enable batching

    Specify a batch size in number of tuples to batch and send at once. The default is 10.

  4. Click Finish.

The wizard generates the Java file, and opens it in StreamBase Studio. All the required package and import statements are provided and the StreamBase Java Client library class and method definitions are started (based on your selections in the dialogs). Just edit the // TODO comments to complete the coding.

Buffering Options

For enqueue clients, the dialog presents options to enable:

  • The buffer size (number of tuples per buffer)

  • The buffer's flush interval

In the generated code, you can also explicitly flush the buffer of a specified stream, or flush all buffers.

By enabling tuple buffering and experimenting with these parameters, you may be able to improve the efficiency and performance of your enqueue code. A single enqueue of a buffer containing (for example) 100 tuples should be more efficient than making 100 separate enqueue operations.

The example below demonstrates how to use this feature. The buffer size is set to 100 tuples and the buffer flush interval is set to 1000 milliseconds (one second).

package com.mycompany.mysbapps.clients;

import java.util.*;

import com.streambase.sb.*;
import com.streambase.sb.client.*;
   .
   .
   .
public class MyEnqueuerClient {

   private final static String SB_URI = "sb://localhost:10000";
   private static StreamBaseClient client = null;
   private String[] streamNames = null;
   private Collection tuples = null;
   private int buf_size = 100;
   private int flush_interval = 1000;
   .
   .
   .
   public MyEnqueuerClient(StreamBaseURI uri) throws StreamBaseException {
      System.out.println("Connecting to " + uri.toString() + "...");
      client = new StreamBaseClient(uri);
      System.out.println("Connected to " + uri.toString());

      // Turn on buffering. A WakeAndFlushBuffer thread is only
      // started if flush_interval > 0.
      client.enableBuffering(buf_size, flush_interval);
   }

In the StreamBase Client libraries, buffering is only turned on if the value for the buf_size parameter is greater than zero. (The buf_size parameter specifies the number of tuples, not a byte limit.) Also the buffer is flushed at a regular interval specified by the flush_interval parameter. If the flush_interval is not greater than zero, the buffer will only be flushed when it reaches capacity (it is filled with tuples).

If buffering is enabled and the flush_interval is set, the buffer will be flushed periodically based on the specified interval. However, to have more control over enqueuing buffered tuples, you can also use the flushBuffer(stream_name) method of the StreamBaseClient class to enqueue the contents of a buffer immediately. Or, you can use the flushAllBuffers() method to enqueue the contents of all buffers. These methods assume that the caller has a lock on _buffers_lock.

Generally you want to flush a buffer when you are concerned that the tuples in the buffer may become stale (staleness is relative to the particular application). Let's say, for example, your buffer size is 1000 and your flush_interval is 0 (this value turns off periodic flushing). The buffer will automatically be flushed when it is filled. That is, when the 1000th tuple has been enqueued. If 300 tuples have just been enqueued and it is unclear when or if more tuples will arrive to fill the buffer, it would be a good idea to call flushBuffer() or flushAllBuffers().

Related Topics

  • For general information about using the StreamBase API for Java clients, start in API Guide.