Round Robin Dequeuer Sample

Introduction

The Round Robin Dequeuer sample is a set of advanced StreamBase modules that presumes you are familiar with:

  • StreamSQL programming

  • StreamBase containers and the system container

  • Adding applications to containers in a running server

  • Using output predicates on output streams

  • Editing StreamBase Server configuration files

The application modules in this sample demonstrate the use of round robin dequeuing. Round robin dequeuer applications control the destination of output tuples based on the connected clients, so that each output message is delivered to a single, subscribed client.

Note

Round robin dequeuing may lose messages as client programs connect and disconnect.

You can configure a StreamBase application to work as a round robin dequeuer by adding the following features:

Subscriptions table

A subscriptions table receives an event whenever a client subscribes or unsubscribes from a stream. The table keeps track of the currently connected dequeuers, so that we can round-robin them. The subscriptions table is maintained in the system container. Its schema must contain at least the following fields:

  • connectionid (int), the client connection identifier. Each connected client has a connection ID, which is a random hash value. To see connection IDs at run time, enter the command sbadmin listConnections.

  • path (string), the path of the stream.

  • logicalPath (string), the logical network address of the connected client. This is the same as the path, unless filtered subscribe is used (logical stream names are described in Narrowing Dequeue Results with Filtered Subscribe in the API Guide).

Output processing module

Your dequeuer application must include an output processing module that tracks the current subscription set based on the Subscriptions table.

Output stream destination control

Connect the processing module to an output stream in your application. In the output stream, set a predicate to control which clients will receive its tuples. The predicate can reference per-client fields, such as connectionid. A tuple is delivered only to clients for which the predicate is true. If the identifier is null, then the tuple is delivered to all connected clients, as usual.

Server configuration

Edit the server configuration file to identify the location of the processing module. For example, in this sample, the processing module is in the same location as the server configuration file:

<global>
  <module-search directory="."/>
  ...
</global>

With these features in place, you can write your own round robin logic to determine where to send output messages.

This Sample's Files

This sample includes several applications and supporting files:

  • roundrobin.sbapp: EventFlow version of the roundrobin example.

  • roundrobin.ssql: StreamSQL version of the roundrobin example.

  • roundrobinwide.sbapp: EventFlow version of the roundrobin example with many fields.

  • roundrobinwide.ssql: StreamSQL version of the roundrobin example with many fields.

  • systemtables.sbapp: EventFlow version of the output processing module used by the roundrobin modules to maintain a subscriptions table.

  • systemtables.ssql: StreamSQL version of the output processing module.

How the Round Robin Sample Works

This section uses the StreamSQL version of the roundrobin sample to describe how the application works. The EventFlow versions follow the same logic. The purpose of the module is to process subscribe and unsubscribe events to maintain its table of current subscriptions.

Output Processing Module

The following discussion examines the output processing module, which is defined in systemtables.ssql. Bracketed numbers on the right are not part of the code, but instead link to lines of commentary below.

create input stream subscriptions(connectionid string, path string,  [1]
  logicalPath string, subscribe boolean, time timestamp);

create output memory table subscriptionstab (id long,                [2]
  connectionid string,   
  path string, logicalPath string, time timestamp,  
  primary key(connectionid, logicalPath) using btree);

create index subidindex on subscriptionstab using btree (id);        [3]

create memory table serialid(id int, serialnum long, 
  primary key(id) using hash);                                       [4]

create stream subscriptionsinsert as select * from subscriptions     [5]
  where subscribe == true;

create stream subscriptionswithid;                                   [6]

insert into serialid (id, serialnum) select 1 as id,                 [7]
  long(1) as serialnum        
  from subscriptionsinsert
    on duplicate key update serialnum = serialid.serialnum+1
    returning subscriptionsinsert.*, 
    serialid.serialnum as subscriptionid 
    INTO subscriptionswithid;

insert into subscriptionstab select subscriptionid,                  [8]
  connectionid, path, logicalPath,   
  time from subscriptionswithid;

create stream subscriptionsdelete as select * from subscriptions     [9]
  where subscribe == false;

delete from subscriptionstab using subscriptionsdelete              [10]
  where subscriptionstab.connectionid = subscriptionsdelete.connectionid 
  and subscriptionstab.logicalPath = subscriptionsdelete.logicalPath;

The following annotations describe points of interest in the systemtables module:

  1. Create input stream for subscriptions. The stream schema includes these fields:

    • The required connectionid, path, and logicalPath fields.

    • subscribe: A flag indicating either a subscribe or an unsubscribe event.

    • time: The timestamp of the event.

  2. Create the shared subscriptions table. The table schema adds an id field for the subscription sequence number.

  3. Define the index to do lookups on.

  4. Create table for the serial number for the subscriptions table.

  5. Create an intermediate stream so that we don't access the query table on each insert.

  6. Stream of subscriptions with an ID.

  7. Insert the next sequence if no rows, otherwise do an update returning the serial numbers and subscriptions.

  8. Insert subscriptions into subscriptionstab.

  9. Create an intermediate stream to handle deleted subscriptions.

  10. Unsubscribe: delete from subscriptionstab.

