Creating StreamBase Python Clients

This topic describes how to create enqueue and dequeue client applications for StreamBase applications using the StreamBase Client library for Python. The Python client libraries are shipped for Linux installations of StreamBase. Windows and macOS are not supported.

Important

The StreamBase Client library is not guaranteed to be thread-safe—that is, multiple threads cannot concurrently access the same streambase.Client object. You can create and use separate Client objects in separate threads, even if all those Client objects are connected to the same server. You can also guard a single Client object with a mutex, to ensure that only one thread at a time has access to it.

Initial Setup for Linux

On Linux platforms, the Python Client library is built only for Python version 2.7. A streambase.py module is installed in /opt/tibco/sb-cep/11.1/lib64/python2.7.

Set the PYTHONPATH environment variable to point to the location of the appropriate client library for the Python version on your system. For example, on 64-bit Red Hat Enterprise Linux 6, where the default Python version is 2.6, use a setting like the following:

export PYTHONPATH=/opt/tibco/str/11.1/lib64/python2.7:$PYTHONPATH

You can use the sb-config --pypath command to determine the correct Python library version to use. For example:

export PYTHONPATH=`sb-config --pypath`:$PYTHONPATH

Writing Python Enqueue Clients

This section describes how to use the StreamBase Python library to write a client application that enqueues data to StreamBase Server. A sample file, SimpleEnqueuer.py, is provided as part of the client sample included in the StreamBase base kit in $STREAMBASE_HOME/sample/client.

The basic procedure for enqueuing data into a StreamBase node from Python is:

  1. Import the StreamBase Python module:

    import streambase as sb

  2. Create an instance of the streambase.Client class. If needed, specify the URI of the desired StreamBase node as a string argument. For example:

    client = sb.client("sb://localhost:10000/")

  3. Enqueue tuples and terminate.

Your Python program can exploit the homology between StreamBase tuples and Python dictionaries, and between StreamBase lists and Python lists. For example, to enqueue tuples having the schema {list bids {double Price, int Volume}}, your Python client code would be:

import streambase as sb

client = sb.Client("sb://localhost")
client.enqueue("BidsIn", {'bids': [ {'Price': 10.0, 'Volume': 100}, {'Price': 5.0, 'Volume': 50}]})
client.close()

You can also:

  1. Enqueue the tuple onto the stream directly, as a Python tuple or dictionary:

    client.enqueue("InputStream", (5, "hello"))
    client.enqueue("InputStream", {'myint':5, 'mystring':"hello"})
  2. Manually build a streambase.Tuple object and enqueue it:

    1. Retrieve a streambase.Schema object for each stream to which you want to enqueue. For example:

      schema = client.getSchemaForStream("InputStream")
    2. Create a streambase.Tuple object of the specified schema. In this example, the schema of the above stream:

      sbtuple = sb.Tuple(schema)
    3. Set values for each of the tuple's fields, specifying them by name or position. For example:

      sbtuple['myint'] = 5
      sbtuple['mystring'] = "hello"
      
      #  ...is equivalent to...
      
      sbtuple[0] = 5
      sbtuple[1] = "hello"
      
      #  ...is equivalent to...
      
      sbtuple.setInt('myint', 5)
      sbtuple.setString('mystring', "hello")  
    4. Enqueue the streambase.Tuple onto the stream, which you can reference by name. For example:

      client.enqueue("InputStream", sbtuple)

For details on Python Dictionaries, refer to chapter 3 of the Python Language Reference.

Running a StreamBase Python Client

  1. On a supported Linux machine where the StreamBase client library and Python library are installed, set the PYTHONPATH environment variable as described in Initial Setup for Linux.

  2. Run your StreamBase client in the Python interpreter. For example:

    python mysbclient.py

  3. If the Python interpreter fails with a library import error, make sure the StreamBase client library is available to your linker by adding its path to your LD_LIBRARY_PATH environment variable. For example, on a 64-bit Linux system using Bash and with StreamBase installed its default location, use the following command:

    export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/tibco/str/11.1/lib64

Writing Python Dequeue Clients

This section describes how to use the StreamBase Python library to write a client application that dequeues data from StreamBase Server. A sample file, SimpleDequeuer.py, is provided as part of the client sample included in the StreamBase base kit in $STREAMBASE_HOME/sample/client.

Performance Note

Dequeue (producer) clients that are slow may eventually get disconnected from the EventFlow engine. Use a configuration file of type com.tibco.ep.streambase.configuration.sbclientapilistener to specify the pagePool > maxClientPages property. EventFlow engines disconnect clients that try to allocate more memory than the limit set by this parameter, and is designed to protect engines from a slow or hung dequeue client. For more information, see the pagePool object in StreamBase Client API Listener Configuration.

The basic procedure for dequeuing data from a StreamBase node to Python is as follows:

  1. Import the StreamBase Python module with a line like the following:

    import streambase as sb
  2. Create an instance of the streambase.Client class. If needed, specify the URI of the desired StreamBase node as a string argument. For example:

    client = sb.Client("sb://localhost:10000/")
  3. Subscribe to each output stream from which you want to dequeue. For example:

    client.subscribe("OutputStream")

  4. Call the dequeue() method on the client, which blocks until a tuple becomes available. This method returns a streambase.DequeueResult object, which contains a status flag that indicates whether tuples were received.

    Warning

    It is a best practice that you set a timeout on the dequeue and loop waiting for a GOOD status flag. Otherwise you will not be able to use Ctrl+C to exit your Python program while it is waiting for tuples to dequeue, and you will have to kill it with some other method. The dequeue timeout can be large: you are simply setting the maximum latency between the user issuing a Ctrl+C and the program terminating.

    DEFAULT_TIMEOUT = 500       # milliseconds
    result = sb.DequeueResult()
    while result.getStatus != sb.DequeueResult.GOOD:
        result = client.dequeue(DEFAULT_TIMEOUT)
  5. Once dequeue() has received tuples, you can get from the DequeueResult a streambase.TupleList object containing the streambase.Tuples dequeued, which behaves as a Python list. For example:

    tuples = result.getTuples()
    print "Got " + str(len(tuples)) + " tuple" + ((len(tuples) == 1) and "s" or "")
    print "First tuple is: " + str(tuples[0])
  6. If the result of dequeue().getStatus() is streambase.DequeueResult.CLOSED, the server (or your client from another thread) has closed the connection, and your program should respond appropriately.

  7. Having received tuples, you can access them any of several ways. You can:

    1. Loop over the TupleList. For example:

      for t in tuples:
          print t
    2. Access the Tuple's fields by name or index. For example:

      print "myint: " + str(t[0]) + ", mystring: " + t[1]
      print "myint: " + str(t['myint']) + ", mystring: " + t['mystring']
  8. If you wish to cancel the blocking dequeue() call (for example, if your client is shutting down), call the close() method on the Client object. In fact, before exiting, always call the streambase.Client.close() method to flush the client network buffers.

Running Python Dequeue Clients

The procedure for running Python dequeue clients is the same as described above for enqueue clients. Remember to always call streambase.Client.close()to flush the client network buffers before exiting.