99from typing import Any , Callable , Sequence , TypeVar , get_origin
1010from threading import Lock
1111from enum import Enum
12+ from dataclasses import dataclass
1213
1314from . import _engine
1415from . import vector
@@ -61,18 +62,18 @@ def _create_data_slice(
6162def _spec_kind (spec : Any ) -> str :
6263 return spec .__class__ .__name__
6364
64- def _spec_value_dump (v : Any ) -> Any :
65- """Recursively dump a spec object and its nested attributes to a dictionary ."""
65+ def _dump_engine_object (v : Any ) -> Any :
66+ """Recursively dump an object for engine. Engine side uses `Pythonzized` to catch ."""
6667 if isinstance (v , type ) or get_origin (v ) is not None :
6768 return encode_enriched_type (v )
6869 elif isinstance (v , Enum ):
6970 return v .value
7071 elif hasattr (v , '__dict__' ):
71- return {k : _spec_value_dump (v ) for k , v in v .__dict__ .items ()}
72+ return {k : _dump_engine_object (v ) for k , v in v .__dict__ .items ()}
7273 elif isinstance (v , (list , tuple )):
73- return [_spec_value_dump (item ) for item in v ]
74+ return [_dump_engine_object (item ) for item in v ]
7475 elif isinstance (v , dict ):
75- return {k : _spec_value_dump (v ) for k , v in v .items ()}
76+ return {k : _dump_engine_object (v ) for k , v in v .items ()}
7677 return v
7778
7879T = TypeVar ('T' )
@@ -177,7 +178,7 @@ def transform(self, fn_spec: op.FunctionSpec, *args, **kwargs) -> DataSlice:
177178 lambda target_scope , name :
178179 flow_builder_state .engine_flow_builder .transform (
179180 _spec_kind (fn_spec ),
180- _spec_value_dump (fn_spec ),
181+ _dump_engine_object (fn_spec ),
181182 transform_args ,
182183 target_scope ,
183184 flow_builder_state .field_name_builder .build_name (
@@ -267,7 +268,7 @@ def export(self, name: str, target_spec: op.StorageSpec, /, *,
267268 {"field_name" : field_name , "metric" : metric .value }
268269 for field_name , metric in vector_index ]
269270 self ._flow_builder_state .engine_flow_builder .export (
270- name , _spec_kind (target_spec ), _spec_value_dump (target_spec ),
271+ name , _spec_kind (target_spec ), _dump_engine_object (target_spec ),
271272 index_options , self ._engine_data_collector )
272273
273274
@@ -316,13 +317,20 @@ def add_source(self, spec: op.SourceSpec, /, name: str | None = None) -> DataSli
316317 self ._state ,
317318 lambda target_scope , name : self ._state .engine_flow_builder .add_source (
318319 _spec_kind (spec ),
319- _spec_value_dump (spec ),
320+ _dump_engine_object (spec ),
320321 target_scope ,
321322 self ._state .field_name_builder .build_name (
322323 name , prefix = _to_snake_case (_spec_kind (spec ))+ '_' ),
323324 ),
324325 name
325326 )
327+ @dataclass
328+ class EvaluateAndDumpOptions :
329+ """
330+ Options for evaluating and dumping a flow.
331+ """
332+ output_dir : str
333+ use_cache : bool = True
326334
327335class Flow :
328336 """
@@ -348,20 +356,32 @@ def __str__(self):
348356 def __repr__ (self ):
349357 return repr (self ._lazy_engine_flow ())
350358
359+ @property
360+ def name (self ) -> str :
361+ """
362+ Get the name of the flow.
363+ """
364+ return self ._lazy_engine_flow ().name ()
365+
351366 def update (self ):
352367 """
353368 Update the index defined by the flow.
354369 Once the function returns, the indice is fresh up to the moment when the function is called.
355370 """
356371 return self ._lazy_engine_flow ().update ()
357372
373+ def evaluate_and_dump (self , options : EvaluateAndDumpOptions ):
374+ """
375+ Evaluate and dump the flow.
376+ """
377+ return self ._lazy_engine_flow ().evaluate_and_dump (_dump_engine_object (options ))
378+
358379 def internal_flow (self ) -> _engine .Flow :
359380 """
360381 Get the engine flow.
361382 """
362383 return self ._lazy_engine_flow ()
363384
364-
365385def _create_lazy_flow (name : str | None , fl_def : Callable [[FlowBuilder , DataScope ], None ]) -> Flow :
366386 """
367387 Create a flow without really building it yet.
0 commit comments