Skip to content

Commit 47ce7dd

Browse files
authored
feat(internal-setup): apply internal-only setup changes automatically (#905)
1 parent e63194a commit 47ce7dd

File tree

4 files changed

+130
-73
lines changed

4 files changed

+130
-73
lines changed

src/builder/flow_builder.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::{base::schema::EnrichedValueType, prelude::*, py::Pythonized};
1+
use crate::{
2+
base::schema::EnrichedValueType, prelude::*, py::Pythonized, setup::ObjectSetupChange,
3+
};
24

35
use pyo3::{exceptions::PyException, prelude::*};
46
use pyo3_async_runtimes::tokio::future_into_py;
@@ -612,7 +614,7 @@ impl FlowBuilder {
612614
let analyzed_flow =
613615
super::AnalyzedFlow::from_flow_instance(spec, flow_instance_ctx).await?;
614616
let persistence_ctx = self.lib_context.require_persistence_ctx()?;
615-
let execution_ctx = {
617+
let flow_ctx = {
616618
let flow_setup_ctx = persistence_ctx.setup_ctx.read().await;
617619
FlowContext::new(
618620
Arc::new(analyzed_flow),
@@ -623,7 +625,34 @@ impl FlowBuilder {
623625
)
624626
.await?
625627
};
626-
anyhow::Ok(execution_ctx)
628+
629+
// Apply internal-only changes if any.
630+
{
631+
let mut flow_exec_ctx =
632+
flow_ctx.get_execution_ctx_for_setup().write().await;
633+
if flow_exec_ctx.setup_change.has_internal_changes()
634+
&& !flow_exec_ctx.setup_change.has_external_changes()
635+
{
636+
let mut lib_setup_ctx = persistence_ctx.setup_ctx.write().await;
637+
let mut output_buffer = Vec::<u8>::new();
638+
setup::apply_changes_for_flow_ctx(
639+
setup::FlowSetupChangeAction::Setup,
640+
&flow_ctx,
641+
&mut flow_exec_ctx,
642+
&mut *lib_setup_ctx,
643+
&persistence_ctx.builtin_db_pool,
644+
&mut output_buffer,
645+
)
646+
.await?;
647+
trace!(
648+
"Applied internal-only change for flow {}:\n{}",
649+
self.flow_instance_name,
650+
String::from_utf8_lossy(&output_buffer)
651+
);
652+
}
653+
}
654+
655+
anyhow::Ok(flow_ctx)
627656
})
628657
})
629658
.into_py_result()?;

src/py/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use pyo3::IntoPyObjectExt;
1313
use pyo3::{exceptions::PyException, prelude::*};
1414
use pyo3_async_runtimes::tokio::future_into_py;
1515
use std::fmt::Write;
16+
use std::io::Cursor;
1617
use std::sync::Arc;
1718

1819
mod convert;

src/setup/driver.rs

