import threading
import time
from typing import Callable, Optional
from ..models import LiveQuery
from ..exceptions import LiveViewException
[docs]class QueryListener:
"""A QueryListener listens to a live query and responds to events according to their type.
It provides no built-in callback functions and allows you to customize how events are handled.
See ``liveview.listeners.LiveResult`` for an example of a QueryListener with callback functions implemented.
"""
def __init__(self,
live_query: LiveQuery,
tuple_added: Callable = None,
tuple_removed: Callable = None,
tuple_updated: Callable = None,
delete_begin: Callable = None,
delete_end: Callable = None,
exception_raised: Callable = None,
snapshot_begin: Callable = None,
snapshot_end: Callable = None):
"""Initialize a listener which listens for events from a LiveQuery and calls user-defined callback functions
on them, depending on the event's type.
Args:
live_query (LiveQuery): The query to be listened to
tuple_added (Callable): Function to be called when tuple_added event is received
tuple_removed (Callable): Function to be called when tuple_removed event is received
tuple_updated (Callable): Function to be called when tuple_updated event is received
delete_begin (Callable): Function to be called when delete_begin event is received
delete_end (Callable): Function to be called when delete_end event is received
exception_raised (Callable): Function to be called when an exception is raised
snapshot_begin (Callable): Function to be called when snapshot_end event is received
snapshot_end (Callable): Function to be called when snapshot_end event is received
"""
# Store callback functions as members
self._callback_dict = {
'add': tuple_added,
'remove': tuple_removed,
'update': tuple_updated,
'begin_snapshot': snapshot_begin,
'end_snapshot': snapshot_end,
'exception': exception_raised,
'begin_delete': delete_begin,
'end_delete': delete_end,
}
self._live_query = live_query
# Declare threading state
self._thread: Optional[threading.Thread] = None
self._should_run = False
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
[docs] def start(self) -> None:
"""
Start listening for events and calling the callback on incoming data.
Returns: None
"""
if self._thread:
raise LiveViewException('Error in QueryListener.start(): thread already running.')
try:
self._live_query.get_schema()
except LiveViewException as e:
raise LiveViewException(f'Invalid query: {e}')
def listen():
self._should_run = True
# Run until the thread is told to stop
while self._should_run:
try:
with self._live_query:
for event in self._live_query:
# Retrieve the appropriate callback function, determined by the event type
callback_func = self._callback_dict[event['type']]
# Call it, if it was provided
if callback_func:
callback_func(event)
except Exception as err:
err_func = self._callback_dict['exception']
if err_func:
err_func(err)
time.sleep(1)
self._thread = threading.Thread(target=listen, daemon=True)
self._thread.start()
[docs] def stop(self) -> None:
"""Stop handling events and close the thread.
Returns: None
"""
self._should_run = False
self._live_query.close()
if self._thread is None:
raise LiveViewException('Error in QueryListener.stop(): thread not started')
self._thread.join()
self._thread = None