Skip to content

Commit 5fd2ee4

Browse files
authored
Incremental processing: propagate ordinal from source and skip processing a row if ordinal+logic has no change (#225)
* Interface update. * Source update to include source ordinal. * Avoid collapsing a request when logic fingerprint changed.
1 parent 4a35a66 commit 5fd2ee4

File tree

9 files changed

+274
-153
lines changed

9 files changed

+274
-153
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,5 @@ http-body-util = "0.1.3"
9595
yaml-rust2 = "0.10.0"
9696
urlencoding = "2.1.3"
9797
uuid = { version = "1.16.0", features = ["serde", "v4", "v8"] }
98+
tokio-stream = "0.1.17"
99+
async-stream = "0.3.6"

src/execution/evaluator.rs

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -443,8 +443,9 @@ pub async fn evaluate_source_entry(
443443
source_op: &AnalyzedSourceOp,
444444
schema: &schema::DataSchema,
445445
key: &value::KeyValue,
446+
source_value: value::FieldValues,
446447
memory: &EvaluationMemory,
447-
) -> Result<Option<ScopeValueBuilder>> {
448+
) -> Result<ScopeValueBuilder> {
448449
let root_schema = &schema.schema;
449450
let root_scope_value =
450451
ScopeValueBuilder::new(root_schema.fields.len(), schema.collectors.len());
@@ -464,26 +465,20 @@ pub async fn evaluate_source_entry(
464465
}
465466
};
466467

467-
let result = match source_op.executor.get_value(key).await? {
468-
Some(val) => {
469-
let scope_value =
470-
ScopeValueBuilder::augmented_from(&value::ScopeValue(val), collection_schema)?;
471-
root_scope_entry.define_field_w_builder(
472-
&source_op.output,
473-
value::Value::Table(BTreeMap::from([(key.clone(), scope_value)])),
474-
);
475-
476-
evaluate_op_scope(
477-
&plan.op_scope,
478-
RefList::Nil.prepend(&root_scope_entry),
479-
memory,
480-
)
481-
.await?;
482-
Some(root_scope_value)
483-
}
484-
None => None,
485-
};
486-
anyhow::Ok(result)
468+
let scope_value =
469+
ScopeValueBuilder::augmented_from(&value::ScopeValue(source_value), collection_schema)?;
470+
root_scope_entry.define_field_w_builder(
471+
&source_op.output,
472+
value::Value::Table(BTreeMap::from([(key.clone(), scope_value)])),
473+
);
474+
475+
evaluate_op_scope(
476+
&plan.op_scope,
477+
RefList::Nil.prepend(&root_scope_entry),
478+
memory,
479+
)
480+
.await?;
481+
Ok(root_scope_value)
487482
}
488483

489484
pub async fn evaluate_transient_flow(

src/execution/indexer.rs

Lines changed: 87 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use anyhow::Result;
1+
use crate::prelude::*;
2+
23
use futures::future::{join, join_all, try_join, try_join_all};
34
use itertools::Itertools;
45
use log::error;
@@ -13,7 +14,7 @@ use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoiz
1314
use crate::base::schema;
1415
use crate::base::value::{self, FieldValues, KeyValue};
1516
use crate::builder::plan::*;
16-
use crate::ops::interface::{ExportTargetMutation, ExportTargetUpsertEntry};
17+
use crate::ops::interface::{ExportTargetMutation, ExportTargetUpsertEntry, Ordinal};
1718
use crate::utils::db::WriteAction;
1819
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};
1920

@@ -93,9 +94,9 @@ pub fn extract_primary_key(
9394
Ok(key)
9495
}
9596

