Skip to content

Commit 00e6e31

Browse files
committed
feat: plumbing FlowInstanceContext to more setup methods
1 parent a2bbb06 commit 00e6e31

File tree

7 files changed

+23
-23
lines changed

7 files changed

+23
-23
lines changed

src/ops/factory_bases.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
358358
async fn apply_setup_changes(
359359
&self,
360360
setup_status: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
361-
auth_registry: &Arc<AuthRegistry>,
361+
context: Arc<FlowInstanceContext>,
362362
) -> Result<()>;
363363
}
364364

@@ -499,7 +499,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
499499
async fn apply_setup_changes(
500500
&self,
501501
setup_status: Vec<ResourceSetupChangeItem<'async_trait>>,
502-
auth_registry: &Arc<AuthRegistry>,
502+
context: Arc<FlowInstanceContext>,
503503
) -> Result<()> {
504504
StorageFactoryBase::apply_setup_changes(
505505
self,
@@ -514,7 +514,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
514514
})
515515
})
516516
.collect::<Result<Vec<_>>>()?,
517-
auth_registry,
517+
context,
518518
)
519519
.await
520520
}

src/ops/interface.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ pub trait ExportTargetFactory: Send + Sync {
302302
async fn apply_setup_changes(
303303
&self,
304304
setup_status: Vec<ResourceSetupChangeItem<'async_trait>>,
305-
auth_registry: &Arc<AuthRegistry>,
305+
context: Arc<FlowInstanceContext>,
306306
) -> Result<()>;
307307
}
308308

