Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{base::schema::EnrichedValueType, prelude::*, py::Pythonized};
use crate::{
base::schema::EnrichedValueType, prelude::*, py::Pythonized, setup::ObjectSetupChange,
};

use pyo3::{exceptions::PyException, prelude::*};
use pyo3_async_runtimes::tokio::future_into_py;
Expand Down Expand Up @@ -612,7 +614,7 @@ impl FlowBuilder {
let analyzed_flow =
super::AnalyzedFlow::from_flow_instance(spec, flow_instance_ctx).await?;
let persistence_ctx = self.lib_context.require_persistence_ctx()?;
let execution_ctx = {
let flow_ctx = {
let flow_setup_ctx = persistence_ctx.setup_ctx.read().await;
FlowContext::new(
Arc::new(analyzed_flow),
Expand All @@ -623,7 +625,34 @@ impl FlowBuilder {
)
.await?
};
anyhow::Ok(execution_ctx)

// Apply internal-only changes if any.
{
let mut flow_exec_ctx =
flow_ctx.get_execution_ctx_for_setup().write().await;
if flow_exec_ctx.setup_change.has_internal_changes()
&& !flow_exec_ctx.setup_change.has_external_changes()
{
let mut lib_setup_ctx = persistence_ctx.setup_ctx.write().await;
let mut output_buffer = Vec::<u8>::new();
setup::apply_changes_for_flow_ctx(
setup::FlowSetupChangeAction::Setup,
&flow_ctx,
&mut flow_exec_ctx,
&mut *lib_setup_ctx,
&persistence_ctx.builtin_db_pool,
&mut output_buffer,
)
.await?;
trace!(
"Applied internal-only change for flow {}:\n{}",
self.flow_instance_name,
String::from_utf8_lossy(&output_buffer)
);
}
}

anyhow::Ok(flow_ctx)
})
})
.into_py_result()?;
Expand Down
1 change: 1 addition & 0 deletions src/py/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use pyo3::IntoPyObjectExt;
use pyo3::{exceptions::PyException, prelude::*};
use pyo3_async_runtimes::tokio::future_into_py;
use std::fmt::Write;
use std::io::Cursor;
use std::sync::Arc;

