Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/cocoindex/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Cocoindex is a framework for building and running indexing pipelines.
"""
from . import functions, query, sources, storages, cli
from . import functions, query, sources, storages, cli, utils
from .flow import FlowBuilder, DataScope, DataSlice, Flow, flow_def, transform_flow
from .flow import EvaluateAndDumpOptions, GeneratedField
from .flow import update_all_flows_async, FlowLiveUpdater, FlowLiveUpdaterOptions
Expand Down
30 changes: 21 additions & 9 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,8 @@ class _FlowBuilderState:
engine_flow_builder: _engine.FlowBuilder
field_name_builder: _NameBuilder

def __init__(self, /, name: str | None = None):
flow_name = _flow_name_builder.build_name(name, prefix="_flow_")
self.engine_flow_builder = _engine.FlowBuilder(get_full_flow_name(flow_name))
def __init__(self, full_name: str):
self.engine_flow_builder = _engine.FlowBuilder(full_name)
self.field_name_builder = _NameBuilder()

def get_data_slice(self, v: Any) -> _engine.DataSlice:
Expand Down Expand Up @@ -464,9 +463,13 @@ class Flow:
"""
A flow describes an indexing pipeline.
"""
_name: str
_full_name: str
_lazy_engine_flow: Callable[[], _engine.Flow]

def __init__(self, engine_flow_creator: Callable[[], _engine.Flow]):
def __init__(self, name: str, full_name: str, engine_flow_creator: Callable[[], _engine.Flow]):
self._name = name
self._full_name = full_name
engine_flow = None
lock = Lock()
def _lazy_engine_flow() -> _engine.Flow:
Expand Down Expand Up @@ -497,7 +500,7 @@ def build_tree(label: str, lines: list):
tree.children.append(section_node)
return tree

def _get_spec(self, verbose: bool = False) -> list[tuple[str, str, int]]:
def _get_spec(self, verbose: bool = False) -> _engine.RenderedSpec:
return self._lazy_engine_flow().get_spec(output_mode="verbose" if verbose else "concise")

def _get_schema(self) -> list[tuple[str, str, str]]:
Expand All @@ -509,12 +512,19 @@ def __str__(self):
def __repr__(self):
return repr(self._lazy_engine_flow())

@property
def name(self) -> str:
"""
Get the name of the flow.
"""
return self._name

@property
def full_name(self) -> str:
"""
Get the full name of the flow.
"""
return self._lazy_engine_flow().name()
return self._full_name

def update(self) -> _engine.IndexUpdateInfo:
"""
Expand Down Expand Up @@ -555,14 +565,16 @@ def _create_lazy_flow(name: str | None, fl_def: Callable[[FlowBuilder, DataScope
Create a flow without really building it yet.
The flow will be built the first time when it's really needed.
"""
flow_name = _flow_name_builder.build_name(name, prefix="_flow_")
flow_full_name = get_full_flow_name(flow_name)
def _create_engine_flow() -> _engine.Flow:
flow_builder_state = _FlowBuilderState(name=name)
flow_builder_state = _FlowBuilderState(flow_full_name)
root_scope = DataScope(
flow_builder_state, flow_builder_state.engine_flow_builder.root_scope())
fl_def(FlowBuilder(flow_builder_state), root_scope)
return flow_builder_state.engine_flow_builder.build_flow(execution_context.event_loop)

return Flow(_create_engine_flow)
return Flow(flow_name, flow_full_name, _create_engine_flow)


_flows_lock = Lock()
Expand Down Expand Up @@ -695,7 +707,7 @@ async def _flow_info_async(self) -> TransformFlowInfo:
return self._lazy_flow_info

async def _build_flow_info_async(self) -> TransformFlowInfo:
flow_builder_state = _FlowBuilderState(name=self._flow_name)
flow_builder_state = _FlowBuilderState(self._flow_name)
sig = inspect.signature(self._flow_fn)
if len(sig.parameters) != len(self._flow_arg_types):
raise ValueError(
Expand Down
9 changes: 9 additions & 0 deletions python/cocoindex/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .flow import Flow
from .setting import get_app_namespace

def get_target_storage_default_name(flow: Flow, target_name: str, delimiter: str = "__") -> str:
"""
Get the default name for a target.
It's used as the underlying storage name (e.g. a table, a collection, etc.) followed by most storage backends, if not explicitly specified.
"""
return get_app_namespace(trailing_delimiter=delimiter) + flow.name + delimiter + target_name
Loading