Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions python/cocoindex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from rich.table import Table

from . import flow, lib, setting
from .setup import apply_setup_changes, drop_setup, flow_names_with_setup, sync_setup
from .setup import make_setup_bundle, make_drop_bundle, flow_names_with_setup

# Create ServerSettings lazily upon first call, as environment variables may be loaded from files, etc.
COCOINDEX_HOST = "https://cocoindex.io"
Expand Down Expand Up @@ -146,7 +146,7 @@ def cli(env_file: str | None = None) -> None:

if load_dotenv(dotenv_path=dotenv_path):
loaded_env_path = os.path.abspath(dotenv_path)
click.echo(f"Loaded environment variables from: {loaded_env_path}", err=True)
click.echo(f"Loaded environment variables from: {loaded_env_path}\n", err=True)

try:
_initialize_cocoindex_in_process()
Expand Down Expand Up @@ -262,9 +262,10 @@ def setup(app_target: str, force: bool) -> None:
app_ref = _get_app_ref_from_specifier(app_target)
_load_user_app(app_ref)

setup_status = sync_setup()
click.echo(setup_status)
if setup_status.is_up_to_date():
setup_bundle = make_setup_bundle(flow.flow_names())
description, is_up_to_date = setup_bundle.describe()
click.echo(description)
if is_up_to_date:
click.echo("No changes need to be pushed.")
return
if not force and not click.confirm(
Expand All @@ -273,7 +274,7 @@ def setup(app_target: str, force: bool) -> None:
show_default=False,
):
return
apply_setup_changes(setup_status)
setup_bundle.apply(write_to_stdout=True)


