Skip to content

Commit e5a1809

Browse files
authored
Bug fix: make sure dangling precommit states properly handled. (#193)
1 parent ea6fe1e commit e5a1809

File tree

1 file changed

+15
-18
lines changed

1 file changed

+15
-18
lines changed

src/execution/indexer.rs

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ struct TrackingInfoForTarget<'a> {
116116
mutation: ExportTargetMutation,
117117
}
118118

119+
#[derive(Debug)]
119120
struct PrecommitData<'a> {
120121
scope_value: &'a ScopeValueBuilder,
121122
memoization_info: &'a MemoizationInfo,
@@ -174,18 +175,19 @@ async fn precommit_source_tracking_info(
174175
.or_default()
175176
.export_op = Some(export_op);
176177
}
178+
179+
// Collect `tracking_info_for_targets` from existing tracking info.
177180
if let Some(info) = tracking_info {
178181
let sqlx::types::Json(staging_target_keys) = info.staging_target_keys;
179182
for (target_id, keys_info) in staging_target_keys.into_iter() {
180-
let mut target_info = TrackingInfoForTarget::default();
183+
let target_info = tracking_info_for_targets.entry(target_id).or_default();
181184
for key_info in keys_info.into_iter() {
182185
target_info
183186
.existing_staging_keys_info
184187
.entry(key_info.0)
185188
.or_default()
186189
.push((key_info.1, key_info.2));
187190
}
188-
tracking_info_for_targets.insert(target_id, target_info);
189191
}
190192

191193
if let Some(sqlx::types::Json(target_keys)) = info.target_keys {
@@ -378,7 +380,6 @@ async fn commit_source_tracking_info(
378380
.collect::<Vec<_>>()
379381
})
380382
.unwrap_or_default();
381-
382383
if !precommit_metadata.source_entry_exists && cleaned_staging_target_keys.is_empty() {
383384
// TODO: When we support distributed execution in the future, we'll need to leave a tombstone for a while
384385
// to prevent an earlier update causing the record reappear because of out-of-order processing.
@@ -532,21 +533,17 @@ pub async fn update_source_entry(
532533

533534
// Phase 3: Apply changes to the target storage, including upserting new target records and removing existing ones.
534535
let mut target_mutations = precommit_output.target_mutations;
535-
let apply_futs = plan
536-
.export_ops
537-
.iter()
538-
.filter_map(|export_op| {
539-
target_mutations
540-
.remove(&export_op.target_id)
541-
.and_then(|mutation| {
542-
if !mutation.is_empty() {
543-
Some(export_op.executor.apply_mutation(mutation))
544-
} else {
545-
None
546-
}
547-
})
548-
})
549-
.collect::<Vec<_>>();
536+
let apply_futs = plan.export_ops.iter().filter_map(|export_op| {
537+
target_mutations
538+
.remove(&export_op.target_id)
539+
.and_then(|mutation| {
540+
if !mutation.is_empty() {
541+
Some(export_op.executor.apply_mutation(mutation))
542+
} else {
543+
None
544+
}
545+
})
546+
});
550547

551548
// TODO: Handle errors.
552549
try_join_all(apply_futs).await?;

0 commit comments

Comments
 (0)