Skip to content

Commit e0a1277

Browse files
committed
Refactor perspective utilities into common module and add adapter test
Signed-off-by: Pascal Tomecek <[email protected]>
1 parent 8030fcb commit e0a1277

File tree

9 files changed

+148
-59
lines changed

9 files changed

+148
-59
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,7 @@ jobs:
672672
package:
673673
- "sqlalchemy<2"
674674
- "numpy==1.19.5"
675+
- "pandas<1.5" # Any later does not support numpy==1.19.5
675676
- "perspective-python<3"
676677

677678
runs-on: ${{ matrix.os }}

conda/dev-environment-unix.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ dependencies:
2424
- librdkafka
2525
- lz4-c
2626
- mamba
27-
- mdformat>=0.7.17,<0.8
27+
- mdformat=0.7.17
2828
- ninja
2929
- numpy<2
3030
- pandas

conda/dev-environment-win.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ dependencies:
2323
- lz4-c
2424
- make
2525
- mamba
26-
- mdformat>=0.7.17,<0.8
26+
- mdformat=0.7.17
2727
- ninja
2828
- numpy<2
2929
- pandas

csp/adapters/perspective.py

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
import threading
22
from datetime import timedelta
3-
from packaging import version
3+
from perspective import Table as Table_, View as View_
44
from typing import Dict, Optional, Union
55

66
import csp
77
from csp import ts
8+
from csp.impl.perspective_common import (
9+
date_to_perspective,
10+
datetime_to_perspective,
11+
is_perspective3,
12+
perspective_type_map,
13+
)
814
from csp.impl.wiring.delayed_node import DelayedNodeWrapperDef
915

1016
try:
@@ -14,19 +20,12 @@
1420
except ImportError:
1521
raise ImportError("perspective adapter requires tornado package")
1622

17-
try:
18-
from perspective import Server, Table as Table_, View as View_, __version__, set_threadpool_size
19-
20-
if version.parse(__version__) >= version.parse("3"):
21-
_PERSPECTIVE_3 = True
22-
elif version.parse(__version__) >= version.parse("0.6.2"):
23-
from perspective import PerspectiveManager
2423

25-
_PERSPECTIVE_3 = False
26-
else:
27-
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
28-
except ImportError:
29-
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
24+
_PERSPECTIVE_3 = is_perspective3()
25+
if _PERSPECTIVE_3:
26+
from perspective import Server
27+
else:
28+
from perspective import PerspectiveManager
3029

3130

3231
# Run perspective update in a separate tornado loop
@@ -43,12 +42,25 @@ def _apply_updates(table: object, data: {str: ts[object]}, throttle: timedelta):
4342

4443
with csp.state():
4544
s_buffer = []
45+
s_datetime_cols = set()
46+
s_date_cols = set()
4647

4748
with csp.start():
4849
csp.schedule_alarm(alarm, throttle, True)
50+
if _PERSPECTIVE_3:
51+
s_datetime_cols = set([c for c, t in table.schema().items() if t == "datetime"])
52+
s_date_cols = set([c for c, t in table.schema().items() if t == "date"])
4953

5054
if csp.ticked(data):
51-
s_buffer.append(dict(data.tickeditems()))
55+
row = dict(data.tickeditems())
56+
if _PERSPECTIVE_3:
57+
for col, value in row.items():
58+
if col in s_datetime_cols:
59+
row[col] = datetime_to_perspective(row[col])
60+
if col in s_date_cols:
61+
row[col] = date_to_perspective(row[col])
62+
63+
s_buffer.append(row)
5264

5365
if csp.ticked(alarm):
5466
if len(s_buffer) > 0:
@@ -66,11 +78,13 @@ def _launch_application(port: int, server: object, stub: ts[object]):
6678
s_iothread = None
6779