Main Module

This section examines the StreamSQL version of dequeuer application itself, roundrobin.ssql, which contains the systemtables module.

create input stream subscriptions(connectionid string, path string,      [1]
  logicalPath string, subscribe boolean, time timestamp);

create table subscriptionstab;                                           [2]

APPLY MODULE "systemtables.ssql" FROM subscriptions=subscriptions        [3]
 INTO subscriptionstab=subscriptionstab;

create input stream in(a int);                                           [4]

create output stream out(a int, nextsubid long, nextconnectionid string) [5]
    with output predicate system.connectionid = nextconnectionid;

declare lastsubid long default 0                                         [6]
    update from (select nextsubid from out);

create output stream nextid AS                                           [7]
  select in.*, 
      subscriptionstab.id as nextsubid, 
      subscriptionstab.connectionid as nextconnectionid
  from in outer join subscriptionstab
  where subscriptionstab.id > lastsubid and path = getPath('out')
    order by id limit 1;
    
select nextid.a,                                                         [8]
  coalesce(nextid.nextsubid, subscriptionstab.id) as nextsubid, 
  coalesce(nextid.nextconnectionid, subscriptionstab.connectionid) 
    as nextconnectionid
    from nextid, subscriptionstab
    where path = getPath('out')
    order by id limit 1 into out;

The following annotations describe points of interest in the roundrobin.ssql application:

  1. Note that this schema matches that of the output processing module's input stream.

  2. This is a placeholder table for the shared table in the systemtables module.

  3. Import the shared table.

  4. The input stream for the actual data, in this example a simple int field.

  5. Create the output stream now so that we can use it to define a dynamic variable later. Output fields include:

    • a: The application data

    • nextsubid: The next subscriber client that we want to send output to.

    • nextconnectionid: The subscriber's connection ID.

    Finally, declare a predicate. connectionid is one of several special fields, described in Defining Output Streams in the Authoring Guide, that can be used in output stream predicates to control dequeuing. It specifies that output is to be sent only to a particular connection.

  6. Declare the lastsubid dynamic variable, which will contain the ID of the last client that received a subscription event from the output stream.

  7. Create an intermediate output stream that contains the result of a lookup for the nextid in the subscriptions table. That is, the next subscription whose ID is greater than the last subscription we used. The lookup returns null if there are no subscriptions, or if we have sent a tuple to the last subscription in the table.

    The getPath function returns the full path of the out stream, ensuring that the right container is used at run time, as described below.

  8. If the preceding lookup returns null, then look for the ID from the start of the table. As in standard SQL, the coalesce function here returns the first non-null value in the list of values.

Running the Round Robin Sample in Terminal Windows

This section describes how to run the sample in UNIX terminal windows or Windows command prompt windows. On Windows, be sure to use the StreamBase Command Prompt from the Start menu as described in the Test/Debug Guide, not the default command prompt.

  1. Open four terminal windows on UNIX, or four StreamBase Command Prompts on Windows. In each window, navigate to the directory where the sample is installed, or to your workspace copy of the sample, as described above.

  2. In window 1, start StreamBase Server without an application, but specify the configuration file that is installed with the sample. For example, use this command:

    sbd -f sbd.sbconf

  3. In window 2, add the primary module to a named container, making sure to connect the system.subscriptions stream with the application's subscriptions stream:

    sbadmin addContainer rr roundrobin.sbapp rr.subscriptions=system.subscriptions

    where rr is a name you choose for the new container. (At this point, the server may return a warning message in window 1, which can be safely ignored for purposes of this sample.)

  4. In both windows 2 and 3, start dequeuers against the output stream with the same command in both windows:

    sbc -u sb://localhost/rr dequeue rr.out

    You can add more windows to see more dequeuers.

  5. In window 4, start a feed simulation:

    sbfeedsim -u sb://localhost/rr

    Observe data emitted by each of the dequeuers in windows 2 and 3, printing in round robin fashion with one dequeued line going to window 2, the next to window 3, the next to window 2, and so on.

  6. To shut down the sbfeedsim operation in window 4, press Ctrl+C.

  7. To terminate the server and dequeuers, enter this command in window 4:

    sbadmin shutdown

Importing This Sample into StreamBase Studio

In StreamBase Studio, import this sample with the following steps:

  • From the top menu, select FileLoad StreamBase Sample.

  • Select Round robin dequeuing from the Applications category.

  • Click OK.

StreamBase Studio creates a single project containing this sample's files.

Sample Location

When you load the sample into StreamBase Studio, Studio copies the sample project's files to your Studio workspace, which is normally part of your home directory, with full access rights.

Important

Load this sample in StreamBase Studio, and thereafter use the Studio workspace copy of the sample to run and test it, even when running from the command prompt.

Using the workspace copy of the sample avoids permission problems. The default workspace location for this sample is:

studio-workspace/sample_roundrobin

See Default Installation Directories for the default location of studio-workspace on your system.

In the default TIBCO StreamBase installation, this sample's files are initially installed in:

streambase-install-dir/sample/roundrobin

See Default Installation Directories for the default location of studio-workspace on your system.