Source code for tibco.liveview.listeners.lv_listener

import threading
import time
from typing import Callable, Optional

from ..models import LiveQuery
from ..exceptions import LiveViewException


[docs]class QueryListener: """A QueryListener listens to a live query and responds to events according to their type. It provides no built-in callback functions and allows you to customize how events are handled. See ``liveview.listeners.LiveResult`` for an example of a QueryListener with callback functions implemented. """ def __init__(self, live_query: LiveQuery, tuple_added: Callable = None, tuple_removed: Callable = None, tuple_updated: Callable = None, delete_begin: Callable = None, delete_end: Callable = None, exception_raised: Callable = None, snapshot_begin: Callable = None, snapshot_end: Callable = None): """Initialize a listener which listens for events from a LiveQuery and calls user-defined callback functions on them, depending on the event's type. Args: live_query (LiveQuery): The query to be listened to tuple_added (Callable): Function to be called when tuple_added event is received tuple_removed (Callable): Function to be called when tuple_removed event is received tuple_updated (Callable): Function to be called when tuple_updated event is received delete_begin (Callable): Function to be called when delete_begin event is received delete_end (Callable): Function to be called when delete_end event is received exception_raised (Callable): Function to be called when an exception is raised snapshot_begin (Callable): Function to be called when snapshot_end event is received snapshot_end (Callable): Function to be called when snapshot_end event is received """ # Store callback functions as members self._callback_dict = { 'add': tuple_added, 'remove': tuple_removed, 'update': tuple_updated, 'begin_snapshot': snapshot_begin, 'end_snapshot': snapshot_end, 'exception': exception_raised, 'begin_delete': delete_begin, 'end_delete': delete_end, } self._live_query = live_query # Declare threading state self._thread: Optional[threading.Thread] = None self._should_run = False def __enter__(self): self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop()
[docs] def start(self) -> None: """ Start listening for events and calling the callback on incoming data. Returns: None """ if self._thread: raise LiveViewException('Error in QueryListener.start(): thread already running.') try: self._live_query.get_schema() except LiveViewException as e: raise LiveViewException(f'Invalid query: {e}') def listen(): self._should_run = True # Run until the thread is told to stop while self._should_run: try: with self._live_query: for event in self._live_query: # Retrieve the appropriate callback function, determined by the event type callback_func = self._callback_dict[event['type']] # Call it, if it was provided if callback_func: callback_func(event) except Exception as err: err_func = self._callback_dict['exception'] if err_func: err_func(err) time.sleep(1) self._thread = threading.Thread(target=listen, daemon=True) self._thread.start()
[docs] def stop(self) -> None: """Stop handling events and close the thread. Returns: None """ self._should_run = False self._live_query.close() if self._thread is None: raise LiveViewException('Error in QueryListener.stop(): thread not started') self._thread.join() self._thread = None