Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/execution/db_tracking_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl TrackingTableSetupStatusCheck {
state: self.desired_state.clone(),
description: "Tracking Table".to_string(),
status_check: Some(self),
legacy_key: None,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
StorageFactoryBase::describe_resource(self, &key)
}

fn normalize_setup_key(&self, key: serde_json::Value) -> Result<serde_json::Value> {
fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value> {
let key: T::Key = serde_json::from_value(key.clone())?;
Ok(serde_json::to_value(key)?)
}
Expand Down Expand Up @@ -478,5 +478,6 @@ fn from_json_combined_state<T: Debug + Clone + Serialize + DeserializeOwned>(
})
})
.collect::<Result<_>>()?,
legacy_state_key: existing_states.legacy_state_key,
})
}
2 changes: 1 addition & 1 deletion src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ pub trait ExportTargetFactory: Send + Sync {

/// Normalize the key. e.g. the JSON format may change (after code change, e.g. new optional field or field ordering), even if the underlying value is not changed.
/// This should always return the canonical serialized form.
fn normalize_setup_key(&self, key: serde_json::Value) -> Result<serde_json::Value>;
fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value>;

fn check_state_compatibility(
&self,
Expand Down
1 change: 1 addition & 0 deletions src/setup/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl<D: Operator> StatusCheck<D> {
}
})
.collect(),
legacy_state_key: existing.legacy_state_key,
};
let mut keys_to_delete = IndexSet::new();
let mut states_to_upsert = vec![];
Expand Down
79 changes: 52 additions & 27 deletions src/setup/db_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,30 @@ async fn delete_state(
Ok(())
}

pub struct StateUpdateInfo {
pub desired_state: Option<serde_json::Value>,
pub legacy_key: Option<ResourceTypeKey>,
}

impl StateUpdateInfo {
pub fn new(
desired_state: Option<&impl Serialize>,
legacy_key: Option<ResourceTypeKey>,
) -> Result<Self> {
Ok(Self {
desired_state: desired_state
.as_ref()
.map(serde_json::to_value)
.transpose()?,
legacy_key,
})
}
}

