Skip to content

Commit 834a3df

Browse files
authored
Make setup management more flexible and straightforward. (#261)
- Explicit support for setup deletions, no matter if present in the current process. - Clearly show persisted setup in `ls` even if not in the current process.
1 parent bd84f71 commit 834a3df

File tree

6 files changed

+124
-47
lines changed

6 files changed

+124
-47
lines changed

python/cocoindex/cli.py

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import datetime
44

55
from . import flow, lib
6-
from .setup import check_setup_status, CheckSetupStatusOptions, apply_setup_changes
6+
from .setup import sync_setup, drop_setup, flow_names_with_setup, apply_setup_changes
77

88
@click.group()
99
def cli():
@@ -12,12 +12,42 @@ def cli():
1212
"""
1313

1414
@cli.command()
15-
def ls():
15+
@click.option(
16+
"-a", "--all", "show_all", is_flag=True, show_default=True, default=False,
17+
help="Also show all flows with persisted setup, even if not defined in the current process.")
18+
def ls(show_all: bool):
1619
"""
17-
List all available flows.
20+
List all flows.
1821
"""
19-
for name in flow.flow_names():
20-
click.echo(name)
22+
current_flow_names = [fl.name for fl in flow.flows()]
23+
persisted_flow_names = flow_names_with_setup()
24+
remaining_persisted_flow_names = set(persisted_flow_names)
25+
26+
has_missing_setup = False
27+
has_extra_setup = False
28+
29+
for name in current_flow_names:
30+
if name in remaining_persisted_flow_names:
31+
remaining_persisted_flow_names.remove(name)
32+
suffix = ''
33+
else:
34+
suffix = ' [+]'
35+
has_missing_setup = True
36+
click.echo(f'{name}{suffix}')
37+
38+
if show_all:
39+
for name in persisted_flow_names:
40+
if name in remaining_persisted_flow_names:
41+
click.echo(f'{name} [?]')
42+
has_extra_setup = True
43+
44+
if has_missing_setup or has_extra_setup:
45+
click.echo('')
46+
click.echo('Notes:')
47+
if has_missing_setup:
48+
click.echo(' [+]: Flows present in the current process, but missing setup.')
49+
if has_extra_setup:
50+
click.echo(' [?]: Flows with persisted setup, but not in the current process.')
2151

2252
@cli.command()
2353
@click.argument("flow_name", type=str, required=False)
@@ -28,17 +58,41 @@ def show(flow_name: str | None):
2858
click.echo(str(_flow_by_name(flow_name)))
2959

3060
@cli.command()
61+
def setup():
62+
"""
63+
Check and apply backend setup changes for flows, including the internal and target storage
64+
(to export).
65+
"""
66+
status_check = sync_setup()
67+
click.echo(status_check)
68+
if status_check.is_up_to_date():
69+
click.echo("No changes need to be pushed.")
70+
return
71+
if not click.confirm(
72+
"Changes need to be pushed. Continue? [yes/N]", default=False, show_default=False):
73+
return
74+
apply_setup_changes(status_check)
75+
76+
@cli.command()
77+
@click.argument("flow_name", type=str, nargs=-1)
3178
@click.option(
32-
"-D", "--delete_legacy_flows", is_flag=True, show_default=True, default=False,
33-
help="Also check / delete flows existing before but no longer exist.")
34-
def setup(delete_legacy_flows):
79+
"-a", "--all", "drop_all", is_flag=True, show_default=True, default=False,
80+
help="Drop all flows with persisted setup, even if not defined in the current process.")
81+
def drop(flow_name: tuple[str, ...], drop_all: bool):
3582
"""
36-
Check and apply backend setup changes for flows, including the internal and target storage (to export).
83+
Drop the backend for specified flows.
84+
If no flow is specified, all flows defined in the current process will be dropped.
3785
"""
38-
options = CheckSetupStatusOptions(delete_legacy_flows=delete_legacy_flows)
39-
status_check = check_setup_status(options)
40-
print(status_check)
86+
if drop_all:
87+
flow_names = flow_names_with_setup()
88+
elif len(flow_name) == 0:
89+
flow_names = [fl.name for fl in flow.flows()]
90+
else:
91+
flow_names = list(flow_name)
92+
status_check = drop_setup(flow_names)
93+
click.echo(status_check)
4194
if status_check.is_up_to_date():
95+
click.echo("No flows need to be dropped.")
4296
return
4397
if not click.confirm(
4498
"Changes need to be pushed. Continue? [yes/N]", default=False, show_default=False):

python/cocoindex/setup.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
1-
from dataclasses import dataclass
2-
31
from . import flow
42
from . import _engine
53

6-
@dataclass
7-
class CheckSetupStatusOptions:
8-
delete_legacy_flows: bool
4+
def sync_setup() -> _engine.SetupStatusCheck:
5+
flow.ensure_all_flows_built()
6+
return _engine.sync_setup()
97

10-
def check_setup_status(options: CheckSetupStatusOptions) -> _engine.SetupStatusCheck:
8+
def drop_setup(flow_names: list[str]) -> _engine.SetupStatusCheck:
119
flow.ensure_all_flows_built()
12-
return _engine.check_setup_status(vars(options))
10+
return _engine.drop_setup(flow_names)
11+
12+
def flow_names_with_setup() -> list[str]:
13+
return _engine.flow_names_with_setup()
1314

1415
def apply_setup_changes(status_check: _engine.SetupStatusCheck):
1516
_engine.apply_setup_changes(status_check)

src/builder/flow_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ impl FlowBuilder {
340340
pub fn new(name: &str) -> PyResult<Self> {
341341
let lib_context = get_lib_context().into_py_result()?;
342342
let existing_flow_ss = lib_context
343-
.combined_setup_states
343+
.all_setup_states
344344
.read()
345345
.unwrap()
346346
.flows

src/lib_context.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap
6363
pub struct LibContext {
6464
pub pool: PgPool,
6565
pub flows: Mutex<BTreeMap<String, Arc<FlowContext>>>,
66-
pub combined_setup_states: RwLock<setup::AllSetupState<setup::ExistingMode>>,
66+
pub all_setup_states: RwLock<setup::AllSetupState<setup::ExistingMode>>,
6767
}
6868

6969
impl LibContext {
@@ -94,14 +94,14 @@ pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
9494
pyo3_async_runtimes::tokio::init_with_runtime(get_runtime()).unwrap();
9595
});
9696

97-
let (pool, all_css) = get_runtime().block_on(async {
97+
let (pool, all_setup_states) = get_runtime().block_on(async {
9898
let pool = PgPool::connect(&settings.database_url).await?;
9999
let existing_ss = setup::get_existing_setup_state(&pool).await?;
100100
anyhow::Ok((pool, existing_ss))
101101
})?;
102102
Ok(LibContext {
103103
pool,
104-
combined_setup_states: RwLock::new(all_css),
104+
all_setup_states: RwLock::new(all_setup_states),
105105
flows: Mutex::new(BTreeMap::new()),
106106
})
107107
}

src/py/mod.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -278,17 +278,30 @@ impl SetupStatusCheck {
278278
}
279279

280280
#[pyfunction]
281-
fn check_setup_status(
282-
options: Pythonized<setup::CheckSetupStatusOptions>,
283-
) -> PyResult<SetupStatusCheck> {
281+
fn sync_setup() -> PyResult<SetupStatusCheck> {
284282
let lib_context = get_lib_context().into_py_result()?;
285283
let flows = lib_context.flows.lock().unwrap();
286-
let all_css = lib_context.combined_setup_states.read().unwrap();
287-
let setup_status =
288-
setup::check_setup_status(&flows, &all_css, options.into_inner()).into_py_result()?;
284+
let all_setup_states = lib_context.all_setup_states.read().unwrap();
285+
let setup_status = setup::sync_setup(&flows, &all_setup_states).into_py_result()?;
289286
Ok(SetupStatusCheck(setup_status))
290287
}
291288

289+
#[pyfunction]
290+
fn drop_setup(flow_names: Vec<String>) -> PyResult<SetupStatusCheck> {
291+
let lib_context = get_lib_context().into_py_result()?;
292+
let all_setup_states = lib_context.all_setup_states.read().unwrap();
293+
let setup_status = setup::drop_setup(flow_names, &all_setup_states).into_py_result()?;
294+
Ok(SetupStatusCheck(setup_status))
295+
}
296+
297+
#[pyfunction]
298+
fn flow_names_with_setup() -> PyResult<Vec<String>> {
299+
let lib_context = get_lib_context().into_py_result()?;
300+
let all_setup_states = lib_context.all_setup_states.read().unwrap();
301+
let flow_names = all_setup_states.flows.keys().cloned().collect();
302+
Ok(flow_names)
303+
}
304+
292305
#[pyfunction]
293306
fn apply_setup_changes(py: Python<'_>, setup_status: &SetupStatusCheck) -> PyResult<()> {
294307
py.allow_threads(|| {
@@ -314,8 +327,10 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
314327
m.add_function(wrap_pyfunction!(start_server, m)?)?;
315328
m.add_function(wrap_pyfunction!(stop, m)?)?;
316329
m.add_function(wrap_pyfunction!(register_function_factory, m)?)?;
317-
m.add_function(wrap_pyfunction!(check_setup_status, m)?)?;
330+
m.add_function(wrap_pyfunction!(sync_setup, m)?)?;
331+
m.add_function(wrap_pyfunction!(drop_setup, m)?)?;
318332
m.add_function(wrap_pyfunction!(apply_setup_changes, m)?)?;
333+
m.add_function(wrap_pyfunction!(flow_names_with_setup, m)?)?;
319334

320335
m.add_class::<builder::flow_builder::FlowBuilder>()?;
321336
m.add_class::<builder::flow_builder::DataCollector>()?;

src/setup/driver.rs

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -285,16 +285,9 @@ pub fn check_flow_setup_status(
285285
})
286286
}
287287

288-
#[derive(Debug, Deserialize, Default)]
289-
pub struct CheckSetupStatusOptions {
290-
/// If true, also check / clean up flows existing before but no longer exist.
291-
pub delete_legacy_flows: bool,
292-
}
293-
294-
pub fn check_setup_status(
288+
pub fn sync_setup(
295289
flows: &BTreeMap<String, Arc<FlowContext>>,
296290
all_setup_state: &AllSetupState<ExistingMode>,
297-
options: CheckSetupStatusOptions,
298291
) -> Result<AllSetupStatusCheck> {
299292
let mut flow_status_checks = BTreeMap::new();
300293
for (flow_name, flow_context) in flows {
@@ -304,19 +297,33 @@ pub fn check_setup_status(
304297
check_flow_setup_status(Some(&flow_context.flow.desired_state), existing_state)?,
305298
);
306299
}
307-
if options.delete_legacy_flows {
308-
for (flow_name, existing_state) in all_setup_state.flows.iter() {
309-
if !flows.contains_key(flow_name) {
310-
flow_status_checks.insert(
311-
flow_name.clone(),
312-
check_flow_setup_status(None, Some(existing_state))?,
313-
);
314-
}
300+
Ok(AllSetupStatusCheck {
301+
metadata_table: db_metadata::MetadataTableSetup {
302+
metadata_table_missing: !all_setup_state.has_metadata_table,
303+
},
304+
flows: flow_status_checks,
305+
})
306+
}
307+
308+
pub fn drop_setup(
309+
flow_names: impl IntoIterator<Item = String>,
310+
all_setup_state: &AllSetupState<ExistingMode>,
311+
) -> Result<AllSetupStatusCheck> {
312+
if !all_setup_state.has_metadata_table {
313+
api_bail!("CocoIndex metadata table is missing.");
314+
}
315+
let mut flow_status_checks = BTreeMap::new();
316+
for flow_name in flow_names.into_iter() {
317+
if let Some(existing_state) = all_setup_state.flows.get(&flow_name) {
318+
flow_status_checks.insert(
319+
flow_name,
320+
check_flow_setup_status(None, Some(existing_state))?,
321+
);
315322
}
316323
}
317324
Ok(AllSetupStatusCheck {
318325
metadata_table: db_metadata::MetadataTableSetup {
319-
metadata_table_missing: !all_setup_state.has_metadata_table,
326+
metadata_table_missing: false,
320327
},
321328
flows: flow_status_checks,
322329
})

0 commit comments

Comments
 (0)