diff --git a/src/base/spec.rs b/src/base/spec.rs index 8666793dd..da5301d6e 100644 --- a/src/base/spec.rs +++ b/src/base/spec.rs @@ -163,6 +163,11 @@ pub struct OpSpec { pub spec: serde_json::Map, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ImportOpSpec { + pub source: OpSpec, +} + /// Transform data using a given operator. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TransformOpSpec { @@ -244,7 +249,7 @@ pub struct FlowInstanceSpec { pub name: String, #[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")] - pub source_ops: Vec>, + pub import_ops: Vec>, #[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")] pub reactive_ops: Vec>, diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 177b6eb61..a97776239 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -591,25 +591,25 @@ fn add_collector( } impl AnalyzerContext<'_> { - pub(super) fn analyze_source_op( + pub(super) fn analyze_import_op( &self, scope: &mut DataScopeBuilder, - source_op: NamedSpec, + import_op: NamedSpec, metadata: Option<&mut FlowSetupMetadata>, existing_source_states: Option<&Vec<&SourceSetupState>>, - ) -> Result> + Send> { - let factory = self.registry.get(&source_op.spec.kind); + ) -> Result> + Send> { + let factory = self.registry.get(&import_op.spec.source.kind); let source_factory = match factory { Some(ExecutorFactory::Source(source_executor)) => source_executor.clone(), _ => { return Err(anyhow::anyhow!( "Source executor not found for kind: {}", - source_op.spec.kind + import_op.spec.source.kind )) } }; let (output_type, executor) = source_factory.build( - serde_json::Value::Object(source_op.spec.spec), + serde_json::Value::Object(import_op.spec.source.spec), self.flow_ctx.clone(), )?; @@ -642,7 +642,7 @@ impl AnalyzerContext<'_> { metadata.last_source_id }; metadata.sources.insert( - source_op.name.clone(), + import_op.name.clone(), SourceSetupState { source_id, key_schema: key_schema_no_attrs, @@ -651,13 +651,13 @@ impl AnalyzerContext<'_> { source_id }); - let op_name = source_op.name.clone(); - let output = scope.add_field(source_op.name, &output_type)?; + let op_name = import_op.name.clone(); + let output = scope.add_field(import_op.name, &output_type)?; let result_fut = async move { trace!("Start building executor for source op `{}`", op_name); let executor = executor.await?; trace!("Finished building executor for source op `{}`", op_name); - Ok(AnalyzedSourceOp { + Ok(AnalyzedImportOp { source_id: source_id.unwrap_or_default(), executor, output, @@ -1100,14 +1100,14 @@ pub fn analyze_flow( name: ROOT_SCOPE_NAME, data: &mut root_data_scope, }; - let source_ops_futs = flow_inst - .source_ops + let import_ops_futs = flow_inst + .import_ops .iter() - .map(|source_op| { - let existing_source_states = source_states_by_name.get(source_op.name.as_str()); - analyzer_ctx.analyze_source_op( + .map(|import_op| { + let existing_source_states = source_states_by_name.get(import_op.name.as_str()); + analyzer_ctx.analyze_import_op( root_exec_scope.data, - source_op.clone(), + import_op.clone(), Some(&mut setup_state.metadata), existing_source_states, ) @@ -1138,8 +1138,8 @@ pub fn analyze_flow( .with(&data_schema)? .into_fingerprint(); let plan_fut = async move { - let (source_ops, op_scope, export_ops) = try_join3( - try_join_all(source_ops_futs), + let (import_ops, op_scope, export_ops) = try_join3( + try_join_all(import_ops_futs), op_scope_fut, try_join_all(export_ops_futs), ) @@ -1148,7 +1148,7 @@ pub fn analyze_flow( Ok(ExecutionPlan { tracking_table_setup, logic_fingerprint, - source_ops, + import_ops, op_scope, export_ops, }) diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index 6c49ff1b3..d7dd2f5cb 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -329,7 +329,7 @@ pub struct FlowBuilder { direct_input_fields: Vec, direct_output_value: Option, - source_ops: Vec>, + import_ops: Vec>, export_ops: Vec>, next_generated_op_id: usize, @@ -365,7 +365,7 @@ impl FlowBuilder { reactive_ops: vec![], - source_ops: vec![], + import_ops: vec![], export_ops: vec![], direct_input_fields: vec![], @@ -395,11 +395,13 @@ impl FlowBuilder { )); } } - let source_op = spec::NamedSpec { + let import_op = spec::NamedSpec { name, - spec: spec::OpSpec { - kind, - spec: op_spec.into_inner(), + spec: spec::ImportOpSpec { + source: spec::OpSpec { + kind, + spec: op_spec.into_inner(), + }, }, }; let analyzer_ctx = AnalyzerContext { @@ -409,14 +411,14 @@ impl FlowBuilder { let mut root_data_scope = self.root_data_scope.lock().unwrap(); let analyzed = analyzer_ctx - .analyze_source_op(&mut root_data_scope, source_op.clone(), None, None) + .analyze_import_op(&mut root_data_scope, import_op.clone(), None, None) .into_py_result()?; std::mem::drop(analyzed); let result = Self::last_field_to_data_slice(&root_data_scope, self.root_data_scope_ref.clone()) .into_py_result()?; - self.source_ops.push(source_op); + self.import_ops.push(import_op); Ok(result) } @@ -633,7 +635,7 @@ impl FlowBuilder { pub fn build_flow(&self, py: Python<'_>) -> PyResult { let spec = spec::FlowInstanceSpec { name: self.flow_instance_name.clone(), - source_ops: self.source_ops.clone(), + import_ops: self.import_ops.clone(), reactive_ops: self.reactive_ops.clone(), export_ops: self.export_ops.clone(), }; @@ -705,7 +707,7 @@ impl FlowBuilder { impl std::fmt::Display for FlowBuilder { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Flow instance name: {}\n\n", self.flow_instance_name)?; - for op in self.source_ops.iter() { + for op in self.import_ops.iter() { write!( f, "Source op {}\n{}\n", diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 304660276..1d3b95e6f 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -55,7 +55,7 @@ pub struct AnalyzedOpOutput { pub field_idx: u32, } -pub struct AnalyzedSourceOp { +pub struct AnalyzedImportOp { pub name: String, pub source_id: i32, pub executor: Box, @@ -128,7 +128,7 @@ pub struct ExecutionPlan { pub tracking_table_setup: db_tracking_setup::TrackingTableSetupState, pub logic_fingerprint: Fingerprint, - pub source_ops: Vec, + pub import_ops: Vec, pub op_scope: AnalyzedOpScope, pub export_ops: Vec, } diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs index 1fcb3381d..25bac21ed 100644 --- a/src/execution/dumper.rs +++ b/src/execution/dumper.rs @@ -14,7 +14,7 @@ use yaml_rust2::YamlEmitter; use super::memoization::EvaluationMemoryOptions; use super::row_indexer; use crate::base::{schema, value}; -use crate::builder::plan::{AnalyzedSourceOp, ExecutionPlan}; +use crate::builder::plan::{AnalyzedImportOp, ExecutionPlan}; use crate::ops::interface::SourceExecutorListOptions; use crate::utils::yaml_ser::YamlSerializer; @@ -69,7 +69,7 @@ struct Dumper<'a> { impl<'a> Dumper<'a> { async fn evaluate_source_entry<'b>( &'a self, - source_op: &'a AnalyzedSourceOp, + import_op: &'a AnalyzedImportOp, key: &value::KeyValue, collected_values_buffer: &'b mut Vec>, ) -> Result>>> @@ -78,7 +78,7 @@ impl<'a> Dumper<'a> { { let data_builder = row_indexer::evaluate_source_entry_with_memory( self.plan, - source_op, + import_op, self.schema, key, EvaluationMemoryOptions { @@ -130,13 +130,13 @@ impl<'a> Dumper<'a> { async fn evaluate_and_dump_source_entry( &self, - source_op: &AnalyzedSourceOp, + import_op: &AnalyzedImportOp, key: value::KeyValue, file_path: PathBuf, ) -> Result<()> { let mut collected_values_buffer = Vec::new(); let (exports, error) = match self - .evaluate_source_entry(source_op, &key, &mut collected_values_buffer) + .evaluate_source_entry(import_op, &key, &mut collected_values_buffer) .await { Ok(exports) => (exports, None), @@ -145,7 +145,7 @@ impl<'a> Dumper<'a> { let key_value = value::Value::from(key); let file_data = SourceOutputData { key: value::TypedValue { - t: &source_op.primary_key_type, + t: &import_op.primary_key_type, v: &key_value, }, exports, @@ -166,10 +166,10 @@ impl<'a> Dumper<'a> { Ok(()) } - async fn evaluate_and_dump_for_source_op(&self, source_op: &AnalyzedSourceOp) -> Result<()> { + async fn evaluate_and_dump_for_source(&self, import_op: &AnalyzedImportOp) -> Result<()> { let mut keys_by_filename_prefix: IndexMap> = IndexMap::new(); - let mut rows_stream = source_op.executor.list(SourceExecutorListOptions { + let mut rows_stream = import_op.executor.list(SourceExecutorListOptions { include_ordinal: false, }); while let Some(rows) = rows_stream.next().await { @@ -181,7 +181,7 @@ impl<'a> Dumper<'a> { .map(|s| urlencoding::encode(&s).into_owned()) .join(":"); s.truncate( - (0..(FILENAME_PREFIX_MAX_LENGTH - source_op.name.as_str().len())) + (0..(FILENAME_PREFIX_MAX_LENGTH - import_op.name.as_str().len())) .rev() .find(|i| s.is_char_boundary(*i)) .unwrap_or(0), @@ -202,9 +202,9 @@ impl<'a> Dumper<'a> { Cow::Borrowed("") }; let file_name = - format!("{}@{}{}.yaml", source_op.name, filename_prefix, extra_id); + format!("{}@{}{}.yaml", import_op.name, filename_prefix, extra_id); let file_path = output_dir.join(Path::new(&file_name)); - self.evaluate_and_dump_source_entry(source_op, key, file_path) + self.evaluate_and_dump_source_entry(import_op, key, file_path) }) }); try_join_all(evaluate_futs).await?; @@ -214,9 +214,9 @@ impl<'a> Dumper<'a> { async fn evaluate_and_dump(&self) -> Result<()> { try_join_all( self.plan - .source_ops + .import_ops .iter() - .map(|source_op| self.evaluate_and_dump_for_source_op(source_op)), + .map(|import_op| self.evaluate_and_dump_for_source(import_op)), ) .await?; Ok(()) diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index e9a9b5909..70eeb55ce 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -440,7 +440,7 @@ async fn evaluate_op_scope( pub async fn evaluate_source_entry( plan: &ExecutionPlan, - source_op: &AnalyzedSourceOp, + import_op: &AnalyzedImportOp, schema: &schema::DataSchema, key: &value::KeyValue, source_value: value::FieldValues, @@ -455,7 +455,7 @@ pub async fn evaluate_source_entry( schema: root_schema, }; - let collection_schema = match &root_schema.fields[source_op.output.field_idx as usize] + let collection_schema = match &root_schema.fields[import_op.output.field_idx as usize] .value_type .typ { @@ -468,7 +468,7 @@ pub async fn evaluate_source_entry( let scope_value = ScopeValueBuilder::augmented_from(&value::ScopeValue(source_value), collection_schema)?; root_scope_entry.define_field_w_builder( - &source_op.output, + &import_op.output, value::Value::Table(BTreeMap::from([(key.clone(), scope_value)])), ); diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index ef87b2924..d5c48f262 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -441,7 +441,7 @@ async fn commit_source_tracking_info( pub async fn evaluate_source_entry_with_memory( plan: &ExecutionPlan, - source_op: &AnalyzedSourceOp, + import_op: &AnalyzedImportOp, schema: &schema::DataSchema, key: &value::KeyValue, options: EvaluationMemoryOptions, @@ -450,7 +450,7 @@ pub async fn evaluate_source_entry_with_memory( let stored_info = if options.enable_cache || !options.evaluation_only { let source_key_json = serde_json::to_value(key)?; let existing_tracking_info = read_source_tracking_info_for_processing( - source_op.source_id, + import_op.source_id, &source_key_json, &plan.tracking_table_setup, pool, @@ -463,17 +463,17 @@ pub async fn evaluate_source_entry_with_memory( None }; let memory = EvaluationMemory::new(chrono::Utc::now(), stored_info, options); - let source_value = match source_op.executor.get_value(key).await? { + let source_value = match import_op.executor.get_value(key).await? { Some(d) => d, None => return Ok(None), }; - let output = evaluate_source_entry(plan, source_op, schema, key, source_value, &memory).await?; + let output = evaluate_source_entry(plan, import_op, schema, key, source_value, &memory).await?; Ok(Some(output)) } pub async fn update_source_row( plan: &ExecutionPlan, - source_op: &AnalyzedSourceOp, + import_op: &AnalyzedImportOp, schema: &schema::DataSchema, key: &value::KeyValue, source_value: Option, @@ -486,7 +486,7 @@ pub async fn update_source_row( // Phase 1: Evaluate with memoization info. let existing_tracking_info = read_source_tracking_info_for_processing( - source_op.source_id, + import_op.source_id, &source_key_json, &plan.tracking_table_setup, pool, @@ -519,7 +519,7 @@ pub async fn update_source_row( ); let output = evaluate_source_entry( plan, - source_op, + import_op, schema, key, source_value, @@ -533,7 +533,7 @@ pub async fn update_source_row( // Phase 2 (precommit): Update with the memoization info and stage target keys. let precommit_output = precommit_source_tracking_info( - source_op.source_id, + import_op.source_id, &source_key_json, source_version, plan.logic_fingerprint, @@ -572,7 +572,7 @@ pub async fn update_source_row( // Phase 4: Update the tracking record. commit_source_tracking_info( - source_op.source_id, + import_op.source_id, &source_key_json, source_version, &plan.logic_fingerprint.0, diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index 530fb23ab..664c7aec0 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -43,17 +43,17 @@ impl SourceIndexingContext { pool: &PgPool, ) -> Result { let plan = flow.get_execution_plan().await?; - let source_op = &plan.source_ops[source_idx]; + let import_op = &plan.import_ops[source_idx]; let mut list_state = db_tracking::ListTrackedSourceKeyMetadataState::new(); let mut rows = HashMap::new(); let mut key_metadata_stream = - list_state.list(source_op.source_id, &plan.tracking_table_setup, pool); + list_state.list(import_op.source_id, &plan.tracking_table_setup, pool); let scan_generation = 0; while let Some(key_metadata) = key_metadata_stream.next().await { let key_metadata = key_metadata?; let source_key = value::Value::::from_json( key_metadata.source_key, - &source_op.primary_key_type, + &import_op.primary_key_type, )? .into_key()?; rows.insert( @@ -91,7 +91,7 @@ impl SourceIndexingContext { let fut = async move { let permit = processing_sem.acquire().await?; let plan = self.flow.get_execution_plan().await?; - let source_op = &plan.source_ops[self.source_idx]; + let import_op = &plan.import_ops[self.source_idx]; let source_value = if source_version.kind == row_indexer::SourceVersionKind::Deleted { None } else { @@ -100,12 +100,12 @@ impl SourceIndexingContext { // also happens for update cases and there's no way to keep them always in sync for many sources. // // We only need source version <= actual version for value. - source_op.executor.get_value(&key).await? + import_op.executor.get_value(&key).await? }; let schema = &self.flow.data_schema; let result = row_indexer::update_source_row( &plan, - source_op, + import_op, schema, &key, source_value, @@ -192,8 +192,8 @@ impl SourceIndexingContext { async fn update_source(self: &Arc, pool: &PgPool) -> Result { let plan = self.flow.get_execution_plan().await?; - let source_op = &plan.source_ops[self.source_idx]; - let mut rows_stream = source_op + let import_op = &plan.import_ops[self.source_idx]; + let mut rows_stream = import_op .executor .list(interface::SourceExecutorListOptions { include_ordinal: true, @@ -253,7 +253,7 @@ impl SourceIndexingContext { } Ok(stats::SourceUpdateInfo { - source_name: source_op.name.clone(), + source_name: import_op.name.clone(), stats: Arc::unwrap_or_clone(update_stats), }) } @@ -262,7 +262,7 @@ impl SourceIndexingContext { pub async fn update(flow_context: &FlowContext, pool: &PgPool) -> Result { let plan = flow_context.flow.get_execution_plan().await?; let source_update_stats = try_join_all( - (0..plan.source_ops.len()) + (0..plan.import_ops.len()) .map(|idx| async move { let source_context = flow_context.get_source_indexing_context(idx, pool).await?; source_context.update_source(pool).await diff --git a/src/lib_context.rs b/src/lib_context.rs index becb2a9e4..281da49a9 100644 --- a/src/lib_context.rs +++ b/src/lib_context.rs @@ -23,7 +23,7 @@ impl FlowContext { pub fn new(flow: Arc) -> Self { let mut source_indexing_contexts = Vec::new(); source_indexing_contexts - .resize_with(flow.flow_instance.source_ops.len(), || OnceCell::new()); + .resize_with(flow.flow_instance.import_ops.len(), || OnceCell::new()); Self { flow, source_indexing_contexts, diff --git a/src/service/flows.rs b/src/service/flows.rs index db451177a..84ee7ae7a 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -78,8 +78,8 @@ pub async fn get_keys( })?; let execution_plan = flow_ctx.flow.get_execution_plan().await?; - let source_op = execution_plan - .source_ops + let import_op = execution_plan + .import_ops .iter() .find(|op| op.output.field_idx == field_idx as u32) .ok_or_else(|| { @@ -89,7 +89,7 @@ pub async fn get_keys( ) })?; - let mut rows_stream = source_op.executor.list(SourceExecutorListOptions { + let mut rows_stream = import_op.executor.list(SourceExecutorListOptions { include_ordinal: false, }); let mut keys = Vec::new(); @@ -122,12 +122,12 @@ pub async fn evaluate_data( let flow_ctx = lib_context.get_flow_context(&flow_name)?; let schema = &flow_ctx.flow.data_schema; - let source_op_idx = flow_ctx + let import_op_idx = flow_ctx .flow .flow_instance - .source_ops + .import_ops .iter() - .position(|source_op| source_op.name == query.field) + .position(|op| op.name == query.field) .ok_or_else(|| { ApiError::new( &format!("source field not found: {}", query.field), @@ -135,8 +135,8 @@ pub async fn evaluate_data( ) })?; let plan = flow_ctx.flow.get_execution_plan().await?; - let source_op = &plan.source_ops[source_op_idx]; - let field_schema = &schema.fields[source_op.output.field_idx as usize]; + let import_op = &plan.import_ops[import_op_idx]; + let field_schema = &schema.fields[import_op.output.field_idx as usize]; let collection_schema = match &field_schema.value_type.typ { schema::ValueType::Collection(collection) => collection, _ => api_bail!("field is not a table: {}", query.field), @@ -148,7 +148,7 @@ pub async fn evaluate_data( let value_builder = row_indexer::evaluate_source_entry_with_memory( &plan, - source_op, + import_op, schema, &key, memoization::EvaluationMemoryOptions {