Skip to content

Commit cc032b1

Browse files
committed
feat(setup): provide internal logic for in-place setup
1 parent 8daa081 commit cc032b1

File tree

9 files changed

+627
-340
lines changed

9 files changed

+627
-340
lines changed

python/cocoindex/cli.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from rich.table import Table
1818

1919
from . import flow, lib, setting
20-
from .setup import apply_setup_changes, drop_setup, flow_names_with_setup, sync_setup
20+
from .setup import make_setup_bundle, make_drop_bundle, flow_names_with_setup
2121

2222
# Create ServerSettings lazily upon first call, as environment variables may be loaded from files, etc.
2323
COCOINDEX_HOST = "https://cocoindex.io"
@@ -146,7 +146,7 @@ def cli(env_file: str | None = None) -> None:
146146

147147
if load_dotenv(dotenv_path=dotenv_path):
148148
loaded_env_path = os.path.abspath(dotenv_path)
149-
click.echo(f"Loaded environment variables from: {loaded_env_path}", err=True)
149+
click.echo(f"Loaded environment variables from: {loaded_env_path}\n", err=True)
150150

151151
try:
152152
_initialize_cocoindex_in_process()
@@ -262,9 +262,10 @@ def setup(app_target: str, force: bool) -> None:
262262
app_ref = _get_app_ref_from_specifier(app_target)
263263
_load_user_app(app_ref)
264264

265-
setup_status = sync_setup()
266-
click.echo(setup_status)
267-
if setup_status.is_up_to_date():
265+
setup_bundle = make_setup_bundle(flow.flow_names())
266+
description, is_up_to_date = setup_bundle.describe()
267+
click.echo(description)
268+
if is_up_to_date:
268269
click.echo("No changes need to be pushed.")
269270
return
270271
if not force and not click.confirm(
@@ -273,7 +274,7 @@ def setup(app_target: str, force: bool) -> None:
273274
show_default=False,
274275
):
275276
return
276-
apply_setup_changes(setup_status)
277+
setup_bundle.apply(write_to_stdout=True)
277278

278279

