diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs index 8079af45a..ee41b32fb 100644 --- a/src/execution/dumper.rs +++ b/src/execution/dumper.rs @@ -70,6 +70,7 @@ impl<'a> Dumper<'a> { import_op_idx: usize, import_op: &'a AnalyzedImportOp, key: &value::KeyValue, + key_aux_info: &serde_json::Value, collected_values_buffer: &'b mut Vec>, ) -> Result>>> where @@ -83,6 +84,7 @@ impl<'a> Dumper<'a> { key, import_op_idx, }, + key_aux_info, self.setup_execution_ctx, EvaluationMemoryOptions { enable_cache: self.options.use_cache, @@ -134,6 +136,7 @@ impl<'a> Dumper<'a> { import_op_idx: usize, import_op: &AnalyzedImportOp, key: value::KeyValue, + key_aux_info: serde_json::Value, file_path: PathBuf, ) -> Result<()> { let _permit = import_op @@ -142,7 +145,13 @@ impl<'a> Dumper<'a> { .await?; let mut collected_values_buffer = Vec::new(); let (exports, error) = match self - .evaluate_source_entry(import_op_idx, import_op, &key, &mut collected_values_buffer) + .evaluate_source_entry( + import_op_idx, + import_op, + &key, + &key_aux_info, + &mut collected_values_buffer, + ) .await { Ok(exports) => (exports, None), @@ -177,7 +186,10 @@ impl<'a> Dumper<'a> { import_op_idx: usize, import_op: &AnalyzedImportOp, ) -> Result<()> { - let mut keys_by_filename_prefix: IndexMap> = IndexMap::new(); + let mut keys_by_filename_prefix: IndexMap< + String, + Vec<(value::KeyValue, serde_json::Value)>, + > = IndexMap::new(); let mut rows_stream = import_op.executor.list(&SourceExecutorListOptions { include_ordinal: false, @@ -196,7 +208,10 @@ impl<'a> Dumper<'a> { .find(|i| s.is_char_boundary(*i)) .unwrap_or(0), ); - keys_by_filename_prefix.entry(s).or_default().push(row.key); + keys_by_filename_prefix + .entry(s) + .or_default() + .push((row.key, row.key_aux_info)); } } let output_dir = Path::new(&self.options.output_dir); @@ -205,22 +220,25 @@ impl<'a> Dumper<'a> { .into_iter() .flat_map(|(filename_prefix, keys)| { let num_keys = keys.len(); - keys.into_iter().enumerate().map(move |(i, key)| { - let extra_id = if num_keys > 1 { - Cow::Owned(format!(".{i}")) - } else { - Cow::Borrowed("") - }; - let file_name = - format!("{}@{}{}.yaml", import_op.name, filename_prefix, extra_id); - let file_path = output_dir.join(Path::new(&file_name)); - self.evaluate_and_dump_source_entry( - import_op_idx, - import_op, - key, - file_path, - ) - }) + keys.into_iter() + .enumerate() + .map(move |(i, (key, key_aux_info))| { + let extra_id = if num_keys > 1 { + Cow::Owned(format!(".{i}")) + } else { + Cow::Borrowed("") + }; + let file_name = + format!("{}@{}{}.yaml", import_op.name, filename_prefix, extra_id); + let file_path = output_dir.join(Path::new(&file_name)); + self.evaluate_and_dump_source_entry( + import_op_idx, + import_op, + key, + key_aux_info, + file_path, + ) + }) }); try_join_all(evaluate_futs).await?; Ok(()) diff --git a/src/execution/indexing_status.rs b/src/execution/indexing_status.rs index 2fd42efd1..6702cb30f 100644 --- a/src/execution/indexing_status.rs +++ b/src/execution/indexing_status.rs @@ -24,6 +24,7 @@ pub struct SourceRowIndexingStatus { pub async fn get_source_row_indexing_status( src_eval_ctx: &evaluator::SourceRowEvaluationContext<'_>, + key_aux_info: &serde_json::Value, setup_execution_ctx: &exec_ctx::FlowSetupExecutionContext, pool: &sqlx::PgPool, ) -> Result { @@ -36,6 +37,7 @@ pub async fn get_source_row_indexing_status( ); let current_fut = src_eval_ctx.import_op.executor.get_value( src_eval_ctx.key, + key_aux_info, &interface::SourceExecutorGetOptions { include_value: false, include_ordinal: true, diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index 4cf4aad6b..c40326b6a 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -191,6 +191,7 @@ impl SourceUpdateTask { .await?; tokio::spawn(source_context.clone().process_source_key( change.key, + Some(change.key_aux_info), change.data, update_stats.clone(), concur_permit, diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index e8dad9edf..6ee6ed2c1 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -588,6 +588,7 @@ async fn try_content_hash_optimization( pub async fn evaluate_source_entry_with_memory( src_eval_ctx: &SourceRowEvaluationContext<'_>, + key_aux_info: &serde_json::Value, setup_execution_ctx: &exec_ctx::FlowSetupExecutionContext, options: EvaluationMemoryOptions, pool: &PgPool, @@ -614,6 +615,7 @@ pub async fn evaluate_source_entry_with_memory( .executor .get_value( src_eval_ctx.key, + key_aux_info, &SourceExecutorGetOptions { include_value: true, include_ordinal: false, diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index 8f85c747c..3b4abb2b9 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -100,12 +100,14 @@ impl SourceIndexingContext { }) } + /// `key_aux_info` is not available for deletions. It must not be provided if `source_data` is `None`. pub async fn process_source_key< AckFut: Future> + Send + 'static, AckFn: FnOnce() -> AckFut, >( self: Arc, key: value::KeyValue, + key_aux_info: Option, source_data: Option, update_stats: Arc, _concur_permit: concur_control::CombinedConcurrencyControllerPermit, @@ -122,6 +124,11 @@ impl SourceIndexingContext { .executor .get_value( &key, + key_aux_info.as_ref().ok_or_else(|| { + anyhow::anyhow!( + "`key_aux_info` must be provided when there's no `source_data`" + ) + })?, &interface::SourceExecutorGetOptions { include_value: true, include_ordinal: true, @@ -330,6 +337,7 @@ impl SourceIndexingContext { .await?; join_set.spawn(self.clone().process_source_key( row.key, + Some(row.key_aux_info), None, update_stats.clone(), concur_permit, @@ -357,17 +365,15 @@ impl SourceIndexingContext { deleted_key_versions }; for (key, source_ordinal) in deleted_key_versions { - // If the source ordinal is unavailable, call without source ordinal so that another polling will be triggered to avoid out-of-order. - let source_data = source_ordinal - .is_available() - .then(|| interface::SourceData { - value: interface::SourceValue::NonExistence, - ordinal: source_ordinal, - }); + let source_data = interface::SourceData { + value: interface::SourceValue::NonExistence, + ordinal: source_ordinal, + }; let concur_permit = import_op.concurrency_controller.acquire(Some(|| 0)).await?; join_set.spawn(self.clone().process_source_key( key, - source_data, + None, + Some(source_data), update_stats.clone(), concur_permit, NO_ACK, diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 171d667be..d896b60bf 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -50,6 +50,11 @@ impl TryFrom> for Ordinal { pub struct PartialSourceRowMetadata { pub key: KeyValue, + /// Auxiliary information for the source row, to be used when reading the content. + /// e.g. it can be used to uniquely identify version of the row. + /// Use serde_json::Value::Null to represent no auxiliary information. + pub key_aux_info: serde_json::Value, + pub ordinal: Option, } @@ -86,6 +91,9 @@ pub struct SourceData { pub struct SourceChange { pub key: KeyValue, + /// Auxiliary information for the source row, to be used when reading the content. + /// e.g. it can be used to uniquely identify version of the row. + pub key_aux_info: serde_json::Value, /// If None, the engine will poll to get the latest existence state and value. pub data: Option, @@ -139,6 +147,7 @@ pub trait SourceExecutor: Send + Sync { async fn get_value( &self, key: &KeyValue, + key_aux_info: &serde_json::Value, options: &SourceExecutorGetOptions, ) -> Result; diff --git a/src/ops/sources/amazon_s3.rs b/src/ops/sources/amazon_s3.rs index 260546c1f..029c0df93 100644 --- a/src/ops/sources/amazon_s3.rs +++ b/src/ops/sources/amazon_s3.rs @@ -111,6 +111,7 @@ impl SourceExecutor for Executor { if include && !exclude { batch.push(PartialSourceRowMetadata { key: KeyValue::Str(key.to_string().into()), + key_aux_info: serde_json::Value::Null, ordinal: obj.last_modified().map(datetime_to_ordinal), }); } @@ -132,6 +133,7 @@ impl SourceExecutor for Executor { async fn get_value( &self, key: &KeyValue, + _key_aux_info: &serde_json::Value, options: &SourceExecutorGetOptions, ) -> Result { let key_str = key.str_value()?; @@ -272,6 +274,7 @@ impl Executor { let decoded_key = decode_form_encoded_url(&s3.object.key)?; changes.push(SourceChange { key: KeyValue::Str(decoded_key), + key_aux_info: serde_json::Value::Null, data: None, }); } diff --git a/src/ops/sources/azure_blob.rs b/src/ops/sources/azure_blob.rs index c2ae29c32..19cdf0819 100644 --- a/src/ops/sources/azure_blob.rs +++ b/src/ops/sources/azure_blob.rs @@ -93,6 +93,7 @@ impl SourceExecutor for Executor { let ordinal = Some(datetime_to_ordinal(&blob.properties.last_modified)); batch.push(PartialSourceRowMetadata { key: KeyValue::Str(key.clone().into()), + key_aux_info: serde_json::Value::Null, ordinal, }); } @@ -114,6 +115,7 @@ impl SourceExecutor for Executor { async fn get_value( &self, key: &KeyValue, + _key_aux_info: &serde_json::Value, options: &SourceExecutorGetOptions, ) -> Result { let key_str = key.str_value()?; diff --git a/src/ops/sources/google_drive.rs b/src/ops/sources/google_drive.rs index e9fe55e57..5d99eae3e 100644 --- a/src/ops/sources/google_drive.rs +++ b/src/ops/sources/google_drive.rs @@ -135,6 +135,7 @@ impl Executor { } else if is_supported_file_type(&mime_type) { Some(PartialSourceRowMetadata { key: KeyValue::Str(id), + key_aux_info: serde_json::Value::Null, ordinal: file.modified_time.map(|t| t.try_into()).transpose()?, }) } else { @@ -210,6 +211,7 @@ impl Executor { if self.is_file_covered(&file_id).await? { changes.push(SourceChange { key: KeyValue::Str(Arc::from(file_id)), + key_aux_info: serde_json::Value::Null, data: None, }); } @@ -323,6 +325,7 @@ impl SourceExecutor for Executor { async fn get_value( &self, key: &KeyValue, + _key_aux_info: &serde_json::Value, options: &SourceExecutorGetOptions, ) -> Result { let file_id = key.str_value()?; diff --git a/src/ops/sources/local_file.rs b/src/ops/sources/local_file.rs index a1616199a..9620eebaa 100644 --- a/src/ops/sources/local_file.rs +++ b/src/ops/sources/local_file.rs @@ -71,6 +71,7 @@ impl SourceExecutor for Executor { if let Some(relative_path) = relative_path.to_str() { yield vec![PartialSourceRowMetadata { key: KeyValue::Str(relative_path.into()), + key_aux_info: serde_json::Value::Null, ordinal, }]; } else { @@ -87,6 +88,7 @@ impl SourceExecutor for Executor { async fn get_value( &self, key: &KeyValue, + _key_aux_info: &serde_json::Value, options: &SourceExecutorGetOptions, ) -> Result { if !self.is_file_included(key.str_value()?.as_ref()) { diff --git a/src/service/flows.rs b/src/service/flows.rs index 789389229..2726956a0 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -122,6 +122,7 @@ pub async fn get_keys( pub struct SourceRowKeyParams { field: String, key: Vec, + key_aux: Option, } #[derive(Serialize)] @@ -135,6 +136,7 @@ struct SourceRowKeyContextHolder<'a> { import_op_idx: usize, schema: &'a FlowSchema, key: value::KeyValue, + key_aux_info: serde_json::Value, } impl<'a> SourceRowKeyContextHolder<'a> { @@ -163,11 +165,13 @@ impl<'a> SourceRowKeyContextHolder<'a> { .key_field() .ok_or_else(|| api_error!("field {} does not have a key", source_row_key.field))?; let key = value::KeyValue::from_strs(source_row_key.key, &key_field.value_type.typ)?; + let key_aux_info = source_row_key.key_aux.unwrap_or_default(); Ok(Self { plan, import_op_idx, schema, key, + key_aux_info, }) } @@ -192,6 +196,7 @@ pub async fn evaluate_data( let execution_ctx = flow_ctx.use_execution_ctx().await?; let evaluate_output = row_indexer::evaluate_source_entry_with_memory( &source_row_key_ctx.as_context(), + &source_row_key_ctx.key_aux_info, &execution_ctx.setup_execution_context, memoization::EvaluationMemoryOptions { enable_cache: true, @@ -242,6 +247,7 @@ pub async fn get_row_indexing_status( let execution_ctx = flow_ctx.use_execution_ctx().await?; let indexing_status = indexing_status::get_source_row_indexing_status( &source_row_key_ctx.as_context(), + &source_row_key_ctx.key_aux_info, &execution_ctx.setup_execution_context, lib_context.require_builtin_db_pool()?, )