diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index deea01d1c..31bc5b49d 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -1,4 +1,6 @@ -use crate::{base::schema::EnrichedValueType, prelude::*, py::Pythonized}; +use crate::{ + base::schema::EnrichedValueType, prelude::*, py::Pythonized, setup::ObjectSetupChange, +}; use pyo3::{exceptions::PyException, prelude::*}; use pyo3_async_runtimes::tokio::future_into_py; @@ -612,7 +614,7 @@ impl FlowBuilder { let analyzed_flow = super::AnalyzedFlow::from_flow_instance(spec, flow_instance_ctx).await?; let persistence_ctx = self.lib_context.require_persistence_ctx()?; - let execution_ctx = { + let flow_ctx = { let flow_setup_ctx = persistence_ctx.setup_ctx.read().await; FlowContext::new( Arc::new(analyzed_flow), @@ -623,7 +625,34 @@ impl FlowBuilder { ) .await? }; - anyhow::Ok(execution_ctx) + + // Apply internal-only changes if any. + { + let mut flow_exec_ctx = + flow_ctx.get_execution_ctx_for_setup().write().await; + if flow_exec_ctx.setup_change.has_internal_changes() + && !flow_exec_ctx.setup_change.has_external_changes() + { + let mut lib_setup_ctx = persistence_ctx.setup_ctx.write().await; + let mut output_buffer = Vec::::new(); + setup::apply_changes_for_flow_ctx( + setup::FlowSetupChangeAction::Setup, + &flow_ctx, + &mut flow_exec_ctx, + &mut *lib_setup_ctx, + &persistence_ctx.builtin_db_pool, + &mut output_buffer, + ) + .await?; + trace!( + "Applied internal-only change for flow {}:\n{}", + self.flow_instance_name, + String::from_utf8_lossy(&output_buffer) + ); + } + } + + anyhow::Ok(flow_ctx) }) }) .into_py_result()?; diff --git a/src/py/mod.rs b/src/py/mod.rs index 818619ed6..538f1c185 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -13,6 +13,7 @@ use pyo3::IntoPyObjectExt; use pyo3::{exceptions::PyException, prelude::*}; use pyo3_async_runtimes::tokio::future_into_py; use std::fmt::Write; +use std::io::Cursor; use std::sync::Arc; mod convert; diff --git a/src/setup/driver.rs b/src/setup/driver.rs index 2b50f69cc..5fa0a68ad 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -410,11 +410,11 @@ async fn maybe_update_resource_setup< async fn apply_changes_for_flow( write: &mut (dyn std::io::Write + Send), flow_ctx: &FlowContext, - flow_status: &FlowSetupChange, + flow_setup_change: &FlowSetupChange, existing_setup_state: &mut Option>, pool: &PgPool, ) -> Result<()> { - let Some(status) = flow_status.status else { + let Some(status) = flow_setup_change.status else { return Ok(()); }; let verb = match status { @@ -428,7 +428,7 @@ async fn apply_changes_for_flow( let mut update_info = HashMap::::new(); - if let Some(metadata_change) = &flow_status.metadata_change { + if let Some(metadata_change) = &flow_setup_change.metadata_change { update_info.insert( db_metadata::ResourceTypeKey::new( MetadataRecordType::FlowMetadata.to_string(), @@ -437,7 +437,7 @@ async fn apply_changes_for_flow( db_metadata::StateUpdateInfo::new(metadata_change.desired_state(), None)?, ); } - if let Some(tracking_table) = &flow_status.tracking_table { + if let Some(tracking_table) = &flow_setup_change.tracking_table { if tracking_table .setup_change .as_ref() @@ -454,7 +454,7 @@ async fn apply_changes_for_flow( } } - for target_resource in &flow_status.target_resources { + for target_resource in &flow_setup_change.target_resources { update_info.insert( db_metadata::ResourceTypeKey::new( MetadataRecordType::Target(target_resource.key.target_kind.clone()).to_string(), @@ -474,13 +474,13 @@ async fn apply_changes_for_flow( let new_version_id = db_metadata::stage_changes_for_flow( flow_ctx.flow_name(), - flow_status.seen_flow_metadata_version, + flow_setup_change.seen_flow_metadata_version, &update_info, pool, ) .await?; - if let Some(tracking_table) = &flow_status.tracking_table { + if let Some(tracking_table) = &flow_setup_change.tracking_table { maybe_update_resource_setup( "tracking table", write, @@ -491,7 +491,7 @@ async fn apply_changes_for_flow( } let mut setup_change_by_target_kind = IndexMap::<&str, Vec<_>>::new(); - for target_resource in &flow_status.target_resources { + for target_resource in &flow_setup_change.target_resources { setup_change_by_target_kind .entry(target_resource.key.target_kind.as_str()) .or_default() @@ -543,21 +543,21 @@ async fn apply_changes_for_flow( }; let metadata = CombinedState::from_change( existing_metadata, - flow_status + flow_setup_change .metadata_change .as_ref() .map(|v| v.desired_state()), ); let tracking_table = CombinedState::from_change( existing_tracking_table, - flow_status.tracking_table.as_ref().map(|c| { + flow_setup_change.tracking_table.as_ref().map(|c| { c.setup_change .as_ref() .and_then(|c| c.desired_state.as_ref()) }), ); let mut targets = existing_targets; - for target_resource in &flow_status.target_resources { + for target_resource in &flow_setup_change.target_resources { match &target_resource.state { Some(state) => { targets.insert( @@ -618,26 +618,6 @@ pub struct SetupChangeBundle { } impl SetupChangeBundle { - async fn get_flow_setup_change<'a>( - setup_ctx: &LibSetupContext, - flow_ctx: &'a FlowContext, - flow_exec_ctx: &'a FlowExecutionContext, - action: &FlowSetupChangeAction, - buffer: &'a mut Option, - ) -> Result<&'a FlowSetupChange> { - let result = match action { - FlowSetupChangeAction::Setup => &flow_exec_ctx.setup_change, - FlowSetupChangeAction::Drop => { - let existing_state = setup_ctx.all_setup_states.flows.get(flow_ctx.flow_name()); - buffer.insert( - diff_flow_setup_states(None, existing_state, &flow_ctx.flow.flow_instance_ctx) - .await?, - ) - } - }; - Ok(result) - } - pub async fn describe(&self, lib_context: &LibContext) -> Result<(String, bool)> { let mut text = String::new(); let mut is_up_to_date = true; @@ -665,7 +645,7 @@ impl SetupChangeBundle { let flow_exec_ctx = flow_ctx.get_execution_ctx_for_setup().read().await; let mut setup_change_buffer = None; - let setup_change = Self::get_flow_setup_change( + let setup_change = get_flow_setup_change( setup_ctx, &flow_ctx, &flow_exec_ctx, @@ -715,40 +695,75 @@ impl SetupChangeBundle { .clone() }; let mut flow_exec_ctx = flow_ctx.get_execution_ctx_for_setup().write().await; - - let mut setup_change_buffer = None; - let setup_change = Self::get_flow_setup_change( - setup_ctx, + apply_changes_for_flow_ctx( + self.action, &flow_ctx, - &flow_exec_ctx, - &self.action, - &mut setup_change_buffer, - ) - .await?; - if setup_change.is_up_to_date() { - continue; - } - - let mut flow_states = setup_ctx.all_setup_states.flows.remove(flow_name); - apply_changes_for_flow( - write, - &flow_ctx, - setup_change, - &mut flow_states, + &mut flow_exec_ctx, + setup_ctx, &persistence_ctx.builtin_db_pool, + write, ) .await?; - - flow_exec_ctx - .update_setup_state(&flow_ctx.flow, flow_states.as_ref()) - .await?; - if let Some(flow_states) = flow_states { - setup_ctx - .all_setup_states - .flows - .insert(flow_name.to_string(), flow_states); - } } Ok(()) } } + +async fn get_flow_setup_change<'a>( + setup_ctx: &LibSetupContext, + flow_ctx: &'a FlowContext, + flow_exec_ctx: &'a FlowExecutionContext, + action: &FlowSetupChangeAction, + buffer: &'a mut Option, +) -> Result<&'a FlowSetupChange> { + let result = match action { + FlowSetupChangeAction::Setup => &flow_exec_ctx.setup_change, + FlowSetupChangeAction::Drop => { + let existing_state = setup_ctx.all_setup_states.flows.get(flow_ctx.flow_name()); + buffer.insert( + diff_flow_setup_states(None, existing_state, &flow_ctx.flow.flow_instance_ctx) + .await?, + ) + } + }; + Ok(result) +} + +pub(crate) async fn apply_changes_for_flow_ctx( + action: FlowSetupChangeAction, + flow_ctx: &FlowContext, + flow_exec_ctx: &mut FlowExecutionContext, + setup_ctx: &mut LibSetupContext, + db_pool: &PgPool, + write: &mut (dyn std::io::Write + Send), +) -> Result<()> { + let mut setup_change_buffer = None; + let setup_change = get_flow_setup_change( + setup_ctx, + &flow_ctx, + &flow_exec_ctx, + &action, + &mut setup_change_buffer, + ) + .await?; + if setup_change.is_up_to_date() { + return Ok(()); + } + + let mut flow_states = setup_ctx + .all_setup_states + .flows + .remove(flow_ctx.flow_name()); + apply_changes_for_flow(write, &flow_ctx, setup_change, &mut flow_states, db_pool).await?; + + flow_exec_ctx + .update_setup_state(&flow_ctx.flow, flow_states.as_ref()) + .await?; + if let Some(flow_states) = flow_states { + setup_ctx + .all_setup_states + .flows + .insert(flow_ctx.flow_name().to_string(), flow_states); + } + Ok(()) +} diff --git a/src/setup/states.rs b/src/setup/states.rs index 340f6aff2..579159d8d 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -353,7 +353,16 @@ pub enum ObjectStatus { pub trait ObjectSetupChange { fn status(&self) -> Option; - fn is_up_to_date(&self) -> bool; + + /// Returns true if it has internal changes, i.e. changes that don't need user intervention. + fn has_internal_changes(&self) -> bool; + + /// Returns true if it has external changes, i.e. changes that should notify users. + fn has_external_changes(&self) -> bool; + + fn is_up_to_date(&self) -> bool { + return !self.has_internal_changes() && !self.has_external_changes(); + } } #[derive(Debug)] @@ -376,16 +385,19 @@ impl ObjectSetupChange for FlowSetupChange { self.status } - fn is_up_to_date(&self) -> bool { - self.metadata_change.is_none() - && self - .tracking_table - .as_ref() - .is_none_or(|t| t.is_up_to_date()) - && self + fn has_internal_changes(&self) -> bool { + return self.metadata_change.is_some(); + } + + fn has_external_changes(&self) -> bool { + return self + .tracking_table + .as_ref() + .is_some_and(|t| !t.is_up_to_date()) + || self .target_resources .iter() - .all(|target| target.is_up_to_date()) + .any(|target| !target.is_up_to_date()); } }