Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/docs/core/flow_def.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ demo_flow = cocoindex.add_flow_def("DemoFlow", demo_flow_def)
In both cases, `demo_flow` will be an object with `cocoindex.Flow` class type.
See [Flow Running](/docs/core/flow_methods) for more details on it.

Sometimes you no longer want to keep states of the flow in memory. We provide a `cocoindex.remove_flow()` method for this purpose:

```python
cocoindex.remove_flow(demo_flow)
```

After it's called, `demo_flow` becomes an invalid object, and you should not call any methods of it.

:::note

This only removes states of the flow from the current process, and it won't affect the persistent states.

If you w

:::

</TabItem>
</Tabs>

Expand Down
8 changes: 8 additions & 0 deletions docs/docs/core/flow_methods.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ cocoindex.drop_all_flows(report_to_stdout=True)
</TabItem>
</Tabs>

:::note

After dropping the flow, the in-memory `cocoindex.Flow` instance is still valid, and you can call setup methods on it again.

If you want to remove the flow from the current process, you can call `cocoindex.remove_flow(demo_flow)` to do so (see [related doc](/docs/core/flow_def#entry-point)).

:::

## Build / update target data

The major goal of a flow is to perform the transformations on source data and build / update data in the target.
Expand Down
3 changes: 2 additions & 1 deletion python/cocoindex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from .flow import flow_def
from .flow import EvaluateAndDumpOptions, GeneratedField
from .flow import FlowLiveUpdater, FlowLiveUpdaterOptions
from .flow import add_flow_def
from .flow import add_flow_def, remove_flow
from .flow import update_all_flows_async, setup_all_flows, drop_all_flows
from .lib import init, start_server, stop
from .llm import LlmSpec, LlmApiType
Expand Down Expand Up @@ -54,6 +54,7 @@
"FlowLiveUpdater",
"FlowLiveUpdaterOptions",
"add_flow_def",
"remove_flow",
"update_all_flows_async",
"setup_all_flows",
"drop_all_flows",
Expand Down
38 changes: 29 additions & 9 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ class Flow:

_name: str
_full_name: str
_lazy_engine_flow: Callable[[], _engine.Flow]
_lazy_engine_flow: Callable[[], _engine.Flow] | None

def __init__(
self, name: str, full_name: str, engine_flow_creator: Callable[[], _engine.Flow]
Expand Down Expand Up @@ -664,18 +664,18 @@ def build_tree(label: str, lines: list[Any]) -> Tree:
return tree

def _get_spec(self, verbose: bool = False) -> _engine.RenderedSpec:
return self._lazy_engine_flow().get_spec(
return self.internal_flow().get_spec(
output_mode="verbose" if verbose else "concise"
)

def _get_schema(self) -> list[tuple[str, str, str]]:
return cast(list[tuple[str, str, str]], self._lazy_engine_flow().get_schema())
return cast(list[tuple[str, str, str]], self.internal_flow().get_schema())

def __str__(self) -> str:
return str(self._get_spec())

def __repr__(self) -> str:
return repr(self._lazy_engine_flow())
return repr(self.internal_flow())

@property
def name(self) -> str:
Expand Down Expand Up @@ -715,12 +715,14 @@ def evaluate_and_dump(
"""
Evaluate the flow and dump flow outputs to files.
"""
return self._lazy_engine_flow().evaluate_and_dump(dump_engine_object(options))
return self.internal_flow().evaluate_and_dump(dump_engine_object(options))

def internal_flow(self) -> _engine.Flow:
"""
Get the engine flow.
"""
if self._lazy_engine_flow is None:
raise RuntimeError(f"Flow {self.full_name} is already removed")
return self._lazy_engine_flow()

async def internal_flow_async(self) -> _engine.Flow:
Expand All @@ -731,27 +733,32 @@ async def internal_flow_async(self) -> _engine.Flow:

def setup(self, report_to_stdout: bool = False) -> None:
"""
Setup the flow.
Setup persistent backends of the flow.
"""
execution_context.run(self.setup_async(report_to_stdout=report_to_stdout))

async def setup_async(self, report_to_stdout: bool = False) -> None:
"""
Setup the flow. The async version.
Setup persistent backends of the flow. The async version.
"""
await make_setup_bundle([self]).describe_and_apply_async(
report_to_stdout=report_to_stdout
)

def drop(self, report_to_stdout: bool = False) -> None:
"""
Drop the flow.
Drop persistent backends of the flow.

The current instance is still valid after it's called.
For example, you can still call `setup()` after it, to setup the persistent backends again.

Call `cocoindex.remove_flow()` if you want to remove the flow from the current process.
"""
execution_context.run(self.drop_async(report_to_stdout=report_to_stdout))

async def drop_async(self, report_to_stdout: bool = False) -> None:
"""
Drop the flow. The async version.
Drop persistent backends of the flow. The async version.
"""
await make_drop_bundle([self]).describe_and_apply_async(
report_to_stdout=report_to_stdout
Expand Down Expand Up @@ -805,6 +812,19 @@ def add_flow_def(name: str, fl_def: Callable[[FlowBuilder, DataScope], None]) ->
return fl


def remove_flow(fl: Flow) -> None:
"""
Remove a flow from the current process to free up resources.
After it's called, methods of the flow should no longer be called.

This will NOT touch the persistent backends of the flow.
"""
_engine.remove_flow_context(fl.full_name)
fl._lazy_engine_flow = None # pylint: disable=protected-access
with _flows_lock:
del _flows[fl.name]


def flow_def(
name: str | None = None,
) -> Callable[[Callable[[FlowBuilder, DataScope], None]], Flow]:
Expand Down
7 changes: 6 additions & 1 deletion src/lib_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ impl LibContext {
Ok(flow_ctx)
}

pub fn remove_flow_context(&self, flow_name: &str) {
let mut flows = self.flows.lock().unwrap();
flows.remove(flow_name);
}

pub fn require_persistence_ctx(&self) -> Result<&PersistenceContext> {
self.persistence_ctx
.as_ref()
Expand Down Expand Up @@ -268,7 +273,7 @@ pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
})
}

static LIB_CONTEXT: RwLock<Option<Arc<LibContext>>> = RwLock::new(None);
pub static LIB_CONTEXT: RwLock<Option<Arc<LibContext>>> = RwLock::new(None);

pub(crate) fn init_lib_context(settings: settings::Settings) -> Result<()> {
let mut lib_context_locked = LIB_CONTEXT.write().unwrap();
Expand Down
9 changes: 9 additions & 0 deletions src/py/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,14 @@ fn make_drop_bundle(flow_names: Vec<String>) -> PyResult<SetupChangeBundle> {
Ok(SetupChangeBundle(Arc::new(bundle)))
}

#[pyfunction]
fn remove_flow_context(flow_name: String) {
let lib_context_locked = crate::lib_context::LIB_CONTEXT.read().unwrap();
if let Some(lib_context) = lib_context_locked.as_ref() {
lib_context.remove_flow_context(&flow_name)
}
}

#[pyfunction]
fn add_auth_entry(key: String, value: Pythonized<serde_json::Value>) -> PyResult<()> {
get_auth_registry()
Expand Down Expand Up @@ -493,6 +501,7 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(flow_names_with_setup_async, m)?)?;
m.add_function(wrap_pyfunction!(make_setup_bundle, m)?)?;
m.add_function(wrap_pyfunction!(make_drop_bundle, m)?)?;
m.add_function(wrap_pyfunction!(remove_flow_context, m)?)?;
m.add_function(wrap_pyfunction!(add_auth_entry, m)?)?;

m.add_class::<builder::flow_builder::FlowBuilder>()?;
Expand Down