Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 59 additions & 34 deletions src/builder/exec_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ fn build_import_op_exec_ctx(
fn build_target_id(
analyzed_target_ss: &AnalyzedTargetSetupState,
existing_target_states: &HashMap<&setup::ResourceIdentifier, Vec<&setup::TargetSetupState>>,
flow_setup_state: &mut setup::FlowSetupState<setup::DesiredMode>,
metadata: &mut setup::FlowSetupMetadata,
target_states: &mut IndexMap<setup::ResourceIdentifier, setup::TargetSetupState>,
) -> Result<i32> {
let interface::ExecutorFactory::ExportTarget(target_factory) =
get_executor_factory(&analyzed_target_ss.target_kind)?
Expand Down Expand Up @@ -125,8 +126,8 @@ fn build_target_id(
None
};
let target_id = target_id.unwrap_or_else(|| {
flow_setup_state.metadata.last_target_id += 1;
flow_setup_state.metadata.last_target_id
metadata.last_target_id += 1;
metadata.last_target_id
});
let max_schema_version_id = existing_target_states
.iter()
Expand All @@ -143,7 +144,7 @@ fn build_target_id(
} else {
max_schema_version_id + 1
};
match flow_setup_state.targets.entry(resource_id) {
match target_states.entry(resource_id) {
indexmap::map::Entry::Occupied(entry) => {
api_bail!(
"Target resource already exists: kind = {}, key = {}",
Expand Down Expand Up @@ -199,34 +200,27 @@ pub fn build_flow_setup_execution_context(
}
}

let mut setup_state = setup::FlowSetupState::<setup::DesiredMode> {
seen_flow_metadata_version: existing_flow_ss
.and_then(|flow_ss| flow_ss.seen_flow_metadata_version),
metadata: setup::FlowSetupMetadata {
last_source_id: existing_metadata_versions()
.map(|metadata| metadata.last_source_id)
.max()
.unwrap_or(0),
last_target_id: existing_metadata_versions()
.map(|metadata| metadata.last_target_id)
.max()
.unwrap_or(0),
sources: BTreeMap::new(),
},
tracking_table: db_tracking_setup::TrackingTableSetupState {
table_name: existing_flow_ss
.and_then(|flow_ss| {
flow_ss
.tracking_table
.current
.as_ref()
.map(|v| v.table_name.clone())
})
.unwrap_or_else(|| db_tracking_setup::default_tracking_table_name(&flow_inst.name)),
version_id: db_tracking_setup::CURRENT_TRACKING_TABLE_VERSION,
},
targets: IndexMap::new(),
let mut metadata = setup::FlowSetupMetadata {
last_source_id: existing_metadata_versions()
.map(|metadata| metadata.last_source_id)
.max()
.unwrap_or(0),
last_target_id: existing_metadata_versions()
.map(|metadata| metadata.last_target_id)
.max()
.unwrap_or(0),
sources: BTreeMap::new(),
features: existing_flow_ss
.map(|m| {
m.metadata
.possible_versions()
.flat_map(|v| v.features.iter())
.cloned()
.collect::<BTreeSet<_>>()
})
.unwrap_or_else(setup::flow_features::default_features),
};
let mut target_states = IndexMap::new();

let import_op_exec_ctx = flow_inst
.import_ops
Expand All @@ -241,7 +235,7 @@ pub fn build_flow_setup_execution_context(
&import_op.name,
output_type,
source_states_by_name.get(&import_op.name.as_str()),
&mut setup_state.metadata,
&mut metadata,
)
})
.collect::<Result<Vec<_>>>()?;
Expand All @@ -253,7 +247,8 @@ pub fn build_flow_setup_execution_context(
let target_id = build_target_id(
analyzed_target_ss,
&target_states_by_name_type,
&mut setup_state,
&mut metadata,
&mut target_states,
)?;
Ok(ExportOpExecutionContext { target_id })
})
Expand All @@ -263,10 +258,40 @@ pub fn build_flow_setup_execution_context(
build_target_id(
analyzed_target_ss,
&target_states_by_name_type,
&mut setup_state,
&mut metadata,
&mut target_states,
)?;
}

let setup_state = setup::FlowSetupState::<setup::DesiredMode> {
seen_flow_metadata_version: existing_flow_ss
.and_then(|flow_ss| flow_ss.seen_flow_metadata_version),
tracking_table: db_tracking_setup::TrackingTableSetupState {
table_name: existing_flow_ss
.and_then(|flow_ss| {
flow_ss
.tracking_table
.current
.as_ref()
.map(|v| v.table_name.clone())
})
.unwrap_or_else(|| db_tracking_setup::default_tracking_table_name(&flow_inst.name)),
version_id: db_tracking_setup::CURRENT_TRACKING_TABLE_VERSION,
source_state_table_name: metadata
.features
.contains(setup::flow_features::SOURCE_STATE_TABLE)
.then(|| {
existing_flow_ss
.and_then(|flow_ss| flow_ss.tracking_table.current.as_ref())
.and_then(|v| v.source_state_table_name.clone())
.unwrap_or_else(|| {
db_tracking_setup::default_source_state_table_name(&flow_inst.name)
})
}),
},
targets: target_states,
metadata,
};
Ok(FlowSetupExecutionContext {
setup_state,
import_ops: import_op_exec_ctx,
Expand Down
120 changes: 106 additions & 14 deletions src/execution/db_tracking_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ pub fn default_tracking_table_name(flow_name: &str) -> String {
)
}

pub fn default_source_state_table_name(flow_name: &str) -> String {
format!(
"{}__cocoindex_srcstate",
utils::db::sanitize_identifier(flow_name)
)
}

pub const CURRENT_TRACKING_TABLE_VERSION: i32 = 1;

async fn upgrade_tracking_table(
Expand Down Expand Up @@ -46,19 +53,38 @@ async fn upgrade_tracking_table(
Ok(())
}

async fn create_source_state_table(pool: &PgPool, table_name: &str) -> Result<()> {
let query = format!(
"CREATE TABLE IF NOT EXISTS {table_name} (
source_id INTEGER NOT NULL,
key JSONB NOT NULL,
value JSONB NOT NULL,

PRIMARY KEY (source_id, key)
)"
);
sqlx::query(&query).execute(pool).await?;
Ok(())
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TrackingTableSetupState {
pub table_name: String,
pub version_id: i32,
#[serde(default)]
pub source_state_table_name: Option<String>,
}

#[derive(Debug)]
pub struct TrackingTableSetupStatus {
pub desired_state: Option<TrackingTableSetupState>,

pub legacy_table_names: Vec<String>,

pub min_existing_version_id: Option<i32>,
pub legacy_tracking_table_names: BTreeSet<String>,

pub source_state_table_always_exists: bool,
pub legacy_source_state_table_names: BTreeSet<String>,

pub source_ids_to_delete: Vec<i32>,
}

Expand All @@ -68,19 +94,29 @@ impl TrackingTableSetupStatus {
existing: &CombinedState<TrackingTableSetupState>,
source_ids_to_delete: Vec<i32>,
) -> Option<Self> {
let legacy_table_names = existing
let legacy_tracking_table_names = existing
.legacy_values(desired, |v| &v.table_name)
.into_iter()
.cloned()
.collect();
.collect::<BTreeSet<_>>();
let legacy_source_state_table_names = existing
.legacy_values(desired, |v| &v.source_state_table_name)
.into_iter()
.filter_map(|v| v.clone())
.collect::<BTreeSet<_>>();
let min_existing_version_id = existing
.always_exists()
.then(|| existing.possible_versions().map(|v| v.version_id).min())
.flatten();
if desired.is_some() || min_existing_version_id.is_some() {
Some(Self {
desired_state: desired.cloned(),
legacy_table_names,
legacy_tracking_table_names,
source_state_table_always_exists: existing.always_exists()
&& existing
.possible_versions()
.all(|v| v.source_state_table_name.is_some()),
legacy_source_state_table_names,
min_existing_version_id,
source_ids_to_delete,
})
Expand All @@ -95,7 +131,7 @@ impl TrackingTableSetupStatus {
ResourceSetupInfo {
key: (),
state: self.desired_state.clone(),
description: "Tracking Table".to_string(),
description: "Internal Storage for Tracking".to_string(),
setup_status: Some(self),
legacy_key: None,
}
Expand All @@ -105,10 +141,10 @@ impl TrackingTableSetupStatus {
impl ResourceSetupStatus for TrackingTableSetupStatus {
fn describe_changes(&self) -> Vec<setup::ChangeDescription> {
let mut changes: Vec<setup::ChangeDescription> = vec![];
if self.desired_state.is_some() && !self.legacy_table_names.is_empty() {
if self.desired_state.is_some() && !self.legacy_tracking_table_names.is_empty() {
changes.push(setup::ChangeDescription::Action(format!(
"Rename legacy tracking tables: {}. ",
self.legacy_table_names.join(", ")
self.legacy_tracking_table_names.iter().join(", ")
)));
}
match (self.min_existing_version_id, &self.desired_state) {
Expand All @@ -127,10 +163,37 @@ impl ResourceSetupStatus for TrackingTableSetupStatus {
}
(Some(_), None) => changes.push(setup::ChangeDescription::Action(format!(
"Drop existing tracking table: {}. ",
self.legacy_table_names.join(", ")
self.legacy_tracking_table_names.iter().join(", ")
))),
(None, None) => (),
}

let source_state_table_name = self
.desired_state
.as_ref()
.and_then(|v| v.source_state_table_name.as_ref());
if let Some(source_state_table_name) = source_state_table_name {
if !self.legacy_source_state_table_names.is_empty() {
changes.push(setup::ChangeDescription::Action(format!(
"Rename legacy source state tables: {}. ",
self.legacy_source_state_table_names.iter().join(", ")
)));
}
if !self.source_state_table_always_exists {
changes.push(setup::ChangeDescription::Action(format!(
"Create the source state table: {}. ",
source_state_table_name
)));
}
} else if !self.source_state_table_always_exists
&& !self.legacy_source_state_table_names.is_empty()
{
changes.push(setup::ChangeDescription::Action(format!(
"Drop existing source state table: {}. ",
self.legacy_source_state_table_names.iter().join(", ")
)));
}

if !self.source_ids_to_delete.is_empty() {
changes.push(setup::ChangeDescription::Action(format!(
"Delete source IDs: {}. ",
Expand All @@ -145,12 +208,21 @@ impl ResourceSetupStatus for TrackingTableSetupStatus {
}

fn change_type(&self) -> SetupChangeType {
let source_state_table_up_to_date = self.legacy_source_state_table_names.is_empty()
&& (self.source_state_table_always_exists
|| self
.desired_state
.as_ref()
.map_or(true, |v| v.source_state_table_name.is_none()));
match (self.min_existing_version_id, &self.desired_state) {
(None, Some(_)) => SetupChangeType::Create,
(Some(min_version_id), Some(desired)) => {
if min_version_id == desired.version_id && self.legacy_table_names.is_empty() {
if min_version_id == desired.version_id
&& self.legacy_tracking_table_names.is_empty()
&& source_state_table_up_to_date
{
SetupChangeType::NoChange
} else if min_version_id < desired.version_id {
} else if min_version_id < desired.version_id || !source_state_table_up_to_date {
SetupChangeType::Update
} else {
SetupChangeType::Invalid
Expand All @@ -167,7 +239,7 @@ impl TrackingTableSetupStatus {
let lib_context = get_lib_context()?;
let pool = lib_context.require_builtin_db_pool()?;
if let Some(desired) = &self.desired_state {
for lagacy_name in self.legacy_table_names.iter() {
for lagacy_name in self.legacy_tracking_table_names.iter() {
let query = format!(
"ALTER TABLE IF EXISTS {} RENAME TO {}",
lagacy_name, desired.table_name
Expand All @@ -185,11 +257,31 @@ impl TrackingTableSetupStatus {
.await?;
}
} else {
for lagacy_name in self.legacy_table_names.iter() {
for lagacy_name in self.legacy_tracking_table_names.iter() {
let query = format!("DROP TABLE IF EXISTS {lagacy_name}");
sqlx::query(&query).execute(pool).await?;
}
}

let source_state_table_name = self
.desired_state
.as_ref()
.and_then(|v| v.source_state_table_name.as_ref());
if let Some(source_state_table_name) = source_state_table_name {
for lagacy_name in self.legacy_source_state_table_names.iter() {
let query = format!(
"ALTER TABLE IF EXISTS {lagacy_name} RENAME TO {source_state_table_name}"
);
sqlx::query(&query).execute(pool).await?;
}
if !self.source_state_table_always_exists {
create_source_state_table(pool, source_state_table_name).await?;
}
} else {
for lagacy_name in self.legacy_source_state_table_names.iter() {
let query = format!("DROP TABLE IF EXISTS {lagacy_name}");
sqlx::query(&query).execute(pool).await?;
}
return Ok(());
}
Ok(())
}
Expand Down
7 changes: 7 additions & 0 deletions src/setup/flow_features.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use crate::prelude::*;

pub const SOURCE_STATE_TABLE: &str = "source_state_table";

pub fn default_features() -> BTreeSet<String> {
vec![SOURCE_STATE_TABLE.to_string()].into_iter().collect()
}
1 change: 1 addition & 0 deletions src/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod driver;
mod states;

pub mod components;
pub mod flow_features;

pub use auth_registry::AuthRegistry;
pub use driver::*;
Expand Down
2 changes: 2 additions & 0 deletions src/setup/states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ pub struct FlowSetupMetadata {
pub last_source_id: i32,
pub last_target_id: i32,
pub sources: BTreeMap<String, SourceSetupState>,
#[serde(default)]
pub features: BTreeSet<String>,
}

#[derive(Debug, Clone)]
Expand Down
Loading