Skip to content

Commit d955251

Browse files
authored
feat(key-aux): return key aux info from source's list API and pass to get (#859)
1 parent de7649a commit d955251

File tree

11 files changed

+81
-27
lines changed

11 files changed

+81
-27
lines changed

src/execution/dumper.rs

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ impl<'a> Dumper<'a> {
7070
import_op_idx: usize,
7171
import_op: &'a AnalyzedImportOp,
7272
key: &value::KeyValue,
73+
key_aux_info: &serde_json::Value,
7374
collected_values_buffer: &'b mut Vec<Vec<value::FieldValues>>,
7475
) -> Result<Option<IndexMap<&'b str, TargetExportData<'b>>>>
7576
where
@@ -83,6 +84,7 @@ impl<'a> Dumper<'a> {
8384
key,
8485
import_op_idx,
8586
},
87+
key_aux_info,
8688
self.setup_execution_ctx,
8789
EvaluationMemoryOptions {
8890
enable_cache: self.options.use_cache,
@@ -134,6 +136,7 @@ impl<'a> Dumper<'a> {
134136
import_op_idx: usize,
135137
import_op: &AnalyzedImportOp,
136138
key: value::KeyValue,
139+
key_aux_info: serde_json::Value,
137140
file_path: PathBuf,
138141
) -> Result<()> {
139142
let _permit = import_op
@@ -142,7 +145,13 @@ impl<'a> Dumper<'a> {
142145
.await?;
143146
let mut collected_values_buffer = Vec::new();
144147
let (exports, error) = match self
145-
.evaluate_source_entry(import_op_idx, import_op, &key, &mut collected_values_buffer)
148+
.evaluate_source_entry(
149+
import_op_idx,
150+
import_op,
151+
&key,
152+
&key_aux_info,
153+
&mut collected_values_buffer,
154+
)
146155
.await
147156
{
148157
Ok(exports) => (exports, None),
@@ -177,7 +186,10 @@ impl<'a> Dumper<'a> {
177186
import_op_idx: usize,
178187
import_op: &AnalyzedImportOp,
179188
) -> Result<()> {
180-
let mut keys_by_filename_prefix: IndexMap<String, Vec<value::KeyValue>> = IndexMap::new();
189+
let mut keys_by_filename_prefix: IndexMap<
190+
String,
191+
Vec<(value::KeyValue, serde_json::Value)>,
192+
> = IndexMap::new();
181193

182194
let mut rows_stream = import_op.executor.list(&SourceExecutorListOptions {
183195
include_ordinal: false,
@@ -196,7 +208,10 @@ impl<'a> Dumper<'a> {
196208
.find(|i| s.is_char_boundary(*i))
197209
.unwrap_or(0),
198210
);
199-
keys_by_filename_prefix.entry(s).or_default().push(row.key);
211+
keys_by_filename_prefix
212+
.entry(s)
213+
.or_default()
214+
.push((row.key, row.key_aux_info));
200215
}
201216
}
202217
let output_dir = Path::new(&self.options.output_dir);
@@ -205,22 +220,25 @@ impl<'a> Dumper<'a> {
205220
.into_iter()
206221
.flat_map(|(filename_prefix, keys)| {
207222
let num_keys = keys.len();
208-
keys.into_iter().enumerate().map(move |(i, key)| {
209-
let extra_id = if num_keys > 1 {
210-
Cow::Owned(format!(".{i}"))
211-
} else {
212-
Cow::Borrowed("")
213-
};
214-
let file_name =
215-
format!("{}@{}{}.yaml", import_op.name, filename_prefix, extra_id);
216-
let file_path = output_dir.join(Path::new(&file_name));
217-
self.evaluate_and_dump_source_entry(
218-
import_op_idx,
219-
import_op,
220-
key,
221-
file_path,
222-
)
223-
})
223+
keys.into_iter()
224+
.enumerate()
225+
.map(move |(i, (key, key_aux_info))| {
226+
let extra_id = if num_keys > 1 {
227+
Cow::Owned(format!(".{i}"))
228+
} else {
229+
Cow::Borrowed("")
230+
};
231+
let file_name =
232+
format!("{}@{}{}.yaml", import_op.name, filename_prefix, extra_id);
233+
let file_path = output_dir.join(Path::new(&file_name));
234+
self.evaluate_and_dump_source_entry(
235+
import_op_idx,
236+
import_op,
237+
key,
238+
key_aux_info,
239+
file_path,
240+
)
241+
})
224242
});
225243
try_join_all(evaluate_futs).await?;
226244
Ok(())

src/execution/indexing_status.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub struct SourceRowIndexingStatus {
2424

2525
pub async fn get_source_row_indexing_status(
2626
src_eval_ctx: &evaluator::SourceRowEvaluationContext<'_>,
27+
key_aux_info: &serde_json::Value,
2728
setup_execution_ctx: &exec_ctx::FlowSetupExecutionContext,
2829
pool: &sqlx::PgPool,
2930
) -> Result<SourceRowIndexingStatus> {
@@ -36,6 +37,7 @@ pub async fn get_source_row_indexing_status(
3637
);
3738
let current_fut = src_eval_ctx.import_op.executor.get_value(
3839
src_eval_ctx.key,
40+
key_aux_info,
3941
&interface::SourceExecutorGetOptions {
4042
include_value: false,
4143
include_ordinal: true,

src/execution/live_updater.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ impl SourceUpdateTask {
191191
.await?;
192192
tokio::spawn(source_context.clone().process_source_key(
193193
change.key,
194+
Some(change.key_aux_info),
194195
change.data,
195196
update_stats.clone(),
196197
concur_permit,

src/execution/row_indexer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,7 @@ async fn try_content_hash_optimization(
588588

589589
pub async fn evaluate_source_entry_with_memory(
590590
src_eval_ctx: &SourceRowEvaluationContext<'_>,
591+
key_aux_info: &serde_json::Value,
591592
setup_execution_ctx: &exec_ctx::FlowSetupExecutionContext,
592593
options: EvaluationMemoryOptions,
593594
pool: &PgPool,
@@ -614,6 +615,7 @@ pub async fn evaluate_source_entry_with_memory(
614615
.executor
615616
.get_value(
616617
src_eval_ctx.key,
618+
key_aux_info,
617619
&SourceExecutorGetOptions {
618620
include_value: true,
619621
include_ordinal: false,

src/execution/source_indexer.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,14 @@ impl SourceIndexingContext {
100100
})
101101
}
102102

103+
/// `key_aux_info` is not available for deletions. It must not be provided if `source_data` is `None`.
103104
pub async fn process_source_key<
104105
AckFut: Future<Output = Result<()>> + Send + 'static,
105106
AckFn: FnOnce() -> AckFut,
106107
>(
107108
self: Arc<Self>,
108109
key: value::KeyValue,
110+
key_aux_info: Option<serde_json::Value>,
109111
source_data: Option<interface::SourceData>,
110112
update_stats: Arc<stats::UpdateStats>,
111113
_concur_permit: concur_control::CombinedConcurrencyControllerPermit,
@@ -122,6 +124,11 @@ impl SourceIndexingContext {
122124
.executor
123125
.get_value(
124126
&key,
127+
key_aux_info.as_ref().ok_or_else(|| {
128+
anyhow::anyhow!(
129+
"`key_aux_info` must be provided when there's no `source_data`"
130+
)
131+
})?,
125132
&interface::SourceExecutorGetOptions {
126133
include_value: true,
127134
include_ordinal: true,
@@ -330,6 +337,7 @@ impl SourceIndexingContext {
330337
.await?;
331338
join_set.spawn(self.clone().process_source_key(
332339
row.key,
340+
Some(row.key_aux_info),
333341
None,
334342
update_stats.clone(),
335343
concur_permit,
@@ -357,17 +365,15 @@ impl SourceIndexingContext {
357365
deleted_key_versions
358366
};
359367
for (key, source_ordinal) in deleted_key_versions {
360-
// If the source ordinal is unavailable, call without source ordinal so that another polling will be triggered to avoid out-of-order.
361-
let source_data = source_ordinal
362-
.is_available()
363-
.then(|| interface::SourceData {
364-
value: interface::SourceValue::NonExistence,
365-
ordinal: source_ordinal,
366-
});
368+
let source_data = interface::SourceData {
369+
value: interface::SourceValue::NonExistence,
370+
ordinal: source_ordinal,
371+
};
367372
let concur_permit = import_op.concurrency_controller.acquire(Some(|| 0)).await?;
368373
join_set.spawn(self.clone().process_source_key(
369374
key,
370-
source_data,
375+
None,
376+
Some(source_data),
371377
update_stats.clone(),
372378
concur_permit,
373379
NO_ACK,

src/ops/interface.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ impl<TZ: TimeZone> TryFrom<chrono::DateTime<TZ>> for Ordinal {
5050

5151
pub struct PartialSourceRowMetadata {
5252
pub key: KeyValue,
53+
/// Auxiliary information for the source row, to be used when reading the content.
54+
/// e.g. it can be used to uniquely identify version of the row.
55+
/// Use serde_json::Value::Null to represent no auxiliary information.
56+
pub key_aux_info: serde_json::Value,
57+
5358
pub ordinal: Option<Ordinal>,
5459
}
5560

@@ -86,6 +91,9 @@ pub struct SourceData {
8691

8792
pub struct SourceChange {
8893
pub key: KeyValue,
94+
/// Auxiliary information for the source row, to be used when reading the content.
95+
/// e.g. it can be used to uniquely identify version of the row.
96+
pub key_aux_info: serde_json::Value,
8997

9098
/// If None, the engine will poll to get the latest existence state and value.
9199
pub data: Option<SourceData>,
@@ -139,6 +147,7 @@ pub trait SourceExecutor: Send + Sync {
139147
async fn get_value(
140148
&self,
141149
key: &KeyValue,
150+
key_aux_info: &serde_json::Value,
142151
options: &SourceExecutorGetOptions,
143152
) -> Result<PartialSourceRowData>;
144153

src/ops/sources/amazon_s3.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ impl SourceExecutor for Executor {
111111
if include && !exclude {
112112
batch.push(PartialSourceRowMetadata {
113113
key: KeyValue::Str(key.to_string().into()),
114+
key_aux_info: serde_json::Value::Null,
114115
ordinal: obj.last_modified().map(datetime_to_ordinal),
115116
});
116117
}
@@ -132,6 +133,7 @@ impl SourceExecutor for Executor {
132133
async fn get_value(
133134
&self,
134135
key: &KeyValue,
136+
_key_aux_info: &serde_json::Value,
135137
options: &SourceExecutorGetOptions,
136138
) -> Result<PartialSourceRowData> {
137139
let key_str = key.str_value()?;
@@ -272,6 +274,7 @@ impl Executor {
272274
let decoded_key = decode_form_encoded_url(&s3.object.key)?;
273275
changes.push(SourceChange {
274276
key: KeyValue::Str(decoded_key),
277+
key_aux_info: serde_json::Value::Null,
275278
data: None,
276279
});
277280
}

src/ops/sources/azure_blob.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ impl SourceExecutor for Executor {
9393
let ordinal = Some(datetime_to_ordinal(&blob.properties.last_modified));
9494
batch.push(PartialSourceRowMetadata {
9595
key: KeyValue::Str(key.clone().into()),
96+
key_aux_info: serde_json::Value::Null,
9697
ordinal,
9798
});
9899
}
@@ -114,6 +115,7 @@ impl SourceExecutor for Executor {
114115
async fn get_value(
115116
&self,
116117
key: &KeyValue,
118+
_key_aux_info: &serde_json::Value,
117119
options: &SourceExecutorGetOptions,
118120
) -> Result<PartialSourceRowData> {
119121
let key_str = key.str_value()?;

src/ops/sources/google_drive.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ impl Executor {
135135
} else if is_supported_file_type(&mime_type) {
136136
Some(PartialSourceRowMetadata {
137137
key: KeyValue::Str(id),
138+
key_aux_info: serde_json::Value::Null,
138139
ordinal: file.modified_time.map(|t| t.try_into()).transpose()?,
139140
})
140141
} else {
@@ -210,6 +211,7 @@ impl Executor {
210211
if self.is_file_covered(&file_id).await? {
211212
changes.push(SourceChange {
212213
key: KeyValue::Str(Arc::from(file_id)),
214+
key_aux_info: serde_json::Value::Null,
213215
data: None,
214216
});
215217
}
@@ -323,6 +325,7 @@ impl SourceExecutor for Executor {
323325
async fn get_value(
324326
&self,
325327
key: &KeyValue,
328+
_key_aux_info: &serde_json::Value,
326329
options: &SourceExecutorGetOptions,
327330
) -> Result<PartialSourceRowData> {
328331
let file_id = key.str_value()?;

src/ops/sources/local_file.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ impl SourceExecutor for Executor {
7171
if let Some(relative_path) = relative_path.to_str() {
7272
yield vec![PartialSourceRowMetadata {
7373
key: KeyValue::Str(relative_path.into()),
74+
key_aux_info: serde_json::Value::Null,
7475
ordinal,
7576
}];
7677
} else {
@@ -87,6 +88,7 @@ impl SourceExecutor for Executor {
8788
async fn get_value(
8889
&self,
8990
key: &KeyValue,
91+
_key_aux_info: &serde_json::Value,
9092
options: &SourceExecutorGetOptions,
9193
) -> Result<PartialSourceRowData> {
9294
if !self.is_file_included(key.str_value()?.as_ref()) {

0 commit comments

Comments
 (0)