@cli.command("drop")
Expand Down Expand Up @@ -348,9 +349,10 @@ def drop(
click.echo("No flows identified for the drop operation.")
return

setup_status = drop_setup(flow_names)
click.echo(setup_status)
if setup_status.is_up_to_date():
setup_bundle = make_drop_bundle(flow_names)
description, is_up_to_date = setup_bundle.describe()
click.echo(description)
if is_up_to_date:
click.echo("No flows need to be dropped.")
return
if not force and not click.confirm(
Expand All @@ -360,7 +362,7 @@ def drop(
):
click.echo("Drop operation aborted by user.")
return
apply_setup_changes(setup_status)
setup_bundle.apply(write_to_stdout=True)


@cli.command()
Expand Down
92 changes: 83 additions & 9 deletions python/cocoindex/setup.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,100 @@
"""
This module provides APIs to manage the setup of flows.
"""

from . import flow
from . import setting
from . import _engine # type: ignore
from .runtime import execution_context


class SetupChangeBundle:
"""
This class represents a bundle of setup changes.
"""

_engine_bundle: _engine.SetupChangeBundle

def __init__(self, _engine_bundle: _engine.SetupChangeBundle):
self._engine_bundle = _engine_bundle

def __str__(self) -> str:
desc, _ = execution_context.run(self._engine_bundle.describe_async())
return desc # type: ignore

def __repr__(self) -> str:
return self.__str__()

def apply(self, write_to_stdout: bool = False) -> None:
"""
Apply the setup changes.
"""
execution_context.run(self.apply_async(write_to_stdout=write_to_stdout))

async def apply_async(self, write_to_stdout: bool = False) -> None:
"""
Apply the setup changes. Async version of `apply`.
"""
await self._engine_bundle.apply_async(write_to_stdout=write_to_stdout)

def describe(self) -> tuple[str, bool]:
"""
Describe the setup changes.
"""
return execution_context.run(self.describe_async()) # type: ignore

async def describe_async(self) -> tuple[str, bool]:
"""
Describe the setup changes. Async version of `describe`.
"""
return await self._engine_bundle.describe_async() # type: ignore

def sync_setup() -> _engine.SetupStatus:

def make_setup_bundle(flow_names: list[str]) -> SetupChangeBundle:
"""
Make a bundle to setup flows with the given names.
"""
flow.ensure_all_flows_built()
return _engine.sync_setup()
return SetupChangeBundle(_engine.make_setup_bundle(flow_names))


def drop_setup(flow_names: list[str]) -> _engine.SetupStatus:
def make_drop_bundle(flow_names: list[str]) -> SetupChangeBundle:
"""
Make a bundle to drop flows with the given names.
"""
flow.ensure_all_flows_built()
return _engine.drop_setup([flow.get_full_flow_name(name) for name in flow_names])
return SetupChangeBundle(_engine.make_drop_bundle(flow_names))


def setup_all_flows(write_to_stdout: bool = False) -> None:
"""
Setup all flows registered in the current process.
"""
make_setup_bundle(flow.flow_names()).apply(write_to_stdout=write_to_stdout)


def drop_all_flows(write_to_stdout: bool = False) -> None:
"""
Drop all flows registered in the current process.
"""
make_drop_bundle(flow.flow_names()).apply(write_to_stdout=write_to_stdout)


def flow_names_with_setup() -> list[str]:
"""
Get the names of all flows that have been setup.
"""
return execution_context.run(flow_names_with_setup_async()) # type: ignore


async def flow_names_with_setup_async() -> list[str]:
"""
Get the names of all flows that have been setup. Async version of `flow_names_with_setup`.
"""
result = []
for name in _engine.flow_names_with_setup():
all_flow_names = await _engine.flow_names_with_setup_async()
for name in all_flow_names:
app_namespace, name = setting.split_app_namespace(name, ".")
if app_namespace == setting.get_app_namespace():
result.append(name)
return result


def apply_setup_changes(setup_status: _engine.SetupStatus) -> None:
_engine.apply_setup_changes(setup_status)
26 changes: 12 additions & 14 deletions src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::{
lib_context::LibContext,
ops::interface::FlowInstanceContext,
py::IntoPyResult,
setup,
};
use crate::{lib_context::FlowContext, py};

Expand Down Expand Up @@ -237,7 +236,6 @@ impl std::fmt::Display for DataCollector {
pub struct FlowBuilder {
lib_context: Arc<LibContext>,
flow_inst_context: Arc<FlowInstanceContext>,
existing_flow_ss: Option<setup::FlowSetupState<setup::ExistingMode>>,

root_op_scope: Arc<OpScope>,
flow_instance_name: String,
Expand All @@ -259,14 +257,6 @@ impl FlowBuilder {
#[new]
pub fn new(name: &str) -> PyResult<Self> {
let lib_context = get_lib_context().into_py_result()?;
let existing_flow_ss = lib_context.persistence_ctx.as_ref().and_then(|ctx| {
ctx.all_setup_states
.read()
.unwrap()
.flows
.get(name)
.cloned()
});
let root_op_scope = OpScope::new(
spec::ROOT_SCOPE_NAME.to_string(),
None,
Expand All @@ -276,7 +266,6 @@ impl FlowBuilder {
let result = Self {
lib_context,
flow_inst_context,
existing_flow_ss,
root_op_scope,
flow_instance_name: name.to_string(),

Expand Down Expand Up @@ -578,9 +567,18 @@ impl FlowBuilder {
get_runtime().block_on(async move {
let analyzed_flow =
super::AnalyzedFlow::from_flow_instance(spec, flow_instance_ctx).await?;
let execution_ctx =
FlowContext::new(Arc::new(analyzed_flow), self.existing_flow_ss.as_ref())
.await?;
let persistence_ctx = self.lib_context.require_persistence_ctx()?;
let execution_ctx = {
let flow_setup_ctx = persistence_ctx.setup_ctx.read().await;
FlowContext::new(
Arc::new(analyzed_flow),
flow_setup_ctx
.all_setup_states
.flows
.get(&self.flow_instance_name),
)
.await?
};
anyhow::Ok(execution_ctx)
})
})
Expand Down
2 changes: 1 addition & 1 deletion src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl SharedAckFn {
async fn update_source(
flow: Arc<builder::AnalyzedFlow>,
plan: Arc<plan::ExecutionPlan>,
execution_ctx: Arc<tokio::sync::OwnedRwLockReadGuard<crate::lib_context::ExecutionContext>>,
execution_ctx: Arc<tokio::sync::OwnedRwLockReadGuard<crate::lib_context::FlowExecutionContext>>,
source_update_stats: Arc<stats::UpdateStats>,
source_idx: usize,
pool: PgPool,
Expand Down
Loading