11import pandas as pd
2+ import pyarrow as pa
23import pytz
3- from datetime import datetime , timedelta
4+ from datetime import date , datetime , timedelta , timezone
45from pandas .compat import set_function_name
56from typing import Optional
67
@@ -40,7 +41,8 @@ def _apply_updates(
4041 if throttle > timedelta (0 ):
4142 csp .schedule_alarm (alarm , throttle , True )
4243 s_has_time_col = time_col and time_col not in data .keys ()
43- s_datetime_cols = set ([c for c , t in table .schema ().items () if t == datetime ])
44+ s_datetime_cols = set ([c for c , t in table .schema ().items () if t == "datetime" ])
45+ s_date_cols = set ([c for c , t in table .schema ().items () if t == "date" ])
4446
4547 with csp .stop ():
4648 try :
@@ -81,14 +83,22 @@ def _apply_updates(
8183 row [index_col ] = idx
8284 if s_has_time_col :
8385 if localize :
84- row [time_col ] = pytz .utc .localize (csp .now ())
86+ row [time_col ] = int ( pytz .utc .localize (csp .now ()). timestamp () * 1000 )
8587 else :
86- row [time_col ] = csp .now ()
88+ row [time_col ] = int ( pytz . utc . localize ( csp .now ()). timestamp () * 1000 )
8789 else :
8890 row = new_rows [idx ]
8991
90- if localize and col in s_datetime_cols and value .tzinfo is None :
91- row [col ] = pytz .utc .localize (value )
92+ if col in s_date_cols :
93+ row [col ] = int (
94+ datetime (year = value .year , month = value .month , day = value .day , tzinfo = timezone .utc ).timestamp () * 1000
95+ )
96+
97+ elif localize and col in s_datetime_cols :
98+ if value .tzinfo is None :
99+ row [col ] = int (pytz .utc .localize (value ).timestamp () * 1000 )
100+ else :
101+ row [col ] = int (pytz .utc .localize (value ).timestamp () * 1000 )
92102 else :
93103 row [col ] = value
94104
@@ -160,28 +170,41 @@ def __init__(
160170 self ._limit = limit
161171 self ._localize = localize
162172
173+ # TODO: we do not want 1 server per table, make a Client param?
174+ self ._psp_server = perspective .Server ()
175+ self ._psp_client = self ._psp_server .new_local_client ()
176+
163177 self ._basket = _frame_to_basket (data )
164178 self ._static_frame = data .csp .static_frame ()
165- self ._static_table = perspective . Table (self ._static_frame )
179+ self ._static_table = self . _psp_client . table (self ._static_frame )
166180 static_schema = self ._static_table .schema ()
167181 # Since the index will be accounted for separately, remove the index from the static table schema,
168182 # and re-enter it under index_col
169183 raw_index_name = self ._static_frame .index .name or "index"
170184 index_type = static_schema .pop (raw_index_name )
171185 schema = {index_col : index_type }
186+ perspective_type_map = {
187+ str : "string" ,
188+ float : "float" ,
189+ int : "integer" ,
190+ date : "date" ,
191+ datetime : "datetime" ,
192+ bool : "boolean" ,
193+ }
194+
172195 if time_col :
173- schema [time_col ] = datetime
196+ schema [time_col ] = " datetime"
174197 for col , series in data .items ():
175198 if is_csp_type (series ):
176- schema [col ] = series .dtype .subtype
199+ schema [col ] = perspective_type_map [ series .dtype .subtype ]
177200 else :
178201 schema [col ] = static_schema [col ]
179202
180203 if self ._keep_history :
181- self ._table = perspective . Table (schema , index = None , limit = limit )
204+ self ._table = self . _psp_client . table (schema , index = None , limit = limit )
182205 self ._static_records = self ._static_frame .to_dict (orient = "index" )
183206 else :
184- self ._table = perspective . Table (schema , index = self ._index_col )
207+ self ._table = self . _psp_client . table (schema , index = self ._index_col )
185208 self ._static_frame .index = self ._static_frame .index .rename (self ._index_col )
186209 self ._table .update (self ._static_frame )
187210 self ._static_records = None # No need to update dynamically
@@ -222,7 +245,7 @@ def run_historical(self, starttime, endtime):
222245 index = self ._index_col
223246 if self ._limit :
224247 df = df .sort_values (self ._time_col ).tail (self ._limit ).reset_index (drop = True )
225- return perspective . Table ( df . to_dict ( "series" ) , index = index )
248+ return self . _psp_client . table ( df , index = index )
226249
227250 def run (self , starttime = None , endtime = timedelta (seconds = 60 ), realtime = True , clear = False ):
228251 """Run a graph that sends data to the table on the current thread.
@@ -280,7 +303,7 @@ def get_widget(self, **override_kwargs):
280303 "sort" : [[self ._time_col , "desc" ]],
281304 }
282305 else :
283- kwargs = {"columns" : list (self ._table .schema ())}
306+ kwargs = {"columns" : list (self ._table .columns ())}
284307 kwargs .update (override_kwargs )
285308 return perspective .PerspectiveWidget (self ._table , ** kwargs )
286309
@@ -294,14 +317,30 @@ def _method(self, **options):
294317
295318 @classmethod
296319 def _add_view_methods (cls ):
297- cls .to_df = cls ._create_view_method (perspective .View .to_df )
298- cls .to_dict = cls ._create_view_method (perspective .View .to_dict )
299320 cls .to_json = cls ._create_view_method (perspective .View .to_json )
300321 cls .to_csv = cls ._create_view_method (perspective .View .to_csv )
301- cls .to_numpy = cls ._create_view_method (perspective .View .to_numpy )
302322 cls .to_columns = cls ._create_view_method (perspective .View .to_columns )
303323 cls .to_arrow = cls ._create_view_method (perspective .View .to_arrow )
304324
325+ def to_df (self , ** kwargs ):
326+ ipc_bytes = self .to_arrow ()
327+ table = pa .ipc .open_stream (ipc_bytes ).read_all ()
328+ df = pd .DataFrame (table .to_pandas (** kwargs ))
329+
330+ # DAVIS: `pyarrow` does not force alphabetical order on categories, so
331+ # we correct this here to make assertions pass. We can enforce this in
332+ # Perspective at a performance hit/API complexity.
333+ for column in df :
334+ if df [column ].dtype == "datetime64[ms]" :
335+ df [column ] = df [column ].astype ("datetime64[ns]" )
336+ elif df [column ].dtype == "category" :
337+ df [column ] = df [column ].cat .reorder_categories (df [column ].cat .categories .sort_values ())
338+
339+ if df .index .dtype == "category" :
340+ df .index = df .index .cat .reorder_categories (df .index .cat .categories .sort_values ())
341+
342+ return df
343+
305344
306345CspPerspectiveTable ._add_view_methods ()
307346
0 commit comments