diff --git a/python/cocoindex/__init__.py b/python/cocoindex/__init__.py index 4ab0b4a13..c25c9c6b8 100644 --- a/python/cocoindex/__init__.py +++ b/python/cocoindex/__init__.py @@ -1,7 +1,7 @@ """ Cocoindex is a framework for building and running indexing pipelines. """ -from . import functions, query, sources, storages, cli +from . import functions, query, sources, storages, cli, utils from .flow import FlowBuilder, DataScope, DataSlice, Flow, flow_def, transform_flow from .flow import EvaluateAndDumpOptions, GeneratedField from .flow import update_all_flows_async, FlowLiveUpdater, FlowLiveUpdaterOptions diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 8e166afcf..2c06db491 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -310,9 +310,8 @@ class _FlowBuilderState: engine_flow_builder: _engine.FlowBuilder field_name_builder: _NameBuilder - def __init__(self, /, name: str | None = None): - flow_name = _flow_name_builder.build_name(name, prefix="_flow_") - self.engine_flow_builder = _engine.FlowBuilder(get_full_flow_name(flow_name)) + def __init__(self, full_name: str): + self.engine_flow_builder = _engine.FlowBuilder(full_name) self.field_name_builder = _NameBuilder() def get_data_slice(self, v: Any) -> _engine.DataSlice: @@ -464,9 +463,13 @@ class Flow: """ A flow describes an indexing pipeline. """ + _name: str + _full_name: str _lazy_engine_flow: Callable[[], _engine.Flow] - def __init__(self, engine_flow_creator: Callable[[], _engine.Flow]): + def __init__(self, name: str, full_name: str, engine_flow_creator: Callable[[], _engine.Flow]): + self._name = name + self._full_name = full_name engine_flow = None lock = Lock() def _lazy_engine_flow() -> _engine.Flow: @@ -497,7 +500,7 @@ def build_tree(label: str, lines: list): tree.children.append(section_node) return tree - def _get_spec(self, verbose: bool = False) -> list[tuple[str, str, int]]: + def _get_spec(self, verbose: bool = False) -> _engine.RenderedSpec: return self._lazy_engine_flow().get_spec(output_mode="verbose" if verbose else "concise") def _get_schema(self) -> list[tuple[str, str, str]]: @@ -509,12 +512,19 @@ def __str__(self): def __repr__(self): return repr(self._lazy_engine_flow()) + @property + def name(self) -> str: + """ + Get the name of the flow. + """ + return self._name + @property def full_name(self) -> str: """ Get the full name of the flow. """ - return self._lazy_engine_flow().name() + return self._full_name def update(self) -> _engine.IndexUpdateInfo: """ @@ -555,14 +565,16 @@ def _create_lazy_flow(name: str | None, fl_def: Callable[[FlowBuilder, DataScope Create a flow without really building it yet. The flow will be built the first time when it's really needed. """ + flow_name = _flow_name_builder.build_name(name, prefix="_flow_") + flow_full_name = get_full_flow_name(flow_name) def _create_engine_flow() -> _engine.Flow: - flow_builder_state = _FlowBuilderState(name=name) + flow_builder_state = _FlowBuilderState(flow_full_name) root_scope = DataScope( flow_builder_state, flow_builder_state.engine_flow_builder.root_scope()) fl_def(FlowBuilder(flow_builder_state), root_scope) return flow_builder_state.engine_flow_builder.build_flow(execution_context.event_loop) - return Flow(_create_engine_flow) + return Flow(flow_name, flow_full_name, _create_engine_flow) _flows_lock = Lock() @@ -695,7 +707,7 @@ async def _flow_info_async(self) -> TransformFlowInfo: return self._lazy_flow_info async def _build_flow_info_async(self) -> TransformFlowInfo: - flow_builder_state = _FlowBuilderState(name=self._flow_name) + flow_builder_state = _FlowBuilderState(self._flow_name) sig = inspect.signature(self._flow_fn) if len(sig.parameters) != len(self._flow_arg_types): raise ValueError( diff --git a/python/cocoindex/utils.py b/python/cocoindex/utils.py new file mode 100644 index 000000000..e5be399a0 --- /dev/null +++ b/python/cocoindex/utils.py @@ -0,0 +1,9 @@ +from .flow import Flow +from .setting import get_app_namespace + +def get_target_storage_default_name(flow: Flow, target_name: str, delimiter: str = "__") -> str: + """ + Get the default name for a target. + It's used as the underlying storage name (e.g. a table, a collection, etc.) followed by most storage backends, if not explicitly specified. + """ + return get_app_namespace(trailing_delimiter=delimiter) + flow.name + delimiter + target_name