Skip to content

Commit 725e022

Browse files
authored
feat(remove): add a remove_flow() method (#726)
* feat(remove): add a `remove_flow()` method * docs(remove): add docs for `remove_flow()`
1 parent 25353f6 commit 725e022

File tree

6 files changed

+70
-11
lines changed

6 files changed

+70
-11
lines changed

docs/docs/core/flow_def.mdx

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,22 @@ demo_flow = cocoindex.add_flow_def("DemoFlow", demo_flow_def)
4646
In both cases, `demo_flow` will be an object with `cocoindex.Flow` class type.
4747
See [Flow Running](/docs/core/flow_methods) for more details on it.
4848

49+
Sometimes you no longer want to keep states of the flow in memory. We provide a `cocoindex.remove_flow()` method for this purpose:
50+
51+
```python
52+
cocoindex.remove_flow(demo_flow)
53+
```
54+
55+
After it's called, `demo_flow` becomes an invalid object, and you should not call any methods of it.
56+
57+
:::note
58+
59+
This only removes states of the flow from the current process, and it won't affect the persistent states.
60+
61+
If you w
62+
63+
:::
64+
4965
</TabItem>
5066
</Tabs>
5167

docs/docs/core/flow_methods.mdx

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,14 @@ cocoindex.drop_all_flows(report_to_stdout=True)
100100
</TabItem>
101101
</Tabs>
102102

103+
:::note
104+
105+
After dropping the flow, the in-memory `cocoindex.Flow` instance is still valid, and you can call setup methods on it again.
106+
107+
If you want to remove the flow from the current process, you can call `cocoindex.remove_flow(demo_flow)` to do so (see [related doc](/docs/core/flow_def#entry-point)).
108+
109+
:::
110+
103111
## Build / update target data
104112

105113
The major goal of a flow is to perform the transformations on source data and build / update data in the target.

python/cocoindex/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from .flow import flow_def
1212
from .flow import EvaluateAndDumpOptions, GeneratedField
1313
from .flow import FlowLiveUpdater, FlowLiveUpdaterOptions
14-
from .flow import add_flow_def
14+
from .flow import add_flow_def, remove_flow
1515
from .flow import update_all_flows_async, setup_all_flows, drop_all_flows
1616
from .lib import init, start_server, stop
1717
from .llm import LlmSpec, LlmApiType
@@ -54,6 +54,7 @@
5454
"FlowLiveUpdater",
5555
"FlowLiveUpdaterOptions",
5656
"add_flow_def",
57+
"remove_flow",
5758
"update_all_flows_async",
5859
"setup_all_flows",
5960
"drop_all_flows",

python/cocoindex/flow.py

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ class Flow:
624624

625625
_name: str
626626
_full_name: str
627-
_lazy_engine_flow: Callable[[], _engine.Flow]
627+
_lazy_engine_flow: Callable[[], _engine.Flow] | None
628628

629629
def __init__(
630630
self, name: str, full_name: str, engine_flow_creator: Callable[[], _engine.Flow]
@@ -664,18 +664,18 @@ def build_tree(label: str, lines: list[Any]) -> Tree:
664664
return tree
665665

666666
def _get_spec(self, verbose: bool = False) -> _engine.RenderedSpec:
667-
return self._lazy_engine_flow().get_spec(
667+
return self.internal_flow().get_spec(
668668
output_mode="verbose" if verbose else "concise"
669669
)
670670

671671
def _get_schema(self) -> list[tuple[str, str, str]]:
672-
return cast(list[tuple[str, str, str]], self._lazy_engine_flow().get_schema())
672+
return cast(list[tuple[str, str, str]], self.internal_flow().get_schema())
673673

674674
def __str__(self) -> str:
675675
return str(self._get_spec())
676676

677677
def __repr__(self) -> str:
678-
return repr(self._lazy_engine_flow())
678+
return repr(self.internal_flow())
679679

680680
@property
681681
def name(self) -> str:
@@ -715,12 +715,14 @@ def evaluate_and_dump(
715715
"""
716716
Evaluate the flow and dump flow outputs to files.
717717
"""
718-
return self._lazy_engine_flow().evaluate_and_dump(dump_engine_object(options))
718+
return self.internal_flow().evaluate_and_dump(dump_engine_object(options))
719719

720720
def internal_flow(self) -> _engine.Flow:
721721
"""
722722
Get the engine flow.
723723
"""
724+
if self._lazy_engine_flow is None:
725+
raise RuntimeError(f"Flow {self.full_name} is already removed")
724726
return self._lazy_engine_flow()
725727

726728
async def internal_flow_async(self) -> _engine.Flow:
@@ -731,27 +733,32 @@ async def internal_flow_async(self) -> _engine.Flow:
731733

732734
def setup(self, report_to_stdout: bool = False) -> None:
733735
"""
734-
Setup the flow.
736+
Setup persistent backends of the flow.
735737
"""
736738
execution_context.run(self.setup_async(report_to_stdout=report_to_stdout))
737739

738740
async def setup_async(self, report_to_stdout: bool = False) -> None:
739741
"""
740-
Setup the flow. The async version.
742+
Setup persistent backends of the flow. The async version.
741743
"""
742744
await make_setup_bundle([self]).describe_and_apply_async(
743745
report_to_stdout=report_to_stdout
744746
)
745747

746748
def drop(self, report_to_stdout: bool = False) -> None:
747749
"""
748-
Drop the flow.
750+
Drop persistent backends of the flow.
751+
752+
The current instance is still valid after it's called.
753+
For example, you can still call `setup()` after it, to setup the persistent backends again.
754+
755+
Call `cocoindex.remove_flow()` if you want to remove the flow from the current process.
749756
"""
750757
execution_context.run(self.drop_async(report_to_stdout=report_to_stdout))
751758

752759
async def drop_async(self, report_to_stdout: bool = False) -> None:
753760
"""
754-
Drop the flow. The async version.
761+
Drop persistent backends of the flow. The async version.
755762
"""
756763
await make_drop_bundle([self]).describe_and_apply_async(
757764
report_to_stdout=report_to_stdout
@@ -805,6 +812,19 @@ def add_flow_def(name: str, fl_def: Callable[[FlowBuilder, DataScope], None]) ->
805812
return fl
806813

807814

815+
def remove_flow(fl: Flow) -> None:
816+
"""
817+
Remove a flow from the current process to free up resources.
818+
After it's called, methods of the flow should no longer be called.
819+
820+
This will NOT touch the persistent backends of the flow.
821+
"""
822+
_engine.remove_flow_context(fl.full_name)
823+
fl._lazy_engine_flow = None # pylint: disable=protected-access
824+
with _flows_lock:
825+
del _flows[fl.name]
826+
827+
808828
def flow_def(
809829
name: str | None = None,
810830
) -> Callable[[Callable[[FlowBuilder, DataScope], None]], Flow]:

src/lib_context.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,11 @@ impl LibContext {
214214
Ok(flow_ctx)
215215
}
216216

217+
pub fn remove_flow_context(&self, flow_name: &str) {
218+
let mut flows = self.flows.lock().unwrap();
219+
flows.remove(flow_name);
220+
}
221+
217222
pub fn require_persistence_ctx(&self) -> Result<&PersistenceContext> {
218223
self.persistence_ctx
219224
.as_ref()
@@ -268,7 +273,7 @@ pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
268273
})
269274
}
270275

271-
static LIB_CONTEXT: RwLock<Option<Arc<LibContext>>> = RwLock::new(None);
276+
pub static LIB_CONTEXT: RwLock<Option<Arc<LibContext>>> = RwLock::new(None);
272277

273278
pub(crate) fn init_lib_context(settings: settings::Settings) -> Result<()> {
274279
let mut lib_context_locked = LIB_CONTEXT.write().unwrap();

src/py/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,14 @@ fn make_drop_bundle(flow_names: Vec<String>) -> PyResult<SetupChangeBundle> {
462462
Ok(SetupChangeBundle(Arc::new(bundle)))
463463
}
464464

465+
#[pyfunction]
466+
fn remove_flow_context(flow_name: String) {
467+
let lib_context_locked = crate::lib_context::LIB_CONTEXT.read().unwrap();
468+
if let Some(lib_context) = lib_context_locked.as_ref() {
469+
lib_context.remove_flow_context(&flow_name)
470+
}
471+
}
472+
465473
#[pyfunction]
466474
fn add_auth_entry(key: String, value: Pythonized<serde_json::Value>) -> PyResult<()> {
467475
get_auth_registry()
@@ -493,6 +501,7 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
493501
m.add_function(wrap_pyfunction!(flow_names_with_setup_async, m)?)?;
494502
m.add_function(wrap_pyfunction!(make_setup_bundle, m)?)?;
495503
m.add_function(wrap_pyfunction!(make_drop_bundle, m)?)?;
504+
m.add_function(wrap_pyfunction!(remove_flow_context, m)?)?;
496505
m.add_function(wrap_pyfunction!(add_auth_entry, m)?)?;
497506

498507
m.add_class::<builder::flow_builder::FlowBuilder>()?;

0 commit comments

Comments
 (0)