Skip to content

Commit d0641ea

Browse files
committed
Incorporate PR feedback
Signed-off-by: Pascal Tomecek <[email protected]>
1 parent d86de08 commit d0641ea

File tree

5 files changed

+131
-92
lines changed

5 files changed

+131
-92
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@ jobs:
672672
package:
673673
- "sqlalchemy<2"
674674
- "numpy==1.19.5"
675-
- "pandas<2"
675+
- "pandas<1.5" # Any later does not support numpy==1.19.5
676676
- "perspective-python<3"
677677

678678
runs-on: ${{ matrix.os }}

csp/adapters/perspective.py

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@
55

66
import csp
77
from csp import ts
8-
from csp.impl.perspective_common import apply_updates, is_perspective3, perspective_type_map
8+
from csp.impl.perspective_common import (
9+
date_to_perspective,
10+
datetime_to_perspective,
11+
is_perspective3,
12+
perspective_type_map,
13+
)
914
from csp.impl.wiring.delayed_node import DelayedNodeWrapperDef
1015

1116
try:
@@ -30,6 +35,41 @@ def perspective_thread(client):
3035
loop.start()
3136

3237

38+
@csp.node
39+
def _apply_updates(table: object, data: {str: ts[object]}, throttle: timedelta):
40+
with csp.alarms():
41+
alarm = csp.alarm(bool)
42+
43+
with csp.state():
44+
s_buffer = []
45+
s_datetime_cols = set()
46+
s_date_cols = set()
47+
48+
with csp.start():
49+
csp.schedule_alarm(alarm, throttle, True)
50+
if _PERSPECTIVE_3:
51+
s_datetime_cols = set([c for c, t in table.schema().items() if t == "datetime"])
52+
s_date_cols = set([c for c, t in table.schema().items() if t == "date"])
53+
54+
if csp.ticked(data):
55+
row = dict(data.tickeditems())
56+
if _PERSPECTIVE_3:
57+
for col, value in row.items():
58+
if col in s_datetime_cols:
59+
row[col] = datetime_to_perspective(row[col])
60+
if col in s_date_cols:
61+
row[col] = date_to_perspective(row[col])
62+
63+
s_buffer.append(row)
64+
65+
if csp.ticked(alarm):
66+
if len(s_buffer) > 0:
67+
table.update(s_buffer)
68+
s_buffer = []
69+
70+
csp.schedule_alarm(alarm, throttle, True)
71+
72+
3373
@csp.node
3474
def _launch_application(port: int, server: object, stub: ts[object]):
3575
with csp.state():
@@ -204,8 +244,7 @@ def _instantiate(self):
204244
ptable = Table(schema, limit=table.limit, index=table.index)
205245
manager.host_table(table_name, ptable)
206246

207-
data = {(idx, col): value for idx, (col, value) in enumerate(table.columns.items())}
208-
apply_updates(ptable, data, index_col=None, time_col=None, throttle=self._throttle, localize=False)
247+
_apply_updates(ptable, table.columns, self._throttle)
209248

210249
if _PERSPECTIVE_3:
211250
_launch_application(self._port, server, csp.const("stub"))

