66
77import re
88import inspect
9+ import datetime
910from typing import Any , Callable , Sequence , TypeVar , get_origin
1011from threading import Lock
1112from enum import Enum
@@ -64,10 +65,17 @@ def _spec_kind(spec: Any) -> str:
6465
6566def _dump_engine_object (v : Any ) -> Any :
6667 """Recursively dump an object for engine. Engine side uses `Pythonzized` to catch."""
67- if isinstance (v , type ) or get_origin (v ) is not None :
68+ if v is None :
69+ return None
70+ elif isinstance (v , type ) or get_origin (v ) is not None :
6871 return encode_enriched_type (v )
6972 elif isinstance (v , Enum ):
7073 return v .value
74+ elif isinstance (v , datetime .timedelta ):
75+ total_secs = v .total_seconds ()
76+ secs = int (total_secs )
77+ nanos = int ((total_secs - secs ) * 1e9 )
78+ return {'secs' : secs , 'nanos' : nanos }
7179 elif hasattr (v , '__dict__' ):
7280 return {k : _dump_engine_object (v ) for k , v in v .__dict__ .items ()}
7381 elif isinstance (v , (list , tuple )):
@@ -314,6 +322,13 @@ def get_data_slice(self, v: Any) -> _engine.DataSlice:
314322 return v ._state .engine_data_slice
315323 return self .engine_flow_builder .constant (encode_enriched_type (type (v )), v )
316324
325+ @dataclass
326+ class SourceRefreshOptions :
327+ """
328+ Options for refreshing a source.
329+ """
330+ refresh_interval : datetime .timedelta | None = None
331+
317332class FlowBuilder :
318333 """
319334 A flow builder is used to build a flow.
@@ -329,7 +344,10 @@ def __str__(self):
329344 def __repr__ (self ):
330345 return repr (self ._state .engine_flow_builder )
331346
332- def add_source (self , spec : op .SourceSpec , / , name : str | None = None ) -> DataSlice :
347+ def add_source (self , spec : op .SourceSpec , / , * ,
348+ name : str | None = None ,
349+ refresh_options : SourceRefreshOptions | None = None ,
350+ ) -> DataSlice :
333351 """
334352 Add a source to the flow.
335353 """
@@ -341,6 +359,7 @@ def add_source(self, spec: op.SourceSpec, /, name: str | None = None) -> DataSli
341359 target_scope ,
342360 self ._state .field_name_builder .build_name (
343361 name , prefix = _to_snake_case (_spec_kind (spec ))+ '_' ),
362+ _dump_engine_object (refresh_options ),
344363 ),
345364 name
346365 )
0 commit comments