diff --git a/docs/docs/core/flow_def.mdx b/docs/docs/core/flow_def.mdx index bc0001b8d..ba6c51421 100644 --- a/docs/docs/core/flow_def.mdx +++ b/docs/docs/core/flow_def.mdx @@ -46,6 +46,22 @@ demo_flow = cocoindex.add_flow_def("DemoFlow", demo_flow_def) In both cases, `demo_flow` will be an object with `cocoindex.Flow` class type. See [Flow Running](/docs/core/flow_methods) for more details on it. +Sometimes you no longer want to keep states of the flow in memory. We provide a `cocoindex.remove_flow()` method for this purpose: + +```python +cocoindex.remove_flow(demo_flow) +``` + +After it's called, `demo_flow` becomes an invalid object, and you should not call any methods of it. + +:::note + +This only removes states of the flow from the current process, and it won't affect the persistent states. + +If you w + +::: + diff --git a/docs/docs/core/flow_methods.mdx b/docs/docs/core/flow_methods.mdx index 5f673ccf7..f67425d7d 100644 --- a/docs/docs/core/flow_methods.mdx +++ b/docs/docs/core/flow_methods.mdx @@ -100,6 +100,14 @@ cocoindex.drop_all_flows(report_to_stdout=True) +:::note + +After dropping the flow, the in-memory `cocoindex.Flow` instance is still valid, and you can call setup methods on it again. + +If you want to remove the flow from the current process, you can call `cocoindex.remove_flow(demo_flow)` to do so (see [related doc](/docs/core/flow_def#entry-point)). + +::: + ## Build / update target data The major goal of a flow is to perform the transformations on source data and build / update data in the target. diff --git a/python/cocoindex/__init__.py b/python/cocoindex/__init__.py index 97a911a87..f2730424e 100644 --- a/python/cocoindex/__init__.py +++ b/python/cocoindex/__init__.py @@ -11,7 +11,7 @@ from .flow import flow_def from .flow import EvaluateAndDumpOptions, GeneratedField from .flow import FlowLiveUpdater, FlowLiveUpdaterOptions -from .flow import add_flow_def +from .flow import add_flow_def, remove_flow from .flow import update_all_flows_async, setup_all_flows, drop_all_flows from .lib import init, start_server, stop from .llm import LlmSpec, LlmApiType @@ -54,6 +54,7 @@ "FlowLiveUpdater", "FlowLiveUpdaterOptions", "add_flow_def", + "remove_flow", "update_all_flows_async", "setup_all_flows", "drop_all_flows", diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 5ce6a0cb9..d85198703 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -624,7 +624,7 @@ class Flow: _name: str _full_name: str - _lazy_engine_flow: Callable[[], _engine.Flow] + _lazy_engine_flow: Callable[[], _engine.Flow] | None def __init__( self, name: str, full_name: str, engine_flow_creator: Callable[[], _engine.Flow] @@ -664,18 +664,18 @@ def build_tree(label: str, lines: list[Any]) -> Tree: return tree def _get_spec(self, verbose: bool = False) -> _engine.RenderedSpec: - return self._lazy_engine_flow().get_spec( + return self.internal_flow().get_spec( output_mode="verbose" if verbose else "concise" ) def _get_schema(self) -> list[tuple[str, str, str]]: - return cast(list[tuple[str, str, str]], self._lazy_engine_flow().get_schema()) + return cast(list[tuple[str, str, str]], self.internal_flow().get_schema()) def __str__(self) -> str: return str(self._get_spec()) def __repr__(self) -> str: - return repr(self._lazy_engine_flow()) + return repr(self.internal_flow()) @property def name(self) -> str: @@ -715,12 +715,14 @@ def evaluate_and_dump( """ Evaluate the flow and dump flow outputs to files. """ - return self._lazy_engine_flow().evaluate_and_dump(dump_engine_object(options)) + return self.internal_flow().evaluate_and_dump(dump_engine_object(options)) def internal_flow(self) -> _engine.Flow: """ Get the engine flow. """ + if self._lazy_engine_flow is None: + raise RuntimeError(f"Flow {self.full_name} is already removed") return self._lazy_engine_flow() async def internal_flow_async(self) -> _engine.Flow: @@ -731,13 +733,13 @@ async def internal_flow_async(self) -> _engine.Flow: def setup(self, report_to_stdout: bool = False) -> None: """ - Setup the flow. + Setup persistent backends of the flow. """ execution_context.run(self.setup_async(report_to_stdout=report_to_stdout)) async def setup_async(self, report_to_stdout: bool = False) -> None: """ - Setup the flow. The async version. + Setup persistent backends of the flow. The async version. """ await make_setup_bundle([self]).describe_and_apply_async( report_to_stdout=report_to_stdout @@ -745,13 +747,18 @@ async def setup_async(self, report_to_stdout: bool = False) -> None: def drop(self, report_to_stdout: bool = False) -> None: """ - Drop the flow. + Drop persistent backends of the flow. + + The current instance is still valid after it's called. + For example, you can still call `setup()` after it, to setup the persistent backends again. + + Call `cocoindex.remove_flow()` if you want to remove the flow from the current process. """ execution_context.run(self.drop_async(report_to_stdout=report_to_stdout)) async def drop_async(self, report_to_stdout: bool = False) -> None: """ - Drop the flow. The async version. + Drop persistent backends of the flow. The async version. """ await make_drop_bundle([self]).describe_and_apply_async( report_to_stdout=report_to_stdout @@ -805,6 +812,19 @@ def add_flow_def(name: str, fl_def: Callable[[FlowBuilder, DataScope], None]) -> return fl +def remove_flow(fl: Flow) -> None: + """ + Remove a flow from the current process to free up resources. + After it's called, methods of the flow should no longer be called. + + This will NOT touch the persistent backends of the flow. + """ + _engine.remove_flow_context(fl.full_name) + fl._lazy_engine_flow = None # pylint: disable=protected-access + with _flows_lock: + del _flows[fl.name] + + def flow_def( name: str | None = None, ) -> Callable[[Callable[[FlowBuilder, DataScope], None]], Flow]: diff --git a/src/lib_context.rs b/src/lib_context.rs index 3f5d92a00..59f5e9889 100644 --- a/src/lib_context.rs +++ b/src/lib_context.rs @@ -214,6 +214,11 @@ impl LibContext { Ok(flow_ctx) } + pub fn remove_flow_context(&self, flow_name: &str) { + let mut flows = self.flows.lock().unwrap(); + flows.remove(flow_name); + } + pub fn require_persistence_ctx(&self) -> Result<&PersistenceContext> { self.persistence_ctx .as_ref() @@ -268,7 +273,7 @@ pub fn create_lib_context(settings: settings::Settings) -> Result { }) } -static LIB_CONTEXT: RwLock>> = RwLock::new(None); +pub static LIB_CONTEXT: RwLock>> = RwLock::new(None); pub(crate) fn init_lib_context(settings: settings::Settings) -> Result<()> { let mut lib_context_locked = LIB_CONTEXT.write().unwrap(); diff --git a/src/py/mod.rs b/src/py/mod.rs index bf6568f24..5e5d60024 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -462,6 +462,14 @@ fn make_drop_bundle(flow_names: Vec) -> PyResult { Ok(SetupChangeBundle(Arc::new(bundle))) } +#[pyfunction] +fn remove_flow_context(flow_name: String) { + let lib_context_locked = crate::lib_context::LIB_CONTEXT.read().unwrap(); + if let Some(lib_context) = lib_context_locked.as_ref() { + lib_context.remove_flow_context(&flow_name) + } +} + #[pyfunction] fn add_auth_entry(key: String, value: Pythonized) -> PyResult<()> { get_auth_registry() @@ -493,6 +501,7 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(flow_names_with_setup_async, m)?)?; m.add_function(wrap_pyfunction!(make_setup_bundle, m)?)?; m.add_function(wrap_pyfunction!(make_drop_bundle, m)?)?; + m.add_function(wrap_pyfunction!(remove_flow_context, m)?)?; m.add_function(wrap_pyfunction!(add_auth_entry, m)?)?; m.add_class::()?;