Source code for tibco.liveview.listeners.lv_live_result

import pprint as pp
import threading
import pandas as pd
from collections import OrderedDict
from typing import Callable, Dict, List
from ..models import LiveQuery
from ..exceptions import LiveViewException
from .lv_listener import QueryListener
from ..util import convert_tuple_with_schema


[docs]class LiveResult(QueryListener): """A LiveResult is a QueryListener that listens to a LiveQuery and maintains a result set of tuples. It implements callback handlers for add, remove, and update events so that when ``LiveResult#get_columns()`` is called with desired columns as arguments, data from the current result set is retrieved. **Warning**: The result set may grow infinitely. This class is only intended to be used with live queries that produce finite result sets. Usage: :: import liveview as lv import time from liveview.listeners import LiveResult # Assume a LiveView server is running 'Hello LiveView' locally on port 11080 client = liveview.get_client('lv://localhost:11080') live_query = client.live_query_from_s('ItemsSales', 'SELECT * FROM ItemsSales') with LiveResult(live_query) as live_result: time.sleep(3) categories, items = live_result.get_columns('category', 'Item') """ def __init__(self, live_query: LiveQuery, exception_raised: Callable = None, convert_with_schema: bool = True): super().__init__(live_query, tuple_added=self._tuple_added, tuple_updated=self._tuple_updated, tuple_removed=self._tuple_removed, exception_raised=exception_raised) # _tuple_dict is an ordered dictionary that stores a mapping from tuple primary keys to their data. self._tuple_dict: Dict[int, Dict[str, any]] = OrderedDict() self._read_lock = threading.Lock() self._convert_with_schema = convert_with_schema
[docs] def get_tuples(self) -> Dict[int, Dict[str, any]]: """Get a mapping from tuple primary keys to tuple data""" return self._tuple_dict
[docs] def get_columns(self, *columns) -> List[List[any]]: """Get columns of live data as a list of lists. Usage: :: temperatures, room_names = live_result.get_columns('Temperature', 'Room') # temperatures == [70.2, 105.3, 75.3] # room_names == ['Kitchen', 'Sauna', 'Living Room'] # Get a single column, unpack the list of lists. temperatures, *_ = live_result.get_columns('Temperature') Args: *columns: one or more tuple field names Returns: A list of the columns in the same order as requested in the arguments """ # If we have no elements, return an empty list of lists (one for each column) if len(self._tuple_dict) == 0: return [[] for _ in columns] # Return a matrix of the columns requested with self._read_lock: return pd.DataFrame(self._tuple_dict.values()).loc[:, columns].to_numpy().T
def _convert_tuple_to_schema(self, lv_tuple): """Convert a tuple using the LiveQuery's schema""" if not self._convert_with_schema: return lv_tuple return convert_tuple_with_schema(lv_tuple, self._live_query.get_schema()) def _tuple_added(self, event): key = event.get('key') data = event.get('data') if key is None: raise LiveViewException(f'Tuple added event <{pp.pformat(event)}> does not contain key') if data is None: raise LiveViewException(f'Tuple added event <{pp.pformat(event)}> does not contain data') with self._read_lock: self._tuple_dict[key] = self._convert_tuple_to_schema(data) def _tuple_updated(self, event): key: int = event.get('key') data: dict = event.get('data') if key is None: raise LiveViewException(f'Tuple updated event <{pp.pformat(event)}> does not contain key') if data is None: raise LiveViewException(f'Tuple updated event <{pp.pformat(event)}> does not contain data') if self._tuple_dict.get(key) is None: raise LiveViewException(f'Tuple updated event <{pp.pformat(event)}> contains key missing from LiveResult') with self._read_lock: for field in self._convert_tuple_to_schema(data): self._tuple_dict[key][field] = data[field] def _tuple_removed(self, event): key = event.get('key') if key is None: raise LiveViewException(f'Tuple removed event <{pp.pformat(event)}> does not contain key') if key not in self._tuple_dict: raise LiveViewException(f'Tuple removed event <{pp.pformat(event)}> attempted to remove tuple' ' not in LiveResult') with self._read_lock: self._tuple_dict.pop(key)