Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/execution/dumper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::memoization::EvaluationMemoryOptions;
use super::row_indexer;
use crate::base::{schema, value};
use crate::builder::plan::{AnalyzedImportOp, ExecutionPlan};
use crate::ops::interface::SourceExecutorListOptions;
use crate::ops::interface::SourceExecutorReadOptions;
use crate::utils::yaml_ser::YamlSerializer;

#[derive(Debug, Clone, Deserialize)]
Expand Down Expand Up @@ -193,9 +193,10 @@ impl<'a> Dumper<'a> {

let mut rows_stream = import_op
.executor
.list(&SourceExecutorListOptions {
.list(&SourceExecutorReadOptions {
include_ordinal: false,
include_content_version_fp: false,
include_value: false,
})
.await?;
while let Some(rows) = rows_stream.next().await {
Expand Down
2 changes: 1 addition & 1 deletion src/execution/indexing_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub async fn get_source_row_indexing_status(
let current_fut = src_eval_ctx.import_op.executor.get_value(
src_eval_ctx.key,
key_aux_info,
&interface::SourceExecutorGetOptions {
&interface::SourceExecutorReadOptions {
include_value: false,
include_ordinal: true,
include_content_version_fp: false,
Expand Down
22 changes: 13 additions & 9 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
execution::{source_indexer::ProcessSourceKeyInput, stats::UpdateStats},
execution::{source_indexer::ProcessSourceRowInput, stats::UpdateStats},
prelude::*,
};

Expand Down Expand Up @@ -192,18 +192,18 @@ impl SourceUpdateTask {
.concurrency_controller
.acquire(concur_control::BYTES_UNKNOWN_YET)
.await?;
tokio::spawn(source_context.clone().process_source_key(
change.key,
tokio::spawn(source_context.clone().process_source_row(
ProcessSourceRowInput {
key: change.key,
key_aux_info: Some(change.key_aux_info),
data: change.data,
},
update_stats.clone(),
concur_permit,
Some(move || async move {
SharedAckFn::ack(&shared_ack_fn).await
}),
pool.clone(),
ProcessSourceKeyInput {
key_aux_info: Some(change.key_aux_info),
data: change.data,
},
));
}
}
Expand Down Expand Up @@ -242,7 +242,9 @@ impl SourceUpdateTask {
let live_mode = self.options.live_mode;
async move {
let update_stats = Arc::new(stats::UpdateStats::default());
source_context.update(&pool, &update_stats).await?;
source_context
.update(&pool, &update_stats, /*expect_little_diff=*/ false)
.await?;
if update_stats.has_any_change() {
status_tx.send_modify(|update| {
update.source_updates_num[source_idx] += 1;
Expand All @@ -260,7 +262,9 @@ impl SourceUpdateTask {
interval.tick().await;

let update_stats = Arc::new(stats::UpdateStats::default());
source_context.update(&pool, &update_stats).await?;
source_context
.update(&pool, &update_stats, /*expect_little_diff=*/ true)
.await?;
if update_stats.has_any_change() {
status_tx.send_modify(|update| {
update.source_updates_num[source_idx] += 1;
Expand Down
4 changes: 2 additions & 2 deletions src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::stats;
use crate::base::value::{self, FieldValues, KeyValue};
use crate::builder::plan::*;
use crate::ops::interface::{
ExportTargetMutation, ExportTargetUpsertEntry, Ordinal, SourceExecutorGetOptions,
ExportTargetMutation, ExportTargetUpsertEntry, Ordinal, SourceExecutorReadOptions,
};
use crate::utils::db::WriteAction;
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};
Expand Down Expand Up @@ -841,7 +841,7 @@ pub async fn evaluate_source_entry_with_memory(
.get_value(
src_eval_ctx.key,
key_aux_info,
&SourceExecutorGetOptions {
&SourceExecutorReadOptions {
include_value: true,
include_ordinal: false,
include_content_version_fp: false,
Expand Down
86 changes: 44 additions & 42 deletions src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ impl<'a> LocalSourceRowStateOperator<'a> {
}
}

pub struct ProcessSourceKeyInput {
pub struct ProcessSourceRowInput {
pub key: value::FullKeyValue,
/// `key_aux_info` is not available for deletions. It must be provided if `data.value` is `None`.
pub key_aux_info: Option<serde_json::Value>,
pub data: interface::PartialSourceRowData,
Expand Down Expand Up @@ -224,17 +225,16 @@ impl SourceIndexingContext {
})
}

pub async fn process_source_key<
pub async fn process_source_row<
AckFut: Future<Output = Result<()>> + Send + 'static,
AckFn: FnOnce() -> AckFut,
>(
self: Arc<Self>,
key: value::FullKeyValue,
row_input: ProcessSourceRowInput,
update_stats: Arc<stats::UpdateStats>,
_concur_permit: concur_control::CombinedConcurrencyControllerPermit,
ack_fn: Option<AckFn>,
pool: PgPool,
inputs: ProcessSourceKeyInput,
) {
let process = async {
let plan = self.flow.get_execution_plan().await?;
Expand All @@ -245,7 +245,7 @@ impl SourceIndexingContext {
plan: &plan,
import_op,
schema,
key: &key,
key: &row_input.key,
import_op_idx: self.source_idx,
};
let mut row_indexer = row_indexer::RowIndexer::new(
Expand All @@ -256,9 +256,9 @@ impl SourceIndexingContext {
)?;

let mut row_state_operator =
LocalSourceRowStateOperator::new(&key, &self.state, &update_stats);
LocalSourceRowStateOperator::new(&row_input.key, &self.state, &update_stats);

let source_data = inputs.data;
let source_data = row_input.data;
if let Some(ordinal) = source_data.ordinal
&& let Some(content_version_fp) = &source_data.content_version_fp
{
Expand Down Expand Up @@ -295,22 +295,22 @@ impl SourceIndexingContext {
}
}

let (ordinal, value, content_version_fp) =
let (ordinal, content_version_fp, value) =
match (source_data.ordinal, source_data.value) {
(Some(ordinal), Some(value)) => {
(ordinal, value, source_data.content_version_fp)
(ordinal, source_data.content_version_fp, value)
}
_ => {
let data = import_op
.executor
.get_value(
&key,
inputs.key_aux_info.as_ref().ok_or_else(|| {
&row_input.key,
row_input.key_aux_info.as_ref().ok_or_else(|| {
anyhow::anyhow!(
"`key_aux_info` must be provided when there's no `source_data`"
)
})?,
&interface::SourceExecutorGetOptions {
&interface::SourceExecutorReadOptions {
include_value: true,
include_ordinal: true,
include_content_version_fp: true,
Expand All @@ -320,9 +320,9 @@ impl SourceIndexingContext {
(
data.ordinal
.ok_or_else(|| anyhow::anyhow!("ordinal is not available"))?,
data.content_version_fp,
data.value
.ok_or_else(|| anyhow::anyhow!("value is not available"))?,
data.content_version_fp,
)
}
};
Expand Down Expand Up @@ -356,7 +356,8 @@ impl SourceIndexingContext {
"{:?}",
e.context(format!(
"Error in processing row from source `{source}` with key: {key}",
source = self.flow.flow_instance.import_ops[self.source_idx].name
source = self.flow.flow_instance.import_ops[self.source_idx].name,
key = row_input.key,
))
);
}
Expand All @@ -366,6 +367,7 @@ impl SourceIndexingContext {
self: &Arc<Self>,
pool: &PgPool,
update_stats: &Arc<stats::UpdateStats>,
expect_little_diff: bool,
) -> Result<()> {
let pending_update_fut = {
let mut pending_update = self.pending_update.lock().unwrap();
Expand All @@ -382,7 +384,8 @@ impl SourceIndexingContext {
let mut pending_update = slf.pending_update.lock().unwrap();
*pending_update = None;
}
slf.update_once(&pool, &update_stats).await?;
slf.update_once(&pool, &update_stats, expect_little_diff)
.await?;
}
anyhow::Ok(())
});
Expand All @@ -405,24 +408,26 @@ impl SourceIndexingContext {
self: &Arc<Self>,
pool: &PgPool,
update_stats: &Arc<stats::UpdateStats>,
expect_little_diff: bool,
) -> Result<()> {
let plan = self.flow.get_execution_plan().await?;
let import_op = &plan.import_ops[self.source_idx];
let rows_stream = import_op
.executor
.list(&interface::SourceExecutorListOptions {
include_ordinal: true,
include_content_version_fp: true,
})
.await?;
let read_options = interface::SourceExecutorReadOptions {
include_ordinal: true,
include_content_version_fp: true,
// When only a little diff is expected and the source provides ordinal, we don't fetch values during `list()` by default,
// as there's a high chance that we don't need the values at all
include_value: !(expect_little_diff && import_op.executor.provides_ordinal()),
};
let rows_stream = import_op.executor.list(&read_options).await?;
self.update_with_stream(import_op, rows_stream, pool, update_stats)
.await
}

async fn update_with_stream(
self: &Arc<Self>,
import_op: &plan::AnalyzedImportOp,
mut rows_stream: BoxStream<'_, Result<Vec<interface::PartialSourceRowMetadata>>>,
mut rows_stream: BoxStream<'_, Result<Vec<interface::PartialSourceRow>>>,
pool: &PgPool,
update_stats: &Arc<stats::UpdateStats>,
) -> Result<()> {
Expand All @@ -435,7 +440,8 @@ impl SourceIndexingContext {
while let Some(row) = rows_stream.next().await {
for row in row? {
let source_version = SourceVersion::from_current_with_ordinal(
row.ordinal
row.data
.ordinal
.ok_or_else(|| anyhow::anyhow!("ordinal is not available"))?,
);
{
Expand All @@ -454,20 +460,16 @@ impl SourceIndexingContext {
.concurrency_controller
.acquire(concur_control::BYTES_UNKNOWN_YET)
.await?;
join_set.spawn(self.clone().process_source_key(
row.key,
join_set.spawn(self.clone().process_source_row(
ProcessSourceRowInput {
key: row.key,
key_aux_info: Some(row.key_aux_info),
data: row.data,
},
update_stats.clone(),
concur_permit,
NO_ACK,
pool.clone(),
ProcessSourceKeyInput {
key_aux_info: Some(row.key_aux_info),
data: interface::PartialSourceRowData {
value: None,
ordinal: Some(source_version.ordinal),
content_version_fp: row.content_version_fp,
},
},
));
}
}
Expand All @@ -491,20 +493,20 @@ impl SourceIndexingContext {
};
for (key, source_ordinal) in deleted_key_versions {
let concur_permit = import_op.concurrency_controller.acquire(Some(|| 0)).await?;
join_set.spawn(self.clone().process_source_key(
key,
update_stats.clone(),
concur_permit,
NO_ACK,
pool.clone(),
ProcessSourceKeyInput {
join_set.spawn(self.clone().process_source_row(
ProcessSourceRowInput {
key,
key_aux_info: None,
data: interface::PartialSourceRowData {
value: Some(interface::SourceValue::NonExistence),
ordinal: Some(source_ordinal),
content_version_fp: None,
value: Some(interface::SourceValue::NonExistence),
},
},
update_stats.clone(),
concur_permit,
NO_ACK,
pool.clone(),
));
}
while let Some(result) = join_set.join_next().await {
Expand Down
Loading
Loading