6880
with csp.start():
69-
from perspective import PerspectiveTornadoHandler
70-
7181
if _PERSPECTIVE_3:
82+
from perspective.handlers.tornado import PerspectiveTornadoHandler
83+
7284
handler_args = {"perspective_server": server, "check_origin": True}
7385
else:
86+
from perspective import PerspectiveTornadoHandler
87+
7488
handler_args = {"manager": server, "check_origin": True}
7589
s_app = tornado.web.Application(
7690
[
@@ -205,12 +219,14 @@ def create_table(self, name, limit=None, index=None):
205219
return table
206220

207221
def _instantiate(self):
208-
set_threadpool_size(self._threadpool_size)
209222
if _PERSPECTIVE_3:
210223
server = Server()
211224
client = server.new_local_client()
212225
thread = threading.Thread(target=perspective_thread, kwargs=dict(client=client))
213226
else:
227+
from perspective import set_threadpool_size
228+
229+
set_threadpool_size(self._threadpool_size)
214230
manager = PerspectiveManager()
215231
thread = threading.Thread(target=perspective_thread, kwargs=dict(manager=manager))
216232
thread.daemon = True
@@ -221,10 +237,12 @@ def _instantiate(self):
221237
k: v.tstype.typ if not issubclass(v.tstype.typ, csp.Enum) else str for k, v in table.columns.items()
222238
}
223239
if _PERSPECTIVE_3:
240+
psp_type_map = perspective_type_map()
241+
schema = {col: psp_type_map.get(typ, typ) for col, typ in schema.items()}
242+
ptable = client.table(schema, name=table_name, limit=table.limit, index=table.index)
243+
else:
224244
ptable = Table(schema, limit=table.limit, index=table.index)
225245
manager.host_table(table_name, ptable)
226-
else:
227-
ptable = client.table(schema, name=table_name, limit=table.limit, index=table.index)
228246

229247
_apply_updates(ptable, table.columns, self._throttle)
230248

csp/impl/pandas_perspective.py

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,24 @@
11
import pandas as pd
22
import pytz
3-
from datetime import date, datetime, timedelta
4-
from packaging import version
3+
from datetime import datetime, timedelta
54
from pandas.compat import set_function_name
65
from typing import Optional
76

87
import csp
98
import csp.impl.pandas_accessor # To ensure that the csp accessors are registered
109
from csp.impl.pandas_ext_type import is_csp_type
10+
from csp.impl.perspective_common import (
11+
PerspectiveWidget,
12+
date_to_perspective,
13+
datetime_to_perspective,
14+
is_perspective3,
15+
perspective,
16+
perspective_type_map,
17+
)
1118

1219
_ = csp.impl.pandas_accessor
1320

14-
try:
15-
import perspective
16-
17-
if version.parse(perspective.__version__) >= version.parse("3"):
18-
_PERSPECTIVE_3 = True
19-
from perspective.widget import PerspectiveWidget
20-
else:
21-
_PERSPECTIVE_3 = False
22-
from perspective import PerspectiveWidget
23-
except ImportError:
24-
raise ImportError(
25-
"perspective must be installed to use this module. " "To install, run 'pip install perspective-python'."
26-
)
21+
_PERSPECTIVE_3 = is_perspective3()
2722

2823

2924
@csp.node
@@ -43,6 +38,7 @@ def _apply_updates(
4338
s_buffer = []
4439
s_has_time_col = False
4540
s_datetime_cols = set()
41+
s_date_cols = set()
4642

4743
with csp.start():
4844
if throttle > timedelta(0):
@@ -67,26 +63,25 @@ def _apply_updates(
6763
if index_col:
6864
row[index_col] = idx
6965
if s_has_time_col:
70-
if localize or _PERSPECTIVE_3:
66+
if localize:
7167
row[time_col] = pytz.utc.localize(csp.now())
7268
else:
7369
row[time_col] = csp.now()
7470
if _PERSPECTIVE_3:
75-
row[time_col] = int(row[time_col].timestamp() * 1000)
71+
row[time_col] = datetime_to_perspective(row[time_col])
7672
else:
7773
row = new_rows[idx]
7874

79-
if (localize or _PERSPECTIVE_3) and col in s_datetime_cols and value.tzinfo is None:
75+
if localize and col in s_datetime_cols and value.tzinfo is None:
8076
row[col] = pytz.utc.localize(value)
8177
else:
8278
row[col] = value
8379

8480
if _PERSPECTIVE_3:
8581
if col in s_datetime_cols:
86-
row[col] = int(row[col].timestamp() * 1000)
82+
row[col] = datetime_to_perspective(row[col])
8783
if col in s_date_cols:
88-
d = row[col]
89-
row[col] = int(datetime(year=d.year, month=d.month, day=d.day, tzinfo=pytz.UTC).timestamp() * 1000)
84+
row[col] = date_to_perspective(row[col])
9085

9186
if static_records:
9287
for idx, row in new_rows.items():
@@ -181,15 +176,8 @@ def __init__(
181176
else:
182177
schema[col] = static_schema[col]
183178
if _PERSPECTIVE_3:
184-
perspective_type_map = {
185-
str: "string",
186-
float: "float",
187-
int: "integer",
188-
date: "date",
189-
datetime: "datetime",
190-
bool: "boolean",
191-
}
192-
schema = {col: perspective_type_map.get(typ, typ) for col, typ in schema.items()}
179+
psp_type_map = perspective_type_map()
180+
schema = {col: psp_type_map.get(typ, typ) for col, typ in schema.items()}
193181

194182
if self._keep_history:
195183
if _PERSPECTIVE_3:

csp/impl/perspective_common.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import pytz
2+
from datetime import date, datetime
3+
from packaging import version
4+
5+
try:
6+
import perspective
7+
8+
if version.parse(perspective.__version__) >= version.parse("3"):
9+
_PERSPECTIVE_3 = True
10+
from perspective.widget import PerspectiveWidget
11+
12+
elif version.parse(perspective.__version__) >= version.parse("0.6.2"):
13+
from perspective import PerspectiveManager, PerspectiveWidget # noqa F401
14+
15+
_PERSPECTIVE_3 = False
16+
else:
17+
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
18+
19+
except ImportError:
20+
raise ImportError(
21+
"perspective must be installed to use this module. " "To install, run 'pip install perspective-python'."
22+
)
23+
24+
25+
def is_perspective3():
26+
"""Whether the perspective version is >= 3"""
27+
return _PERSPECTIVE_3
28+
29+
30+
def perspective_type_map():
31+
"""Return the mapping of standard python types to perspective types"""
32+
return {
33+
str: "string",
34+
float: "float",
35+
int: "integer",
36+
date: "date",
37+
datetime: "datetime",
38+
bool: "boolean",
39+
}
40+
41+
42+
def datetime_to_perspective(dt: datetime) -> int:
43+
"""Convert a python datetime to an integer number of milliseconds for perspective >= 3"""
44+
if dt.tzinfo is None:
45+
dt = pytz.utc.localize(dt)
46+
return int(dt.timestamp() * 1000)
47+
48+
49+
def date_to_perspective(d: date) -> int:
50+
"""Convert a python date to an integer number of milliseconds for perspective >= 3"""
51+
return int(datetime(year=d.year, month=d.month, day=d.day, tzinfo=pytz.UTC).timestamp() * 1000)
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import unittest
2+
from datetime import date, datetime, timedelta
3+
4+
import csp
5+
6+
try:
7+
from csp.adapters.perspective import PerspectiveAdapter
8+
from csp.impl.pandas_perspective import CspPerspectiveMultiTable, CspPerspectiveTable
9+
from csp.impl.perspective_common import PerspectiveWidget, is_perspective3
10+
except ImportError:
11+
raise unittest.SkipTest("skipping perspective tests")
12+
13+
14+
class MyStruct(csp.Struct):
15+
my_str: str
16+
my_float: float
17+
my_bool: bool
18+
my_date: date
19+
my_datetime: datetime
20+
21+
22+
def my_graph(output={}):
23+
adapter = PerspectiveAdapter(8000)
24+
table = adapter.create_table("Test")
25+
data = MyStruct(
26+
my_str="foo", my_float=1.0, my_bool=False, my_date=date(2020, 1, 1), my_datetime=datetime(2020, 1, 1)
27+
)
28+
table.publish(csp.unroll(csp.const([data, data])))
29+
output["table"] = table
30+
31+
32+
class TestPerspectiveAdapter(unittest.TestCase):
33+
def test_adapter(self):
34+
output = {}
35+
csp.run(my_graph, output, starttime=datetime.utcnow(), endtime=timedelta(seconds=1))

csp/tests/impl/test_pandas_perspective.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,10 @@
1313
import ipywidgets
1414
import perspective
1515

16-
if version.parse(perspective.__version__) >= version.parse("3"):
17-
_PERSPECTIVE_3 = True
18-
from perspective.widget import PerspectiveWidget
19-
else:
20-
_PERSPECTIVE_3 = False
21-
from perspective import PerspectiveWidget
22-
2316
from csp.impl.pandas_perspective import CspPerspectiveMultiTable, CspPerspectiveTable
17+
from csp.impl.perspective_common import PerspectiveWidget, is_perspective3
18+
19+
_PERSPECTIVE_3 = is_perspective3()
2420
except ImportError:
2521
raise unittest.SkipTest("skipping perspective tests")
2622

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ develop = [
6767
# lint
6868
"codespell>=2.2.6,<2.3",
6969
"isort>=5,<6",
70-
"mdformat>=0.7.17,<0.8",
70+
"mdformat==0.7.17", # >0.7.17 doesnot support python 3.8
7171
"ruff>=0.3,<0.4",
7272
# test
7373
"pytest",

0 commit comments

Comments
 (0)