96-
enum WithApplyStatus<T = ()> {
97+
pub enum UnchangedOr<T> {
9798
Normal(T),
98-
Collapsed,
99+
Unchanged,
99100
}
100101

101102
#[derive(Default)]
@@ -140,7 +141,7 @@ async fn precommit_source_tracking_info(
140141
db_setup: &db_tracking_setup::TrackingTableSetupState,
141142
export_ops: &[AnalyzedExportOp],
142143
pool: &PgPool,
143-
) -> Result<WithApplyStatus<PrecommitOutput>> {
144+
) -> Result<UnchangedOr<PrecommitOutput>> {
144145
let mut txn = pool.begin().await?;
145146

146147
let tracking_info = db_tracking::read_source_tracking_info_for_precommit(
@@ -157,7 +158,7 @@ async fn precommit_source_tracking_info(
157158
.and_then(|info| info.processed_source_ordinal)
158159
> source_ordinal
159160
{
160-
return Ok(WithApplyStatus::Collapsed);
161+
return Ok(UnchangedOr::Unchanged);
161162
}
162163
let process_ordinal = (tracking_info
163164
.as_ref()
@@ -323,7 +324,7 @@ async fn precommit_source_tracking_info(
323324

324325
txn.commit().await?;
325326

326-
Ok(WithApplyStatus::Normal(PrecommitOutput {
327+
Ok(UnchangedOr::Normal(PrecommitOutput {
327328
metadata: PrecommitMetadata {
328329
source_entry_exists: data.is_some(),
329330
process_ordinal,
@@ -343,7 +344,7 @@ async fn commit_source_tracking_info(
343344
process_timestamp: &chrono::DateTime<chrono::Utc>,
344345
db_setup: &db_tracking_setup::TrackingTableSetupState,
345346
pool: &PgPool,
346-
) -> Result<WithApplyStatus<()>> {
347+
) -> Result<UnchangedOr<()>> {
347348
let mut txn = pool.begin().await?;
348349

349350
let tracking_info = db_tracking::read_source_tracking_info_for_commit(
@@ -357,7 +358,7 @@ async fn commit_source_tracking_info(
357358
if tracking_info.as_ref().and_then(|info| info.process_ordinal)
358359
>= Some(precommit_metadata.process_ordinal)
359360
{
360-
return Ok(WithApplyStatus::Collapsed);
361+
return Ok(UnchangedOr::Unchanged);
361362
}
362363

363364
let cleaned_staging_target_keys = tracking_info
@@ -417,7 +418,7 @@ async fn commit_source_tracking_info(
417418

418419
txn.commit().await?;
419420

420-
Ok(WithApplyStatus::Normal(()))
421+
Ok(UnchangedOr::Normal(()))
421422
}
422423

423424
pub async fn evaluate_source_entry_with_memory(
@@ -444,8 +445,16 @@ pub async fn evaluate_source_entry_with_memory(
444445
None
445446
};
446447
let memory = EvaluationMemory::new(chrono::Utc::now(), stored_info, options);
447-
let data_builder = evaluate_source_entry(plan, source_op, schema, key, &memory).await?;
448-
Ok(data_builder)
448+
let source_data = match source_op.executor.get_value(key).await? {
449+
Some(d) => d,
450+
None => return Ok(None),
451+
};
452+
let source_value = match source_data.value.await? {
453+
Some(value) => value,
454+
None => return Ok(None),
455+
};
456+
let output = evaluate_source_entry(plan, source_op, schema, key, source_value, &memory).await?;
457+
Ok(Some(output))
449458
}
450459

451460
pub async fn update_source_entry(
@@ -461,8 +470,6 @@ pub async fn update_source_entry(
461470
let process_timestamp = chrono::Utc::now();
462471

463472
// Phase 1: Evaluate with memoization info.
464-
465-
// TODO: Skip if the source is not newer and the processing logic is not changed.
466473
let existing_tracking_info = read_source_tracking_info(
467474
source_op.source_id,
468475
&source_key_json,
@@ -471,66 +478,94 @@ pub async fn update_source_entry(
471478
)
472479
.await?;
473480
let already_exists = existing_tracking_info.is_some();
474-
let memoization_info = existing_tracking_info
475-
.and_then(|info| info.memoization_info.map(|info| info.0))
476-
.flatten();
477-
let evaluation_memory = EvaluationMemory::new(
478-
process_timestamp,
479-
memoization_info,
480-
EvaluationMemoryOptions {
481-
enable_cache: true,
482-
evaluation_only: false,
483-
},
484-
);
485-
let value_builder = if !only_for_deletion {
486-
evaluate_source_entry(plan, source_op, schema, key, &evaluation_memory).await?
481+
let (existing_source_ordinal, existing_logic_fingerprint, memoization_info) =
482+
match existing_tracking_info {
483+
Some(info) => (
484+
info.processed_source_ordinal.map(Ordinal),
485+
info.process_logic_fingerprint,
486+
info.memoization_info.map(|info| info.0).flatten(),
487+
),
488+
None => Default::default(),
489+
};
490+
let (source_ordinal, output, stored_mem_info) = if !only_for_deletion {
491+
let source_data = source_op.executor.get_value(key).await?;
492+
let source_ordinal = source_data.as_ref().and_then(|d| d.ordinal);
493+
match (source_ordinal, existing_source_ordinal) {
494+
// TODO: Collapse if the source is not newer and the processing logic is not changed.
495+
(Some(source_ordinal), Some(existing_source_ordinal)) => {
496+
if source_ordinal < existing_source_ordinal
497+
|| (source_ordinal == existing_source_ordinal
498+
&& existing_logic_fingerprint == source_op.)
499+
{
500+
return Ok(());
501+
}
502+
}
503+
_ => {}
504+
}
505+
let source_value = match source_data {
506+
Some(d) => d.value.await?,
507+
None => None,
508+
};
509+
match source_value {
510+
Some(source_value) => {
511+
let evaluation_memory = EvaluationMemory::new(
512+
process_timestamp,
513+
memoization_info,
514+
EvaluationMemoryOptions {
515+
enable_cache: true,
516+
evaluation_only: false,
517+
},
518+
);
519+
let output = evaluate_source_entry(
520+
plan,
521+
source_op,
522+
schema,
523+
key,
524+
source_value,
525+
&evaluation_memory,
526+
)
527+
.await?;
528+
(
529+
source_ordinal,
530+
Some(output),
531+
evaluation_memory.into_stored()?,
532+
)
533+
}
534+
None => Default::default(),
535+
}
487536
} else {
488-
None
537+
Default::default()
489538
};
490-
let exists = value_builder.is_some();
491-
492539
if already_exists {
493-
if exists {
540+
if output.is_some() {
494541
stats.num_already_exists.fetch_add(1, Relaxed);
495542
} else {
496543
stats.num_deletions.fetch_add(1, Relaxed);
497544
}
498-
} else if exists {
545+
} else if output.is_some() {
499546
stats.num_insertions.fetch_add(1, Relaxed);
500547
} else {
501548
return Ok(());
502549
}
503550

504-
let memoization_info = evaluation_memory.into_stored()?;
505-
let (source_ordinal, precommit_data) = match &value_builder {
506-
Some(scope_value) => {
507-
(
508-
// TODO: Generate the actual source ordinal.
509-
Some(1),
510-
Some(PrecommitData {
511-
scope_value,
512-
memoization_info: &memoization_info,
513-
}),
514-
)
515-
}
516-
None => (None, None),
517-
};
518-
519551
// Phase 2 (precommit): Update with the memoization info and stage target keys.
520552
let precommit_output = precommit_source_tracking_info(
521553
source_op.source_id,
522554
&source_key_json,
523-
source_ordinal,
524-
precommit_data,
555+
source_ordinal.map(|o| o.into()),
556+
output.as_ref().map(|scope_value| PrecommitData {
557+
scope_value,
558+
memoization_info: &stored_mem_info,
559+
}),
525560
&process_timestamp,
526561
&plan.tracking_table_setup,
527562
&plan.export_ops,
528563
pool,
529564
)
530565
.await?;
531566
let precommit_output = match precommit_output {
532-
WithApplyStatus::Normal(output) => output,
533-
WithApplyStatus::Collapsed => return Ok(()),
567+
UnchangedOr::Normal(output) => output,
568+
UnchangedOr::Unchanged => return Ok(()),
534569
};
535570

536571
// Phase 3: Apply changes to the target storage, including upserting new target records and removing existing ones.
@@ -554,7 +589,7 @@ pub async fn update_source_entry(
554589
commit_source_tracking_info(
555590
source_op.source_id,
556591
&source_key_json,
557-
source_ordinal,
592+
source_ordinal.map(|o| o.into()),
558593
&plan.logic_fingerprint,
559594
precommit_output.metadata,
560595
&process_timestamp,

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ mod execution;
44
mod lib_context;
55
mod llm;
66
mod ops;
7+
mod prelude;
78
mod py;
89
mod server;
910
mod service;

src/ops/interface.rs

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,71 @@
1+
use std::time::SystemTime;
2+
13
use crate::base::{
24
schema::*,
35
spec::{IndexOptions, VectorSimilarityMetric},
46
value::*,
57
};
8+
use crate::prelude::*;
69
use crate::setup;
7-
use anyhow::Result;
8-
use async_trait::async_trait;
9-
use futures::future::BoxFuture;
10+
use chrono::TimeZone;
1011
use serde::Serialize;
11-
use std::sync::Arc;
1212

1313
pub struct FlowInstanceContext {
1414
pub flow_instance_name: String,
1515
}
1616

17+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
18+
pub struct Ordinal(pub i64);
19+
20+
impl Into<i64> for Ordinal {
21+
fn into(self) -> i64 {
22+
self.0
23+
}
24+
}
25+
26+
impl TryFrom<SystemTime> for Ordinal {
27+
type Error = anyhow::Error;
28+
29+
fn try_from(time: SystemTime) -> Result<Self, Self::Error> {
30+
let duration = time.duration_since(std::time::UNIX_EPOCH)?;
31+
Ok(duration.as_micros().try_into().map(Ordinal)?)
32+
}
33+
}
34+
35+
impl<TZ: TimeZone> TryFrom<chrono::DateTime<TZ>> for Ordinal {
36+
type Error = anyhow::Error;
37+
38+
fn try_from(time: chrono::DateTime<TZ>) -> Result<Self, Self::Error> {
39+
Ok(Ordinal(time.timestamp_micros()))
40+
}
41+
}
42+
43+
pub struct SourceData<'a> {
44+
/// Value that must increase monotonically after change. E.g. can be from the update time.
45+
pub ordinal: Option<Ordinal>,
46+
/// None means the item is gone when polling.
47+
pub value: BoxFuture<'a, Result<Option<FieldValues>>>,
48+
}
49+
50+
pub struct SourceChange<'a> {
51+
/// Last update/deletion ordinal. None means unavailable.
52+
pub ordinal: Option<Ordinal>,
53+
pub key: KeyValue,
54+
/// None means a deletion. None within the `BoxFuture` means the item is gone when polling.
55+
pub value: Option<BoxFuture<'a, Result<Option<FieldValues>>>>,
56+
}
57+
1758
#[async_trait]
1859
pub trait SourceExecutor: Send + Sync {
1960
/// Get the list of keys for the source.
2061
async fn list_keys(&self) -> Result<Vec<KeyValue>>;
2162

2263
// Get the value for the given key.
23-
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>>;
64+
async fn get_value(&self, key: &KeyValue) -> Result<Option<SourceData<'async_trait>>>;
65+
66+
fn change_stream<'a>(&'a self) -> Option<BoxStream<'a, SourceChange<'a>>> {
67+
None
68+
}
2469
}
2570

2671
pub trait SourceFactory {

0 commit comments

Comments
 (0)