Lines changed: 76 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -410,11 +410,11 @@ async fn maybe_update_resource_setup<
410410
async fn apply_changes_for_flow(
411411
write: &mut (dyn std::io::Write + Send),
412412
flow_ctx: &FlowContext,
413-
flow_status: &FlowSetupChange,
413+
flow_setup_change: &FlowSetupChange,
414414
existing_setup_state: &mut Option<setup::FlowSetupState<setup::ExistingMode>>,
415415
pool: &PgPool,
416416
) -> Result<()> {
417-
let Some(status) = flow_status.status else {
417+
let Some(status) = flow_setup_change.status else {
418418
return Ok(());
419419
};
420420
let verb = match status {
@@ -428,7 +428,7 @@ async fn apply_changes_for_flow(
428428
let mut update_info =
429429
HashMap::<db_metadata::ResourceTypeKey, db_metadata::StateUpdateInfo>::new();
430430

431-
if let Some(metadata_change) = &flow_status.metadata_change {
431+
if let Some(metadata_change) = &flow_setup_change.metadata_change {
432432
update_info.insert(
433433
db_metadata::ResourceTypeKey::new(
434434
MetadataRecordType::FlowMetadata.to_string(),
@@ -437,7 +437,7 @@ async fn apply_changes_for_flow(
437437
db_metadata::StateUpdateInfo::new(metadata_change.desired_state(), None)?,
438438
);
439439
}
440-
if let Some(tracking_table) = &flow_status.tracking_table {
440+
if let Some(tracking_table) = &flow_setup_change.tracking_table {
441441
if tracking_table
442442
.setup_change
443443
.as_ref()
@@ -454,7 +454,7 @@ async fn apply_changes_for_flow(
454454
}
455455
}
456456

457-
for target_resource in &flow_status.target_resources {
457+
for target_resource in &flow_setup_change.target_resources {
458458
update_info.insert(
459459
db_metadata::ResourceTypeKey::new(
460460
MetadataRecordType::Target(target_resource.key.target_kind.clone()).to_string(),
@@ -474,13 +474,13 @@ async fn apply_changes_for_flow(
474474

475475
let new_version_id = db_metadata::stage_changes_for_flow(
476476
flow_ctx.flow_name(),
477-
flow_status.seen_flow_metadata_version,
477+
flow_setup_change.seen_flow_metadata_version,
478478
&update_info,
479479
pool,
480480
)
481481
.await?;
482482

483-
if let Some(tracking_table) = &flow_status.tracking_table {
483+
if let Some(tracking_table) = &flow_setup_change.tracking_table {
484484
maybe_update_resource_setup(
485485
"tracking table",
486486
write,
@@ -491,7 +491,7 @@ async fn apply_changes_for_flow(
491491
}
492492

493493
let mut setup_change_by_target_kind = IndexMap::<&str, Vec<_>>::new();
494-
for target_resource in &flow_status.target_resources {
494+
for target_resource in &flow_setup_change.target_resources {
495495
setup_change_by_target_kind
496496
.entry(target_resource.key.target_kind.as_str())
497497
.or_default()
@@ -543,21 +543,21 @@ async fn apply_changes_for_flow(
543543
};
544544
let metadata = CombinedState::from_change(
545545
existing_metadata,
546-
flow_status
546+
flow_setup_change
547547
.metadata_change
548548
.as_ref()
549549
.map(|v| v.desired_state()),
550550
);
551551
let tracking_table = CombinedState::from_change(
552552
existing_tracking_table,
553-
flow_status.tracking_table.as_ref().map(|c| {
553+
flow_setup_change.tracking_table.as_ref().map(|c| {
554554
c.setup_change
555555
.as_ref()
556556
.and_then(|c| c.desired_state.as_ref())
557557
}),
558558
);
559559
let mut targets = existing_targets;
560-
for target_resource in &flow_status.target_resources {
560+
for target_resource in &flow_setup_change.target_resources {
561561
match &target_resource.state {
562562
Some(state) => {
563563
targets.insert(
@@ -618,26 +618,6 @@ pub struct SetupChangeBundle {
618618
}
619619

620620
impl SetupChangeBundle {
621-
async fn get_flow_setup_change<'a>(
622-
setup_ctx: &LibSetupContext,
623-
flow_ctx: &'a FlowContext,
624-
flow_exec_ctx: &'a FlowExecutionContext,
625-
action: &FlowSetupChangeAction,
626-
buffer: &'a mut Option<FlowSetupChange>,
627-
) -> Result<&'a FlowSetupChange> {
628-
let result = match action {
629-
FlowSetupChangeAction::Setup => &flow_exec_ctx.setup_change,
630-
FlowSetupChangeAction::Drop => {
631-
let existing_state = setup_ctx.all_setup_states.flows.get(flow_ctx.flow_name());
632-
buffer.insert(
633-
diff_flow_setup_states(None, existing_state, &flow_ctx.flow.flow_instance_ctx)
634-
.await?,
635-
)
636-
}
637-
};
638-
Ok(result)
639-
}
640-
641621
pub async fn describe(&self, lib_context: &LibContext) -> Result<(String, bool)> {
642622
let mut text = String::new();
643623
let mut is_up_to_date = true;
@@ -665,7 +645,7 @@ impl SetupChangeBundle {
665645
let flow_exec_ctx = flow_ctx.get_execution_ctx_for_setup().read().await;
666646

667647
let mut setup_change_buffer = None;
668-
let setup_change = Self::get_flow_setup_change(
648+
let setup_change = get_flow_setup_change(
669649
setup_ctx,
670650
&flow_ctx,
671651
&flow_exec_ctx,
@@ -715,40 +695,75 @@ impl SetupChangeBundle {
715695
.clone()
716696
};
717697
let mut flow_exec_ctx = flow_ctx.get_execution_ctx_for_setup().write().await;
718-
719-
let mut setup_change_buffer = None;
720-
let setup_change = Self::get_flow_setup_change(
721-
setup_ctx,
698+
apply_changes_for_flow_ctx(
699+
self.action,
722700
&flow_ctx,
723-
&flow_exec_ctx,
724-
&self.action,
725-
&mut setup_change_buffer,
726-
)
727-
.await?;
728-
if setup_change.is_up_to_date() {
729-
continue;
730-
}
731-
732-
let mut flow_states = setup_ctx.all_setup_states.flows.remove(flow_name);
733-
apply_changes_for_flow(
734-
write,
735-
&flow_ctx,
736-
setup_change,
737-
&mut flow_states,
701+
&mut flow_exec_ctx,
702+
setup_ctx,
738703
&persistence_ctx.builtin_db_pool,
704+
write,
739705
)
740706
.await?;
741-
742-
flow_exec_ctx
743-
.update_setup_state(&flow_ctx.flow, flow_states.as_ref())
744-
.await?;
745-
if let Some(flow_states) = flow_states {
746-
setup_ctx
747-
.all_setup_states
748-
.flows
749-
.insert(flow_name.to_string(), flow_states);
750-
}
751707
}
752708
Ok(())
753709
}
754710
}
711+
712+
async fn get_flow_setup_change<'a>(
713+
setup_ctx: &LibSetupContext,
714+
flow_ctx: &'a FlowContext,
715+
flow_exec_ctx: &'a FlowExecutionContext,
716+
action: &FlowSetupChangeAction,
717+
buffer: &'a mut Option<FlowSetupChange>,
718+
) -> Result<&'a FlowSetupChange> {
719+
let result = match action {
720+
FlowSetupChangeAction::Setup => &flow_exec_ctx.setup_change,
721+
FlowSetupChangeAction::Drop => {
722+
let existing_state = setup_ctx.all_setup_states.flows.get(flow_ctx.flow_name());
723+
buffer.insert(
724+
diff_flow_setup_states(None, existing_state, &flow_ctx.flow.flow_instance_ctx)
725+
.await?,
726+
)
727+
}
728+
};
729+
Ok(result)
730+
}
731+
732+
pub(crate) async fn apply_changes_for_flow_ctx(
733+
action: FlowSetupChangeAction,
734+
flow_ctx: &FlowContext,
735+
flow_exec_ctx: &mut FlowExecutionContext,
736+
setup_ctx: &mut LibSetupContext,
737+
db_pool: &PgPool,
738+
write: &mut (dyn std::io::Write + Send),
739+
) -> Result<()> {
740+
let mut setup_change_buffer = None;
741+
let setup_change = get_flow_setup_change(
742+
setup_ctx,
743+
&flow_ctx,
744+
&flow_exec_ctx,
745+
&action,
746+
&mut setup_change_buffer,
747+
)
748+
.await?;
749+
if setup_change.is_up_to_date() {
750+
return Ok(());
751+
}
752+
753+
let mut flow_states = setup_ctx
754+
.all_setup_states
755+
.flows
756+
.remove(flow_ctx.flow_name());
757+
apply_changes_for_flow(write, &flow_ctx, setup_change, &mut flow_states, db_pool).await?;
758+
759+
flow_exec_ctx
760+
.update_setup_state(&flow_ctx.flow, flow_states.as_ref())
761+
.await?;
762+
if let Some(flow_states) = flow_states {
763+
setup_ctx
764+
.all_setup_states
765+
.flows
766+
.insert(flow_ctx.flow_name().to_string(), flow_states);
767+
}
768+
Ok(())
769+
}

src/setup/states.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,16 @@ pub enum ObjectStatus {
353353

354354
pub trait ObjectSetupChange {
355355
fn status(&self) -> Option<ObjectStatus>;
356-
fn is_up_to_date(&self) -> bool;
356+
357+
/// Returns true if it has internal changes, i.e. changes that don't need user intervention.
358+
fn has_internal_changes(&self) -> bool;
359+
360+
/// Returns true if it has external changes, i.e. changes that should notify users.
361+
fn has_external_changes(&self) -> bool;
362+
363+
fn is_up_to_date(&self) -> bool {
364+
return !self.has_internal_changes() && !self.has_external_changes();
365+
}
357366
}
358367

359368
#[derive(Debug)]
@@ -376,16 +385,19 @@ impl ObjectSetupChange for FlowSetupChange {
376385
self.status
377386
}
378387

379-
fn is_up_to_date(&self) -> bool {
380-
self.metadata_change.is_none()
381-
&& self
382-
.tracking_table
383-
.as_ref()
384-
.is_none_or(|t| t.is_up_to_date())
385-
&& self
388+
fn has_internal_changes(&self) -> bool {
389+
return self.metadata_change.is_some();
390+
}
391+
392+
fn has_external_changes(&self) -> bool {
393+
return self
394+
.tracking_table
395+
.as_ref()
396+
.is_some_and(|t| !t.is_up_to_date())
397+
|| self
386398
.target_resources
387399
.iter()
388-
.all(|target| target.is_up_to_date())
400+
.any(|target| !target.is_up_to_date());
389401
}
390402
}
391403

0 commit comments

Comments
 (0)