Skip to content

Commit aaecf2c

Browse files
authored
feat: plumbing FlowInstanceContext to apply_setup_changes() (#805)
1 parent 2926811 commit aaecf2c

File tree

9 files changed

+30
-17
lines changed

9 files changed

+30
-17
lines changed

src/builder/analyzed_flow.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ pub struct AnalyzedFlow {
88
pub data_schema: schema::FlowSchema,
99
pub setup_state: exec_ctx::AnalyzedSetupState,
1010

11+
pub flow_instance_ctx: Arc<FlowInstanceContext>,
12+
1113
/// It's None if the flow is not up to date
1214
pub execution_plan: Shared<BoxFuture<'static, Result<Arc<plan::ExecutionPlan>, SharedError>>>,
1315
}
@@ -18,7 +20,7 @@ impl AnalyzedFlow {
1820
flow_instance_ctx: Arc<FlowInstanceContext>,
1921
) -> Result<Self> {
2022
let (data_schema, setup_state, execution_plan_fut) =
21-
analyzer::analyze_flow(&flow_instance, flow_instance_ctx).await?;
23+
analyzer::analyze_flow(&flow_instance, flow_instance_ctx.clone()).await?;
2224
let execution_plan = async move {
2325
shared_ok(Arc::new(
2426
execution_plan_fut.await.map_err(SharedError::new)?,
@@ -30,6 +32,7 @@ impl AnalyzedFlow {
3032
flow_instance,
3133
data_schema,
3234
setup_state,
35+
flow_instance_ctx,
3336
execution_plan,
3437
};
3538
Ok(result)

src/lib_context.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ async fn build_setup_context(
3333
let setup_status = setup::check_flow_setup_status(
3434
Some(&setup_execution_context.setup_state),
3535
existing_flow_ss,
36+
&analyzed_flow.flow_instance_ctx,
3637
)
3738
.await?;
3839

src/ops/factory_bases.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
320320
key: Self::Key,
321321
desired_state: Option<Self::SetupState>,
322322
existing_states: setup::CombinedState<Self::SetupState>,
323-
auth_registry: &Arc<AuthRegistry>,
323+
flow_instance_ctx: Arc<FlowInstanceContext>,
324324
) -> Result<Self::SetupStatus>;
325325

326326
fn check_state_compatibility(
@@ -420,7 +420,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
420420
key: &serde_json::Value,
421421
desired_state: Option<serde_json::Value>,
422422
existing_states: setup::CombinedState<serde_json::Value>,
423-
auth_registry: &Arc<AuthRegistry>,
423+
flow_instance_ctx: Arc<FlowInstanceContext>,
424424
) -> Result<Box<dyn setup::ResourceSetupStatus>> {
425425
let key: T::Key = Self::deserialize_setup_key(key.clone())?;
426426
let desired_state: Option<T::SetupState> = desired_state
@@ -432,7 +432,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
432432
key,
433433
desired_state,
434434
existing_states,
435-
auth_registry,
435+
flow_instance_ctx,
436436
)
437437
.await?;
438438
Ok(Box::new(setup_status))

src/ops/interface.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ pub trait ExportTargetFactory: Send + Sync {
272272
key: &serde_json::Value,
273273
desired_state: Option<serde_json::Value>,
274274
existing_states: setup::CombinedState<serde_json::Value>,
275-
auth_registry: &Arc<AuthRegistry>,
275+
context: Arc<FlowInstanceContext>,
276276
) -> Result<Box<dyn setup::ResourceSetupStatus>>;
277277

278278
/// Normalize the key. e.g. the JSON format may change (after code change, e.g. new optional field or field ordering), even if the underlying value is not changed.

src/ops/targets/kuzu.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -847,7 +847,7 @@ impl StorageFactoryBase for Factory {
847847
_key: KuzuGraphElement,
848848
desired: Option<SetupState>,
849849
existing: CombinedState<SetupState>,
850-
_auth_registry: &Arc<AuthRegistry>,
850+
_flow_instance_ctx: Arc<FlowInstanceContext>,
851851
) -> Result<Self::SetupStatus> {
852852
let existing_invalidated = desired.as_ref().is_some_and(|desired| {
853853
existing

src/ops/targets/neo4j.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,9 +1009,11 @@ impl StorageFactoryBase for Factory {
10091009
key: Neo4jGraphElement,
10101010
desired: Option<SetupState>,
10111011
existing: CombinedState<SetupState>,
1012-
auth_registry: &Arc<AuthRegistry>,
1012+
flow_instance_ctx: Arc<FlowInstanceContext>,
10131013
) -> Result<Self::SetupStatus> {
1014-
let conn_spec = auth_registry.get::<ConnectionSpec>(&key.connection)?;
1014+
let conn_spec = flow_instance_ctx
1015+
.auth_registry
1016+
.get::<ConnectionSpec>(&key.connection)?;
10151017
let data_status = GraphElementDataSetupStatus::new(desired.as_ref(), &existing);
10161018
let components = components::SetupStatus::create(
10171019
SetupComponentOperator {

src/ops/targets/postgres.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,7 @@ impl StorageFactoryBase for Factory {
719719
_key: TableId,
720720
desired: Option<SetupState>,
721721
existing: setup::CombinedState<SetupState>,
722-
_auth_registry: &Arc<AuthRegistry>,
722+
_flow_instance_ctx: Arc<FlowInstanceContext>,
723723
) -> Result<SetupStatus> {
724724
Ok(SetupStatus::new(desired, existing))
725725
}

src/ops/targets/qdrant.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ impl StorageFactoryBase for Factory {
489489
_key: CollectionKey,
490490
desired: Option<SetupState>,
491491
existing: setup::CombinedState<SetupState>,
492-
_auth_registry: &Arc<AuthRegistry>,
492+
_flow_instance_ctx: Arc<FlowInstanceContext>,
493493
) -> Result<Self::SetupStatus> {
494494
let desired_exists = desired.is_some();
495495
let add_collection = desired.filter(|state| {

src/setup/driver.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use crate::{
22
lib_context::{FlowExecutionContext, LibSetupContext, get_auth_registry},
3-
ops::{get_optional_executor_factory, interface::ExportTargetFactory},
3+
ops::{
4+
get_optional_executor_factory,
5+
interface::{ExportTargetFactory, FlowInstanceContext},
6+
},
47
prelude::*,
58
};
69

@@ -252,6 +255,7 @@ fn group_resource_states<'a>(
252255
pub async fn check_flow_setup_status(
253256
desired_state: Option<&FlowSetupState<DesiredMode>>,
254257
existing_state: Option<&FlowSetupState<ExistingMode>>,
258+
flow_instance_ctx: &Arc<FlowInstanceContext>,
255259
) -> Result<FlowSetupStatus> {
256260
let metadata_change = diff_state(
257261
existing_state.map(|e| &e.metadata),
@@ -331,7 +335,7 @@ pub async fn check_flow_setup_status(
331335
&resource_id.key,
332336
target_state,
333337
existing_without_setup_by_user,
334-
get_auth_registry(),
338+
flow_instance_ctx.clone(),
335339
)
336340
.await?,
337341
)
@@ -618,16 +622,19 @@ pub struct SetupChangeBundle {
618622
impl SetupChangeBundle {
619623
async fn get_flow_setup_status<'a>(
620624
setup_ctx: &LibSetupContext,
621-
flow_name: &str,
625+
flow_ctx: &'a FlowContext,
622626
flow_exec_ctx: &'a FlowExecutionContext,
623627
action: &FlowSetupChangeAction,
624628
buffer: &'a mut Option<FlowSetupStatus>,
625629
) -> Result<&'a FlowSetupStatus> {
626630
let result = match action {
627631
FlowSetupChangeAction::Setup => &flow_exec_ctx.setup_status,
628632
FlowSetupChangeAction::Drop => {
629-
let existing_state = setup_ctx.all_setup_states.flows.get(flow_name);
630-
buffer.insert(check_flow_setup_status(None, existing_state).await?)
633+
let existing_state = setup_ctx.all_setup_states.flows.get(flow_ctx.flow_name());
634+
buffer.insert(
635+
check_flow_setup_status(None, existing_state, &flow_ctx.flow.flow_instance_ctx)
636+
.await?,
637+
)
631638
}
632639
};
633640
Ok(result)
@@ -662,7 +669,7 @@ impl SetupChangeBundle {
662669
let mut setup_status_buffer = None;
663670
let setup_status = Self::get_flow_setup_status(
664671
setup_ctx,
665-
flow_name,
672+
&flow_ctx,
666673
&flow_exec_ctx,
667674
&self.action,
668675
&mut setup_status_buffer,
@@ -714,7 +721,7 @@ impl SetupChangeBundle {
714721
let mut setup_status_buffer = None;
715722
let setup_status = Self::get_flow_setup_status(
716723
setup_ctx,
717-
flow_name,
724+
&flow_ctx,
718725
&flow_exec_ctx,
719726
&self.action,
720727
&mut setup_status_buffer,

0 commit comments

Comments
 (0)