Contents
This topic describes how to create enqueue and dequeue client applications for StreamBase applications using the StreamBase Client library for Python. The Python client libraries are shipped for Windows and Linux installations of StreamBase. Mac OS X is not supported.
Important
The StreamBase Client library is not guaranteed to be thread-safe—that is, multiple threads cannot concurrently access the
same streambase.Client
object. You can create and use separate Client
objects in separate threads, even if all those Client
objects are connected to the same server. You can also guard a single Client
object with a mutex, to ensure that only one thread at a time has access to it.
On Linux platforms, the Python Client library is built for four Python versions: 2.4, 2.5, 2.6, and 2.7. A streambase.py
module for each Python version is installed in /opt/tibco/sb-cep/
, where n.m
/lib64/pythonx.x
x.x
is the Python version number.
Set the PYTHONPATH
environment variable to point to the location of the appropriate client library for the Python version on your system. For
example, on 64-bit Red Hat Enterprise Linux 6, where the default Python version is 2.6, use a setting like the following:
export PYTHONPATH=/opt/tibco/sb-cep/n.m
/lib64/python2.6:$PYTHONPATH
You can use the sb-config --pypath command to determine the correct Python library version to use. For example:
export PYTHONPATH=`sb-config --pypath`:$PYTHONPATH
The StreamBase Python Client library supports two Python distributions on Windows platforms:
-
The 64-bit Windows binary installer for Python 2.7.2 or later from python.org.
-
The 64-bit ActivePython release 2.7.2 or later from ActiveState.
Install Python 2.7 into C:\Python27
.
In a StreamBase Command Prompt, set the PYTHONPATH
environment variable to point to the location of the Python Client library:
set PYTHONPATH=%STREAMBASE_HOME%\lib64\python2.7
This section describes how to use the StreamBase Python library to write a client application that enqueues data to StreamBase
Server. A sample file, SimpleEnqueuer.py
, is provided as part of the client
sample shipped with the StreamBase base kit in $STREAMBASE_HOME
/sample/client
.
The basic procedure for enqueuing data into a StreamBase node from Python is:
-
Import the StreamBase Python module:
import streambase as sb
-
Create an instance of the
streambase.Client
class. If needed, specify the URI of the desired StreamBase node as a string argument. For example:client = sb.client("sb://localhost:10000/")
-
Enqueue tuples and terminate.
Your Python program can exploit the homology between StreamBase tuples and Python dictionaries, and between StreamBase lists and Python lists. For example, to enqueue tuples having the schema {list bids {double Price, int Volume}}, your Python client code would be:
import streambase as sb client = sb.Client("sb://localhost") client.enqueue("BidsIn", {'bids': [ {'Price': 10.0, 'Volume': 100}, {'Price': 5.0, 'Volume': 50}]}) client.close()
You can also:
-
Enqueue the tuple onto the stream directly, as a Python tuple or dictionary:
client.enqueue("InputStream", (5, "hello")) client.enqueue("InputStream", {'myint':5, 'mystring':"hello"})
-
Manually build a
streambase.Tuple
object and enqueue it:-
Retrieve a
streambase.Schema
object for each stream to which you want to enqueue. For example:schema = client.getSchemaForStream("InputStream")
-
Create a
streambase.Tuple
object of the specified schema. In this example, the schema of the above stream:sbtuple = sb.Tuple(schema)
-
Set values for each of the tuple's fields, specifying them by name or position. For example:
sbtuple['myint'] = 5 sbtuple['mystring'] = "hello" # ...is equivalent to... sbtuple[0] = 5 sbtuple[1] = "hello" # ...is equivalent to... sbtuple.setInt('myint', 5) sbtuple.setString('mystring', "hello")
-
Enqueue the
streambase.Tuple
onto the stream, which you can reference by name. For example:client.enqueue("InputStream", sbtuple)
-
For details on Python Dictionaries, refer to chapter 3 of the Python Language Reference.
-
On a supported Linux machine where the StreamBase client library and Python library are installed, set the
PYTHONPATH
environment variable as described in Initial Setup for Linux.For Windows, follow the steps in Initial Setup for Windows.
-
Run your StreamBase client in the Python interpreter. For example:
python
mysbclient.py
-
If the Python interpreter fails with an error such as
ImportError: libsbclient.so.9: cannot open shared object file: No such file or directory
, make sure the StreamBase client library is available to your linker by adding its path to yourLD_LIBRARY_PATH
environment variable. For example, on a 64-bit Linux system using Bash and with StreamBase installed its default location, use the following command:export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/tibco/sb-cep/
n.m
/lib64
This section describes how to use the StreamBase Python library to write a client application that dequeues data from StreamBase
Server. A sample file, SimpleDequeuer.py
, is provided as part of the client
sample shipped with the StreamBase base kit in $STREAMBASE_HOME
/sample/client
.
Performance Note
Dequeue (producer) clients that are slow may eventually get disconnected from the StreamBase Server process, sbd. The server configuration file includes the max-client-pages
attribute to the <page-pool>
element. The sbd process will disconnect clients that try to allocate more memory than the limit set by this parameter, and is designed to
protect sbd from a slow or hung dequeue client. For more information, see the <page-pool>
element in the StreamBase Server Configuration XML topic in StreamBase References.
The basic procedure for dequeuing data from a StreamBase node to Python is as follows:
-
Import the StreamBase Python module with a line like the following:
import streambase as sb
-
Create an instance of the
streambase.Client
class. If needed, specify the URI of the desired StreamBase node as a string argument. For example:client = sb.Client("sb://localhost:10000/")
-
Subscribe to each output stream from which you want to dequeue. For example:
client.subscribe("OutputStream")
-
Call the
dequeue()
method on the client, which blocks until a tuple becomes available. This method returns astreambase.DequeueResult
object, which contains a status flag that indicates whether tuples were received.Warning
TIBCO strongly recommends that you set a timeout on the dequeue and loop waiting for a
GOOD
status flag. Otherwise you will not be able to use Ctrl+C to exit your Python program while it is waiting for tuples to dequeue, and you will have to kill it with some other method. The dequeue timeout can be large: you are simply setting the maximum latency between the user issuing a Ctrl+C and the program terminating.DEFAULT_TIMEOUT = 500 # milliseconds result = sb.DequeueResult() while result.getStatus != sb.DequeueResult.GOOD: result = client.dequeue(DEFAULT_TIMEOUT)
-
Once
dequeue()
has received tuples, you can get from theDequeueResult
astreambase.TupleList
object containing thestreambase.Tuple
s dequeued, which behaves as a Python list. For example:tuples = result.getTuples() print "Got " + str(len(tuples)) + " tuple" + ((len(tuples) == 1) and "s" or "") print "First tuple is: " + str(tuples[0])
-
If the result of
dequeue().getStatus()
isstreambase.DequeueResult.CLOSED
, the server (or your client from another thread) has closed the connection, and your program should respond appropriately. -
Having received tuples, you can access them any of several ways. You can:
-
Loop over the
TupleList
. For example:for t in tuples: print t
-
Access the
Tuple
's fields by name or index. For example:print "myint: " + str(t[0]) + ", mystring: " + t[1] print "myint: " + str(t['myint']) + ", mystring: " + t['mystring']
-
-
If you wish to cancel the blocking
dequeue()
call (for example, if your client is shutting down), call theclose()
method on theClient
object. In fact, before exiting, always call thestreambase.Client.close()
method to flush the client network buffers.
The procedure for running Python dequeue clients is the same as described above for enqueue clients. Remember to always call streambase.Client.close()
to flush the client network buffers before exiting.