src/ops/targets/kuzu.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,7 +1040,7 @@ impl StorageFactoryBase for Factory {
10401040
async fn apply_setup_changes(
10411041
&self,
10421042
changes: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
1043-
auth_registry: &Arc<AuthRegistry>,
1043+
context: Arc<FlowInstanceContext>,
10441044
) -> Result<()> {
10451045
let mut changes_by_conn = IndexMap::new();
10461046
for change in changes.into_iter() {
@@ -1050,7 +1050,7 @@ impl StorageFactoryBase for Factory {
10501050
.push(change);
10511051
}
10521052
for (conn, changes) in changes_by_conn.into_iter() {
1053-
let conn_spec = auth_registry.get::<ConnectionSpec>(&conn)?;
1053+
let conn_spec = context.auth_registry.get::<ConnectionSpec>(&conn)?;
10541054
let kuzu_client = KuzuThinClient::new(&conn_spec, self.reqwest_client.clone());
10551055

10561056
let (node_changes, rel_changes): (Vec<_>, Vec<_>) =

src/ops/targets/neo4j.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,7 +1084,7 @@ impl StorageFactoryBase for Factory {
10841084
async fn apply_setup_changes(
10851085
&self,
10861086
changes: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
1087-
auth_registry: &Arc<AuthRegistry>,
1087+
context: Arc<FlowInstanceContext>,
10881088
) -> Result<()> {
10891089
// Relationships first, then nodes, as relationships need to be deleted before nodes they referenced.
10901090
let mut relationship_types = IndexSet::<&Neo4jGraphElement>::new();
@@ -1116,15 +1116,15 @@ impl StorageFactoryBase for Factory {
11161116
for rel_type in relationship_types.into_iter() {
11171117
let graph = self
11181118
.graph_pool
1119-
.get_graph_for_key(rel_type, auth_registry)
1119+
.get_graph_for_key(rel_type, &context.auth_registry)
11201120
.await?;
11211121
clear_graph_element_data(&graph, rel_type, true).await?;
11221122
}
11231123
// Clear standalone nodes, which is simpler than dependent nodes.
11241124
for node_label in node_labels.iter() {
11251125
let graph = self
11261126
.graph_pool
1127-
.get_graph_for_key(node_label, auth_registry)
1127+
.get_graph_for_key(node_label, &context.auth_registry)
11281128
.await?;
11291129
clear_graph_element_data(&graph, node_label, true).await?;
11301130
}
@@ -1133,7 +1133,7 @@ impl StorageFactoryBase for Factory {
11331133
if !node_labels.contains(node_label) {
11341134
let graph = self
11351135
.graph_pool
1136-
.get_graph_for_key(node_label, auth_registry)
1136+
.get_graph_for_key(node_label, &context.auth_registry)
11371137
.await?;
11381138
clear_graph_element_data(&graph, node_label, false).await?;
11391139
}

src/ops/targets/postgres.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -777,10 +777,10 @@ impl StorageFactoryBase for Factory {
777777
async fn apply_setup_changes(
778778
&self,
779779
changes: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
780-
auth_registry: &Arc<AuthRegistry>,
780+
context: Arc<FlowInstanceContext>,
781781
) -> Result<()> {
782782
for change in changes.iter() {
783-
let db_pool = get_db_pool(change.key.database.as_ref(), auth_registry).await?;
783+
let db_pool = get_db_pool(change.key.database.as_ref(), &context.auth_registry).await?;
784784
change
785785
.setup_status
786786
.apply_change(&db_pool, &change.key.table_name)

src/ops/targets/qdrant.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -544,19 +544,19 @@ impl StorageFactoryBase for Factory {
544544
async fn apply_setup_changes(
545545
&self,
546546
setup_status: Vec<TypedResourceSetupChangeItem<'async_trait, Self>>,
547-
auth_registry: &Arc<AuthRegistry>,
547+
context: Arc<FlowInstanceContext>,
548548
) -> Result<()> {
549549
for setup_change in setup_status.iter() {
550550
let qdrant_client =
551-
self.get_qdrant_client(&setup_change.key.connection, auth_registry)?;
551+
self.get_qdrant_client(&setup_change.key.connection, &context.auth_registry)?;
552552
setup_change
553553
.setup_status
554554
.apply_delete(&setup_change.key.collection_name, &qdrant_client)
555555
.await?;
556556
}
557557
for setup_change in setup_status.iter() {
558558
let qdrant_client =
559-
self.get_qdrant_client(&setup_change.key.connection, auth_registry)?;
559+
self.get_qdrant_client(&setup_change.key.connection, &context.auth_registry)?;
560560
setup_change
561561
.setup_status
562562
.apply_create(&setup_change.key.collection_name, &qdrant_client)

src/setup/driver.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{
2-
lib_context::{FlowExecutionContext, LibSetupContext, get_auth_registry},
2+
lib_context::{FlowContext, FlowExecutionContext, LibSetupContext},
33
ops::{
44
get_optional_executor_factory,
55
interface::{ExportTargetFactory, FlowInstanceContext},
@@ -411,7 +411,7 @@ async fn maybe_update_resource_setup<
411411

412412
async fn apply_changes_for_flow(
413413
write: &mut (dyn std::io::Write + Send),
414-
flow_name: &str,
414+
flow_ctx: &FlowContext,
415415
flow_status: &FlowSetupStatus,
416416
existing_setup_state: &mut Option<setup::FlowSetupState<setup::ExistingMode>>,
417417
pool: &PgPool,
@@ -425,7 +425,7 @@ async fn apply_changes_for_flow(
425425
ObjectStatus::Existing => "Updating resources for ",
426426
_ => bail!("invalid flow status"),
427427
};
428-
write!(write, "\n{verb} flow {flow_name}:\n")?;
428+
write!(write, "\n{verb} flow {}:\n", flow_ctx.flow_name())?;
429429

430430
let mut update_info =
431431
HashMap::<db_metadata::ResourceTypeKey, db_metadata::StateUpdateInfo>::new();
@@ -475,7 +475,7 @@ async fn apply_changes_for_flow(
475475
}
476476

477477
let new_version_id = db_metadata::stage_changes_for_flow(
478-
flow_name,
478+
flow_ctx.flow_name(),
479479
flow_status.seen_flow_metadata_version,
480480
&update_info,
481481
pool,
@@ -517,7 +517,7 @@ async fn apply_changes_for_flow(
517517
setup_status: s.setup_status.as_ref(),
518518
})
519519
.collect(),
520-
get_auth_registry(),
520+
flow_ctx.flow.flow_instance_ctx.clone(),
521521
)
522522
.await?;
523523
Ok(())
@@ -528,7 +528,7 @@ async fn apply_changes_for_flow(
528528

529529
let is_deletion = status == ObjectStatus::Deleted;
530530
db_metadata::commit_changes_for_flow(
531-
flow_name,
531+
flow_ctx.flow_name(),
532532
new_version_id,
533533
&update_info,
534534
is_deletion,
@@ -580,7 +580,7 @@ async fn apply_changes_for_flow(
580580
});
581581
}
582582

583-
writeln!(write, "Done for flow {flow_name}")?;
583+
writeln!(write, "Done for flow {}", flow_ctx.flow_name())?;
584584
Ok(())
585585
}
586586

@@ -734,7 +734,7 @@ impl SetupChangeBundle {
734734
let mut flow_states = setup_ctx.all_setup_states.flows.remove(flow_name);
735735
apply_changes_for_flow(
736736
write,
737-
flow_name,
737+
&flow_ctx,
738738
setup_status,
739739
&mut flow_states,
740740
&persistence_ctx.builtin_db_pool,

0 commit comments

Comments
 (0)