Skip to content

Commit 25c2b67

Browse files
authored
Upgrade csp to perspective 3.x (#392)
* Upgrade csp to perspective 3.x Signed-off-by: Pascal Tomecek <pascal.tomecek@cubistsystematic.com> Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
1 parent 8274827 commit 25c2b67

File tree

10 files changed

+352
-109
lines changed

10 files changed

+352
-109
lines changed

.github/workflows/build.yml

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -670,9 +670,9 @@ jobs:
670670
python-version:
671671
- 3.9
672672
package:
673-
- "sqlalchemy>=2"
674673
- "sqlalchemy<2"
675-
- "numpy==1.19.5"
674+
- "numpy==1.22.4" # Min supported version of pandas 2.2
675+
- "perspective-python<3"
676676

677677
runs-on: ${{ matrix.os }}
678678

@@ -709,11 +709,15 @@ jobs:
709709

710710
- name: Python Test Steps
711711
run: make test TEST_ARGS="-k TestDBReader"
712-
if: ${{ contains( 'sqlalchemy', matrix.package )}}
712+
if: ${{ contains( matrix.package, 'sqlalchemy' )}}
713713

714714
- name: Python Test Steps
715715
run: make test
716-
if: ${{ contains( 'numpy', matrix.package )}}
716+
if: ${{ contains( matrix.package, 'numpy' )}}
717+
718+
- name: Python Test Steps
719+
run: make test TEST_ARGS="-k Perspective"
720+
if: ${{ contains( matrix.package, 'perspective' )}}
717721

718722
###########################################################################################################
719723
#.........................................................................................................#

conda/dev-environment-unix.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ dependencies:
2424
- librdkafka
2525
- lz4-c
2626
- mamba
27-
- mdformat>=0.7.17,<0.8
27+
- mdformat=0.7.17
2828
- ninja
2929
- numpy<2
3030
- pandas

conda/dev-environment-win.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ dependencies:
2323
- lz4-c
2424
- make
2525
- mamba
26-
- mdformat>=0.7.17,<0.8
26+
- mdformat=0.7.17
2727
- ninja
2828
- numpy<2
2929
- pandas

csp/adapters/perspective.py

Lines changed: 57 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
11
import threading
22
from datetime import timedelta
3+
from perspective import Table as Table_, View as View_
34
from typing import Dict, Optional, Union
45

56
import csp
67
from csp import ts
8+
from csp.impl.perspective_common import (
9+
date_to_perspective,
10+
datetime_to_perspective,
11+
is_perspective3,
12+
perspective_type_map,
13+
)
714
from csp.impl.wiring.delayed_node import DelayedNodeWrapperDef
815

916
try:
@@ -14,20 +21,17 @@
1421
raise ImportError("perspective adapter requires tornado package")
1522

1623

17-
try:
18-
from perspective import PerspectiveManager, Table as Table_, View as View_, __version__, set_threadpool_size
19-
20-
MAJOR, MINOR, PATCH = map(int, __version__.split("."))
21-
if (MAJOR, MINOR, PATCH) < (0, 6, 2):
22-
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
23-
except ImportError:
24-
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
24+
_PERSPECTIVE_3 = is_perspective3()
25+
if _PERSPECTIVE_3:
26+
from perspective import Server
27+
else:
28+
from perspective import PerspectiveManager
2529

2630

2731
# Run perspective update in a separate tornado loop
28-
def perspective_thread(manager):
32+
def perspective_thread(client):
2933
loop = tornado.ioloop.IOLoop()
30-
manager.set_loop_callback(loop.add_callback)
34+
client.set_loop_callback(loop.add_callback)
3135
loop.start()
3236

3337

@@ -38,12 +42,25 @@ def _apply_updates(table: object, data: {str: ts[object]}, throttle: timedelta):
3842

3943
with csp.state():
4044
s_buffer = []
45+
s_datetime_cols = set()
46+
s_date_cols = set()
4147

4248
with csp.start():
4349
csp.schedule_alarm(alarm, throttle, True)
50+
if _PERSPECTIVE_3:
51+
s_datetime_cols = set([c for c, t in table.schema().items() if t == "datetime"])
52+
s_date_cols = set([c for c, t in table.schema().items() if t == "date"])
4453

4554
if csp.ticked(data):
46-
s_buffer.append(dict(data.tickeditems()))
55+
row = dict(data.tickeditems())
56+
if _PERSPECTIVE_3:
57+
for col, value in row.items():
58+
if col in s_datetime_cols:
59+
row[col] = datetime_to_perspective(row[col])
60+
if col in s_date_cols:
61+
row[col] = date_to_perspective(row[col])
62+
63+
s_buffer.append(row)
4764

4865
if csp.ticked(alarm):
4966
if len(s_buffer) > 0:
@@ -54,19 +71,25 @@ def _apply_updates(table: object, data: {str: ts[object]}, throttle: timedelta):
5471

5572

5673
@csp.node
57-
def _launch_application(port: int, manager: object, stub: ts[object]):
74+
def _launch_application(port: int, server: object, stub: ts[object]):
5875
with csp.state():
5976
s_app = None
6077
s_ioloop = None
6178
s_iothread = None
6279

6380
with csp.start():
64-
from perspective import PerspectiveTornadoHandler
81+
if _PERSPECTIVE_3:
82+
from perspective.handlers.tornado import PerspectiveTornadoHandler
6583

84+
handler_args = {"perspective_server": server, "check_origin": True}
85+
else:
86+
from perspective import PerspectiveTornadoHandler
87+
88+
handler_args = {"manager": server, "check_origin": True}
6689
s_app = tornado.web.Application(
6790
[
6891
# create a websocket endpoint that the client Javascript can access
69-
(r"/websocket", PerspectiveTornadoHandler, {"manager": manager, "check_origin": True})
92+
(r"/websocket", PerspectiveTornadoHandler, handler_args)
7093
],
7194
websocket_ping_interval=15,
7295
)
@@ -196,21 +219,34 @@ def create_table(self, name, limit=None, index=None):
196219
return table
197220

198221
def _instantiate(self):
199-
set_threadpool_size(self._threadpool_size)
200-
201-
manager = PerspectiveManager()
222+
if _PERSPECTIVE_3:
223+
server = Server()
224+
client = server.new_local_client()
225+
thread = threading.Thread(target=perspective_thread, kwargs=dict(client=client))
226+
else:
227+
from perspective import set_threadpool_size
202228

203-
thread = threading.Thread(target=perspective_thread, kwargs=dict(manager=manager))
229+
set_threadpool_size(self._threadpool_size)
230+
manager = PerspectiveManager()
231+
thread = threading.Thread(target=perspective_thread, kwargs=dict(manager=manager))
204232
thread.daemon = True
205233
thread.start()
206234

207235
for table_name, table in self._tables.items():
208236
schema = {
209237
k: v.tstype.typ if not issubclass(v.tstype.typ, csp.Enum) else str for k, v in table.columns.items()
210238
}
211-
ptable = Table(schema, limit=table.limit, index=table.index)
212-
manager.host_table(table_name, ptable)
239+
if _PERSPECTIVE_3:
240+
psp_type_map = perspective_type_map()
241+
schema = {col: psp_type_map.get(typ, typ) for col, typ in schema.items()}
242+
ptable = client.table(schema, name=table_name, limit=table.limit, index=table.index)
243+
else:
244+
ptable = Table(schema, limit=table.limit, index=table.index)
245+
manager.host_table(table_name, ptable)
213246

214247
_apply_updates(ptable, table.columns, self._throttle)
215248

216-
_launch_application(self._port, manager, csp.const("stub"))
249+
if _PERSPECTIVE_3:
250+
_launch_application(self._port, server, csp.const("stub"))
251+
else:
252+
_launch_application(self._port, manager, csp.const("stub"))

csp/dataframe.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from datetime import datetime, timedelta
1+
from datetime import date, datetime, timedelta
2+
from packaging import version
23
from typing import Dict, Optional
34

45
import csp.baselib
@@ -12,6 +13,7 @@ class DataFrame:
1213
def __init__(self, data: Optional[Dict] = None):
1314
self._data = data or {}
1415
self._columns = list(self._data.keys())
16+
self._psp_client = None
1517

1618
@property
1719
def columns(self):
@@ -204,10 +206,17 @@ def to_perspective(self, starttime: datetime, endtime: datetime = None, realtime
204206
try:
205207
import perspective
206208

209+
if version.parse(perspective.__version__) >= version.parse("3"):
210+
_PERSPECTIVE_3 = True
211+
from perspective.widget import PerspectiveWidget
212+
else:
213+
_PERSPECTIVE_3 = False
214+
from perspective import PerspectiveWidget
215+
207216
global RealtimePerspectiveWidget
208217
if RealtimePerspectiveWidget is None:
209218

210-
class RealtimePerspectiveWidget(perspective.PerspectiveWidget):
219+
class RealtimePerspectiveWidget(PerspectiveWidget):
211220
def __init__(self, engine_runner, *args, **kwargs):
212221
super().__init__(*args, **kwargs)
213222
self._runner = engine_runner
@@ -222,14 +231,14 @@ def join(self):
222231
self._runner.join()
223232

224233
except ImportError:
225-
raise ImportError("eval_perspective requires perspective-python installed")
234+
raise ImportError("to_perspective requires perspective-python installed")
226235

227236
if not realtime:
228237
df = self.to_pandas(starttime, endtime)
229-
return perspective.PerspectiveWidget(df.ffill(), plugin="Y Line", columns=self._columns, group_by="index")
238+
return PerspectiveWidget(df.ffill(), plugin="Y Line", columns=self._columns, group_by="index")
230239

231240
@csp.node
232-
def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, throttle: timedelta):
241+
def apply_updates(table: object, data: Dict[str, csp.ts[object]], timecol: str, throttle: timedelta):
233242
with csp.alarms():
234243
alarm = csp.alarm(bool)
235244
with csp.state():
@@ -240,7 +249,10 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro
240249

241250
if csp.ticked(data):
242251
s_buffer.append(dict(data.tickeditems()))
243-
s_buffer[-1][timecol] = csp.now()
252+
if _PERSPECTIVE_3:
253+
s_buffer[-1][timecol] = int(csp.now().timestamp() * 1000)
254+
else:
255+
s_buffer[-1][timecol] = csp.now()
244256

245257
if csp.ticked(alarm):
246258
if len(s_buffer) > 0:
@@ -252,7 +264,21 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro
252264
timecol = "time"
253265
schema = {k: v.tstype.typ for k, v in self._data.items()}
254266
schema[timecol] = datetime
255-
table = perspective.Table(schema)
267+
if _PERSPECTIVE_3:
268+
perspective_type_map = {
269+
str: "string",
270+
float: "float",
271+
int: "integer",
272+
date: "date",
273+
datetime: "datetime",
274+
bool: "boolean",
275+
}
276+
schema = {col: perspective_type_map[typ] for col, typ in schema.items()}
277+
if self._psp_client is None:
278+
self._psp_client = perspective.Server().new_local_client()
279+
table = self._psp_client.table(schema)
280+
else:
281+
table = perspective.Table(schema)
256282
runner = csp.run_on_thread(
257283
apply_updates,
258284
table,

0 commit comments

Comments
 (0)