Skip to content

Commit 1e5e684

Browse files
authored
refactor(source-api): make ordinal/value more clear in various APIs (#479)
1 parent 125d409 commit 1e5e684

File tree

8 files changed

+206
-171
lines changed

8 files changed

+206
-171
lines changed

src/execution/indexing_status.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ use futures::try_join;
66

77
#[derive(Debug, Serialize)]
88
pub struct SourceRowLastProcessedInfo {
9-
pub source_ordinal: Option<interface::Ordinal>,
9+
pub source_ordinal: interface::Ordinal,
1010
pub processing_time: Option<chrono::DateTime<chrono::Utc>>,
1111
pub is_logic_current: bool,
1212
}
1313

1414
#[derive(Debug, Serialize)]
1515
pub struct SourceRowInfo {
16-
pub ordinal: Option<interface::Ordinal>,
16+
pub ordinal: interface::Ordinal,
1717
}
1818

1919
#[derive(Debug, Serialize)]
@@ -43,17 +43,21 @@ pub async fn get_source_row_indexing_status(
4343
let (last_processed, current) = try_join!(last_processed_fut, current_fut)?;
4444

4545
let last_processed = last_processed.map(|l| SourceRowLastProcessedInfo {
46-
source_ordinal: l.processed_source_ordinal.map(interface::Ordinal),
46+
source_ordinal: interface::Ordinal(l.processed_source_ordinal),
4747
processing_time: l
4848
.process_time_micros
4949
.map(chrono::DateTime::<chrono::Utc>::from_timestamp_micros)
5050
.flatten(),
5151
is_logic_current: Some(src_eval_ctx.plan.logic_fingerprint.0.as_slice())
5252
== l.process_logic_fingerprint.as_ref().map(|b| b.as_slice()),
5353
});
54-
let current = current.map(|c| SourceRowInfo { ordinal: c.ordinal });
54+
let current = SourceRowInfo {
55+
ordinal: current
56+
.ordinal
57+
.ok_or(anyhow::anyhow!("Ordinal is unavailable for the source"))?,
58+
};
5559
Ok(SourceRowIndexingStatus {
5660
last_processed,
57-
current,
61+
current: Some(current),
5862
})
5963
}

src/execution/live_updater.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,12 @@ async fn update_source(
9393
async move {
9494
let mut change_stream = change_stream;
9595
while let Some(change) = change_stream.next().await {
96-
source_context
97-
.process_change(change, &pool, &source_update_stats)
98-
.map(tokio::spawn);
96+
tokio::spawn(source_context.clone().process_source_key(
97+
change.key,
98+
change.data,
99+
source_update_stats.clone(),
100+
pool.clone(),
101+
));
99102
}
100103
Ok(())
101104
}

src/execution/row_indexer.rs

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,15 @@ pub fn extract_primary_key(
4343
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
4444
pub enum SourceVersionKind {
4545
#[default]
46-
NonExistent,
46+
UnknownLogic,
4747
DifferentLogic,
4848
CurrentLogic,
49-
Deleted,
49+
NonExistence,
5050
}
5151

5252
#[derive(Debug, Clone, Default)]
5353
pub struct SourceVersion {
54-
pub ordinal: Option<Ordinal>,
54+
pub ordinal: Ordinal,
5555
pub kind: SourceVersionKind,
5656
}
5757

@@ -62,7 +62,7 @@ impl SourceVersion {
6262
curr_fp: Fingerprint,
6363
) -> Self {
6464
Self {
65-
ordinal: stored_ordinal.map(Ordinal),
65+
ordinal: Ordinal(stored_ordinal),
6666
kind: match &stored_fp {
6767
Some(stored_fp) => {
6868
if stored_fp.as_slice() == curr_fp.0.as_slice() {
@@ -71,22 +71,26 @@ impl SourceVersion {
7171
SourceVersionKind::DifferentLogic
7272
}
7373
}
74-
None => SourceVersionKind::NonExistent,
74+
None => SourceVersionKind::UnknownLogic,
7575
},
7676
}
7777
}
7878

79-
pub fn from_current(ordinal: Option<Ordinal>) -> Self {
79+
pub fn from_current(ordinal: Ordinal) -> Self {
8080
Self {
8181
ordinal,
8282
kind: SourceVersionKind::CurrentLogic,
8383
}
8484
}
8585

86-
pub fn for_deletion(&self) -> Self {
86+
pub fn from_current_data(data: &interface::SourceData) -> Self {
87+
let kind = match &data.value {
88+
interface::SourceValue::Existence(_) => SourceVersionKind::CurrentLogic,
89+
interface::SourceValue::NonExistence => SourceVersionKind::NonExistence,
90+
};
8791
Self {
88-
ordinal: self.ordinal,
89-
kind: SourceVersionKind::Deleted,
92+
ordinal: data.ordinal,
93+
kind,
9094
}
9195
}
9296

@@ -95,7 +99,7 @@ impl SourceVersion {
9599
target: &SourceVersion,
96100
update_stats: Option<&stats::UpdateStats>,
97101
) -> bool {
98-
let should_skip = match (self.ordinal, target.ordinal) {
102+
let should_skip = match (self.ordinal.0, target.ordinal.0) {
99103
(Some(orginal), Some(target_ordinal)) => {
100104
orginal > target_ordinal || (orginal == target_ordinal && self.kind >= target.kind)
101105
}
@@ -418,7 +422,7 @@ async fn commit_source_tracking_info(
418422
source_id,
419423
source_key_json,
420424
cleaned_staging_target_keys,
421-
source_version.ordinal.map(|o| o.into()),
425+
source_version.ordinal.into(),
422426
logic_fingerprint,
423427
precommit_metadata.process_ordinal,
424428
process_timestamp.timestamp_micros(),
@@ -460,7 +464,7 @@ pub async fn evaluate_source_entry_with_memory(
460464
None
461465
};
462466
let memory = EvaluationMemory::new(chrono::Utc::now(), stored_info, options);
463-
let source_value = match src_eval_ctx
467+
let source_value = src_eval_ctx
464468
.import_op
465469
.executor
466470
.get_value(
@@ -471,19 +475,20 @@ pub async fn evaluate_source_entry_with_memory(
471475
},
472476
)
473477
.await?
474-
{
475-
Some(d) => d
476-
.value
477-
.ok_or_else(|| anyhow::anyhow!("value not returned"))?,
478-
None => return Ok(None),
478+
.value
479+
.ok_or_else(|| anyhow::anyhow!("value not returned"))?;
480+
let output = match source_value {
481+
interface::SourceValue::Existence(source_value) => {
482+
Some(evaluate_source_entry(src_eval_ctx, source_value, &memory).await?)
483+
}
484+
interface::SourceValue::NonExistence => None,
479485
};
480-
let output = evaluate_source_entry(src_eval_ctx, source_value, &memory).await?;
481-
Ok(Some(output))
486+
Ok(output)
482487
}
483488

484489
pub async fn update_source_row(
485490
src_eval_ctx: &SourceRowEvaluationContext<'_>,
486-
source_value: Option<FieldValues>,
491+
source_value: interface::SourceValue,
487492
source_version: &SourceVersion,
488493
pool: &PgPool,
489494
update_stats: &stats::UpdateStats,
@@ -517,7 +522,7 @@ pub async fn update_source_row(
517522
None => Default::default(),
518523
};
519524
let (output, stored_mem_info) = match source_value {
520-
Some(source_value) => {
525+
interface::SourceValue::Existence(source_value) => {
521526
let evaluation_memory = EvaluationMemory::new(
522527
process_time,
523528
memoization_info,
@@ -530,7 +535,7 @@ pub async fn update_source_row(
530535
evaluate_source_entry(src_eval_ctx, source_value, &evaluation_memory).await?;
531536
(Some(output), evaluation_memory.into_stored()?)
532537
}
533-
None => Default::default(),
538+
interface::SourceValue::NonExistence => Default::default(),
534539
};
535540

536541
// Phase 2 (precommit): Update with the memoization info and stage target keys.
@@ -601,7 +606,7 @@ pub async fn update_source_row(
601606

602607
if let Some(existing_version) = existing_version {
603608
if output.is_some() {
604-
if source_version.ordinal.is_none()
609+
if !source_version.ordinal.is_available()
605610
|| source_version.ordinal != existing_version.ordinal
606611
{
607612
update_stats.num_updates.inc(1);

0 commit comments

Comments
 (0)