Skip to content

Commit 16f2dca

Browse files
committed
feat: implemented transient arguement for the coindex.flow_def decorator
1 parent 661f8d7 commit 16f2dca

File tree

1 file changed

+201
-4
lines changed

1 file changed

+201
-4
lines changed

python/cocoindex/flow.py

Lines changed: 201 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -909,15 +909,193 @@ def _inner(handler: Callable[[str], Any]) -> Callable[[str], Any]:
909909
return _inner
910910

911911

912+
class TransientFlowWrapper(Flow):
913+
"""
914+
A wrapper for transient flows that doesn't support persistence operations.
915+
"""
916+
917+
def __init__(self, name: str, fl_def: Callable[[FlowBuilder, DataScope], None]):
918+
self._fl_def = fl_def
919+
# For transient flows, we don't create a regular engine flow creator
920+
super().__init__(name, self._create_transient_engine_flow)
921+
922+
def _create_transient_engine_flow(self) -> _engine.Flow:
923+
"""
924+
Create a transient engine flow. This should not be called directly.
925+
"""
926+
raise NotImplementedError(
927+
"Transient flows don't support this operation. Use evaluate() instead."
928+
)
929+
930+
def setup(self, report_to_stdout: bool = False) -> None:
931+
"""
932+
Setup is not supported for transient flows.
933+
"""
934+
raise NotImplementedError("Setup is not supported for transient flows")
935+
936+
async def setup_async(self, report_to_stdout: bool = False) -> None:
937+
"""
938+
Setup is not supported for transient flows.
939+
"""
940+
raise NotImplementedError("Setup is not supported for transient flows")
941+
942+
def drop(self, report_to_stdout: bool = False) -> None:
943+
"""
944+
Drop is not supported for transient flows.
945+
"""
946+
raise NotImplementedError("Drop is not supported for transient flows")
947+
948+
async def drop_async(self, report_to_stdout: bool = False) -> None:
949+
"""
950+
Drop is not supported for transient flows.
951+
"""
952+
raise NotImplementedError("Drop is not supported for transient flows")
953+
954+
def update(self, /, *, reexport_targets: bool = False) -> _engine.IndexUpdateInfo:
955+
"""
956+
Update is not supported for transient flows.
957+
"""
958+
raise NotImplementedError("Live updates are not supported for transient flows")
959+
960+
async def update_async(
961+
self, /, *, reexport_targets: bool = False
962+
) -> _engine.IndexUpdateInfo:
963+
"""
964+
Update is not supported for transient flows.
965+
"""
966+
raise NotImplementedError("Live updates are not supported for transient flows")
967+
968+
def evaluate_and_dump(
969+
self, options: EvaluateAndDumpOptions
970+
) -> _engine.IndexUpdateInfo:
971+
"""
972+
Evaluate and dump is not supported for transient flows.
973+
"""
974+
raise NotImplementedError(
975+
"Evaluate and dump is not supported for transient flows"
976+
)
977+
978+
def internal_flow(self) -> _engine.Flow:
979+
"""
980+
Internal flow is not supported for transient flows.
981+
"""
982+
raise NotImplementedError(
983+
"Internal flow access is not supported for transient flows"
984+
)
985+
986+
async def internal_flow_async(self) -> _engine.Flow:
987+
"""
988+
Internal flow is not supported for transient flows.
989+
"""
990+
raise NotImplementedError(
991+
"Internal flow access is not supported for transient flows"
992+
)
993+
994+
def add_query_handler(
995+
self,
996+
name: str,
997+
handler: Callable[[str], Any],
998+
/,
999+
*,
1000+
result_fields: QueryHandlerResultFields | None = None,
1001+
) -> None:
1002+
"""
1003+
Query handlers are not supported for transient flows.
1004+
"""
1005+
raise NotImplementedError(
1006+
"Query handlers are not supported for transient flows"
1007+
)
1008+
1009+
def evaluate(self, **input_values: Any) -> Any:
1010+
"""
1011+
Evaluate the transient flow with the given input values.
1012+
1013+
Args:
1014+
**input_values: Input values as keyword arguments.
1015+
1016+
Returns:
1017+
The result of evaluating the transient flow.
1018+
"""
1019+
return execution_context.run(self.evaluate_async(**input_values))
1020+
1021+
async def evaluate_async(self, **input_values: Any) -> Any:
1022+
"""
1023+
Evaluate the transient flow with the given input values asynchronously.
1024+
1025+
Args:
1026+
**input_values: Input values as keyword arguments.
1027+
1028+
Returns:
1029+
The result of evaluating the transient flow.
1030+
"""
1031+
flow_full_name = get_flow_full_name(self._name)
1032+
validate_full_flow_name(flow_full_name)
1033+
flow_builder_state = _FlowBuilderState(flow_full_name)
1034+
1035+
# Add direct inputs for each input value and collect the data slices
1036+
input_data_slices = {}
1037+
for key, value in input_values.items():
1038+
encoded_type = encode_enriched_type(type(value))
1039+
if encoded_type is None:
1040+
raise ValueError(
1041+
f"Input value `{key}` has unsupported type {type(value)}"
1042+
)
1043+
1044+
# Add the direct input to the flow builder
1045+
data_slice = flow_builder_state.engine_flow_builder.add_direct_input(
1046+
key, dump_engine_object(encoded_type)
1047+
)
1048+
input_data_slices[key] = DataSlice(
1049+
_DataSliceState(flow_builder_state, data_slice)
1050+
)
1051+
1052+
# Create a root scope for the flow definition
1053+
root_scope = DataScope(
1054+
flow_builder_state, flow_builder_state.engine_flow_builder.root_scope()
1055+
)
1056+
1057+
# Add input data slices to the root scope so the flow definition can access them
1058+
for key, data_slice in input_data_slices.items():
1059+
root_scope[key] = data_slice
1060+
1061+
# Build the flow definition - this should set the output
1062+
self._fl_def(FlowBuilder(flow_builder_state), root_scope)
1063+
1064+
# Build the transient flow
1065+
transient_engine_flow = (
1066+
await flow_builder_state.engine_flow_builder.build_transient_flow_async(
1067+
execution_context.event_loop
1068+
)
1069+
)
1070+
1071+
# Convert input values to the format expected by the engine
1072+
input_args = [input_values[key] for key in input_values.keys()]
1073+
1074+
# Evaluate the transient flow
1075+
result = await transient_engine_flow.evaluate_async(input_args)
1076+
return result
1077+
1078+
9121079
def _create_lazy_flow(
913-
name: str | None, fl_def: Callable[[FlowBuilder, DataScope], None]
1080+
name: str | None,
1081+
fl_def: Callable[[FlowBuilder, DataScope], None],
1082+
*,
1083+
transient: bool = False,
9141084
) -> Flow:
9151085
"""
9161086
Create a flow without really building it yet.
9171087
The flow will be built the first time when it's really needed.
1088+
1089+
Args:
1090+
name: The name of the flow.
1091+
fl_def: The flow definition function.
1092+
transient: If True, creates a transient flow that doesn't maintain state.
9181093
"""
9191094
flow_name = _flow_name_builder.build_name(name, prefix="_flow_")
9201095

