import pprint as pp
import threading
import pandas as pd
from collections import OrderedDict
from typing import Callable, Dict, List
from ..models import LiveQuery
from ..exceptions import LiveViewException
from .lv_listener import QueryListener
from ..util import convert_tuple_with_schema
[docs]class LiveResult(QueryListener):
"""A LiveResult is a QueryListener that listens to a LiveQuery and maintains a result set of tuples.
It implements callback handlers for add, remove, and update events so that when ``LiveResult#get_columns()``
is called with desired columns as arguments, data from the current result set is retrieved.
**Warning**: The result set may grow infinitely. This class is only intended to be used with live queries
that produce finite result sets.
Usage:
::
import liveview as lv
import time
from liveview.listeners import LiveResult
# Assume a LiveView server is running 'Hello LiveView' locally on port 11080
client = liveview.get_client('lv://localhost:11080')
live_query = client.live_query_from_s('ItemsSales', 'SELECT * FROM ItemsSales')
with LiveResult(live_query) as live_result:
time.sleep(3)
categories, items = live_result.get_columns('category', 'Item')
"""
def __init__(self, live_query: LiveQuery, exception_raised: Callable = None, convert_with_schema: bool = True):
super().__init__(live_query,
tuple_added=self._tuple_added,
tuple_updated=self._tuple_updated,
tuple_removed=self._tuple_removed,
exception_raised=exception_raised)
# _tuple_dict is an ordered dictionary that stores a mapping from tuple primary keys to their data.
self._tuple_dict: Dict[int, Dict[str, any]] = OrderedDict()
self._read_lock = threading.Lock()
self._convert_with_schema = convert_with_schema
[docs] def get_tuples(self) -> Dict[int, Dict[str, any]]:
"""Get a mapping from tuple primary keys to tuple data"""
return self._tuple_dict
[docs] def get_columns(self, *columns) -> List[List[any]]:
"""Get columns of live data as a list of lists.
Usage:
::
temperatures, room_names = live_result.get_columns('Temperature', 'Room')
# temperatures == [70.2, 105.3, 75.3]
# room_names == ['Kitchen', 'Sauna', 'Living Room']
# Get a single column, unpack the list of lists.
temperatures, *_ = live_result.get_columns('Temperature')
Args:
*columns: one or more tuple field names
Returns:
A list of the columns in the same order as requested in the arguments
"""
# If we have no elements, return an empty list of lists (one for each column)
if len(self._tuple_dict) == 0:
return [[] for _ in columns]
# Return a matrix of the columns requested
with self._read_lock:
return pd.DataFrame(self._tuple_dict.values()).loc[:, columns].to_numpy().T
def _convert_tuple_to_schema(self, lv_tuple):
"""Convert a tuple using the LiveQuery's schema"""
if not self._convert_with_schema:
return lv_tuple
return convert_tuple_with_schema(lv_tuple, self._live_query.get_schema())
def _tuple_added(self, event):
key = event.get('key')
data = event.get('data')
if key is None:
raise LiveViewException(f'Tuple added event <{pp.pformat(event)}> does not contain key')
if data is None:
raise LiveViewException(f'Tuple added event <{pp.pformat(event)}> does not contain data')
with self._read_lock:
self._tuple_dict[key] = self._convert_tuple_to_schema(data)
def _tuple_updated(self, event):
key: int = event.get('key')
data: dict = event.get('data')
if key is None:
raise LiveViewException(f'Tuple updated event <{pp.pformat(event)}> does not contain key')
if data is None:
raise LiveViewException(f'Tuple updated event <{pp.pformat(event)}> does not contain data')
if self._tuple_dict.get(key) is None:
raise LiveViewException(f'Tuple updated event <{pp.pformat(event)}> contains key missing from LiveResult')
with self._read_lock:
for field in self._convert_tuple_to_schema(data):
self._tuple_dict[key][field] = data[field]
def _tuple_removed(self, event):
key = event.get('key')
if key is None:
raise LiveViewException(f'Tuple removed event <{pp.pformat(event)}> does not contain key')
if key not in self._tuple_dict:
raise LiveViewException(f'Tuple removed event <{pp.pformat(event)}> attempted to remove tuple'
' not in LiveResult')
with self._read_lock:
self._tuple_dict.pop(key)