Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 37 additions & 19 deletions src/execution/dumper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl<'a> Dumper<'a> {
import_op_idx: usize,
import_op: &'a AnalyzedImportOp,
key: &value::KeyValue,
key_aux_info: &serde_json::Value,
collected_values_buffer: &'b mut Vec<Vec<value::FieldValues>>,
) -> Result<Option<IndexMap<&'b str, TargetExportData<'b>>>>
where
Expand All @@ -83,6 +84,7 @@ impl<'a> Dumper<'a> {
key,
import_op_idx,
},
key_aux_info,
self.setup_execution_ctx,
EvaluationMemoryOptions {
enable_cache: self.options.use_cache,
Expand Down Expand Up @@ -134,6 +136,7 @@ impl<'a> Dumper<'a> {
import_op_idx: usize,
import_op: &AnalyzedImportOp,
key: value::KeyValue,
key_aux_info: serde_json::Value,
file_path: PathBuf,
) -> Result<()> {
let _permit = import_op
Expand All @@ -142,7 +145,13 @@ impl<'a> Dumper<'a> {
.await?;
let mut collected_values_buffer = Vec::new();
let (exports, error) = match self
.evaluate_source_entry(import_op_idx, import_op, &key, &mut collected_values_buffer)
.evaluate_source_entry(
import_op_idx,
import_op,
&key,
&key_aux_info,
&mut collected_values_buffer,
)
.await
{
Ok(exports) => (exports, None),
Expand Down Expand Up @@ -177,7 +186,10 @@ impl<'a> Dumper<'a> {
import_op_idx: usize,
import_op: &AnalyzedImportOp,
) -> Result<()> {
let mut keys_by_filename_prefix: IndexMap<String, Vec<value::KeyValue>> = IndexMap::new();
let mut keys_by_filename_prefix: IndexMap<
String,
Vec<(value::KeyValue, serde_json::Value)>,
> = IndexMap::new();

let mut rows_stream = import_op.executor.list(&SourceExecutorListOptions {
include_ordinal: false,
Expand All @@ -196,7 +208,10 @@ impl<'a> Dumper<'a> {
.find(|i| s.is_char_boundary(*i))
.unwrap_or(0),
);
keys_by_filename_prefix.entry(s).or_default().push(row.key);
keys_by_filename_prefix
.entry(s)
.or_default()
.push((row.key, row.key_aux_info));
}
}
let output_dir = Path::new(&self.options.output_dir);
Expand All @@ -205,22 +220,25 @@ impl<'a> Dumper<'a> {
.into_iter()
.flat_map(|(filename_prefix, keys)| {
let num_keys = keys.len();
keys.into_iter().enumerate().map(move |(i, key)| {
let extra_id = if num_keys > 1 {
Cow::Owned(format!(".{i}"))
} else {
Cow::Borrowed("")
};
let file_name =
format!("{}@{}{}.yaml", import_op.name, filename_prefix, extra_id);
let file_path = output_dir.join(Path::new(&file_name));
self.evaluate_and_dump_source_entry(
import_op_idx,
import_op,
key,
file_path,
)
})
keys.into_iter()
.enumerate()
.map(move |(i, (key, key_aux_info))| {
let extra_id = if num_keys > 1 {
Cow::Owned(format!(".{i}"))
} else {
Cow::Borrowed("")
};
let file_name =
format!("{}@{}{}.yaml", import_op.name, filename_prefix, extra_id);
let file_path = output_dir.join(Path::new(&file_name));
self.evaluate_and_dump_source_entry(
import_op_idx,
import_op,
key,
key_aux_info,
file_path,
)
})
});
try_join_all(evaluate_futs).await?;
Ok(())
Expand Down
2 changes: 2 additions & 0 deletions src/execution/indexing_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct SourceRowIndexingStatus {

pub async fn get_source_row_indexing_status(
src_eval_ctx: &evaluator::SourceRowEvaluationContext<'_>,
key_aux_info: &serde_json::Value,
setup_execution_ctx: &exec_ctx::FlowSetupExecutionContext,
pool: &sqlx::PgPool,
) -> Result<SourceRowIndexingStatus> {
Expand All @@ -36,6 +37,7 @@ pub async fn get_source_row_indexing_status(
);
let current_fut = src_eval_ctx.import_op.executor.get_value(
src_eval_ctx.key,
key_aux_info,
&interface::SourceExecutorGetOptions {
include_value: false,
include_ordinal: true,
Expand Down
1 change: 1 addition & 0 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ impl SourceUpdateTask {
.await?;
tokio::spawn(source_context.clone().process_source_key(
change.key,
Some(change.key_aux_info),
change.data,
update_stats.clone(),
concur_permit,
Expand Down
2 changes: 2 additions & 0 deletions src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ async fn try_content_hash_optimization(

pub async fn evaluate_source_entry_with_memory(
src_eval_ctx: &SourceRowEvaluationContext<'_>,
key_aux_info: &serde_json::Value,
setup_execution_ctx: &exec_ctx::FlowSetupExecutionContext,
options: EvaluationMemoryOptions,
pool: &PgPool,
Expand All @@ -614,6 +615,7 @@ pub async fn evaluate_source_entry_with_memory(
.executor
.get_value(
src_eval_ctx.key,
key_aux_info,
&SourceExecutorGetOptions {
include_value: true,
include_ordinal: false,
Expand Down
22 changes: 14 additions & 8 deletions src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,14 @@ impl SourceIndexingContext {
})
}

/// `key_aux_info` is not available for deletions. It must not be provided if `source_data` is `None`.
pub async fn process_source_key<
AckFut: Future<Output = Result<()>> + Send + 'static,
AckFn: FnOnce() -> AckFut,
>(
self: Arc<Self>,
key: value::KeyValue,
key_aux_info: Option<serde_json::Value>,
source_data: Option<interface::SourceData>,
update_stats: Arc<stats::UpdateStats>,
_concur_permit: concur_control::CombinedConcurrencyControllerPermit,
Expand All @@ -122,6 +124,11 @@ impl SourceIndexingContext {
.executor
.get_value(
&key,
key_aux_info.as_ref().ok_or_else(|| {
anyhow::anyhow!(
"`key_aux_info` must be provided when there's no `source_data`"
)
})?,
&interface::SourceExecutorGetOptions {
include_value: true,
include_ordinal: true,
Expand Down Expand Up @@ -330,6 +337,7 @@ impl SourceIndexingContext {
.await?;
join_set.spawn(self.clone().process_source_key(
row.key,
Some(row.key_aux_info),
None,
update_stats.clone(),
concur_permit,
Expand Down Expand Up @@ -357,17 +365,15 @@ impl SourceIndexingContext {
deleted_key_versions
};
for (key, source_ordinal) in deleted_key_versions {
// If the source ordinal is unavailable, call without source ordinal so that another polling will be triggered to avoid out-of-order.
let source_data = source_ordinal
.is_available()
.then(|| interface::SourceData {
value: interface::SourceValue::NonExistence,
ordinal: source_ordinal,
});
let source_data = interface::SourceData {
value: interface::SourceValue::NonExistence,
ordinal: source_ordinal,
};
let concur_permit = import_op.concurrency_controller.acquire(Some(|| 0)).await?;
join_set.spawn(self.clone().process_source_key(
key,
source_data,
None,
Some(source_data),
update_stats.clone(),
concur_permit,
NO_ACK,
Expand Down
9 changes: 9 additions & 0 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ impl<TZ: TimeZone> TryFrom<chrono::DateTime<TZ>> for Ordinal {

pub struct PartialSourceRowMetadata {
pub key: KeyValue,
/// Auxiliary information for the source row, to be used when reading the content.
/// e.g. it can be used to uniquely identify version of the row.
/// Use serde_json::Value::Null to represent no auxiliary information.
pub key_aux_info: serde_json::Value,

pub ordinal: Option<Ordinal>,
}

Expand Down Expand Up @@ -86,6 +91,9 @@ pub struct SourceData {

pub struct SourceChange {
pub key: KeyValue,
/// Auxiliary information for the source row, to be used when reading the content.
/// e.g. it can be used to uniquely identify version of the row.
pub key_aux_info: serde_json::Value,

/// If None, the engine will poll to get the latest existence state and value.
pub data: Option<SourceData>,
Expand Down Expand Up @@ -139,6 +147,7 @@ pub trait SourceExecutor: Send + Sync {
async fn get_value(
&self,
key: &KeyValue,
key_aux_info: &serde_json::Value,
options: &SourceExecutorGetOptions,
) -> Result<PartialSourceRowData>;

Expand Down
3 changes: 3 additions & 0 deletions src/ops/sources/amazon_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl SourceExecutor for Executor {
if include && !exclude {
batch.push(PartialSourceRowMetadata {
key: KeyValue::Str(key.to_string().into()),
key_aux_info: serde_json::Value::Null,
ordinal: obj.last_modified().map(datetime_to_ordinal),
});
}
Expand All @@ -132,6 +133,7 @@ impl SourceExecutor for Executor {
async fn get_value(
&self,
key: &KeyValue,
_key_aux_info: &serde_json::Value,
options: &SourceExecutorGetOptions,
) -> Result<PartialSourceRowData> {
let key_str = key.str_value()?;
Expand Down Expand Up @@ -272,6 +274,7 @@ impl Executor {
let decoded_key = decode_form_encoded_url(&s3.object.key)?;
changes.push(SourceChange {
key: KeyValue::Str(decoded_key),
key_aux_info: serde_json::Value::Null,
data: None,
});
}
Expand Down
2 changes: 2 additions & 0 deletions src/ops/sources/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl SourceExecutor for Executor {
let ordinal = Some(datetime_to_ordinal(&blob.properties.last_modified));
batch.push(PartialSourceRowMetadata {
key: KeyValue::Str(key.clone().into()),
key_aux_info: serde_json::Value::Null,
ordinal,
});
}
Expand All @@ -114,6 +115,7 @@ impl SourceExecutor for Executor {
async fn get_value(
&self,
key: &KeyValue,
_key_aux_info: &serde_json::Value,
options: &SourceExecutorGetOptions,
) -> Result<PartialSourceRowData> {
let key_str = key.str_value()?;
Expand Down
3 changes: 3 additions & 0 deletions src/ops/sources/google_drive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl Executor {
} else if is_supported_file_type(&mime_type) {
Some(PartialSourceRowMetadata {
key: KeyValue::Str(id),
key_aux_info: serde_json::Value::Null,
ordinal: file.modified_time.map(|t| t.try_into()).transpose()?,
})
} else {
Expand Down Expand Up @@ -210,6 +211,7 @@ impl Executor {
if self.is_file_covered(&file_id).await? {
changes.push(SourceChange {
key: KeyValue::Str(Arc::from(file_id)),
key_aux_info: serde_json::Value::Null,
data: None,
});
}
Expand Down Expand Up @@ -323,6 +325,7 @@ impl SourceExecutor for Executor {
async fn get_value(
&self,
key: &KeyValue,
_key_aux_info: &serde_json::Value,
options: &SourceExecutorGetOptions,
) -> Result<PartialSourceRowData> {
let file_id = key.str_value()?;
Expand Down
2 changes: 2 additions & 0 deletions src/ops/sources/local_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl SourceExecutor for Executor {
if let Some(relative_path) = relative_path.to_str() {
yield vec![PartialSourceRowMetadata {
key: KeyValue::Str(relative_path.into()),
key_aux_info: serde_json::Value::Null,
ordinal,
}];
} else {
Expand All @@ -87,6 +88,7 @@ impl SourceExecutor for Executor {
async fn get_value(
&self,
key: &KeyValue,
_key_aux_info: &serde_json::Value,
options: &SourceExecutorGetOptions,
) -> Result<PartialSourceRowData> {
if !self.is_file_included(key.str_value()?.as_ref()) {
Expand Down
6 changes: 6 additions & 0 deletions src/service/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pub async fn get_keys(
pub struct SourceRowKeyParams {
field: String,
key: Vec<String>,
key_aux: Option<serde_json::Value>,
}

#[derive(Serialize)]
Expand All @@ -135,6 +136,7 @@ struct SourceRowKeyContextHolder<'a> {
import_op_idx: usize,
schema: &'a FlowSchema,
key: value::KeyValue,
key_aux_info: serde_json::Value,
}

impl<'a> SourceRowKeyContextHolder<'a> {
Expand Down Expand Up @@ -163,11 +165,13 @@ impl<'a> SourceRowKeyContextHolder<'a> {
.key_field()
.ok_or_else(|| api_error!("field {} does not have a key", source_row_key.field))?;
let key = value::KeyValue::from_strs(source_row_key.key, &key_field.value_type.typ)?;
let key_aux_info = source_row_key.key_aux.unwrap_or_default();
Ok(Self {
plan,
import_op_idx,
schema,
key,
key_aux_info,
})
}

Expand All @@ -192,6 +196,7 @@ pub async fn evaluate_data(
let execution_ctx = flow_ctx.use_execution_ctx().await?;
let evaluate_output = row_indexer::evaluate_source_entry_with_memory(
&source_row_key_ctx.as_context(),
&source_row_key_ctx.key_aux_info,
&execution_ctx.setup_execution_context,
memoization::EvaluationMemoryOptions {
enable_cache: true,
Expand Down Expand Up @@ -242,6 +247,7 @@ pub async fn get_row_indexing_status(
let execution_ctx = flow_ctx.use_execution_ctx().await?;
let indexing_status = indexing_status::get_source_row_indexing_status(
&source_row_key_ctx.as_context(),
&source_row_key_ctx.key_aux_info,
&execution_ctx.setup_execution_context,
lib_context.require_builtin_db_pool()?,
)
Expand Down
Loading