Source code for tibco.liveview.models.live_query

import json
import sseclient
import requests
import time
import re
from typing import Optional, List, Dict

from ..exceptions import LiveViewException


[docs]class LiveQuery: """Container class for a LiveQuery. It's recommended that you don't create a LiveQuery directly, but instead follow the below usage. Usage:: import liveview # Assume Hello LiveView is running at lv://localhost:11080 liveview_server_uri = 'lv://localhost:11080' client = liveview.get_client(liveview_server_uri) with client.live_query_from_s('ItemsSales', 'SELECT * FROM ItemsSales') as live_query: results = live_query.take(20) print(results) """ def __init__(self, uri: str) -> None: self._uri: str = uri self._terminate_query: bool = True self._schema: Optional[Dict] = None self._sse_client: sseclient.SSEClient = None
[docs] def get_schema(self) -> Dict: """Get this query's schema. Returns (dict): A mapping from column names to data types. """ # Try to return the cached schema for this query if self._schema is not None: return self._schema # Try to get the schema and cache it try: get_schema_uri, query = re.search(r'(^.*/lv/api/v1/tables/.+/tuples)/live\?(query=.*)', self._uri).groups() response = requests.head(f'{get_schema_uri}?{query}') if response.status_code != 200: raise LiveViewException(f'Schema request {response.url} failed with error: {response.status_code} ' f'{response.text}') self._schema = json.loads(response.headers.get('X-Query-Schema')) return self._schema except AttributeError: raise LiveViewException(f'Failed to get schema because unable to parse URI {self._uri}')
[docs] def take(self, num_tuples: int) -> List: """Get the next ``num_tuples`` from this query. Blocks.""" iterator = self.iterator() return [next(iterator) for _ in range(num_tuples)]
def __enter__(self): self._terminate_query = False return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def iterator(self): """Get an iterator which yields results from this live query.""" return self.__iter__()
def __iter__(self): if self._terminate_query: raise LiveViewException('LiveQuery not open. Please use syntax `with live_query: ...`') try: self._sse_client = sseclient.SSEClient(requests.get(self._uri, stream=True)) for event in self._sse_client.events(): if self._terminate_query: break data = json.loads(event.data) if data.get('type') == 'begin_snapshot': self.schema = data.get('data').get('schema') yield data except requests.exceptions.ConnectionError as e: # Sleep for a moment in the event of a connection error time.sleep(0.5) raise e except StopIteration: raise ConnectionError(f'Connection to event source {self._uri} stopped yielding events') finally: if self._sse_client: self._sse_client.close()
[docs] def close(self): """Close this query. You don't need to call this method if you use the ``with live_query`` syntax. """ self._terminate_query = True if self._sse_client: self._sse_client.close() self._sse_client = None