diff --git a/Cargo.lock b/Cargo.lock index 715aad50cf..f3dc5223c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -756,6 +756,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + [[package]] name = "fastrand" version = "2.1.0" @@ -940,7 +946,7 @@ version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22030e2c5a68ec659fde1e949a745124b48e6fa8b045b7ed5bd1fe4ccc5c4e5d" dependencies = [ - "fallible-iterator", + "fallible-iterator 0.2.0", "indexmap 1.9.3", "stable_deref_trait", ] @@ -1991,9 +1997,14 @@ version = "3.8.0" dependencies = [ "async-lock", "axum", + "fallible-iterator 0.3.0", "futures", + "indexmap 2.2.6", "perspective-client", "perspective-server", + "prost", + "serde", + "serde_json", "tokio", "tracing", ] @@ -2078,9 +2089,12 @@ name = "perspective-python" version = "3.8.0" dependencies = [ "async-lock", + "bytes", + "chrono", "cmake", "extend", "futures", + "indexmap 2.2.6", "macro_rules_attribute", "num_cpus", "perspective-client", @@ -2091,6 +2105,7 @@ dependencies = [ "pyo3-build-config 0.22.6", "python-config-rs", "pythonize", + "serde", "tokio", "tracing", "tracing-subscriber", @@ -2103,11 +2118,16 @@ dependencies = [ "async-lock", "cmake", "futures", + "indexmap 2.2.6", "link-cplusplus", "num_cpus", "perspective-client", + "prost", "protobuf-src", + "serde", + "serde_json", "shlex", + "thiserror 1.0.61", "tracing", ] diff --git a/examples/python-duckdb-virtual/index.html b/examples/python-duckdb-virtual/index.html new file mode 100644 index 0000000000..e2bbc565b7 --- /dev/null +++ b/examples/python-duckdb-virtual/index.html @@ -0,0 +1,31 @@ + + + + + + + + + + + + diff --git a/examples/python-duckdb-virtual/package.json b/examples/python-duckdb-virtual/package.json new file mode 100644 index 0000000000..db4a75d5da --- /dev/null +++ b/examples/python-duckdb-virtual/package.json @@ -0,0 +1,22 @@ +{ + "name": "python-duckdb-virtual", + "private": true, + "version": "3.7.4", + "description": "An example of streaming a `perspective-python` server to the browser.", + "scripts": { + "start": "PYTHONPATH=../../python/perspective python3 server.py" + }, + "keywords": [], + "license": "Apache-2.0", + "dependencies": { + "@finos/perspective": "workspace:^", + "@finos/perspective-viewer": "workspace:^", + "@finos/perspective-viewer-d3fc": "workspace:^", + "@finos/perspective-viewer-datagrid": "workspace:^", + "@finos/perspective-workspace": "workspace:^", + "superstore-arrow": "catalog:" + }, + "devDependencies": { + "npm-run-all": "catalog:" + } +} diff --git a/examples/python-duckdb-virtual/server.py b/examples/python-duckdb-virtual/server.py new file mode 100644 index 0000000000..8538a92aa7 --- /dev/null +++ b/examples/python-duckdb-virtual/server.py @@ -0,0 +1,66 @@ +# ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +# ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃ +# ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃ +# ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃ +# ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃ +# ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫ +# ┃ Copyright (c) 2017, the Perspective Authors. ┃ +# ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃ +# ┃ This file is part of the Perspective library, distributed under the terms ┃ +# ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃ +# ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + +from pathlib import Path + +import duckdb +import perspective +import perspective.handlers.tornado +import perspective.virtual_servers.duckdb +import tornado.ioloop +import tornado.web +import tornado.websocket + +from loguru import logger +from tornado.web import StaticFileHandler + + +INPUT_FILE = ( + Path(__file__).parent.resolve() + / "node_modules" + / "superstore-arrow" + / "superstore.parquet" +) + + +if __name__ == "__main__": + db = duckdb.connect(":memory:perspective") + db.sql( + f""" + SET default_null_order=NULLS_FIRST_ON_ASC_LAST_ON_DESC; + CREATE TABLE data_source_one AS + SELECT * FROM '{INPUT_FILE}'; + """, + ) + + virtual_server = perspective.virtual_servers.duckdb.DuckDBVirtualServer(db) + app = tornado.web.Application( + [ + ( + r"/websocket", + perspective.handlers.tornado.PerspectiveTornadoHandler, + {"perspective_server": virtual_server}, + ), + (r"/node_modules/(.*)", StaticFileHandler, {"path": "../../node_modules/"}), + ( + r"/(.*)", + StaticFileHandler, + {"path": "./", "default_filename": "index.html"}, + ), + ], + websocket_max_message_size=100 * 1024 * 1024, + ) + + app.listen(3000) + logger.info("Listening on http://localhost:3000") + loop = tornado.ioloop.IOLoop.current() + loop.start() diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 81b5620d87..53614beab6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -410,6 +410,31 @@ importers: examples/python-aiohttp: {} + examples/python-duckdb-virtual: + dependencies: + '@finos/perspective': + specifier: workspace:^ + version: link:../../rust/perspective-js + '@finos/perspective-viewer': + specifier: workspace:^ + version: link:../../rust/perspective-viewer + '@finos/perspective-viewer-d3fc': + specifier: workspace:^ + version: link:../../packages/perspective-viewer-d3fc + '@finos/perspective-viewer-datagrid': + specifier: workspace:^ + version: link:../../packages/perspective-viewer-datagrid + '@finos/perspective-workspace': + specifier: workspace:^ + version: link:../../packages/perspective-workspace + superstore-arrow: + specifier: 'catalog:' + version: 3.2.0 + devDependencies: + npm-run-all: + specifier: 'catalog:' + version: 4.1.5 + examples/python-starlette: {} examples/python-tornado: diff --git a/rust/perspective-client/src/rust/lib.rs b/rust/perspective-client/src/rust/lib.rs index 4fdc1ee333..08db32126e 100644 --- a/rust/perspective-client/src/rust/lib.rs +++ b/rust/perspective-client/src/rust/lib.rs @@ -44,11 +44,12 @@ pub mod config; #[rustfmt::skip] #[allow(clippy::all)] -mod proto; +pub mod proto; pub mod utils; pub use crate::client::{Client, ClientHandler, Features, ReconnectCallback, SystemInfo}; +use crate::proto::HostedTable; pub use crate::session::{ProxySession, Session}; pub use crate::table::{ DeleteOptions, ExprValidationResult, Table, TableInitOptions, TableReadFormat, UpdateOptions, @@ -66,6 +67,16 @@ pub mod vendor { pub use paste; } +impl From<&str> for HostedTable { + fn from(entity_id: &str) -> Self { + HostedTable { + entity_id: entity_id.to_string(), + index: None, + limit: None, + } + } +} + /// Assert that an implementation of domain language wrapper for [`Table`] /// implements the expected API. As domain languages have different API needs, /// a trait isn't useful for asserting that the entire API is implemented, diff --git a/rust/perspective-python/Cargo.toml b/rust/perspective-python/Cargo.toml index 49432ac686..f59ccecdad 100644 --- a/rust/perspective-python/Cargo.toml +++ b/rust/perspective-python/Cargo.toml @@ -59,11 +59,15 @@ python-config-rs = "0.1.2" [dependencies] perspective-client = { version = "3.8.0" } perspective-server = { version = "3.8.0" } +bytes = "1.10.1" +chrono = "0.4" macro_rules_attribute = "0.2.0" async-lock = "2.5.0" pollster = "0.3.0" extend = "1.1.2" +indexmap = "2.2.6" futures = "0.3.28" +serde = { version = "1.0" } pyo3 = { version = "0.25.1", features = [ "experimental-async", "extension-module", diff --git a/rust/perspective-python/perspective/__init__.py b/rust/perspective-python/perspective/__init__.py index 34bcc1bd18..be664ea576 100644 --- a/rust/perspective-python/perspective/__init__.py +++ b/rust/perspective-python/perspective/__init__.py @@ -21,6 +21,7 @@ "ProxySession", "AsyncClient", "AsyncServer", + "VirtualServer", "num_cpus", "set_num_cpus", "system_info", @@ -351,6 +352,7 @@ def delete_callback(): Server, AsyncServer, AsyncClient, + VirtualServer, # NOTE: these are classes without constructors, # so we import them just for type hinting Table, # noqa: F401 diff --git a/rust/perspective-python/perspective/virtual_servers/__init__.py b/rust/perspective-python/perspective/virtual_servers/__init__.py new file mode 100644 index 0000000000..faf9b4773b --- /dev/null +++ b/rust/perspective-python/perspective/virtual_servers/__init__.py @@ -0,0 +1,137 @@ +# ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +# ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃ +# ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃ +# ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃ +# ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃ +# ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫ +# ┃ Copyright (c) 2017, the Perspective Authors. ┃ +# ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃ +# ┃ This file is part of the Perspective library, distributed under the terms ┃ +# ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃ +# ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + + +class VirtualSessionModel: + """ + An interface for implementing a Perspective `VirtualServer`. It operates + thusly: + + - A table is selected by name (validated via `get_hosted_tables`). + + - The UI will ask the model to create a temporary table with the results + of querying this table with a specific query `config`, a simple struct + which reflects the UI configurable fields (see `get_features`). + + - The UI will query slices of the temporary table as it needs them to + render. This may be a rectangular slice, a whole column or the entire + set, and it is returned from teh model via a custom push-only + struct `PerspectiveColumn` for now, though in the future we will support + e.g. Polars and other arrow-native formats directly. + + - The UI will delete its own temporary tables via `view_delete` but it is + ok for them to die intermittently, the UI will recover automatically. + """ + + def get_features(self): + """ + [OPTIONAL] Toggle UI features through data model support. For example, + setting `"group_by": False` would hide the "Group By" UI control, as + well as prevent this field from appearing in `config` dicts later + provided to `table_make_view`. + + This API defaults to just "columns", e.g. a simple flat datagrid in + which you can just scroll, select and format columns. + + # Example + + ```python + return { + "group_by": True, + "split_by": True, + "sort": True, + "expressions": True, + "filter_ops": { + "integer": ["==", "<"], + }, + "aggregates": { + "string": ["count"], + "float": ["count", "sum"], + }, + } + ``` + """ + + pass + + def get_hosted_tables(self) -> list[str]: + """ + List of `Table` names available to query from. + """ + + pass + + def table_schema(self, table_name): + """ + Get the _Perspective Schema_ for a `Table`, a mapping of column name to + Perspective column types, a simplified set of six visually-relevant + types mapped from DuckDB's much richer type system. Optionally, + a model may also implement `view_schema` which describes temporary + tables, but for DuckDB this method is identical. + """ + + pass + + def table_columns_size(self, table_name, config): + pass + + def table_size(self, table_name): + """ + Get a table's row count. Optionally, a model may also implement the + `view_size` method to get the row count for temporary tables, but for + DuckDB this method is identical. + """ + + pass + + def view_schema(self, view_name, config): + return self.table_schema(view_name) + + def view_size(self, view_name): + return self.table_size(view_name) + + def table_make_view(self, table_name, view_name, config): + """ + Create a temporary table `view_name` from the results of querying + `table_name` with a query configuration `config`. + """ + + pass + + def table_validate_expression(self, view_name, expression): + """ + [OPTIONAL] Given a temporary table `view_name`, validate the type of + a column expression string `expression`, or raise an error if the + expression is invalid. This is enabeld by `"expressions"` via + `get_features` and defaults to allow all expressions. + """ + + pass + + def view_delete(self, view_name): + """ + Delete a temporary table. The UI will do this automatically, and it + can recover. + """ + + pass + + def view_get_data(self, view_name, config, viewport, data): + """ + Serialize a rectangular slice `viewport` from temporary table + `view_name`, into the `PerspectiveColumn` serialization API injected + via `data`. The push-only `PerspectiveColumn` type can handle casting + Python types as input, but once a type is pushed to a column name it + must not be changed. + """ + + pass diff --git a/rust/perspective-python/perspective/virtual_servers/duckdb.py b/rust/perspective-python/perspective/virtual_servers/duckdb.py new file mode 100644 index 0000000000..f605fab227 --- /dev/null +++ b/rust/perspective-python/perspective/virtual_servers/duckdb.py @@ -0,0 +1,442 @@ +# ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +# ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃ +# ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃ +# ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃ +# ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃ +# ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫ +# ┃ Copyright (c) 2017, the Perspective Authors. ┃ +# ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃ +# ┃ This file is part of the Perspective library, distributed under the terms ┃ +# ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃ +# ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + +import duckdb +import perspective + +from datetime import datetime +from loguru import logger + +from perspective.virtual_servers import VirtualSessionModel + +# TODO(texodus): Missing these features +# +# - `min_max` API for value-coloring and value-sizing. +# +# - row expand/collapse in the datagrid needs datamodel support, this is +# likely a "collapsed" boolean column in the temp table we `UPDATE`. +# +# - `on_update` real-time support will be method which takes sa view name and +# a handler and calls the handler when the view needs to be recalculated. +# +# Nice to have: +# +# - Optional `view_change` method can be implemented for engine optimization, +# defaulting to just delete & recreate (as Perspective engine does now). +# +# - Would like to add a metadata API so that e.g. Viewer debug panel could +# show internal generated SQL. + + +NUMBER_AGGS = [ + "sum", + "count", + "any_value", + "arbitrary", + # "arg_max", + # "arg_max_null", + # "arg_min", + # "arg_min_null", + "array_agg", + "avg", + "bit_and", + "bit_or", + "bit_xor", + "bitstring_agg", + "bool_and", + "bool_or", + "countif", + "favg", + "fsum", + "geomean", + # "histogram", + # "histogram_values", + "kahan_sum", + "last", + # "list" + "max", + # "max_by" + "min", + # "min_by" + "product", + "string_agg", + "sumkahan", + # "weighted_avg", +] + +STRING_AGGS = [ + "count", + "any_value", + "arbitrary", + "first", + "countif", + "last", + "string_agg", +] + +FILTER_OPS = [ + "==", + "!=", + "LIKE", + "IS DISTINCT FROM", + "IS NOT DISTINCT FROM", + ">=", + "<=", + ">", + "<", +] + + +class DuckDBVirtualSession: + def __init__(self, callback, db): + self.session = perspective.VirtualServer(DuckDBVirtualSessionModel(db)) + self.callback = callback + + def handle_request(self, msg): + self.callback(self.session.handle_request(msg)) + + +class DuckDBVirtualServer: + def __init__(self, db): + self.db = db + + def new_session(self, callback): + return DuckDBVirtualSession(callback, self.db) + + +class DuckDBVirtualSessionModel(VirtualSessionModel): + """ + An implementation of a `perspective.VirtualSessionModel` for DuckDB. + """ + + def __init__(self, db): + self.db = db + + def get_features(self): + return { + "group_by": True, + "split_by": True, + "sort": True, + "expressions": True, + "filter_ops": { + "integer": FILTER_OPS, + "float": FILTER_OPS, + "string": FILTER_OPS, + "boolean": FILTER_OPS, + "date": FILTER_OPS, + "datetime": FILTER_OPS, + }, + "aggregates": { + "integer": NUMBER_AGGS, + "float": NUMBER_AGGS, + "string": STRING_AGGS, + "boolean": STRING_AGGS, + "date": STRING_AGGS, + "datetime": STRING_AGGS, + }, + } + + def get_hosted_tables(self): + logger.info("SHOW ALL TABLES") + results = self.db.sql("SHOW ALL TABLES").fetchall() + return [result[2] for result in results] + + def table_schema(self, table_name): + query = f"DESCRIBE {table_name}" + results = run_query(self.db, query) + return { + result[0].split("_")[-1]: duckdb_type_to_psp(result[1]) + for result in results + if not (result[0].startswith("__") and result[0].endswith("__")) + } + + def table_columns_size(self, table_name, config): + # TODO split this into 2 methods + query = f"SELECT COUNT(*) FROM (DESCRIBE {table_name})" + results = run_query(self.db, query) + gs = len(config["group_by"]) + return results[0][0] - ( + 0 if gs == 0 else gs + (1 if len(config["split_by"]) == 0 else 0) + ) + + def table_size(self, table_name): + query = f"SELECT COUNT(*) FROM {table_name}" + results = run_query(self.db, query) + return results[0][0] + + def view_schema(self, view_name, config): + return self.table_schema(view_name) + + def view_size(self, view_name): + return self.table_size(view_name) + + def table_make_view(self, table_name, view_name, config): + columns = config["columns"] + group_by = config["group_by"] + split_by = config["split_by"] + aggregates = config["aggregates"] + sort = config["sort"] + + def col_name(col): + return expr if (expr := config["expressions"].get(col)) else f'"{col}"' + + def select_clause(): + if len(group_by) > 0: + for col in columns: + yield f'{aggregates.get(col)}({col_name(col)}) as "{col}"' + + if len(split_by) == 0: + for idx, group in enumerate(group_by): + yield f"{col_name(group)} as __ROW_PATH_{idx}__" + + groups = ", ".join(col_name(g) for g in group_by) + yield f"GROUPING_ID({groups}) AS __GROUPING_ID__" + elif len(columns) > 0: + for col in columns: + yield f'''{col_name(col)} as "{col.replace('"', '""')}"''' + + def order_by_clause(): + if len(group_by) > 0: + for gidx in range(len(group_by)): + groups = ", ".join(col_name(g) for g in group_by[: (gidx + 1)]) + if len(split_by) == 0: + yield f"""GROUPING_ID({groups}) DESC""" + + for sort_col, sort_dir in sort: + if sort_dir != "none": + agg = aggregates.get(sort_col) + if gidx >= len(group_by) - 1: + yield f"{agg}({col_name(sort_col)}) {sort_dir}" + else: + yield f""" + first({agg}({col_name(sort_col)})) + OVER __WINDOW_{gidx}__ {sort_dir} + """ + + yield f"__ROW_PATH_{gidx}__ ASC" + else: + for sort_col, sort_dir in sort: + if sort_dir is not None: + yield f"{col_name(sort_col)} {sort_dir}" + + def window_clause(): + if len(config["sort"]) == 0: + return + + for gidx in range(len(group_by) - 1): + partition = ", ".join(f"__ROW_PATH_{i}__" for i in range(gidx + 1)) + sub_groups = ", ".join(col_name(g) for g in group_by[: (gidx + 1)]) + groups = ", ".join(col_name(g) for g in group_by) + yield f""" + __WINDOW_{gidx}__ AS ( + PARTITION BY + GROUPING_ID({sub_groups}), + {partition} + ORDER BY + {groups} + )""" + + def where_clause(): + for name, op, value in config["filter"]: + if value is not None: + term_lit = f"'{value}'" if isinstance(value, str) else str(value) + yield f"{col_name(name)} {op} {term_lit}" + + if len(split_by) > 0: + query = "SELECT * FROM {}".format(table_name) + else: + query = "SELECT {} FROM {}".format(", ".join(select_clause()), table_name) + + # else: + # for split in split_by: + # extra_cols_query = f""" + # SELECT DISTINCT {f'"{split}"'} + # FROM {table_name} + # """ + # results = self.db.sql(extra_cols_query).fetchall() + # real_columns = [] + # for result in results: + # for idx, col in enumerate(columns): + # real_columns.append( + # f'"{result[0]}_{col}" AS "{result[0]}|{col}"' + # ) + + if len(where := list(where_clause())) > 0: + query = "{} WHERE {}".format(query, " AND ".join(where)) + + if len(split_by) > 0: + groups = ", ".join(col_name(x) for x in group_by) + group_aliases = ", ".join( + f"{col_name(x)} AS __ROW_PATH_{i}__" for i, x in enumerate(group_by) + ) + + query = f""" + SELECT * EXCLUDE ({groups}), {group_aliases} FROM ( + PIVOT ({query}) + ON {", ".join(f'"{c}"' for c in split_by)} + USING {", ".join(select_clause())} + GROUP BY {groups} + ) + """ + + elif len(group_by) > 0: + groups = ", ".join(col_name(x) for x in group_by) + query = f"{query} GROUP BY ROLLUP({groups})" + + if len(window := list(window_clause())) > 0: + query = f"{query} WINDOW {', '.join(window)}" + + if len(order_by := list(order_by_clause())) > 0: + query = f"{query} ORDER BY {', '.join(order_by)}" + + query = f"CREATE TEMPORARY TABLE {view_name} AS ({query})" + run_query(self.db, query, execute=True) + + def table_validate_expression(self, view_name, expression): + query = f"DESCRIBE (select {expression} from {view_name})" + results = run_query(self.db, query) + return duckdb_type_to_psp(results[0][1]) + + def view_delete(self, view_name): + query = f"DROP TABLE {view_name}" + run_query(self.db, query, execute=True) + + def view_get_data(self, view_name, config, viewport, data): + group_by = config["group_by"] + split_by = config["split_by"] + start_col = viewport.get("start_col") + end_col = viewport.get("end_col") + + limit = "" + if (end_row := viewport.get("end_row")) is not None: + start_row = viewport.get("start_row", 0) + limit = f"LIMIT {end_row - start_row} OFFSET {start_row}" + + col_limit = "" + if end_col is not None: + col_limit = f"LIMIT {end_col - start_col} OFFSET {start_col}" + + group_by_columns = "" + if len(group_by) > 0: + if len(split_by) == 0: + row_paths = ["__GROUPING_ID__"] + else: + row_paths = [] + + row_paths.extend(f"__ROW_PATH_{idx}__" for idx in range(len(group_by))) + group_by_columns = f"{', '.join(row_paths)}," + + query = f""" + SET VARIABLE col_names = ( + SELECT list(column_name) FROM ( + SELECT column_name + FROM (DESCRIBE {view_name}) + WHERE not(starts_with(column_name, '__')) + {col_limit} + ) + ); + + SELECT + {group_by_columns} + COLUMNS(c -> list_contains(getvariable('col_names'), c)) + FROM {view_name} {limit} + """ + + results, columns, dtypes = run_query(self.db, query, columns=True) + for cidx, col in enumerate(columns): + if cidx == 0 and len(group_by) > 0 and len(split_by) == 0: + continue + + group_by_index = None + max_grouping_id = None + if len(prefix := col.split("__ROW_PATH_")) > 1: + group_by_index = int(prefix[1].split("__")[0]) + max_grouping_id = 2 ** (len(group_by) - group_by_index) - 1 + + for ridx, row in enumerate(results): + dtype = duckdb_type_to_psp(dtypes[cidx]) + if ( + len(split_by) > 0 + or max_grouping_id is None + or row[0] < max_grouping_id + ): + data.set_col( + dtype, + col.replace("_", "|"), + ridx, + row[cidx], + group_by_index=group_by_index, + ) + + +################################################################################ +# +# DuckDB Utils + + +def val_to_duckdb_lit(value): + """ + Convert a Python value to a string representation of this values suitable + for SQL injecting. + """ + if isinstance(value, str): + return f"'{value}'" + return str(value) + + +def sort_to_duckdb_sort(sortdir): + if sortdir == "asc": + return "ASC" + if sortdir == "desc": + return "DESC" + return "DESC" + + +def duckdb_type_to_psp(name): + """Convert a DuckDB `dtype` to a Perspective `ColumnType`.""" + if name == "VARCHAR": + return "string" + if name in ("DOUBLE", "BIGINT", "HUGEINT"): + return "float" + if name == "INTEGER": + return "integer" + if name == "DATE": + return "date" + if name == "BOOLEAN": + return "boolean" + if name == "TIMESTAMP": + return "datetime" + + msg = f"Unknown type '{name}'" + raise ValueError(msg) + + +def run_query(db, query, execute=False, columns=False): + query = " ".join(query.split()) + start = datetime.now() + result = None + try: + if execute: + db.execute(query) + else: + req = db.sql(query) + result = req.fetchall() + except (duckdb.ParserException, duckdb.BinderException) as e: + logger.error(e) + logger.error(f"{query}") + raise e + else: + logger.debug(f"{datetime.now() - start} {query}") + if columns: + return (result, req.columns, req.dtypes) + else: + return result diff --git a/rust/perspective-python/src/lib.rs b/rust/perspective-python/src/lib.rs index e119f030a1..0b77950122 100644 --- a/rust/perspective-python/src/lib.rs +++ b/rust/perspective-python/src/lib.rs @@ -71,6 +71,7 @@ fn perspective(py: Python, m: &Bound) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add("PerspectiveError", py.get_type::())?; m.add_function(wrap_pyfunction!(num_cpus, m)?)?; m.add_function(wrap_pyfunction!(set_num_cpus, m)?)?; diff --git a/rust/perspective-python/src/server/mod.rs b/rust/perspective-python/src/server/mod.rs index 8f1e4e2475..6ae102bf99 100644 --- a/rust/perspective-python/src/server/mod.rs +++ b/rust/perspective-python/src/server/mod.rs @@ -14,6 +14,7 @@ mod server_async; mod server_sync; pub(crate) mod session_async; pub(crate) mod session_sync; +pub(crate) mod virtual_server_sync; pub use server_async::*; pub use server_sync::*; diff --git a/rust/perspective-python/src/server/virtual_server_sync.rs b/rust/perspective-python/src/server/virtual_server_sync.rs new file mode 100644 index 0000000000..615a49e632 --- /dev/null +++ b/rust/perspective-python/src/server/virtual_server_sync.rs @@ -0,0 +1,471 @@ +// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃ +// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃ +// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃ +// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃ +// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫ +// ┃ Copyright (c) 2017, the Perspective Authors. ┃ +// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃ +// ┃ This file is part of the Perspective library, distributed under the terms ┃ +// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃ +// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + +use std::str::FromStr; +use std::sync::{Arc, Mutex}; + +use chrono::{DateTime, TimeZone, Utc}; +use indexmap::IndexMap; +use perspective_client::proto::{ColumnType, HostedTable}; +use perspective_server::{ + Features, ResultExt, VirtualDataSlice, VirtualServer, VirtualServerHandler, +}; +use pyo3::exceptions::PyValueError; +use pyo3::types::{ + PyAnyMethods, PyBytes, PyDate, PyDict, PyDictMethods, PyList, PyListMethods, PyString, +}; +use pyo3::{IntoPyObject, Py, PyAny, PyErr, PyResult, Python, pyclass, pymethods}; +use serde::Serialize; + +pub struct PyServerHandler(Py); + +impl VirtualServerHandler for PyServerHandler { + type Error = PyErr; + + fn get_features(&self) -> std::result::Result { + Python::with_gil(|py| { + if self + .0 + .getattr(py, pyo3::intern!(py, "get_features")) + .is_ok() + { + Ok(pythonize::depythonize( + self.0.call_method0(py, "get_features")?.bind(py), + )?) + } else { + Ok(Features::default()) + } + }) + } + + fn get_hosted_tables( + &self, + ) -> Result, Self::Error> { + Python::with_gil(|py| { + Ok(self + .0 + .call_method0(py, pyo3::intern!(py, "get_hosted_tables"))? + .downcast_bound::(py)? + .iter() + .flat_map(|x| { + Ok::<_, PyErr>(if x.is_instance_of::() { + HostedTable { + entity_id: x.to_string(), + index: None, + limit: None, + } + } else { + HostedTable { + entity_id: x.get_item("name")?.to_string(), + index: x.get_item("index").ok().and_then(|x| x.extract().ok()), + limit: x.get_item("limit").ok().and_then(|x| x.extract().ok()), + } + }) + }) + .collect::>()) + }) + } + + fn table_schema( + &self, + table_id: &str, + ) -> Result, Self::Error> { + Python::with_gil(|py| { + Ok(self + .0 + .call_method1(py, pyo3::intern!(py, "table_schema"), (table_id,))? + .downcast_bound::(py)? + .items() + .extract::>()? + .into_iter() + .map(|(k, v)| (k, ColumnType::from_str(&v).unwrap())) + .collect()) + }) + } + + fn table_size(&self, table_id: &str) -> Result { + Python::with_gil(|py| { + self.0 + .call_method1(py, pyo3::intern!(py, "table_size"), (table_id,))? + .extract::(py) + }) + } + + fn table_validate_expression( + &self, + table_id: &str, + expression: &str, + ) -> Result { + Python::with_gil(|py| { + let name = pyo3::intern!(py, "table_validate_expression"); + if self.0.getattr(py, name).is_ok() { + Ok(self + .0 + .call_method1(py, name, (table_id, expression))? + .downcast_bound::(py)? + .extract::()?) + .map(|x| ColumnType::from_str(x.as_str()).unwrap()) + } else { + // TODO this should probably be an error. + Ok(ColumnType::Float) + } + }) + } + + fn table_make_view( + &mut self, + table_id: &str, + view_id: &str, + config: &mut perspective_client::config::ViewConfigUpdate, + ) -> Result { + Python::with_gil(|py| { + let _ = self + .0 + .call_method1( + py, + pyo3::intern!(py, "table_make_view"), + (table_id, view_id, pythonize::pythonize(py, &config)?), + )? + .extract::(py); + + Ok(view_id.to_string()) + }) + } + + fn table_columns_size( + &self, + view_id: &str, + config: &perspective_client::config::ViewConfig, + ) -> Result { + Python::with_gil(|py| { + self.0 + .call_method1( + py, + pyo3::intern!(py, "table_columns_size"), + (view_id, pythonize::pythonize(py, &config)?).into_pyobject(py)?, + )? + .extract::(py) + }) + } + + fn view_schema( + &self, + view_id: &str, + config: &perspective_client::config::ViewConfig, + ) -> Result, Self::Error> { + Python::with_gil(|py| { + let has_view_schema = self.0.getattr(py, "view_schema").is_ok(); + let args = if has_view_schema { + (view_id, pythonize::pythonize(py, &config)?).into_pyobject(py)? + } else { + (view_id,).into_pyobject(py)? + }; + + Ok(self + .0 + .call_method1(py, pyo3::intern!(py, "view_schema"), args)? + .downcast_bound::(py)? + .items() + .extract::>()? + .into_iter() + .map(|(k, v)| (k, ColumnType::from_str(&v).unwrap())) + .collect()) + }) + } + + fn view_size(&self, view_id: &str) -> Result { + Python::with_gil(|py| { + self.0 + .call_method1(py, pyo3::intern!(py, "view_size"), (view_id,))? + .extract::(py) + }) + } + + fn view_delete(&self, view_id: &str) -> Result<(), Self::Error> { + Python::with_gil(|py| { + self.0 + .call_method1(py, pyo3::intern!(py, "view_delete"), (view_id,))?; + Ok(()) + }) + } + + fn view_get_data( + &self, + view_id: &str, + config: &perspective_client::config::ViewConfig, + viewport: &perspective_client::proto::ViewPort, + ) -> Result { + let window: PyViewPort = viewport.clone().into(); + Python::with_gil(|py| { + let data = PyVirtualDataSlice::default(); + let _ = self.0.call_method1( + py, + pyo3::intern!(py, "view_get_data"), + ( + view_id, + pythonize::pythonize(py, &config)?, + pythonize::pythonize(py, &window)?, + data.clone(), + ), + )?; + + Ok(Mutex::into_inner(Arc::try_unwrap(data.0).unwrap()).unwrap()) + }) + } +} + +#[derive(Serialize, PartialEq)] +pub struct PyViewPort { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub start_row: ::core::option::Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub start_col: ::core::option::Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub end_row: ::core::option::Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub end_col: ::core::option::Option, +} + +impl From for PyViewPort { + fn from(value: perspective_client::proto::ViewPort) -> Self { + PyViewPort { + start_row: value.start_row, + start_col: value.start_col, + end_row: value.end_row, + end_col: value.end_col, + } + } +} + +#[derive(Clone, Default)] +#[pyclass(name = "VirtualDataSlice")] +pub struct PyVirtualDataSlice(Arc>); + +#[pymethods] +impl PyVirtualDataSlice { + #[pyo3(signature=(dtype, name, index, val, group_by_index = None))] + pub fn set_col( + &self, + dtype: &str, + name: &str, + index: u32, + val: Py, + group_by_index: Option, + ) -> PyResult<()> { + match dtype { + "string" => self.set_string_col(name, index, val, group_by_index), + "integer" => self.set_integer_col(name, index, val, group_by_index), + "float" => self.set_float_col(name, index, val, group_by_index), + "date" => self.set_datetime_col(name, index, val, group_by_index), + "datetime" => self.set_datetime_col(name, index, val, group_by_index), + "boolean" => self.set_boolean_col(name, index, val, group_by_index), + _ => Err(PyValueError::new_err("Unknown type")), + } + } + + #[pyo3(signature=(name, index, val, group_by_index = None))] + pub fn set_string_col( + &self, + name: &str, + index: u32, + val: Py, + group_by_index: Option, + ) -> PyResult<()> { + Python::with_gil(|py| { + if val.is_none(py) { + self.0 + .lock() + .unwrap() + .set_col(name, group_by_index, index as usize, None as Option) + .unwrap(); + } else if let Ok(val) = val.downcast_bound::(py) { + self.0 + .lock() + .unwrap() + .set_col( + name, + group_by_index, + index as usize, + val.extract::().ok(), + ) + .unwrap(); + } else { + tracing::error!("Unhandled") + }; + + Ok(()) + }) + } + + #[pyo3(signature=(name, index, val, group_by_index = None))] + pub fn set_integer_col( + &self, + name: &str, + index: u32, + val: Py, + group_by_index: Option, + ) -> PyResult<()> { + Python::with_gil(|py| { + if val.is_none(py) { + self.0 + .lock() + .unwrap() + .set_col(name, group_by_index, index as usize, None as Option) + .unwrap(); + } else if let Ok(val) = val.extract::(py) { + self.0 + .lock() + .unwrap() + .set_col(name, group_by_index, index as usize, Some(val)) + .unwrap(); + } else { + tracing::error!("Unhandled") + }; + + Ok(()) + }) + } + + #[pyo3(signature=(name, index, val, group_by_index = None))] + pub fn set_float_col( + &self, + name: &str, + index: u32, + val: Py, + group_by_index: Option, + ) -> PyResult<()> { + Python::with_gil(|py| { + if val.is_none(py) { + self.0 + .lock() + .unwrap() + .set_col(name, group_by_index, index as usize, None as Option) + .unwrap(); + } else if let Ok(val) = val.extract::(py) { + self.0 + .lock() + .unwrap() + .set_col(name, group_by_index, index as usize, Some(val)) + .unwrap(); + } else { + tracing::error!("Unhandled") + }; + + Ok(()) + }) + } + + #[pyo3(signature=(name, index, val, group_by_index = None))] + pub fn set_boolean_col( + &self, + name: &str, + index: u32, + val: Py, + group_by_index: Option, + ) -> PyResult<()> { + Python::with_gil(|py| { + if val.is_none(py) { + self.0 + .lock() + .unwrap() + .set_col(name, group_by_index, index as usize, None as Option) + .unwrap(); + } else if let Ok(val) = val.extract::(py) { + self.0 + .lock() + .unwrap() + .set_col(name, group_by_index, index as usize, Some(val)) + .unwrap(); + } else { + tracing::error!("Unhandled") + }; + + Ok(()) + }) + } + + #[pyo3(signature=(name, index, val, group_by_index = None))] + pub fn set_datetime_col( + &self, + name: &str, + index: u32, + val: Py, + group_by_index: Option, + ) -> PyResult<()> { + Python::with_gil(|py| { + if val.is_none(py) { + self.0 + .lock() + .unwrap() + .set_col(name, group_by_index, index as usize, None as Option) + .unwrap(); + } else if let Ok(val) = val.downcast_bound::(py) { + let dt: DateTime = Utc + .with_ymd_and_hms( + val.getattr("year")?.extract()?, + val.getattr("month")?.extract()?, + val.getattr("day")?.extract()?, + 0, + 0, + 0, + ) + .unwrap(); + let timestamp = dt.timestamp() * 1000; + self.0 + .lock() + .unwrap() + .set_col(name, group_by_index, index as usize, Some(timestamp)) + .unwrap(); + } else if let Ok(val) = val.extract::(py) { + self.0 + .lock() + .unwrap() + .set_col(name, group_by_index, index as usize, Some(val)) + .unwrap(); + } else { + tracing::error!("Unhandled") + }; + + Ok(()) + }) + } +} + +#[pyclass(name = "VirtualServer")] +pub struct PyVirtualServer(VirtualServer); + +#[pymethods] +impl PyVirtualServer { + #[new] + pub fn new(handler: Py) -> PyResult { + Ok(PyVirtualServer(VirtualServer::new(PyServerHandler( + handler, + )))) + } + + pub fn handle_request(&mut self, bytes: Py) -> PyResult> { + Python::with_gil(|py| { + let bytes = self + .0 + .handle_request(bytes::Bytes::from(bytes.as_bytes(py).to_vec())); + + match bytes.get_internal_error() { + Ok(x) => Ok(PyBytes::new(py, &x).unbind()), + Err(Ok(x)) => Err(x), + Err(Err(x)) => Err(PyValueError::new_err(x)), + } + }) + } +} diff --git a/rust/perspective-server/Cargo.toml b/rust/perspective-server/Cargo.toml index 31c13fc537..b86f27c927 100644 --- a/rust/perspective-server/Cargo.toml +++ b/rust/perspective-server/Cargo.toml @@ -45,12 +45,26 @@ shlex = "1.3.0" protobuf-src = { version = "2.0.1" } [dependencies] -link-cplusplus = "1.0.9" perspective-client = { version = "3.8.0" } + +# Key order is frequently implicitly relied upon in dynamic languages, so for +# convenience we try to provide this (as well as explicit metadata calls). +indexmap = { version = "2.2.6", features = ["serde"] } + +# Convenient way to crawl the C++ static archive path +link-cplusplus = "1.0.9" async-lock = "2.5.0" +serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0.107", features = ["raw_value"] } tracing = { version = ">=0.1.36" } +thiserror = { version = "1.0.55" } futures = "0.3" +[dependencies.prost] +version = "0.12.3" +default-features = false +features = ["prost-derive", "std"] + [lib] crate-type = ["rlib"] path = "src/lib.rs" diff --git a/rust/perspective-server/src/lib.rs b/rust/perspective-server/src/lib.rs index 104ede64cf..2e4110a322 100644 --- a/rust/perspective-server/src/lib.rs +++ b/rust/perspective-server/src/lib.rs @@ -16,8 +16,13 @@ mod ffi; mod local_client; mod local_session; mod server; +mod virtual_server; pub use ffi::{num_cpus, set_num_cpus}; pub use local_client::LocalClient; pub use local_session::LocalSession; pub use server::{Server, ServerError, ServerResult, SessionHandler}; +pub use virtual_server::{ + Features, ResultExt, VirtualDataColumn, VirtualDataSlice, VirtualServer, VirtualServerError, + VirtualServerHandler, +}; diff --git a/rust/perspective-server/src/virtual_server.rs b/rust/perspective-server/src/virtual_server.rs new file mode 100644 index 0000000000..6ab2b7bb7a --- /dev/null +++ b/rust/perspective-server/src/virtual_server.rs @@ -0,0 +1,550 @@ +// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃ +// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃ +// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃ +// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃ +// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫ +// ┃ Copyright (c) 2017, the Perspective Authors. ┃ +// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃ +// ┃ This file is part of the Perspective library, distributed under the terms ┃ +// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃ +// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + +use std::borrow::Cow; +use std::collections::HashMap; +use std::error::Error; +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; + +use ::indexmap::IndexMap; +use perspective_client::config::{Scalar, ViewConfig, ViewConfigUpdate}; +use perspective_client::proto::get_features_resp::{ + AggregateArgs, AggregateOptions, ColumnTypeOptions, +}; +use perspective_client::proto::response::ClientResp; +use perspective_client::proto::table_validate_expr_resp::ExprValidationError; +use perspective_client::proto::{ + ColumnType, GetFeaturesResp, GetHostedTablesResp, HostedTable, Request, Response, + TableMakePortReq, TableMakePortResp, TableMakeViewResp, TableSchemaResp, TableSizeResp, + TableValidateExprResp, ViewColumnPathsResp, ViewDeleteResp, ViewDimensionsResp, + ViewExpressionSchemaResp, ViewGetConfigResp, ViewPort, ViewSchemaResp, ViewToColumnsStringResp, +}; +use prost::bytes::{Bytes, BytesMut}; +use prost::{DecodeError, EncodeError, Message as ProstMessage}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +#[derive(Clone, Error, Debug)] +pub enum VirtualServerError { + #[error("External Error: {0:?}")] + InternalError(#[from] T), + + #[error("{0}")] + DecodeError(DecodeError), + + #[error("{0}")] + EncodeError(EncodeError), + + #[error("Unknown view '{0}'")] + UnknownViewId(String), + + #[error("Invalid JSON'{0}'")] + InvalidJSON(Arc), +} + +pub trait ResultExt { + fn get_internal_error(self) -> Result>; +} + +impl ResultExt for Result> { + fn get_internal_error(self) -> Result> { + match self { + Ok(x) => Ok(x), + Err(VirtualServerError::InternalError(x)) => Err(Ok(x)), + Err(x) => Err(Err(x.to_string())), + } + } +} + +macro_rules! respond { + ($msg:ident, $name:ident { $($rest:tt)* }) => {{ + let mut resp = BytesMut::new(); + let resp2 = ClientResp::$name($name { + $($rest)* + }); + + Response { + msg_id: $msg.msg_id, + entity_id: $msg.entity_id, + client_resp: Some(resp2), + }.encode(&mut resp).map_err(VirtualServerError::EncodeError)?; + + resp.freeze() + }}; +} + +pub trait VirtualServerHandler { + type Error: std::error::Error + Send + Sync + 'static; + + // Required + fn get_hosted_tables(&self) -> Result, Self::Error>; + fn table_schema(&self, table_id: &str) -> Result, Self::Error>; + fn table_size(&self, table_id: &str) -> Result; + fn table_columns_size(&self, table_id: &str, config: &ViewConfig) -> Result; + fn table_make_view( + &mut self, + entity_id: &str, + view_id: &str, + config: &mut ViewConfigUpdate, + ) -> Result; + + fn view_size(&self, view_id: &str) -> Result; + fn view_delete(&self, view_id: &str) -> Result<(), Self::Error>; + fn view_schema( + &self, + entity_id: &str, + config: &ViewConfig, + ) -> Result, Self::Error>; + + fn view_get_data( + &self, + view_id: &str, + config: &ViewConfig, + viewport: &ViewPort, + ) -> Result; + + // Optional + fn table_validate_expression( + &self, + _table_id: &str, + _expression: &str, + ) -> Result { + Ok(ColumnType::Float) + } + + fn get_features(&self) -> Result, Self::Error> { + Ok(Features::default()) + } + + fn table_make_port(&self, _req: &TableMakePortReq) -> Result { + Ok(0) + } +} + +// output format +#[derive(Debug, Serialize)] +#[serde(untagged)] +pub enum VirtualDataColumn { + Boolean(Vec>), + String(Vec>), + Float(Vec>), + Integer(Vec>), + Datetime(Vec>), + IntegerIndex(Vec>>), + RowPath(Vec>), +} + +pub trait SetVirtualDataColumn { + fn write_to(self, col: &mut VirtualDataColumn) -> Result<(), &'static str>; + fn new_column() -> VirtualDataColumn; + fn to_scalar(self) -> Scalar; +} + +macro_rules! template_psp { + ($t:ty, $u:ident, $v:ident, $w:ty) => { + impl SetVirtualDataColumn for Option<$t> { + fn write_to(self, col: &mut VirtualDataColumn) -> Result<(), &'static str> { + if let VirtualDataColumn::$u(x) = col { + x.push(self); + Ok(()) + } else { + Err("Bad type") + } + } + + fn new_column() -> VirtualDataColumn { + VirtualDataColumn::$u(vec![]) + } + + fn to_scalar(self) -> Scalar { + if let Some(x) = self { + Scalar::$v(x as $w) + } else { + Scalar::Null + } + } + } + }; +} + +template_psp!(String, String, String, String); +template_psp!(f64, Float, Float, f64); +template_psp!(i32, Integer, Float, f64); +// template_psp!(i, Integer); +template_psp!(i64, Datetime, Float, f64); +template_psp!(bool, Boolean, Bool, bool); + +#[derive(Debug, Default, Serialize)] +pub struct VirtualDataSlice(IndexMap); + +impl Deref for VirtualDataSlice { + type Target = IndexMap; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for VirtualDataSlice { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl VirtualDataSlice { + pub fn set_col( + &mut self, + name: &str, + group_by_index: Option, + index: usize, + value: T, + ) -> Result<(), Box> { + if group_by_index.is_some() { + let col = + if let Some(VirtualDataColumn::RowPath(row_path)) = self.get_mut("__ROW_PATH__") { + row_path + } else { + self.insert( + "__ROW_PATH__".to_owned(), + VirtualDataColumn::RowPath(vec![]), + ); + let Some(VirtualDataColumn::RowPath(rp)) = self.get_mut("__ROW_PATH__") else { + panic!("Irrefutable") + }; + + rp + }; + + if let Some(row) = col.get_mut(index) { + let scalar = value.to_scalar(); + row.push(scalar); + } else { + while col.len() < index { + col.push(vec![]) + } + + let scalar = value.to_scalar(); + col.push(vec![scalar]); + } + + Ok(()) + } else { + let col = if let Some(col) = self.get_mut(name) { + col + } else { + self.insert(name.to_owned(), T::new_column()); + self.get_mut(name).unwrap() + }; + + Ok(value.write_to(col)?) + } + } +} + +/// DTO for `GetFeaturesResp` +#[derive(Debug, Default, Deserialize)] +pub struct Features<'a> { + #[serde(default)] + pub group_by: bool, + + #[serde(default)] + pub split_by: bool, + + #[serde(default)] + pub filter_ops: IndexMap>>, + + #[serde(default)] + pub aggregates: IndexMap>>, + + #[serde(default)] + pub sort: bool, + + #[serde(default)] + pub expressions: bool, + + #[serde(default)] + pub on_update: bool, +} + +#[derive(Debug, Deserialize)] +#[serde(untagged)] +pub enum AggSpec<'a> { + Single(Cow<'a, str>), + Multiple(Cow<'a, str>, Vec), +} + +impl<'a> From> for perspective_client::proto::GetFeaturesResp { + fn from(value: Features<'a>) -> perspective_client::proto::GetFeaturesResp { + GetFeaturesResp { + group_by: value.group_by, + split_by: value.split_by, + expressions: value.expressions, + on_update: value.on_update, + sort: value.sort, + aggregates: value + .aggregates + .iter() + .map(|(dtype, aggs)| { + (*dtype as u32, AggregateOptions { + aggregates: aggs + .iter() + .map(|agg| match agg { + AggSpec::Single(cow) => AggregateArgs { + name: cow.to_string(), + args: vec![], + }, + AggSpec::Multiple(cow, column_types) => AggregateArgs { + name: cow.to_string(), + args: column_types.iter().map(|x| *x as i32).collect(), + }, + }) + .collect(), + }) + }) + .collect(), + filter_ops: value + .filter_ops + .iter() + .map(|(ty, options)| { + (*ty as u32, ColumnTypeOptions { + options: options.iter().map(|x| (*x).to_string()).collect(), + }) + }) + .collect(), + } + } +} + +pub struct VirtualServer { + handler: T, + view_to_table: IndexMap, + view_configs: IndexMap, +} + +impl VirtualServer { + pub fn new(handler: T) -> Self { + Self { + handler, + view_configs: IndexMap::default(), + view_to_table: IndexMap::default(), + } + } + + pub fn handle_request(&mut self, bytes: Bytes) -> Result> { + use perspective_client::proto::request::ClientReq::*; + + let msg = Request::decode(bytes).map_err(VirtualServerError::DecodeError)?; + let resp = match msg.client_req.unwrap() { + GetFeaturesReq(_) => { + tracing::debug!("GetFeaturesReq"); + let features = self.handler.get_features()?; + respond!(msg, GetFeaturesResp { ..features.into() }) + }, + GetHostedTablesReq(_) => { + tracing::debug!("GetHostedTablesReq"); + respond!(msg, GetHostedTablesResp { + table_infos: self.handler.get_hosted_tables()? + }) + }, + TableSchemaReq(_) => { + tracing::debug!("TableSchemaReq"); + respond!(msg, TableSchemaResp { + schema: self + .handler + .table_schema(msg.entity_id.as_str()) + .ok() + .map(|value| perspective_client::proto::Schema { + schema: value + .iter() + .map(|x| perspective_client::proto::schema::KeyTypePair { + name: x.0.to_string(), + r#type: *x.1 as i32, + }) + .collect(), + }) + }) + }, + TableMakePortReq(req) => { + tracing::debug!("TableMakePortReq"); + respond!(msg, TableMakePortResp { + port_id: self.handler.table_make_port(&req)? + }) + }, + TableMakeViewReq(req) => { + tracing::debug!("TableMakeViewReq"); + self.view_to_table + .insert(req.view_id.clone(), msg.entity_id.clone()); + + let mut config: ViewConfigUpdate = req.config.clone().unwrap_or_default().into(); + let bytes = respond!(msg, TableMakeViewResp { + view_id: self.handler.table_make_view( + msg.entity_id.as_str(), + req.view_id.as_str(), + &mut config + )? + }); + + self.view_configs.insert(req.view_id.clone(), config.into()); + bytes + }, + TableSizeReq(_) => { + tracing::debug!("TableSizeReq"); + respond!(msg, TableSizeResp { + size: self.handler.table_size(msg.entity_id.as_str())? + }) + }, + TableValidateExprReq(req) => { + tracing::debug!("TableValidateExprReq"); + let mut expression_schema = HashMap::::default(); + let mut expression_alias = HashMap::::default(); + let mut errors = HashMap::::default(); + for (name, ex) in req.column_to_expr.iter() { + let _ = expression_alias.insert(name.clone(), ex.clone()); + match self + .handler + .table_validate_expression(&msg.entity_id, ex.as_str()) + { + Ok(dtype) => { + let _ = expression_schema.insert(name.clone(), dtype as i32); + }, + Err(e) => { + let _ = errors.insert(name.clone(), ExprValidationError { + error_message: format!("{}", e), + line: 0, + column: 0, + }); + }, + } + } + + respond!(msg, TableValidateExprResp { + expression_schema, + errors, + expression_alias, + }) + }, + ViewSchemaReq(_) => { + tracing::debug!("ViewSchemaReq"); + respond!(msg, ViewSchemaResp { + schema: self + .handler + .view_schema( + msg.entity_id.as_str(), + self.view_configs.get(&msg.entity_id).unwrap() + )? + .into_iter() + .map(|(x, y)| (x, y as i32)) + .collect() + }) + }, + ViewDimensionsReq(_) => { + tracing::debug!("ViewDimensionsReq"); + let view_id = &msg.entity_id; + let table_id = self + .view_to_table + .get(view_id) + .ok_or_else(|| VirtualServerError::UnknownViewId(view_id.to_string()))?; + + let num_table_rows = self.handler.table_size(table_id)?; + let num_table_columns = self.handler.table_schema(table_id)?.len() as u32; + let config = self.view_configs.get(view_id).unwrap(); + let num_view_columns = self.handler.table_columns_size(view_id, config)?; + let num_view_rows = self.handler.view_size(view_id)?; + let resp = ViewDimensionsResp { + num_table_columns, + num_table_rows, + num_view_columns, + num_view_rows, + }; + + respond!(msg, ViewDimensionsResp { ..resp }) + }, + ViewGetConfigReq(_) => { + tracing::debug!("ViewGetConfigReq"); + + respond!(msg, ViewGetConfigResp { + config: Some( + ViewConfigUpdate::from( + self.view_configs.get(&msg.entity_id).unwrap().clone() + ) + .into() + ) + }) + }, + ViewExpressionSchemaReq(_) => { + tracing::debug!("ViewGetConfigReq"); + let mut schema = HashMap::::default(); + let table_id = self.view_to_table.get(&msg.entity_id); + for (name, ex) in self + .view_configs + .get(&msg.entity_id) + .unwrap() + .expressions + .iter() + { + match self + .handler + .table_validate_expression(table_id.unwrap(), ex.as_str()) + { + Ok(dtype) => { + let _ = schema.insert(name.clone(), dtype as i32); + }, + Err(_e) => todo!(), + } + } + + let resp = ViewExpressionSchemaResp { schema }; + respond!(msg, ViewExpressionSchemaResp { ..resp }) + }, + ViewColumnPathsReq(_) => { + tracing::debug!("ViewColumnPathsReq"); + respond!(msg, ViewColumnPathsResp { + paths: self + .handler + .view_schema( + msg.entity_id.as_str(), + self.view_configs.get(&msg.entity_id).unwrap() + )? + .keys() + .cloned() + .collect() + }) + }, + ViewToColumnsStringReq(view_to_columns_string_req) => { + tracing::debug!("ViewToColumnsStringReq"); + let viewport = view_to_columns_string_req.viewport.unwrap(); + let config = self.view_configs.get(&msg.entity_id).unwrap(); + let cols = self + .handler + .view_get_data(msg.entity_id.as_str(), config, &viewport)?; + let json_string = serde_json::to_string(&cols) + .map_err(|e| VirtualServerError::InvalidJSON(Arc::new(e)))?; + respond!(msg, ViewToColumnsStringResp { json_string }) + }, + ViewDeleteReq(_) => { + tracing::debug!("ViewDeleteReq"); + self.handler.view_delete(msg.entity_id.as_str())?; + self.view_to_table.shift_remove(&msg.entity_id); + self.view_configs.shift_remove(&msg.entity_id); + respond!(msg, ViewDeleteResp {}) + }, + + x => { + tracing::error!("Not handled {:?}", x); + Bytes::new() + }, + }; + + Ok(resp) + } +} diff --git a/rust/perspective/Cargo.toml b/rust/perspective/Cargo.toml index eb917ba9fe..cfdb039a5d 100644 --- a/rust/perspective/Cargo.toml +++ b/rust/perspective/Cargo.toml @@ -42,5 +42,14 @@ perspective-client = { version = "3.8.0" } perspective-server = { version = "3.8.0" } tracing = { version = ">=0.1.36" } axum = { version = ">=0.8,<0.9", features = ["ws"], optional = true } +fallible-iterator = "0.3.0" +indexmap = "2.2.6" +serde = { version = "1.0" } +serde_json = { version = "1.0.107" } tokio = { version = "~1", features = ["full"], optional = true } futures = { version = "~0", optional = true } + +[dependencies.prost] +version = "0.12.3" +default-features = false +features = ["prost-derive", "std"] diff --git a/rust/perspective/src/lib.rs b/rust/perspective/src/lib.rs index 965c48bac2..fb270e5161 100644 --- a/rust/perspective/src/lib.rs +++ b/rust/perspective/src/lib.rs @@ -54,5 +54,7 @@ #[cfg(feature = "axum-ws")] pub mod axum; +pub mod virtual_server; +pub use perspective_client::proto; pub use {perspective_client as client, perspective_server as server}; diff --git a/rust/perspective/src/virtual_server.rs b/rust/perspective/src/virtual_server.rs new file mode 100644 index 0000000000..a094c96688 --- /dev/null +++ b/rust/perspective/src/virtual_server.rs @@ -0,0 +1,76 @@ +// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃ +// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃ +// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃ +// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃ +// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫ +// ┃ Copyright (c) 2017, the Perspective Authors. ┃ +// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃ +// ┃ This file is part of the Perspective library, distributed under the terms ┃ +// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃ +// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + +use std::net::SocketAddr; + +use axum::extract::connect_info::ConnectInfo; +use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade}; +use axum::routing::{MethodRouter, get}; +use perspective_server::{VirtualServer, VirtualServerHandler}; + +/// A local error synonym for this module only. +type PerspectiveWSError = Box; + +pub type PSPError = Box; + +/// The inner message loop handles the full-duplex stream of messages +/// between the [`perspective::Client`] and [`Session`]. When this +/// funciton returns, messages are no longer processed. +async fn process_message_loop( + socket: &mut WebSocket, + handler: impl VirtualServerHandler, +) -> Result<(), PerspectiveWSError> { + use Message::*; + let mut processor = VirtualServer::new(handler); + loop { + match socket.recv().await { + Some(Ok(Binary(msg))) => socket.send(Binary(processor.handle_request(msg)?)).await?, + Some(_) | None => { + tracing::debug!("Unexpected msg"); + break; + }, + }; + } + + Ok(()) +} + +/// This handler is responsible for the beginning-to-end lifecycle of a +/// single WebSocket connection to an [`axum`] server. +/// +/// Messages will come in from the [`axum::extract::ws::WebSocket`] in binary +/// form via [`Message::Binary`], where they'll be routed to +/// [`perspective::Session::handle_request`]. The server may generate +/// one or more responses, which it will then send back to +/// the [`axum::extract::ws::WebSocket::send`] method via its +/// [`SessionHandler`] impl. +pub fn custom_websocket_handler(handler: T) -> MethodRouter +where + T: VirtualServerHandler + Clone + Send + Sync + 'static, + S: Clone + Send + Sync + 'static, +{ + let websocket_handler_internal = async |ws: WebSocketUpgrade, + ConnectInfo(addr): ConnectInfo| + -> axum::response::Response { + tracing::info!("{addr} Connected."); + + ws.on_upgrade(move |mut socket| async move { + if let Err(msg) = process_message_loop(&mut socket, handler).await { + tracing::error!("Internal error {}", msg); + } + + tracing::info!("{addr} Disconnected."); + }) + }; + + get(websocket_handler_internal) +}