Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/execution/indexing_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use futures::try_join;

#[derive(Debug, Serialize)]
pub struct SourceRowLastProcessedInfo {
pub source_ordinal: Option<interface::Ordinal>,
pub source_ordinal: interface::Ordinal,
pub processing_time: Option<chrono::DateTime<chrono::Utc>>,
pub is_logic_current: bool,
}

#[derive(Debug, Serialize)]
pub struct SourceRowInfo {
pub ordinal: Option<interface::Ordinal>,
pub ordinal: interface::Ordinal,
}

#[derive(Debug, Serialize)]
Expand Down Expand Up @@ -43,17 +43,21 @@ pub async fn get_source_row_indexing_status(
let (last_processed, current) = try_join!(last_processed_fut, current_fut)?;

let last_processed = last_processed.map(|l| SourceRowLastProcessedInfo {
source_ordinal: l.processed_source_ordinal.map(interface::Ordinal),
source_ordinal: interface::Ordinal(l.processed_source_ordinal),
processing_time: l
.process_time_micros
.map(chrono::DateTime::<chrono::Utc>::from_timestamp_micros)
.flatten(),
is_logic_current: Some(src_eval_ctx.plan.logic_fingerprint.0.as_slice())
== l.process_logic_fingerprint.as_ref().map(|b| b.as_slice()),
});
let current = current.map(|c| SourceRowInfo { ordinal: c.ordinal });
let current = SourceRowInfo {
ordinal: current
.ordinal
.ok_or(anyhow::anyhow!("Ordinal is unavailable for the source"))?,
};
Ok(SourceRowIndexingStatus {
last_processed,
current,
current: Some(current),
})
}
9 changes: 6 additions & 3 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,12 @@ async fn update_source(
async move {
let mut change_stream = change_stream;
while let Some(change) = change_stream.next().await {
source_context
.process_change(change, &pool, &source_update_stats)
.map(tokio::spawn);
tokio::spawn(source_context.clone().process_source_key(
change.key,
change.data,
source_update_stats.clone(),
pool.clone(),
));
}
Ok(())
}
Expand Down
51 changes: 28 additions & 23 deletions src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ pub fn extract_primary_key(
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
pub enum SourceVersionKind {
#[default]
NonExistent,
UnknownLogic,
DifferentLogic,
CurrentLogic,
Deleted,
NonExistence,
}

#[derive(Debug, Clone, Default)]
pub struct SourceVersion {
pub ordinal: Option<Ordinal>,
pub ordinal: Ordinal,
pub kind: SourceVersionKind,
}

Expand All @@ -62,7 +62,7 @@ impl SourceVersion {
curr_fp: Fingerprint,
) -> Self {
Self {
ordinal: stored_ordinal.map(Ordinal),
ordinal: Ordinal(stored_ordinal),
kind: match &stored_fp {
Some(stored_fp) => {
if stored_fp.as_slice() == curr_fp.0.as_slice() {
Expand All @@ -71,22 +71,26 @@ impl SourceVersion {
SourceVersionKind::DifferentLogic
}
}
None => SourceVersionKind::NonExistent,
None => SourceVersionKind::UnknownLogic,
},
}
}

pub fn from_current(ordinal: Option<Ordinal>) -> Self {
pub fn from_current(ordinal: Ordinal) -> Self {
Self {
ordinal,
kind: SourceVersionKind::CurrentLogic,
}
}

pub fn for_deletion(&self) -> Self {
pub fn from_current_data(data: &interface::SourceData) -> Self {
let kind = match &data.value {
interface::SourceValue::Existence(_) => SourceVersionKind::CurrentLogic,
interface::SourceValue::NonExistence => SourceVersionKind::NonExistence,
};
Self {
ordinal: self.ordinal,
kind: SourceVersionKind::Deleted,
ordinal: data.ordinal,
kind,
}
}

Expand All @@ -95,7 +99,7 @@ impl SourceVersion {
target: &SourceVersion,
update_stats: Option<&stats::UpdateStats>,
) -> bool {
let should_skip = match (self.ordinal, target.ordinal) {
let should_skip = match (self.ordinal.0, target.ordinal.0) {
(Some(orginal), Some(target_ordinal)) => {
orginal > target_ordinal || (orginal == target_ordinal && self.kind >= target.kind)
}
Expand Down Expand Up @@ -418,7 +422,7 @@ async fn commit_source_tracking_info(
source_id,
source_key_json,
cleaned_staging_target_keys,
source_version.ordinal.map(|o| o.into()),
source_version.ordinal.into(),
logic_fingerprint,
precommit_metadata.process_ordinal,
process_timestamp.timestamp_micros(),
Expand Down Expand Up @@ -460,7 +464,7 @@ pub async fn evaluate_source_entry_with_memory(
None
};
let memory = EvaluationMemory::new(chrono::Utc::now(), stored_info, options);
let source_value = match src_eval_ctx
let source_value = src_eval_ctx
.import_op
.executor
.get_value(
Expand All @@ -471,19 +475,20 @@ pub async fn evaluate_source_entry_with_memory(
},
)
.await?
{
Some(d) => d
.value
.ok_or_else(|| anyhow::anyhow!("value not returned"))?,
None => return Ok(None),
.value
.ok_or_else(|| anyhow::anyhow!("value not returned"))?;
let output = match source_value {
interface::SourceValue::Existence(source_value) => {
Some(evaluate_source_entry(src_eval_ctx, source_value, &memory).await?)
}
interface::SourceValue::NonExistence => None,
};
let output = evaluate_source_entry(src_eval_ctx, source_value, &memory).await?;
Ok(Some(output))
Ok(output)
}

pub async fn update_source_row(
src_eval_ctx: &SourceRowEvaluationContext<'_>,
source_value: Option<FieldValues>,
source_value: interface::SourceValue,
source_version: &SourceVersion,
pool: &PgPool,
update_stats: &stats::UpdateStats,
Expand Down Expand Up @@ -517,7 +522,7 @@ pub async fn update_source_row(
None => Default::default(),
};
let (output, stored_mem_info) = match source_value {
Some(source_value) => {
interface::SourceValue::Existence(source_value) => {
let evaluation_memory = EvaluationMemory::new(
process_time,
memoization_info,
Expand All @@ -530,7 +535,7 @@ pub async fn update_source_row(
evaluate_source_entry(src_eval_ctx, source_value, &evaluation_memory).await?;
(Some(output), evaluation_memory.into_stored()?)
}
None => Default::default(),
interface::SourceValue::NonExistence => Default::default(),
};

// Phase 2 (precommit): Update with the memoization info and stage target keys.
Expand Down Expand Up @@ -601,7 +606,7 @@ pub async fn update_source_row(

if let Some(existing_version) = existing_version {
if output.is_some() {
if source_version.ordinal.is_none()
if !source_version.ordinal.is_available()
|| source_version.ordinal != existing_version.ordinal
{
update_stats.num_updates.inc(1);
Expand Down
Loading