Source code for tibco.liveview.lv_query

from typing import Optional, List, Union


[docs]class Query: """A Query allows you to build LiveQL expressions. See `LiveQL Reference <https://docs.tibco.com/pub/str/latest/doc/html/lv-reference/lvqueryreference.html>`_ for details. Note that this class allows you to create invalid queries. It's up to you to create valid queries. Usage:: q = Query('ItemsSales').select('category', 'Item').where(category='electronics', Item='Surge Protector') live_query = client.live_query(q) # Or # live_query = client.live_query_from_s(q.table_name, q.to_s()) """ def __init__(self, table_name): self._table_name: str = table_name self._projection: Optional[str] = '*' self._predicate: Optional[str] = None self._for: Optional[float] = None self._colname_timestamp: Optional[str] = None self._between: Optional[(str, str)] = None self._group_by_cols: Optional[str] = None self._order_by_cols: Optional[str] = None self._pivot_expr_list: Optional[str] = None self._pivot_for_col: Optional[str] = None self._pivot_values: Optional[str] = None self._pivot_group_by: Optional[str] = None self._limit: Optional[int] = None self._having_expr: Optional[str] = None @property def table_name(self) -> str: """The name of this query's table""" return self._table_name
[docs] def select(self, *projection: str): """Define a projection over this Query's table. If no arguments are given, the projection will default to '*'. Args: *projection: one or more columns from the table Returns: This Query object Usage:: query = Query('ItemsSales').select('Item', 'category', 'transactionTime') """ if len(projection) == 0: projection = ('*',) self._projection = ', '.join(self._sanitize_token_list(projection)) return self
[docs] def where(self, str_predicate: str = '', **conj_predicates): """Define a predicate to filter rows from this Query. Args: str_predicate (str): Optional; a string predicate like: :: "Item = 'This' OR Item = 'That' AND NOT category = 'automotive'" **conj_predicates: Keyword arg conjunctive predicates that will be ANDed together. For example: :: Query('CarSales').select('*').where(year=2021, country='France').to_s() will give you the string: :: "SELECT * FROM CarSales WHERE year = 2021 AND country = 'France'" Returns: This Query object Usage:: query1 = Query('ItemsSales').select('Item').where(category='Automotive') # Or query2 = Query('ItemsSales').select('Item').where("NOT category = 'Automotive'") """ token_pairs = [] for field, expr in conj_predicates.items(): if type(expr) is str: token_pairs.append((field, f"'{self._sanitize_token(expr)}'")) else: token_pairs.append((field, expr)) self._predicate = str_predicate + ' AND '.join( f"{self._sanitize_token(field)} = {self._sanitize_token(expr)}" for field, expr in token_pairs ) return self
[docs] def for_ms(self, milliseconds: int): """Define a time-delay modifier. See the Time-Based Data Modifiers section of the `LiveQL Reference <https://docs.tibco.com/pub/str/latest/doc/html/lv-reference/lvqueryreference.html>`_ for details. Args: milliseconds (int): The number of milliseconds for which this Query's predicate should hold Returns: This Query object """ self._for = milliseconds return self
[docs] def when(self, colname_timestamp: str): """Define a column for this query to provide time-windowed results around. To be used with `between()`. Args: colname_timestamp (str): The name of a column around which a time-window will be defined Returns: This Query object Usage:: from liveview.util import now_timestamp time1 = now_timestamp() # ... Some logic during which time passes time2 = now_timestamp() query = Query('ItemsSales').select('Item').when('transactionTime').between(time1, time2) """ self._colname_timestamp = colname_timestamp return self
[docs] def between(self, timestamp1, timestamp2): """To be used with `when()`. Give two timestamps to define the bounds of this query's time-windowed results. Args: timestamp1: A timestamp which defines the beginning of a time-window timestamp2: A timestamp which defines the end of a time-window Returns: This Query object Usage:: from liveview.util import now_timestamp time1 = now_timestamp() # ... Some logic during which time passes time2 = now_timestamp() query = Query('ItemsSales').select('Item').when('transactionTime').between(time1, time2) """ self._between = (timestamp1, timestamp2) return self
[docs] def group_by(self, *cols): """Group-by columns. Args: *cols: One or more columns to group by. Returns: This Query object Usage:: query = Query('ItemsSales').select('category', 'COUNT(Item) AS itemCount').group_by('category') # query.to_s() == "SELECT category, COUNT(Item) AS itemCount FROM ItemsSales GROUP BY category" """ self._group_by_cols = ', '.join(self._sanitize_token_list(cols)) return self
[docs] def order_by(self, *cols: str, **col_orders): """Define this query's ordering Usage:: # Default to descending order # ORDER BY item DESC, transactionTime DESC query.order_by('item', 'transactionTime') # Or order by ascending order # ORDER BY transactionTime ASC query.order_by(transactionTime='ASC') Args: *cols (str): column names which will default to ASC ordering **col_orders: column name keyword args which allow you to specify DESC with ``itemsSales='DESC'`` Returns: This Query object """ col_pairs = [] for col in cols: col_pairs.append((self._sanitize_token(col), 'DESC')) for col in col_orders: col_pairs.append((self._sanitize_token(col), col_orders[col])) self._order_by_cols = ', '.join([f'{col} {order}' for col, order in col_pairs]) return self
[docs] def limit(self, limit: int): """Limit the number of rows that this query will return. Args: limit (int): the max number of rows that this query should return Returns: This Query object """ self._limit = limit return self
[docs] def pivot(self, expr_list: List[str], values: List, for_col: str = None, group_by: Union[str, List[str]] = []): """Have this query return results from a pivot table. See the Pivot Modifier section of the `LiveQL Reference <https://docs.tibco.com/pub/str/latest/doc/html/lv-reference/lvqueryreference.html>`_ for details. Args: expr_list (List[str]): A list of expressions which will fill the rows of the pivot table. values (List): A list of values found in `for_col`, these values will be the column titles of the pivot table. for_col (str): A column which contains values in the `values` list. group_by (Union[str, List[str]]): a string or list of strings which are column names to group by. Returns: """ self._pivot_expr_list = ', '.join(self._sanitize_token_list(expr_list)) self._pivot_for_col = self._sanitize_token(for_col) self._pivot_values = f'[{", ".join(map(str, self._sanitize_token_list(values)))}]' # group_by can be a single string or a list of strings, so store it accordingly if type(group_by) == str: self._pivot_group_by = self._sanitize_token(group_by) else: self._pivot_group_by = ', '.join(self._sanitize_token_list(group_by)) return self
[docs] def having(self, having_expr: str): """Provide a HAVING clause to this query. Usage:: query = Query('ItemsSales').select('Item', 'category').order_by('category', Item='ASC').having('AVG(lastSoldPrice) > 5') # query.to_s() == 'SELECT Item, category FROM ItemsSales ORDER BY category DESC, Item ASC HAVING AVG(lastSoldPrice) > 5' Args: having_expr (str): an aggregate HAVING expression Returns: This Query object """ self._having_expr = self._sanitize_token(having_expr) return self
[docs] def to_s(self) -> str: """Return LiveQL string built by this Query Returns: This Query object as a string which LiveView can parse, if valid. """ # SELECT columns FROM table_name query = f'SELECT {self._projection} FROM {self._table_name}' # Where predicate if self._predicate: query += f' WHERE {self._predicate}' # Time-window modifier if self._for: query += f' FOR {self._for} MILLISECONDS' # Time-delay modifier if self._colname_timestamp: query += f' WHEN {self._colname_timestamp}' # Between clause if self._between: query += f' BETWEEN {self._between[0]} AND {self._between[1]}' # Group by clause if self._group_by_cols: query += f' GROUP BY {self._group_by_cols}' # Order by clause if self._order_by_cols: query += f' ORDER BY {self._order_by_cols}' # Pivot clause if self._pivot_expr_list: query += f' PIVOT {self._pivot_expr_list}' if self._pivot_for_col: query += f' FOR {self._pivot_for_col}' query += f' VALUES {self._pivot_values}' if self._pivot_group_by: query += f' GROUP BY {self._pivot_group_by}' # Row result limit if self._limit: query += f' LIMIT {self._limit}' # Having clause if self._having_expr: query += f' HAVING {self._having_expr}' return query
def __str__(self): return self.to_s() def _sanitize_token(self, arg): if str(arg).lower() in self._reserved_tokens: return f'#"{arg}"' return arg def _sanitize_token_list(self, arg_list): return [self._sanitize_token(arg) for arg in arg_list] _reserved_tokens = {'advance', 'declare', 'from', 'into', 'offset', 'primary', 'stream', 'using', 'always', 'default', 'gather', 'join', 'on', 'private', 'table', 'valid', 'and', 'delete', 'group', 'key', 'or', 'public', 'then', 'vjoin', 'apply', 'desc', 'having', 'limit', 'order', 'replace', 'time', 'where', 'as', 'duplicate', 'heartbeat', 'lock', 'outer', 'returning', 'timeout', 'window', 'asc', 'error', 'if', 'lockset', 'output', 'schema', 'true', 'with', 'between', 'every', 'implements', 'materialized', 'parallel', 'secondary', 'truncate', 'within', 'bsort', 'except', 'import', 'merge', 'parameters', 'select', 'tuples', 'by', 'extends', 'index', 'metronome', 'partition', 'set', 'union', 'constant', 'false', 'input', 'not', 'pattern', 'size', 'unlock', 'create', 'foreach', 'insert', 'null', 'predicate', 'slack', 'update', 'cacheable', 'else', 'define', 'function', 'command', 'old', 'publishersn', 'for', 'pivot', 'values', 'key', 'publisherid'}