csp/impl/pandas_perspective.py

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import pandas as pd
2+
import pytz
23
from datetime import datetime, timedelta
34
from pandas.compat import set_function_name
45
from typing import Optional
@@ -8,7 +9,8 @@
89
from csp.impl.pandas_ext_type import is_csp_type
910
from csp.impl.perspective_common import (
1011
PerspectiveWidget,
11-
apply_updates,
12+
date_to_perspective,
13+
datetime_to_perspective,
1214
is_perspective3,
1315
perspective,
1416
perspective_type_map,
@@ -19,6 +21,85 @@
1921
_PERSPECTIVE_3 = is_perspective3()
2022

2123

24+
@csp.node
25+
def _apply_updates(
26+
table: perspective.Table,
27+
data: {object: csp.ts[object]},
28+
index_col: str,
29+
time_col: str,
30+
throttle: timedelta,
31+
localize: bool,
32+
static_records: dict = None,
33+
):
34+
with csp.alarms():
35+
alarm = csp.alarm(bool)
36+
37+
with csp.state():
38+
s_buffer = []
39+
s_has_time_col = False
40+
s_datetime_cols = set()
41+
s_date_cols = set()
42+
43+
with csp.start():
44+
if throttle > timedelta(0):
45+
csp.schedule_alarm(alarm, throttle, True)
46+
s_has_time_col = time_col and time_col not in data.keys()
47+
if _PERSPECTIVE_3:
48+
s_datetime_cols = set([c for c, t in table.schema().items() if t == "datetime"])
49+
s_date_cols = set([c for c, t in table.schema().items() if t == "date"])
50+
else:
51+
s_datetime_cols = set([c for c, t in table.schema().items() if t == datetime])
52+
53+
with csp.stop():
54+
if len(s_buffer) > 0:
55+
table.update(s_buffer)
56+
57+
if csp.ticked(data):
58+
new_rows = {}
59+
for (idx, col), value in data.tickeditems():
60+
if idx not in new_rows:
61+
row = {}
62+
new_rows[idx] = row
63+
if index_col:
64+
row[index_col] = idx
65+
if s_has_time_col:
66+
if localize:
67+
row[time_col] = pytz.utc.localize(csp.now())
68+
else:
69+
row[time_col] = csp.now()
70+
if _PERSPECTIVE_3:
71+
row[time_col] = datetime_to_perspective(row[time_col])
72+
else:
73+
row = new_rows[idx]
74+
75+
if localize and col in s_datetime_cols and value.tzinfo is None:
76+
row[col] = pytz.utc.localize(value)
77+
else:
78+
row[col] = value
79+
80+
if _PERSPECTIVE_3:
81+
if col in s_datetime_cols:
82+
row[col] = datetime_to_perspective(row[col])
83+
if col in s_date_cols:
84+
row[col] = date_to_perspective(row[col])
85+
86+
if static_records:
87+
for idx, row in new_rows.items():
88+
row.update(static_records[idx])
89+
90+
if throttle == timedelta(0):
91+
table.update(list(new_rows.values()))
92+
else:
93+
s_buffer.extend(new_rows.values())
94+
95+
if csp.ticked(alarm):
96+
if len(s_buffer) > 0:
97+
table.update(s_buffer)
98+
s_buffer = []
99+
100+
csp.schedule_alarm(alarm, throttle, True)
101+
102+
22103
def _frame_to_basket(df):
23104
df = df.csp.ts_frame()
24105
basket = {}
@@ -124,7 +205,7 @@ def clear(self):
124205
def graph(self):
125206
"""The csp graph that populates the table with ticking data"""
126207
if self._basket:
127-
apply_updates(
208+
_apply_updates(
128209
self._table,
129210
self._basket,
130211
self._index_col,

csp/impl/perspective_common.py

Lines changed: 5 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
import pytz
2-
from datetime import date, datetime, timedelta
2+
from datetime import date, datetime
33
from packaging import version
44

5-
import csp
6-
75
try:
86
import perspective
97

@@ -42,89 +40,12 @@ def perspective_type_map():
4240

4341

4442
def datetime_to_perspective(dt: datetime) -> int:
45-
"""Convert a python datetime to an integer number of milliseconds for perspective"""
43+
"""Convert a python datetime to an integer number of milliseconds for perspective >= 3"""
44+
if dt.tzinfo is None:
45+
dt = pytz.utc.localize(dt)
4646
return int(dt.timestamp() * 1000)
4747

4848

4949
def date_to_perspective(d: date) -> int:
50-
"""Convert a python date to an integer number of milliseconds for perspective"""
50+
"""Convert a python date to an integer number of milliseconds for perspective >= 3"""
5151
return int(datetime(year=d.year, month=d.month, day=d.day, tzinfo=pytz.UTC).timestamp() * 1000)
52-
53-
54-
@csp.node
55-
def apply_updates(
56-
table: perspective.Table,
57-
data: {object: csp.ts[object]},
58-
index_col: str,
59-
time_col: str,
60-
throttle: timedelta,
61-
localize: bool,
62-
static_records: dict = None,
63-
):
64-
with csp.alarms():
65-
alarm = csp.alarm(bool)
66-
67-
with csp.state():
68-
s_buffer = []
69-
s_has_time_col = False
70-
s_datetime_cols = set()
71-
s_date_cols = set()
72-
73-
with csp.start():
74-
if throttle > timedelta(0):
75-
csp.schedule_alarm(alarm, throttle, True)
76-
s_has_time_col = time_col and time_col not in data.keys()
77-
if _PERSPECTIVE_3:
78-
s_datetime_cols = set([c for c, t in table.schema().items() if t == "datetime"])
79-
s_date_cols = set([c for c, t in table.schema().items() if t == "date"])
80-
else:
81-
s_datetime_cols = set([c for c, t in table.schema().items() if t == datetime])
82-
83-
with csp.stop():
84-
if len(s_buffer) > 0:
85-
table.update(s_buffer)
86-
87-
if csp.ticked(data):
88-
new_rows = {}
89-
for (idx, col), value in data.tickeditems():
90-
if idx not in new_rows:
91-
row = {}
92-
new_rows[idx] = row
93-
if index_col:
94-
row[index_col] = idx
95-
if s_has_time_col:
96-
if localize or _PERSPECTIVE_3:
97-
row[time_col] = pytz.utc.localize(csp.now())
98-
else:
99-
row[time_col] = csp.now()
100-
if _PERSPECTIVE_3:
101-
row[time_col] = datetime_to_perspective(row[time_col])
102-
else:
103-
row = new_rows[idx]
104-
105-
if (localize or _PERSPECTIVE_3) and col in s_datetime_cols and value.tzinfo is None:
106-
row[col] = pytz.utc.localize(value)
107-
else:
108-
row[col] = value
109-
110-
if _PERSPECTIVE_3:
111-
if col in s_datetime_cols:
112-
row[col] = datetime_to_perspective(row[col])
113-
if col in s_date_cols:
114-
row[col] = date_to_perspective(row[col])
115-
116-
if static_records:
117-
for idx, row in new_rows.items():
118-
row.update(static_records[idx])
119-
120-
if throttle == timedelta(0):
121-
table.update(list(new_rows.values()))
122-
else:
123-
s_buffer.extend(new_rows.values())
124-
125-
if csp.ticked(alarm):
126-
if len(s_buffer) > 0:
127-
table.update(s_buffer)
128-
s_buffer = []
129-
130-
csp.schedule_alarm(alarm, throttle, True)

csp/tests/adapters/test_perspective.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
except ImportError:
1111
raise unittest.SkipTest("skipping perspective tests")
1212

13-
_PERSPECTIVE_3 = is_perspective3()
14-
1513

1614
class MyStruct(csp.Struct):
1715
my_str: str

0 commit comments

Comments
 (0)