1096+
if transient:
1097+
return TransientFlowWrapper(flow_name, fl_def)
1098+
9211099
def _create_engine_flow() -> _engine.Flow:
9221100
flow_full_name = get_flow_full_name(flow_name)
9231101
validate_full_flow_name(flow_full_name)
@@ -944,14 +1122,25 @@ def get_flow_full_name(name: str) -> str:
9441122
return f"{setting.get_app_namespace(trailing_delimiter='.')}{name}"
9451123

9461124

947-
def open_flow(name: str, fl_def: Callable[[FlowBuilder, DataScope], None]) -> Flow:
1125+
def open_flow(
1126+
name: str,
1127+
fl_def: Callable[[FlowBuilder, DataScope], None],
1128+
*,
1129+
transient: bool = False,
1130+
) -> Flow:
9481131
"""
9491132
Open a flow, with the given name and definition.
1133+
1134+
Args:
1135+
name: The name of the flow.
1136+
fl_def: The flow definition function.
1137+
transient: If True, creates a transient flow that doesn't maintain state
1138+
and doesn't support live updates. Default is False.
9501139
"""
9511140
with _flows_lock:
9521141
if name in _flows:
9531142
raise KeyError(f"Flow with name {name} already exists")
954-
fl = _flows[name] = _create_lazy_flow(name, fl_def)
1143+
fl = _flows[name] = _create_lazy_flow(name, fl_def, transient=transient)
9551144
return fl
9561145

9571146

@@ -971,11 +1160,19 @@ def remove_flow(fl: Flow) -> None:
9711160

9721161
def flow_def(
9731162
name: str | None = None,
1163+
transient: bool = False,
9741164
) -> Callable[[Callable[[FlowBuilder, DataScope], None]], Flow]:
9751165
"""
9761166
A decorator to wrap the flow definition.
1167+
1168+
Args:
1169+
name: The name of the flow. If None, uses the function name.
1170+
transient: If True, creates a transient flow that doesn't maintain state
1171+
and doesn't support live updates. Default is False.
9771172
"""
978-
return lambda fl_def: open_flow(name or fl_def.__name__, fl_def)
1173+
return lambda fl_def: open_flow(
1174+
name or fl_def.__name__, fl_def, transient=transient
1175+
)
9791176

9801177

9811178
def flow_names() -> list[str]:

0 commit comments

Comments
 (0)