Source code for tibco.liveview.models.buffered_publisher

from queue import Queue
from threading import Thread
from typing import List, Optional
import time
from ..liveview_generated_client.swagger_client import V11Api
from ..exceptions import LiveViewException


[docs]class BufferedLVPublisher: """Asynchronously publishes tuples to a LiveView table using a FIFO buffer. Initialize with:: import liveview client = liveview.get_client('lv://localhost:11080') publisher = client.get_buffered_publisher(table_name='ItemsSales', publish_interval=1) """ def __init__(self, api_client: V11Api, host: str, table_name: str, publish_interval: float = 1): """Not recommended to initialize directly, instead do:: import liveview client = liveview.get_client('lv://localhost:11080') publisher = client.get_buffered_publisher(table_name='ItemsSales', publish_interval=1) Args: api_client: Swagger API client host: Hostname of LiveView server table_name: Table to publish tuples to publish_interval: Interval in seconds at which tuples will be automatically published """ self.publish_interval: float = publish_interval self.uri: str = f'{host}/tables/{table_name}/tuples' self._api: V11Api = api_client self._buffer: Queue = Queue() self._table_name: str = table_name self._thread: Optional[Thread] = None self._thread_exception: Optional[Exception] = None self._stopped: bool = True self.start()
[docs] def publish(self, lv_tuple): """Insert a tuple or list of tuples into the buffer to be published when this object's publish interval""" if self._stopped: raise LiveViewException('Publisher manually stopped; call start() again or create a new publisher') if self._thread_exception: raise self._thread_exception if type(lv_tuple) in (list, tuple): for t in lv_tuple: self._buffer.put(t) else: self._buffer.put(lv_tuple)
[docs] def flush_buffer(self) -> List: """Empty out the buffer and send it to the server. Useful for publishing tuples at the end of a program, when publish_interval seconds may not pass before Python exits. Returns: List of tuples published """ tuples_published = self._flush_impl() if self._thread_exception: raise self._thread_exception return tuples_published
[docs] def start(self) -> None: """Publish any incoming tuples at an interval. Called when object is initialized.""" self._thread = Thread(target=self._worker, daemon=True) self._stopped = False self._thread.start()
[docs] def stop(self) -> None: """Stop the publisher from automatically publishing at an interval.""" if self._stopped: raise LiveViewException('Already stopped') self._stopped = True self._thread.join()
[docs] def buffer_size(self) -> int: """Return the number of tuples currently in the publish queue""" return self._buffer.qsize()
def _worker(self): while not self._stopped: time.sleep(self.publish_interval) if not self._buffer.qsize() == 0: self._flush_impl() def _flush_impl(self) -> List: """ Publish tuples to LV server. Called internally by worker thread or by a manual call to self.flush_buffer() Returns: A list of tuples successfully published. """ # Send tuples to LV server lv_tuples = [] while not self._buffer.empty(): lv_tuples.append(self._buffer.get()) try: self._api.publish_to_table(self._table_name, lv_tuples) except Exception as e: # Note any exception self._thread_exception = LiveViewException(f'Failed to publish tuples to {self.uri} with error {e}') # Put the tuples back in the buffer for t in lv_tuples: self._buffer.put(t) # Clear out the list; this function returns the tuples it published. # We published no tuples -> we return an empty list. lv_tuples = [] return lv_tuples