279280
@cli.command("drop")
@@ -348,9 +349,10 @@ def drop(
348349
click.echo("No flows identified for the drop operation.")
349350
return
350351

351-
setup_status = drop_setup(flow_names)
352-
click.echo(setup_status)
353-
if setup_status.is_up_to_date():
352+
setup_bundle = make_drop_bundle(flow_names)
353+
description, is_up_to_date = setup_bundle.describe()
354+
click.echo(description)
355+
if is_up_to_date:
354356
click.echo("No flows need to be dropped.")
355357
return
356358
if not force and not click.confirm(
@@ -360,7 +362,7 @@ def drop(
360362
):
361363
click.echo("Drop operation aborted by user.")
362364
return
363-
apply_setup_changes(setup_status)
365+
setup_bundle.apply(write_to_stdout=True)
364366

365367

366368
@cli.command()

python/cocoindex/setup.py

Lines changed: 83 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,100 @@
1+
"""
2+
This module provides APIs to manage the setup of flows.
3+
"""
4+
15
from . import flow
26
from . import setting
37
from . import _engine # type: ignore
8+
from .runtime import execution_context
9+
10+
11+
class SetupChangeBundle:
12+
"""
13+
This class represents a bundle of setup changes.
14+
"""
15+
16+
_engine_bundle: _engine.SetupChangeBundle
17+
18+
def __init__(self, _engine_bundle: _engine.SetupChangeBundle):
19+
self._engine_bundle = _engine_bundle
20+
21+
def __str__(self) -> str:
22+
desc, _ = execution_context.run(self._engine_bundle.describe_async())
23+
return desc # type: ignore
24+
25+
def __repr__(self) -> str:
26+
return self.__str__()
27+
28+
def apply(self, write_to_stdout: bool = False) -> None:
29+
"""
30+
Apply the setup changes.
31+
"""
32+
execution_context.run(self.apply_async(write_to_stdout=write_to_stdout))
33+
34+
async def apply_async(self, write_to_stdout: bool = False) -> None:
35+
"""
36+
Apply the setup changes. Async version of `apply`.
37+
"""
38+
await self._engine_bundle.apply_async(write_to_stdout=write_to_stdout)
39+
40+
def describe(self) -> tuple[str, bool]:
41+
"""
42+
Describe the setup changes.
43+
"""
44+
return execution_context.run(self.describe_async()) # type: ignore
445

46+
async def describe_async(self) -> tuple[str, bool]:
47+
"""
48+
Describe the setup changes. Async version of `describe`.
49+
"""
50+
return await self._engine_bundle.describe_async() # type: ignore
551

6-
def sync_setup() -> _engine.SetupStatus:
52+
53+
def make_setup_bundle(flow_names: list[str]) -> SetupChangeBundle:
54+
"""
55+
Make a bundle to setup flows with the given names.
56+
"""
757
flow.ensure_all_flows_built()
8-
return _engine.sync_setup()
58+
return SetupChangeBundle(_engine.make_setup_bundle(flow_names))
959

1060

11-
def drop_setup(flow_names: list[str]) -> _engine.SetupStatus:
61+
def make_drop_bundle(flow_names: list[str]) -> SetupChangeBundle:
62+
"""
63+
Make a bundle to drop flows with the given names.
64+
"""
1265
flow.ensure_all_flows_built()
13-
return _engine.drop_setup([flow.get_full_flow_name(name) for name in flow_names])
66+
return SetupChangeBundle(_engine.make_drop_bundle(flow_names))
67+
68+
69+
def setup_all_flows(write_to_stdout: bool = False) -> None:
70+
"""
71+
Setup all flows registered in the current process.
72+
"""
73+
make_setup_bundle(flow.flow_names()).apply(write_to_stdout=write_to_stdout)
74+
75+
76+
def drop_all_flows(write_to_stdout: bool = False) -> None:
77+
"""
78+
Drop all flows registered in the current process.
79+
"""
80+
make_drop_bundle(flow.flow_names()).apply(write_to_stdout=write_to_stdout)
1481

1582

1683
def flow_names_with_setup() -> list[str]:
84+
"""
85+
Get the names of all flows that have been setup.
86+
"""
87+
return execution_context.run(flow_names_with_setup_async()) # type: ignore
88+
89+
90+
async def flow_names_with_setup_async() -> list[str]:
91+
"""
92+
Get the names of all flows that have been setup. Async version of `flow_names_with_setup`.
93+
"""
1794
result = []
18-
for name in _engine.flow_names_with_setup():
95+
all_flow_names = await _engine.flow_names_with_setup_async()
96+
for name in all_flow_names:
1997
app_namespace, name = setting.split_app_namespace(name, ".")
2098
if app_namespace == setting.get_app_namespace():
2199
result.append(name)
22100
return result
23-
24-
25-
def apply_setup_changes(setup_status: _engine.SetupStatus) -> None:
26-
_engine.apply_setup_changes(setup_status)

src/builder/flow_builder.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use crate::{
1616
lib_context::LibContext,
1717
ops::interface::FlowInstanceContext,
1818
py::IntoPyResult,
19-
setup,
2019
};
2120
use crate::{lib_context::FlowContext, py};
2221

@@ -237,7 +236,6 @@ impl std::fmt::Display for DataCollector {
237236
pub struct FlowBuilder {
238237
lib_context: Arc<LibContext>,
239238
flow_inst_context: Arc<FlowInstanceContext>,
240-
existing_flow_ss: Option<setup::FlowSetupState<setup::ExistingMode>>,
241239

242240
root_op_scope: Arc<OpScope>,
243241
flow_instance_name: String,
@@ -259,14 +257,6 @@ impl FlowBuilder {
259257
#[new]
260258
pub fn new(name: &str) -> PyResult<Self> {
261259
let lib_context = get_lib_context().into_py_result()?;
262-
let existing_flow_ss = lib_context.persistence_ctx.as_ref().and_then(|ctx| {
263-
ctx.all_setup_states
264-
.read()
265-
.unwrap()
266-
.flows
267-
.get(name)
268-
.cloned()
269-
});
270260
let root_op_scope = OpScope::new(
271261
spec::ROOT_SCOPE_NAME.to_string(),
272262
None,
@@ -276,7 +266,6 @@ impl FlowBuilder {
276266
let result = Self {
277267
lib_context,
278268
flow_inst_context,
279-
existing_flow_ss,
280269
root_op_scope,
281270
flow_instance_name: name.to_string(),
282271

@@ -578,9 +567,18 @@ impl FlowBuilder {
578567
get_runtime().block_on(async move {
579568
let analyzed_flow =
580569
super::AnalyzedFlow::from_flow_instance(spec, flow_instance_ctx).await?;
581-
let execution_ctx =
582-
FlowContext::new(Arc::new(analyzed_flow), self.existing_flow_ss.as_ref())
583-
.await?;
570+
let persistence_ctx = self.lib_context.require_persistence_ctx()?;
571+
let execution_ctx = {
572+
let flow_setup_ctx = persistence_ctx.setup_ctx.read().await;
573+
FlowContext::new(
574+
Arc::new(analyzed_flow),
575+
flow_setup_ctx
576+
.all_setup_states
577+
.flows
578+
.get(&self.flow_instance_name),
579+
)
580+
.await?
581+
};
584582
anyhow::Ok(execution_ctx)
585583
})
586584
})

src/execution/live_updater.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl SharedAckFn {
6262
async fn update_source(
6363
flow: Arc<builder::AnalyzedFlow>,
6464
plan: Arc<plan::ExecutionPlan>,
65-
execution_ctx: Arc<tokio::sync::OwnedRwLockReadGuard<crate::lib_context::ExecutionContext>>,
65+
execution_ctx: Arc<tokio::sync::OwnedRwLockReadGuard<crate::lib_context::FlowExecutionContext>>,
6666
source_update_stats: Arc<stats::UpdateStats>,
6767
source_idx: usize,
6868
pool: PgPool,

0 commit comments

Comments
 (0)