Skip to content

Commit a77abab

Browse files
committed
feat: allow to include values in source's list() calls
1 parent 99eddad commit a77abab

File tree

12 files changed

+128
-106
lines changed

12 files changed

+128
-106
lines changed

src/execution/dumper.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use super::memoization::EvaluationMemoryOptions;
1212
use super::row_indexer;
1313
use crate::base::{schema, value};
1414
use crate::builder::plan::{AnalyzedImportOp, ExecutionPlan};
15-
use crate::ops::interface::SourceExecutorListOptions;
15+
use crate::ops::interface::SourceExecutorReadOptions;
1616
use crate::utils::yaml_ser::YamlSerializer;
1717

1818
#[derive(Debug, Clone, Deserialize)]
@@ -193,9 +193,10 @@ impl<'a> Dumper<'a> {
193193

194194
let mut rows_stream = import_op
195195
.executor
196-
.list(&SourceExecutorListOptions {
196+
.list(&SourceExecutorReadOptions {
197197
include_ordinal: false,
198198
include_content_version_fp: false,
199+
include_value: false,
199200
})
200201
.await?;
201202
while let Some(rows) = rows_stream.next().await {

src/execution/indexing_status.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub async fn get_source_row_indexing_status(
3838
let current_fut = src_eval_ctx.import_op.executor.get_value(
3939
src_eval_ctx.key,
4040
key_aux_info,
41-
&interface::SourceExecutorGetOptions {
41+
&interface::SourceExecutorReadOptions {
4242
include_value: false,
4343
include_ordinal: true,
4444
include_content_version_fp: false,

src/execution/live_updater.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{
2-
execution::{source_indexer::ProcessSourceKeyInput, stats::UpdateStats},
2+
execution::{source_indexer::ProcessSourceRowInput, stats::UpdateStats},
33
prelude::*,
44
};
55

@@ -192,18 +192,18 @@ impl SourceUpdateTask {
192192
.concurrency_controller
193193
.acquire(concur_control::BYTES_UNKNOWN_YET)
194194
.await?;
195-
tokio::spawn(source_context.clone().process_source_key(
196-
change.key,
195+
tokio::spawn(source_context.clone().process_source_row(
196+
ProcessSourceRowInput {
197+
key: change.key,
198+
key_aux_info: Some(change.key_aux_info),
199+
data: change.data,
200+
},
197201
update_stats.clone(),
198202
concur_permit,
199203
Some(move || async move {
200204
SharedAckFn::ack(&shared_ack_fn).await
201205
}),
202206
pool.clone(),
203-
ProcessSourceKeyInput {
204-
key_aux_info: Some(change.key_aux_info),
205-
data: change.data,
206-
},
207207
));
208208
}
209209
}

src/execution/row_indexer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use super::stats;
1616
use crate::base::value::{self, FieldValues, KeyValue};
1717
use crate::builder::plan::*;
1818
use crate::ops::interface::{
19-
ExportTargetMutation, ExportTargetUpsertEntry, Ordinal, SourceExecutorGetOptions,
19+
ExportTargetMutation, ExportTargetUpsertEntry, Ordinal, SourceExecutorReadOptions,
2020
};
2121
use crate::utils::db::WriteAction;
2222
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};
@@ -841,7 +841,7 @@ pub async fn evaluate_source_entry_with_memory(
841841
.get_value(
842842
src_eval_ctx.key,
843843
key_aux_info,
844-
&SourceExecutorGetOptions {
844+
&SourceExecutorReadOptions {
845845
include_value: true,
846846
include_ordinal: false,
847847
include_content_version_fp: false,

src/execution/source_indexer.rs

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ impl<'a> LocalSourceRowStateOperator<'a> {
166166
}
167167
}
168168

169-
pub struct ProcessSourceKeyInput {
169+
pub struct ProcessSourceRowInput {
170+
pub key: value::FullKeyValue,
170171
/// `key_aux_info` is not available for deletions. It must be provided if `data.value` is `None`.
171172
pub key_aux_info: Option<serde_json::Value>,
172173
pub data: interface::PartialSourceRowData,
@@ -224,17 +225,16 @@ impl SourceIndexingContext {
224225
})
225226
}
226227

227-
pub async fn process_source_key<
228+
pub async fn process_source_row<
228229
AckFut: Future<Output = Result<()>> + Send + 'static,
229230
AckFn: FnOnce() -> AckFut,
230231
>(
231232
self: Arc<Self>,
232-
key: value::FullKeyValue,
233+
row_input: ProcessSourceRowInput,
233234
update_stats: Arc<stats::UpdateStats>,
234235
_concur_permit: concur_control::CombinedConcurrencyControllerPermit,
235236
ack_fn: Option<AckFn>,
236237
pool: PgPool,
237-
inputs: ProcessSourceKeyInput,
238238
) {
239239
let process = async {
240240
let plan = self.flow.get_execution_plan().await?;
@@ -245,7 +245,7 @@ impl SourceIndexingContext {
245245
plan: &plan,
246246
import_op,
247247
schema,
248-
key: &key,
248+
key: &row_input.key,
249249
import_op_idx: self.source_idx,
250250
};
251251
let mut row_indexer = row_indexer::RowIndexer::new(
@@ -256,9 +256,9 @@ impl SourceIndexingContext {
256256
)?;
257257

258258
let mut row_state_operator =
259-
LocalSourceRowStateOperator::new(&key, &self.state, &update_stats);
259+
LocalSourceRowStateOperator::new(&row_input.key, &self.state, &update_stats);
260260

261-
let source_data = inputs.data;
261+
let source_data = row_input.data;
262262
if let Some(ordinal) = source_data.ordinal
263263
&& let Some(content_version_fp) = &source_data.content_version_fp
264264
{
@@ -295,22 +295,22 @@ impl SourceIndexingContext {
295295
}
296296
}
297297

298-
let (ordinal, value, content_version_fp) =
298+
let (ordinal, content_version_fp, value) =
299299
match (source_data.ordinal, source_data.value) {
300300
(Some(ordinal), Some(value)) => {
301-
(ordinal, value, source_data.content_version_fp)
301+
(ordinal, source_data.content_version_fp, value)
302302
}
303303
_ => {
304304
let data = import_op
305305
.executor
306306
.get_value(
307-
&key,
308-
inputs.key_aux_info.as_ref().ok_or_else(|| {
307+
&row_input.key,
308+
row_input.key_aux_info.as_ref().ok_or_else(|| {
309309
anyhow::anyhow!(
310310
"`key_aux_info` must be provided when there's no `source_data`"
311311
)
312312
})?,
313-
&interface::SourceExecutorGetOptions {
313+
&interface::SourceExecutorReadOptions {
314314
include_value: true,
315315
include_ordinal: true,
316316
include_content_version_fp: true,
@@ -320,9 +320,9 @@ impl SourceIndexingContext {
320320
(
321321
data.ordinal
322322
.ok_or_else(|| anyhow::anyhow!("ordinal is not available"))?,
323+
data.content_version_fp,
323324
data.value
324325
.ok_or_else(|| anyhow::anyhow!("value is not available"))?,
325-
data.content_version_fp,
326326
)
327327
}
328328
};
@@ -356,7 +356,8 @@ impl SourceIndexingContext {
356356
"{:?}",
357357
e.context(format!(
358358
"Error in processing row from source `{source}` with key: {key}",
359-
source = self.flow.flow_instance.import_ops[self.source_idx].name
359+
source = self.flow.flow_instance.import_ops[self.source_idx].name,
360+
key = row_input.key,
360361
))
361362
);
362363
}
@@ -410,9 +411,10 @@ impl SourceIndexingContext {
410411
let import_op = &plan.import_ops[self.source_idx];
411412
let rows_stream = import_op
412413
.executor
413-
.list(&interface::SourceExecutorListOptions {
414+
.list(&interface::SourceExecutorReadOptions {
414415
include_ordinal: true,
415416
include_content_version_fp: true,
417+
include_value: true,
416418
})
417419
.await?;
418420
self.update_with_stream(import_op, rows_stream, pool, update_stats)
@@ -422,7 +424,7 @@ impl SourceIndexingContext {
422424
async fn update_with_stream(
423425
self: &Arc<Self>,
424426
import_op: &plan::AnalyzedImportOp,
425-
mut rows_stream: BoxStream<'_, Result<Vec<interface::PartialSourceRowMetadata>>>,
427+
mut rows_stream: BoxStream<'_, Result<Vec<interface::PartialSourceRow>>>,
426428
pool: &PgPool,
427429
update_stats: &Arc<stats::UpdateStats>,
428430
) -> Result<()> {
@@ -435,7 +437,8 @@ impl SourceIndexingContext {
435437
while let Some(row) = rows_stream.next().await {
436438
for row in row? {
437439
let source_version = SourceVersion::from_current_with_ordinal(
438-
row.ordinal
440+
row.data
441+
.ordinal
439442
.ok_or_else(|| anyhow::anyhow!("ordinal is not available"))?,
440443
);
441444
{
@@ -454,20 +457,16 @@ impl SourceIndexingContext {
454457
.concurrency_controller
455458
.acquire(concur_control::BYTES_UNKNOWN_YET)
456459
.await?;
457-
join_set.spawn(self.clone().process_source_key(
458-
row.key,
460+
join_set.spawn(self.clone().process_source_row(
461+
ProcessSourceRowInput {
462+
key: row.key,
463+
key_aux_info: Some(row.key_aux_info),
464+
data: row.data,
465+
},
459466
update_stats.clone(),
460467
concur_permit,
461468
NO_ACK,
462469
pool.clone(),
463-
ProcessSourceKeyInput {
464-
key_aux_info: Some(row.key_aux_info),
465-
data: interface::PartialSourceRowData {
466-
value: None,
467-
ordinal: Some(source_version.ordinal),
468-
content_version_fp: row.content_version_fp,
469-
},
470-
},
471470
));
472471
}
473472
}
@@ -491,20 +490,20 @@ impl SourceIndexingContext {
491490
};
492491
for (key, source_ordinal) in deleted_key_versions {
493492
let concur_permit = import_op.concurrency_controller.acquire(Some(|| 0)).await?;
494-
join_set.spawn(self.clone().process_source_key(
495-
key,
496-
update_stats.clone(),
497-
concur_permit,
498-
NO_ACK,
499-
pool.clone(),
500-
ProcessSourceKeyInput {
493+
join_set.spawn(self.clone().process_source_row(
494+
ProcessSourceRowInput {
495+
key,
501496
key_aux_info: None,
502497
data: interface::PartialSourceRowData {
503-
value: Some(interface::SourceValue::NonExistence),
504498
ordinal: Some(source_ordinal),
505499
content_version_fp: None,
500+
value: Some(interface::SourceValue::NonExistence),
506501
},
507502
},
503+
update_stats.clone(),
504+
concur_permit,
505+
NO_ACK,
506+
pool.clone(),
508507
));
509508
}
510509
while let Some(result) = join_set.join_next().await {

src/ops/interface.rs

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,14 @@ impl<TZ: TimeZone> TryFrom<chrono::DateTime<TZ>> for Ordinal {
4848
}
4949
}
5050

51-
pub struct PartialSourceRowMetadata {
52-
pub key: FullKeyValue,
53-
/// Auxiliary information for the source row, to be used when reading the content.
54-
/// e.g. it can be used to uniquely identify version of the row.
55-
/// Use serde_json::Value::Null to represent no auxiliary information.
56-
pub key_aux_info: serde_json::Value,
51+
#[derive(Debug)]
52+
pub enum SourceValue {
53+
Existence(FieldValues),
54+
NonExistence,
55+
}
5756

57+
#[derive(Debug, Default)]
58+
pub struct PartialSourceRowData {
5859
pub ordinal: Option<Ordinal>,
5960

6061
/// A content version fingerprint can be anything that changes when the content of the row changes.
@@ -64,12 +65,18 @@ pub struct PartialSourceRowMetadata {
6465
/// It's optional. The source shouldn't use generic way to compute it, e.g. computing a hash of the content.
6566
/// The framework will do so. If there's no fast way to get it from the source, leave it as `None`.
6667
pub content_version_fp: Option<Vec<u8>>,
68+
69+
pub value: Option<SourceValue>,
6770
}
6871

69-
#[derive(Debug)]
70-
pub enum SourceValue {
71-
Existence(FieldValues),
72-
NonExistence,
72+
pub struct PartialSourceRow {
73+
pub key: FullKeyValue,
74+
/// Auxiliary information for the source row, to be used when reading the content.
75+
/// e.g. it can be used to uniquely identify version of the row.
76+
/// Use serde_json::Value::Null to represent no auxiliary information.
77+
pub key_aux_info: serde_json::Value,
78+
79+
pub data: PartialSourceRowData,
7380
}
7481

7582
impl SourceValue {
@@ -108,39 +115,38 @@ pub struct SourceChangeMessage {
108115
}
109116

110117
#[derive(Debug, Default)]
111-
pub struct SourceExecutorListOptions {
118+
pub struct SourceExecutorReadOptions {
119+
/// When set to true, the implementation must return a non-None `ordinal`.
112120
pub include_ordinal: bool,
113-
pub include_content_version_fp: bool,
114-
}
115121

116-
#[derive(Debug, Default)]
117-
pub struct SourceExecutorGetOptions {
118-
pub include_ordinal: bool,
119-
pub include_value: bool,
122+
/// When set to true, the implementation has the discretion to decide whether or not to return a non-None `content_version_fp`.
123+
/// The guideline is to return it only if it's very efficient to get it.
124+
/// If it's returned in `list()`, it must be returned in `get_value()`.
120125
pub include_content_version_fp: bool,
121-
}
122126

123-
#[derive(Debug, Default)]
124-
pub struct PartialSourceRowData {
125-
pub value: Option<SourceValue>,
126-
pub ordinal: Option<Ordinal>,
127-
pub content_version_fp: Option<Vec<u8>>,
127+
/// For get calls, when set to true, the implementation must return a non-None `value`.
128+
///
129+
/// For list calls, when set to true, the implementation has the discretion to decide whether or not to include it.
130+
/// The guideline is to only include it if a single "list() with content" call is significantly more efficient than "list() without content + series of get_value()" calls.
131+
///
132+
/// Even if `list()` already returns `value` when it's true, `get_value()` must still return `value` when it's true.
133+
pub include_value: bool,
128134
}
129135

130136
#[async_trait]
131137
pub trait SourceExecutor: Send + Sync {
132138
/// Get the list of keys for the source.
133139
async fn list(
134140
&self,
135-
options: &SourceExecutorListOptions,
136-
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRowMetadata>>>>;
141+
options: &SourceExecutorReadOptions,
142+
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRow>>>>;
137143

138144
// Get the value for the given key.
139145
async fn get_value(
140146
&self,
141147
key: &FullKeyValue,
142148
key_aux_info: &serde_json::Value,
143-
options: &SourceExecutorGetOptions,
149+
options: &SourceExecutorReadOptions,
144150
) -> Result<PartialSourceRowData>;
145151

146152
async fn change_stream(

0 commit comments

Comments
 (0)