88import re
99import inspect
1010import datetime
11- from typing import Any , Callable , Sequence , TypeVar , get_origin
11+
12+ from typing import Any , Callable , Sequence , TypeVar
1213from threading import Lock
1314from enum import Enum
1415from dataclasses import dataclass
1516
1617from . import _engine
1718from . import vector
1819from . import op
20+ from .convert import dump_engine_object
1921from .typing import encode_enriched_type
2022
2123class _NameBuilder :
@@ -64,27 +66,6 @@ def _create_data_slice(
6466def _spec_kind (spec : Any ) -> str :
6567 return spec .__class__ .__name__
6668
67- def _dump_engine_object (v : Any ) -> Any :
68- """Recursively dump an object for engine. Engine side uses `Pythonzized` to catch."""
69- if v is None :
70- return None
71- elif isinstance (v , type ) or get_origin (v ) is not None :
72- return encode_enriched_type (v )
73- elif isinstance (v , Enum ):
74- return v .value
75- elif isinstance (v , datetime .timedelta ):
76- total_secs = v .total_seconds ()
77- secs = int (total_secs )
78- nanos = int ((total_secs - secs ) * 1e9 )
79- return {'secs' : secs , 'nanos' : nanos }
80- elif hasattr (v , '__dict__' ):
81- return {k : _dump_engine_object (v ) for k , v in v .__dict__ .items ()}
82- elif isinstance (v , (list , tuple )):
83- return [_dump_engine_object (item ) for item in v ]
84- elif isinstance (v , dict ):
85- return {k : _dump_engine_object (v ) for k , v in v .items ()}
86- return v
87-
8869T = TypeVar ('T' )
8970
9071class _DataSliceState :
@@ -176,6 +157,7 @@ def transform(self, fn_spec: op.FunctionSpec, *args, **kwargs) -> DataSlice:
176157 """
177158 Apply a function to the data slice.
178159 """
160+ transform_args : list [tuple [Any , str | None ]]
179161 transform_args = [(self ._state .engine_data_slice , None )]
180162 transform_args += [(self ._state .flow_builder_state .get_data_slice (v ), None ) for v in args ]
181163 transform_args += [(self ._state .flow_builder_state .get_data_slice (v ), k )
@@ -187,7 +169,7 @@ def transform(self, fn_spec: op.FunctionSpec, *args, **kwargs) -> DataSlice:
187169 lambda target_scope , name :
188170 flow_builder_state .engine_flow_builder .transform (
189171 _spec_kind (fn_spec ),
190- _dump_engine_object (fn_spec ),
172+ dump_engine_object (fn_spec ),
191173 transform_args ,
192174 target_scope ,
193175 flow_builder_state .field_name_builder .build_name (
@@ -298,7 +280,7 @@ def export(self, name: str, target_spec: op.StorageSpec, /, *,
298280 {"field_name" : field_name , "metric" : metric .value }
299281 for field_name , metric in vector_index ]
300282 self ._flow_builder_state .engine_flow_builder .export (
301- name , _spec_kind (target_spec ), _dump_engine_object (target_spec ),
283+ name , _spec_kind (target_spec ), dump_engine_object (target_spec ),
302284 index_options , self ._engine_data_collector , setup_by_user )
303285
304286
@@ -357,11 +339,11 @@ def add_source(self, spec: op.SourceSpec, /, *,
357339 self ._state ,
358340 lambda target_scope , name : self ._state .engine_flow_builder .add_source (
359341 _spec_kind (spec ),
360- _dump_engine_object (spec ),
342+ dump_engine_object (spec ),
361343 target_scope ,
362344 self ._state .field_name_builder .build_name (
363345 name , prefix = _to_snake_case (_spec_kind (spec ))+ '_' ),
364- _dump_engine_object (_SourceRefreshOptions (refresh_interval = refresh_interval )),
346+ dump_engine_object (_SourceRefreshOptions (refresh_interval = refresh_interval )),
365347 ),
366348 name
367349 )
@@ -382,7 +364,7 @@ class FlowLiveUpdater:
382364
383365 def __init__ (self , fl : Flow , options : FlowLiveUpdaterOptions | None = None ):
384366 self ._engine_live_updater = _engine .FlowLiveUpdater (
385- fl ._lazy_engine_flow (), _dump_engine_object (options or FlowLiveUpdaterOptions ()))
367+ fl ._lazy_engine_flow (), dump_engine_object (options or FlowLiveUpdaterOptions ()))
386368
387369 def __enter__ (self ) -> FlowLiveUpdater :
388370 return self
@@ -469,7 +451,7 @@ def evaluate_and_dump(self, options: EvaluateAndDumpOptions):
469451 """
470452 Evaluate the flow and dump flow outputs to files.
471453 """
472- return self ._lazy_engine_flow ().evaluate_and_dump (_dump_engine_object (options ))
454+ return self ._lazy_engine_flow ().evaluate_and_dump (dump_engine_object (options ))
473455
474456 def internal_flow (self ) -> _engine .Flow :
475457 """
0 commit comments