pub async fn stage_changes_for_flow(
flow_name: &str,
seen_metadata_version: Option<u64>,
state_updates: &HashMap<ResourceTypeKey, Option<serde_json::Value>>,
resource_update_info: &HashMap<ResourceTypeKey, StateUpdateInfo>,
pool: &PgPool,
) -> Result<u64> {
let mut txn = pool.begin().await?;
Expand Down Expand Up @@ -210,39 +230,43 @@ pub async fn stage_changes_for_flow(
)
.await?;

for (type_id, desired_state) in state_updates {
for (type_id, update_info) in resource_update_info {
let existing = existing_records.remove(type_id);
let change = match desired_state {
let change = match &update_info.desired_state {
Some(desired_state) => StateChange::Upsert(desired_state.clone()),
None => StateChange::Delete,
};
match existing {
let mut new_staging_changes = vec![];
if let Some(legacy_key) = &update_info.legacy_key {
if let Some(legacy_record) = existing_records.remove(&legacy_key) {
new_staging_changes.extend(legacy_record.staging_changes.0);
delete_state(flow_name, legacy_key, &mut *txn).await?;
}
}
let (action, existing_staging_changes) = match existing {
Some(existing) => {
if existing.staging_changes.0.iter().all(|c| c != &change) {
let mut staging_changes = existing.staging_changes.0;
staging_changes.push(change);
upsert_staging_changes(
flow_name,
type_id,
staging_changes,
&mut *txn,
WriteAction::Update,
)
.await?;
let existing_staging_changes = existing.staging_changes.0;
if existing_staging_changes.iter().all(|c| c != &change) {
new_staging_changes.push(change);
}
(WriteAction::Update, existing_staging_changes)
}
None => {
if desired_state.is_some() {
upsert_staging_changes(
flow_name,
type_id,
vec![change],
&mut *txn,
WriteAction::Insert,
)
.await?;
if update_info.desired_state.is_some() {
new_staging_changes.push(change);
}
(WriteAction::Insert, vec![])
}
};
if !new_staging_changes.is_empty() {
upsert_staging_changes(
flow_name,
type_id,
[existing_staging_changes, new_staging_changes].concat(),
&mut *txn,
action,
)
.await?;
}
}
txn.commit().await?;
Expand All @@ -252,7 +276,7 @@ pub async fn stage_changes_for_flow(
pub async fn commit_changes_for_flow(
flow_name: &str,
curr_metadata_version: u64,
state_updates: HashMap<ResourceTypeKey, Option<serde_json::Value>>,
state_updates: HashMap<ResourceTypeKey, StateUpdateInfo>,
delete_version: bool,
pool: &PgPool,
) -> Result<()> {
Expand All @@ -265,8 +289,8 @@ pub async fn commit_changes_for_flow(
StatusCode::CONFLICT,
))?;
}
for (type_id, desired_state) in state_updates {
match desired_state {
for (type_id, update_info) in state_updates {
match update_info.desired_state {
Some(desired_state) => {
upsert_state(
flow_name,
Expand Down Expand Up @@ -301,6 +325,7 @@ impl MetadataTableSetup {
state: None,
description: "CocoIndex Metadata Table".to_string(),
status_check: Some(self),
legacy_key: None,
}
}
}
Expand Down
82 changes: 56 additions & 26 deletions src/setup/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl std::str::FromStr for MetadataRecordType {
fn from_metadata_record<S: DeserializeOwned + Debug + Clone>(
state: Option<serde_json::Value>,
staging_changes: sqlx::types::Json<Vec<StateChange<serde_json::Value>>>,
legacy_state_key: Option<serde_json::Value>,
) -> Result<CombinedState<S>> {
let current: Option<S> = state.map(serde_json::from_value).transpose()?;
let staging: Vec<StateChange<S>> = (staging_changes.0.into_iter())
Expand All @@ -67,7 +68,11 @@ fn from_metadata_record<S: DeserializeOwned + Debug + Clone>(
})
})
.collect::<Result<_>>()?;
Ok(CombinedState { current, staging })
Ok(CombinedState {
current,
staging,
legacy_state_key,
})
}

pub async fn get_existing_setup_state(pool: &PgPool) -> Result<AllSetupState<ExistingMode>> {
Expand Down Expand Up @@ -103,27 +108,33 @@ pub async fn get_existing_setup_state(pool: &PgPool) -> Result<AllSetupState<Exi
db_metadata::parse_flow_version(&state);
}
MetadataRecordType::FlowMetadata => {
flow_ss.metadata = from_metadata_record(state, staging_changes)?;
flow_ss.metadata = from_metadata_record(state, staging_changes, None)?;
}
MetadataRecordType::TrackingTable => {
flow_ss.tracking_table = from_metadata_record(state, staging_changes)?;
flow_ss.tracking_table =
from_metadata_record(state, staging_changes, None)?;
}
MetadataRecordType::Target(target_type) => {
let normalized_key = {
let registry = executor_factory_registry();
match registry.get(&target_type) {
Some(ExecutorFactory::ExportTarget(factory)) => {
factory.normalize_setup_key(metadata_record.key)?
factory.normalize_setup_key(&metadata_record.key)?
}
_ => metadata_record.key.clone(),
}
};
let combined_state = from_metadata_record(
state,
staging_changes,
(normalized_key != metadata_record.key).then_some(metadata_record.key),
)?;
flow_ss.targets.insert(
super::ResourceIdentifier {
key: normalized_key,
target_kind: target_type,
},
from_metadata_record(state, staging_changes)?,
combined_state,
);
}
}
Expand Down Expand Up @@ -203,6 +214,20 @@ fn group_resource_states<'a>(
if let Some(current) = &state.current {
entry.existing.current = Some(current.clone());
}
if let Some(legacy_state_key) = &state.legacy_state_key {
if !entry
.existing
.legacy_state_key
.as_ref()
.map_or(false, |v| v == legacy_state_key)
{
warn!(
"inconsistent legacy key: {:?}, {:?}",
key, entry.existing.legacy_state_key
);
}
entry.existing.legacy_state_key = Some(legacy_state_key.clone());
}
for s in state.staging.iter() {
match s {
StateChange::Upsert(v) => {
Expand Down Expand Up @@ -288,6 +313,7 @@ pub async fn check_flow_setup_status(
StateChange::Delete => Some(StateChange::Delete),
})
.collect(),
legacy_state_key: v.existing.legacy_state_key.clone(),
};
let never_setup_by_sys = target_state.is_none()
&& existing_without_setup_by_user.current.is_none()
Expand All @@ -311,6 +337,13 @@ pub async fn check_flow_setup_status(
state,
description: factory.describe_resource(&resource_id.key)?,
status_check,
legacy_key: v
.existing
.legacy_state_key
.map(|legacy_state_key| ResourceIdentifier {
target_kind: resource_id.target_kind.clone(),
key: legacy_state_key,
}),
});
}
Ok(FlowSetupStatusCheck {
Expand Down Expand Up @@ -406,19 +439,16 @@ pub async fn apply_changes(
};
write!(write, "\n{verb} flow {flow_name}:\n")?;

let mut state_updates =
HashMap::<db_metadata::ResourceTypeKey, Option<serde_json::Value>>::new();
let mut update_info =
HashMap::<db_metadata::ResourceTypeKey, db_metadata::StateUpdateInfo>::new();

if let Some(metadata_change) = &flow_status.metadata_change {
state_updates.insert(
update_info.insert(
db_metadata::ResourceTypeKey::new(
MetadataRecordType::FlowMetadata.to_string(),
serde_json::Value::Null,
),
metadata_change
.desired_state()
.map(serde_json::to_value)
.transpose()?,
db_metadata::StateUpdateInfo::new(metadata_change.desired_state(), None)?,
);
}
if let Some(tracking_table) = &flow_status.tracking_table {
Expand All @@ -428,37 +458,37 @@ pub async fn apply_changes(
.map(|c| c.change_type() != SetupChangeType::NoChange)
.unwrap_or_default()
{
state_updates.insert(
update_info.insert(
db_metadata::ResourceTypeKey::new(
MetadataRecordType::TrackingTable.to_string(),
serde_json::Value::Null,
),
tracking_table
.state
.as_ref()
.map(serde_json::to_value)
.transpose()?,
db_metadata::StateUpdateInfo::new(tracking_table.state.as_ref(), None)?,
);
}
}
for target_resource in &flow_status.target_resources {
state_updates.insert(
update_info.insert(
db_metadata::ResourceTypeKey::new(
MetadataRecordType::Target(target_resource.key.target_kind.clone()).to_string(),
target_resource.key.key.clone(),
),
target_resource
.state
.as_ref()
.map(serde_json::to_value)
.transpose()?,
db_metadata::StateUpdateInfo::new(
target_resource.state.as_ref(),
target_resource.legacy_key.as_ref().map(|k| {
db_metadata::ResourceTypeKey::new(
MetadataRecordType::Target(k.target_kind.clone()).to_string(),
k.key.clone(),
)
}),
)?,
);
}

let new_version_id = db_metadata::stage_changes_for_flow(
flow_name,
flow_status.seen_flow_metadata_version,
&state_updates,
&update_info,
pool,
)
.await?;
Expand All @@ -474,7 +504,7 @@ pub async fn apply_changes(
db_metadata::commit_changes_for_flow(
flow_name,
new_version_id,
state_updates,
update_info,
is_deletion,
pool,
)
Expand Down
4 changes: 4 additions & 0 deletions src/setup/states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl StateMode for DesiredMode {
pub struct CombinedState<T> {
pub current: Option<T>,
pub staging: Vec<StateChange<T>>,
pub legacy_state_key: Option<serde_json::Value>,
}

impl<T> CombinedState<T> {
Expand Down Expand Up @@ -73,6 +74,7 @@ impl<T: Debug + Clone> Default for CombinedState<T> {
Self {
current: None,
staging: vec![],
legacy_state_key: None,
}
}
}
Expand Down Expand Up @@ -263,6 +265,8 @@ pub struct ResourceSetupInfo<K, S, C: ResourceSetupStatusCheck> {

/// If `None`, the resource is managed by users.
pub status_check: Option<C>,

pub legacy_key: Option<K>,
}

impl<K, S, C: ResourceSetupStatusCheck> std::fmt::Display for ResourceSetupInfo<K, S, C> {
Expand Down