mod convert;
Expand Down
137 changes: 76 additions & 61 deletions src/setup/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,11 @@ async fn maybe_update_resource_setup<
async fn apply_changes_for_flow(
write: &mut (dyn std::io::Write + Send),
flow_ctx: &FlowContext,
flow_status: &FlowSetupChange,
flow_setup_change: &FlowSetupChange,
existing_setup_state: &mut Option<setup::FlowSetupState<setup::ExistingMode>>,
pool: &PgPool,
) -> Result<()> {
let Some(status) = flow_status.status else {
let Some(status) = flow_setup_change.status else {
return Ok(());
};
let verb = match status {
Expand All @@ -428,7 +428,7 @@ async fn apply_changes_for_flow(
let mut update_info =
HashMap::<db_metadata::ResourceTypeKey, db_metadata::StateUpdateInfo>::new();

if let Some(metadata_change) = &flow_status.metadata_change {
if let Some(metadata_change) = &flow_setup_change.metadata_change {
update_info.insert(
db_metadata::ResourceTypeKey::new(
MetadataRecordType::FlowMetadata.to_string(),
Expand All @@ -437,7 +437,7 @@ async fn apply_changes_for_flow(
db_metadata::StateUpdateInfo::new(metadata_change.desired_state(), None)?,
);
}
if let Some(tracking_table) = &flow_status.tracking_table {
if let Some(tracking_table) = &flow_setup_change.tracking_table {
if tracking_table
.setup_change
.as_ref()
Expand All @@ -454,7 +454,7 @@ async fn apply_changes_for_flow(
}
}

for target_resource in &flow_status.target_resources {
for target_resource in &flow_setup_change.target_resources {
update_info.insert(
db_metadata::ResourceTypeKey::new(
MetadataRecordType::Target(target_resource.key.target_kind.clone()).to_string(),
Expand All @@ -474,13 +474,13 @@ async fn apply_changes_for_flow(

let new_version_id = db_metadata::stage_changes_for_flow(
flow_ctx.flow_name(),
flow_status.seen_flow_metadata_version,
flow_setup_change.seen_flow_metadata_version,
&update_info,
pool,
)
.await?;

if let Some(tracking_table) = &flow_status.tracking_table {
if let Some(tracking_table) = &flow_setup_change.tracking_table {
maybe_update_resource_setup(
"tracking table",
write,
Expand All @@ -491,7 +491,7 @@ async fn apply_changes_for_flow(
}

let mut setup_change_by_target_kind = IndexMap::<&str, Vec<_>>::new();
for target_resource in &flow_status.target_resources {
for target_resource in &flow_setup_change.target_resources {
setup_change_by_target_kind
.entry(target_resource.key.target_kind.as_str())
.or_default()
Expand Down Expand Up @@ -543,21 +543,21 @@ async fn apply_changes_for_flow(
};
let metadata = CombinedState::from_change(
existing_metadata,
flow_status
flow_setup_change
.metadata_change
.as_ref()
.map(|v| v.desired_state()),
);
let tracking_table = CombinedState::from_change(
existing_tracking_table,
flow_status.tracking_table.as_ref().map(|c| {
flow_setup_change.tracking_table.as_ref().map(|c| {
c.setup_change
.as_ref()
.and_then(|c| c.desired_state.as_ref())
}),
);
let mut targets = existing_targets;
for target_resource in &flow_status.target_resources {
for target_resource in &flow_setup_change.target_resources {
match &target_resource.state {
Some(state) => {
targets.insert(
Expand Down Expand Up @@ -618,26 +618,6 @@ pub struct SetupChangeBundle {
}

impl SetupChangeBundle {
async fn get_flow_setup_change<'a>(
setup_ctx: &LibSetupContext,
flow_ctx: &'a FlowContext,
flow_exec_ctx: &'a FlowExecutionContext,
action: &FlowSetupChangeAction,
buffer: &'a mut Option<FlowSetupChange>,
) -> Result<&'a FlowSetupChange> {
let result = match action {
FlowSetupChangeAction::Setup => &flow_exec_ctx.setup_change,
FlowSetupChangeAction::Drop => {
let existing_state = setup_ctx.all_setup_states.flows.get(flow_ctx.flow_name());
buffer.insert(
diff_flow_setup_states(None, existing_state, &flow_ctx.flow.flow_instance_ctx)
.await?,
)
}
};
Ok(result)
}

pub async fn describe(&self, lib_context: &LibContext) -> Result<(String, bool)> {
let mut text = String::new();
let mut is_up_to_date = true;
Expand Down Expand Up @@ -665,7 +645,7 @@ impl SetupChangeBundle {
let flow_exec_ctx = flow_ctx.get_execution_ctx_for_setup().read().await;

let mut setup_change_buffer = None;
let setup_change = Self::get_flow_setup_change(
let setup_change = get_flow_setup_change(
setup_ctx,
&flow_ctx,
&flow_exec_ctx,
Expand Down Expand Up @@ -715,40 +695,75 @@ impl SetupChangeBundle {
.clone()
};
let mut flow_exec_ctx = flow_ctx.get_execution_ctx_for_setup().write().await;

let mut setup_change_buffer = None;
let setup_change = Self::get_flow_setup_change(
setup_ctx,
apply_changes_for_flow_ctx(
self.action,
&flow_ctx,
&flow_exec_ctx,
&self.action,
&mut setup_change_buffer,
)
.await?;
if setup_change.is_up_to_date() {
continue;
}

let mut flow_states = setup_ctx.all_setup_states.flows.remove(flow_name);
apply_changes_for_flow(
write,
&flow_ctx,
setup_change,
&mut flow_states,
&mut flow_exec_ctx,
setup_ctx,
&persistence_ctx.builtin_db_pool,
write,
)
.await?;

flow_exec_ctx
.update_setup_state(&flow_ctx.flow, flow_states.as_ref())
.await?;
if let Some(flow_states) = flow_states {
setup_ctx
.all_setup_states
.flows
.insert(flow_name.to_string(), flow_states);
}
}
Ok(())
}
}

async fn get_flow_setup_change<'a>(
setup_ctx: &LibSetupContext,
flow_ctx: &'a FlowContext,
flow_exec_ctx: &'a FlowExecutionContext,
action: &FlowSetupChangeAction,
buffer: &'a mut Option<FlowSetupChange>,
) -> Result<&'a FlowSetupChange> {
let result = match action {
FlowSetupChangeAction::Setup => &flow_exec_ctx.setup_change,
FlowSetupChangeAction::Drop => {
let existing_state = setup_ctx.all_setup_states.flows.get(flow_ctx.flow_name());
buffer.insert(
diff_flow_setup_states(None, existing_state, &flow_ctx.flow.flow_instance_ctx)
.await?,
)
}
};
Ok(result)
}

pub(crate) async fn apply_changes_for_flow_ctx(
action: FlowSetupChangeAction,
flow_ctx: &FlowContext,
flow_exec_ctx: &mut FlowExecutionContext,
setup_ctx: &mut LibSetupContext,
db_pool: &PgPool,
write: &mut (dyn std::io::Write + Send),
) -> Result<()> {
let mut setup_change_buffer = None;
let setup_change = get_flow_setup_change(
setup_ctx,
&flow_ctx,
&flow_exec_ctx,
&action,
&mut setup_change_buffer,
)
.await?;
if setup_change.is_up_to_date() {
return Ok(());
}

let mut flow_states = setup_ctx
.all_setup_states
.flows
.remove(flow_ctx.flow_name());
apply_changes_for_flow(write, &flow_ctx, setup_change, &mut flow_states, db_pool).await?;

flow_exec_ctx
.update_setup_state(&flow_ctx.flow, flow_states.as_ref())
.await?;
if let Some(flow_states) = flow_states {
setup_ctx
.all_setup_states
.flows
.insert(flow_ctx.flow_name().to_string(), flow_states);
}
Ok(())
}
30 changes: 21 additions & 9 deletions src/setup/states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,16 @@ pub enum ObjectStatus {

pub trait ObjectSetupChange {
fn status(&self) -> Option<ObjectStatus>;
fn is_up_to_date(&self) -> bool;

/// Returns true if it has internal changes, i.e. changes that don't need user intervention.
fn has_internal_changes(&self) -> bool;

/// Returns true if it has external changes, i.e. changes that should notify users.
fn has_external_changes(&self) -> bool;

fn is_up_to_date(&self) -> bool {
return !self.has_internal_changes() && !self.has_external_changes();
}
}

#[derive(Debug)]
Expand All @@ -376,16 +385,19 @@ impl ObjectSetupChange for FlowSetupChange {
self.status
}

fn is_up_to_date(&self) -> bool {
self.metadata_change.is_none()
&& self
.tracking_table
.as_ref()
.is_none_or(|t| t.is_up_to_date())
&& self
fn has_internal_changes(&self) -> bool {
return self.metadata_change.is_some();
}

fn has_external_changes(&self) -> bool {
return self
.tracking_table
.as_ref()
.is_some_and(|t| !t.is_up_to_date())
|| self
.target_resources
.iter()
.all(|target| target.is_up_to_date())
.any(|target| !target.is_up_to_date());
}
}

Expand Down
Loading