Skip to content

Commit 9e3a51c

Browse files
authored
feat(attachment): update tracked states even if no update action needed (#1142)
1 parent a812d93 commit 9e3a51c

File tree

4 files changed

+42
-4
lines changed

4 files changed

+42
-4
lines changed

src/execution/db_tracking_setup.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ pub struct TrackingTableSetupChange {
103103
pub legacy_source_state_table_names: BTreeSet<String>,
104104

105105
pub source_names_need_state_cleanup: BTreeMap<i32, BTreeSet<String>>,
106+
107+
has_state_change: bool,
106108
}
107109

108110
impl TrackingTableSetupChange {
@@ -136,6 +138,7 @@ impl TrackingTableSetupChange {
136138
legacy_source_state_table_names,
137139
min_existing_version_id,
138140
source_names_need_state_cleanup,
141+
has_state_change: existing.has_state_diff(desired, |v| v),
139142
})
140143
} else {
141144
None
@@ -148,6 +151,7 @@ impl TrackingTableSetupChange {
148151
ResourceSetupInfo {
149152
key: (),
150153
state: self.desired_state.clone(),
154+
has_tracked_state_change: self.has_state_change,
151155
description: "Internal Storage for Tracking".to_string(),
152156
setup_change: Some(self),
153157
legacy_key: None,

src/setup/db_metadata.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ impl MetadataTableSetup {
323323
ResourceSetupInfo {
324324
key: (),
325325
state: None,
326+
has_tracked_state_change: self.metadata_table_missing,
326327
description: "CocoIndex Metadata Table".to_string(),
327328
setup_change: Some(self),
328329
legacy_key: None,

src/setup/driver.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,13 @@ async fn collect_attachments_setup_change(
309309

310310
let mut attachments_change = AttachmentsSetupChange::default();
311311
for (AttachmentSetupKey(kind, key), setup_state) in grouped_attachment_states.into_iter() {
312+
let has_diff = setup_state
313+
.existing
314+
.has_state_diff(setup_state.desired.as_ref(), |s| s);
315+
if !has_diff {
316+
continue;
317+
}
318+
attachments_change.has_tracked_state_change = true;
312319
let factory = get_attachment_factory(&kind)?;
313320
let is_upsertion = setup_state.desired.is_some();
314321
if let Some(action) = factory
@@ -428,9 +435,13 @@ pub async fn diff_flow_setup_states(
428435
.await?;
429436

430437
let desired_state = target_states_group.desired.clone();
431-
let target_state = target_states_group
438+
let desired_target_state = target_states_group
432439
.desired
433440
.and_then(|state| (!state.common.setup_by_user).then_some(state.state));
441+
let has_tracked_state_change = target_states_group
442+
.existing
443+
.has_state_diff(desired_target_state.as_ref(), |s| &s.state)
444+
|| attachments_change.has_tracked_state_change;
434445
let existing_without_setup_by_user = CombinedState {
435446
current: target_states_group
436447
.existing
@@ -449,7 +460,7 @@ pub async fn diff_flow_setup_states(
449460
.collect(),
450461
legacy_state_key: target_states_group.existing.legacy_state_key.clone(),
451462
};
452-
let never_setup_by_sys = target_state.is_none()
463+
let never_setup_by_sys = desired_target_state.is_none()
453464
&& existing_without_setup_by_user.current.is_none()
454465
&& existing_without_setup_by_user.staging.is_empty();
455466
let setup_change = if never_setup_by_sys {
@@ -459,7 +470,7 @@ pub async fn diff_flow_setup_states(
459470
target_change: factory
460471
.diff_setup_states(
461472
&resource_id.key,
462-
target_state,
473+
desired_target_state,
463474
existing_without_setup_by_user,
464475
flow_instance_ctx.clone(),
465476
)
@@ -471,6 +482,7 @@ pub async fn diff_flow_setup_states(
471482
target_resources.push(ResourceSetupInfo {
472483
key: resource_id.clone(),
473484
state: desired_state,
485+
has_tracked_state_change,
474486
description: factory.describe_resource(&resource_id.key)?,
475487
setup_change,
476488
legacy_key: target_states_group

src/setup/states.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,17 @@ impl<T> CombinedState<T> {
106106
.filter(|v| Some(*v) != desired_value)
107107
.collect()
108108
}
109+
110+
pub fn has_state_diff<S>(&self, state: Option<&S>, map_fn: impl Fn(&T) -> &S) -> bool
111+
where
112+
S: PartialEq,
113+
{
114+
if let Some(state) = state {
115+
!self.always_exists_and(|s| map_fn(s) == state)
116+
} else {
117+
self.possible_versions().next().is_some()
118+
}
119+
}
109120
}
110121

111122
impl<T: Debug + Clone> Default for CombinedState<T> {
@@ -320,6 +331,7 @@ impl ResourceSetupChange for std::convert::Infallible {
320331
pub struct ResourceSetupInfo<K, S, C: ResourceSetupChange> {
321332
pub key: K,
322333
pub state: Option<S>,
334+
pub has_tracked_state_change: bool,
323335
pub description: String,
324336

325337
/// If `None`, the resource is managed by users.
@@ -406,6 +418,7 @@ pub trait ObjectSetupChange {
406418

407419
#[derive(Default)]
408420
pub struct AttachmentsSetupChange {
421+
pub has_tracked_state_change: bool,
409422
pub deletes: Vec<Box<dyn AttachmentSetupChange + Send + Sync>>,
410423
pub upserts: Vec<Box<dyn AttachmentSetupChange + Send + Sync>>,
411424
}
@@ -472,7 +485,15 @@ impl ObjectSetupChange for FlowSetupChange {
472485
}
473486

474487
fn has_internal_changes(&self) -> bool {
475-
return self.metadata_change.is_some();
488+
self.metadata_change.is_some()
489+
|| self
490+
.tracking_table
491+
.as_ref()
492+
.map_or(false, |t| t.has_tracked_state_change)
493+
|| self
494+
.target_resources
495+
.iter()
496+
.any(|target| target.has_tracked_state_change)
476497
}
477498

478499
fn has_external_changes(&self) -> bool {

0 commit comments

Comments
 (0)