diff --git a/src/base/schema.rs b/src/base/schema.rs index 4954d75f3..69fae78e7 100644 --- a/src/base/schema.rs +++ b/src/base/schema.rs @@ -1,9 +1,7 @@ -use crate::builder::plan::AnalyzedValueMapping; +use crate::prelude::*; use super::spec::*; -use anyhow::Result; -use serde::{Deserialize, Serialize}; -use std::{collections::BTreeMap, ops::Deref, sync::Arc}; +use crate::builder::plan::AnalyzedValueMapping; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct VectorTypeSchema { @@ -141,9 +139,6 @@ impl std::fmt::Display for TableKind { pub struct TableSchema { pub kind: TableKind, pub row: StructSchema, - - #[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")] - pub collectors: Vec>>, } impl TableSchema { @@ -170,36 +165,19 @@ impl TableSchema { Self { kind: self.kind, row: self.row.without_attrs(), - collectors: self - .collectors - .iter() - .map(|c| NamedSpec { - name: c.name.clone(), - spec: Arc::from(c.spec.without_attrs()), - }) - .collect(), } } } impl std::fmt::Display for TableSchema { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}({}", self.kind, self.row)?; - for collector in self.collectors.iter() { - write!(f, "; COLLECTOR {} ({})", collector.name, collector.spec)?; - } - write!(f, ")")?; - Ok(()) + write!(f, "{}({})", self.kind, self.row) } } impl TableSchema { pub fn new(kind: TableKind, row: StructSchema) -> Self { - Self { - kind, - row, - collectors: Default::default(), - } + Self { kind, row } } pub fn key_field(&self) -> Option<&FieldSchema> { @@ -409,16 +387,27 @@ impl CollectorSchema { } } +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct OpScopeSchema { + /// Output schema for transform ops. + pub op_output_types: HashMap, + + /// Child op scope for foreach ops. + pub op_scopes: HashMap>, + + /// Collectors for the current scope. + pub collectors: Vec>>, +} + /// Top-level schema for a flow instance. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DataSchema { +pub struct FlowSchema { pub schema: StructSchema, - #[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")] - pub collectors: Vec>>, + pub root_op_scope: OpScopeSchema, } -impl Deref for DataSchema { +impl std::ops::Deref for FlowSchema { type Target = StructSchema; fn deref(&self) -> &Self::Target { diff --git a/src/builder/analyzed_flow.rs b/src/builder/analyzed_flow.rs index 548962f88..b08122e2e 100644 --- a/src/builder/analyzed_flow.rs +++ b/src/builder/analyzed_flow.rs @@ -9,7 +9,7 @@ use crate::{ pub struct AnalyzedFlow { pub flow_instance: spec::FlowInstanceSpec, - pub data_schema: schema::DataSchema, + pub data_schema: schema::FlowSchema, pub desired_state: setup::FlowSetupState, /// It's None if the flow is not up to date pub execution_plan: @@ -67,7 +67,7 @@ impl AnalyzedFlow { pub struct AnalyzedTransientFlow { pub transient_flow_instance: spec::TransientFlowSpec, - pub data_schema: schema::DataSchema, + pub data_schema: schema::FlowSchema, pub execution_plan: plan::TransientExecutionPlan, pub output_type: schema::EnrichedValueType, } diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index d9d3289a0..834727643 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -11,7 +11,6 @@ use crate::utils::fingerprint::Fingerprinter; use crate::{ base::{schema::*, spec::*}, ops::{interface::*, registry::*}, - utils::immutable::RefList, }; use futures::future::{try_join3, BoxFuture}; use futures::{future::try_join_all, FutureExt}; @@ -122,13 +121,6 @@ impl TryFrom<&TableSchema> for TableSchemaBuilder { kind: schema.kind, sub_scope: Arc::new(Mutex::new(DataScopeBuilder { data: (&schema.row).try_into()?, - collectors: Mutex::new( - schema - .collectors - .iter() - .map(|c| (c.name.clone(), CollectorBuilder::new(c.spec.clone()))) - .collect(), - ), })), }) } @@ -140,20 +132,9 @@ impl TryInto for &TableSchemaBuilder { fn try_into(self) -> Result { let sub_scope = self.sub_scope.lock().unwrap(); let row = (&sub_scope.data).try_into()?; - let collectors = sub_scope - .collectors - .lock() - .unwrap() - .iter() - .map(|(name, schema)| NamedSpec { - name: name.clone(), - spec: schema.schema.clone(), - }) - .collect(); Ok(TableSchema { kind: self.kind, row, - collectors, }) } } @@ -182,38 +163,9 @@ fn try_make_common_value_type( ); } let row = try_merge_struct_schemas(&table_type1.row, &table_type2.row)?; - - if table_type1.collectors.len() != table_type2.collectors.len() { - api_bail!( - "Collection types are not compatible as they have different collectors count: {} vs {}", - table_type1, - table_type2 - ); - } - let collectors = table_type1 - .collectors - .iter() - .zip(table_type2.collectors.iter()) - .map(|(c1, c2)| -> Result<_> { - if c1.name != c2.name { - api_bail!( - "Collection types are not compatible as they have different collectors names: {} vs {}", - c1.name, - c2.name - ); - } - let collector = NamedSpec { - name: c1.name.clone(), - spec: Arc::new(try_merge_collector_schemas(&c1.spec, &c2.spec)?), - }; - Ok(collector) - }) - .collect::>()?; - ValueType::Table(TableSchema { kind: table_type1.kind, row, - collectors, }) } (t1 @ (ValueType::Basic(_) | ValueType::Struct(_) | ValueType::Table(_)), t2) => { @@ -346,14 +298,12 @@ impl CollectorBuilder { #[derive(Debug)] pub(super) struct DataScopeBuilder { pub data: StructSchemaBuilder, - pub collectors: Mutex>, } impl DataScopeBuilder { pub fn new() -> Self { Self { data: Default::default(), - collectors: Default::default(), } } @@ -411,31 +361,28 @@ impl DataScopeBuilder { value_type, )) } +} - pub fn consume_collector( - &self, - collector_name: &FieldName, - ) -> Result<(AnalyzedLocalCollectorReference, Arc)> { - let mut collectors = self.collectors.lock().unwrap(); - let (collector_idx, _, collector) = collectors - .get_full_mut(collector_name) - .ok_or_else(|| api_error!("Collector not found: {}", collector_name))?; - Ok(( - AnalyzedLocalCollectorReference { - collector_idx: collector_idx as u32, - }, - collector.use_schema(), - )) - } +pub(super) struct AnalyzerContext<'a> { + pub registry: &'a ExecutorFactoryRegistry, + pub flow_ctx: &'a Arc, +} +#[derive(Debug, Default)] +pub(super) struct OpScopeStates { + pub op_output_types: HashMap, + pub collectors: IndexMap, + pub sub_scopes: HashMap>, +} + +impl OpScopeStates { pub fn add_collector( - &self, + &mut self, collector_name: FieldName, schema: CollectorSchema, ) -> Result { - let mut collectors = self.collectors.lock().unwrap(); - let existing_len = collectors.len(); - let idx = match collectors.entry(collector_name) { + let existing_len = self.collectors.len(); + let idx = match self.collectors.entry(collector_name) { indexmap::map::Entry::Occupied(mut entry) => { entry.get_mut().merge_schema(&schema)?; entry.index() @@ -450,53 +397,170 @@ impl DataScopeBuilder { }) } - pub fn into_data_schema(self) -> Result { - Ok(DataSchema { - schema: (&self.data).try_into()?, + pub fn consume_collector( + &mut self, + collector_name: &FieldName, + ) -> Result<(AnalyzedLocalCollectorReference, Arc)> { + let (collector_idx, _, collector) = self + .collectors + .get_full_mut(collector_name) + .ok_or_else(|| api_error!("Collector not found: {}", collector_name))?; + Ok(( + AnalyzedLocalCollectorReference { + collector_idx: collector_idx as u32, + }, + collector.use_schema(), + )) + } + + fn build_op_scope_schema(&self) -> OpScopeSchema { + OpScopeSchema { + op_output_types: self + .op_output_types + .iter() + .map(|(name, value_type)| (name.clone(), value_type.without_attrs())) + .collect(), collectors: self .collectors - .into_inner() - .unwrap() - .into_iter() + .iter() .map(|(name, schema)| NamedSpec { - name, - spec: schema.schema, + name: name.clone(), + spec: schema.schema.clone(), }) .collect(), + op_scopes: self.sub_scopes.clone(), + } + } +} + +#[derive(Debug)] +pub struct OpScope { + pub name: String, + pub parent: Option<(Arc, spec::FieldPath)>, + pub(super) data: Arc>, + pub(super) states: Mutex, +} + +struct Iter<'a>(Option<&'a OpScope>); + +impl<'a> Iterator for Iter<'a> { + type Item = &'a OpScope; + + fn next(&mut self) -> Option { + match self.0 { + Some(scope) => { + self.0 = scope.parent.as_ref().map(|(parent, _)| parent.as_ref()); + Some(scope) + } + None => None, + } + } +} + +impl OpScope { + pub(super) fn new( + name: String, + parent: Option<(Arc, spec::FieldPath)>, + data: Arc>, + ) -> Arc { + Arc::new(Self { + name, + parent, + data, + states: Mutex::default(), }) } + + fn add_op_output( + &self, + name: FieldName, + value_type: EnrichedValueType, + ) -> Result { + let op_output = self + .data + .lock() + .unwrap() + .add_field(name.clone(), &value_type)?; + self.states + .lock() + .unwrap() + .op_output_types + .insert(name, value_type); + Ok(op_output) + } + + pub fn ancestors(&self) -> impl Iterator { + Iter(Some(self)) + } + + pub fn is_op_scope_descendant(&self, other: &Self) -> bool { + if self == other { + return true; + } + match &self.parent { + Some((parent, _)) => parent.is_op_scope_descendant(other), + None => false, + } + } + + pub(super) fn new_foreach_op_scope( + self: &Arc, + scope_name: String, + field_path: &FieldPath, + ) -> Result<(AnalyzedLocalFieldReference, Arc)> { + let (local_field_ref, sub_data_scope) = { + let data_scope = self.data.lock().unwrap(); + let (local_field_ref, value_type) = data_scope.analyze_field_path(field_path)?; + let sub_data_scope = match &value_type.typ { + ValueTypeBuilder::Table(table_type) => table_type.sub_scope.clone(), + _ => api_bail!("ForEach only works on collection, field {field_path} is not"), + }; + (local_field_ref, sub_data_scope) + }; + let sub_op_scope = OpScope::new( + scope_name, + Some((self.clone(), field_path.clone())), + sub_data_scope, + ); + Ok((local_field_ref, sub_op_scope)) + } } -pub(super) struct AnalyzerContext<'a> { - pub registry: &'a ExecutorFactoryRegistry, - pub flow_ctx: &'a Arc, +impl std::fmt::Display for OpScope { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some((scope, field_path)) = &self.parent { + write!(f, "{} [{} AS {}]", scope, field_path, self.name)?; + } else { + write!(f, "[{}]", self.name)?; + } + Ok(()) + } } -pub(super) struct ExecutionScope<'a> { - pub name: &'a str, - pub data: &'a mut DataScopeBuilder, +impl PartialEq for OpScope { + fn eq(&self, other: &Self) -> bool { + std::ptr::eq(self, other) + } } +impl Eq for OpScope {} -fn find_scope<'a>( - scope_name: &ScopeName, - scopes: RefList<'a, &'a ExecutionScope<'a>>, -) -> Result<(u32, &'a ExecutionScope<'a>)> { - let (up_level, scope) = scopes - .iter() +fn find_scope<'a>(scope_name: &ScopeName, op_scope: &'a OpScope) -> Result<(u32, &'a OpScope)> { + let (up_level, scope) = op_scope + .ancestors() .enumerate() - .find(|(_, s)| s.name == scope_name) + .find(|(_, s)| &s.name == scope_name) .ok_or_else(|| api_error!("Scope not found: {}", scope_name))?; Ok((up_level as u32, scope)) } fn analyze_struct_mapping( mapping: &StructMapping, - scopes: RefList<'_, &'_ ExecutionScope<'_>>, + op_scope: &OpScope, ) -> Result<(AnalyzedStructMapping, Vec)> { let mut field_mappings = Vec::with_capacity(mapping.fields.len()); let mut field_schemas = Vec::with_capacity(mapping.fields.len()); for field in mapping.fields.iter() { - let (field_mapping, value_type) = analyze_value_mapping(&field.spec, scopes)?; + let (field_mapping, value_type) = analyze_value_mapping(&field.spec, op_scope)?; field_mappings.push(field_mapping); field_schemas.push(FieldSchema { name: field.name.clone(), @@ -513,7 +577,7 @@ fn analyze_struct_mapping( fn analyze_value_mapping( value_mapping: &ValueMapping, - scopes: RefList<'_, &'_ ExecutionScope<'_>>, + op_scope: &OpScope, ) -> Result<(AnalyzedValueMapping, EnrichedValueType)> { let result = match value_mapping { ValueMapping::Constant(v) => { @@ -522,12 +586,12 @@ fn analyze_value_mapping( } ValueMapping::Field(v) => { - let (scope_up_level, exec_scope) = match &v.scope { - Some(scope) => find_scope(scope, scopes)?, - None => (0, *scopes.head().ok_or_else(|| anyhow!("Scope not found"))?), + let (scope_up_level, op_scope) = match &v.scope { + Some(scope_name) => find_scope(scope_name, op_scope)?, + None => (0, op_scope), }; - let (local_field_ref, value_type) = - exec_scope.data.analyze_field_path(&v.field_path)?; + let data_scope = op_scope.data.lock().unwrap(); + let (local_field_ref, value_type) = data_scope.analyze_field_path(&v.field_path)?; ( AnalyzedValueMapping::Field(AnalyzedFieldReference { local: local_field_ref, @@ -538,7 +602,7 @@ fn analyze_value_mapping( } ValueMapping::Struct(v) => { - let (struct_mapping, field_schemas) = analyze_struct_mapping(v, scopes)?; + let (struct_mapping, field_schemas) = analyze_struct_mapping(v, op_scope)?; ( AnalyzedValueMapping::Struct(struct_mapping), EnrichedValueType { @@ -557,11 +621,11 @@ fn analyze_value_mapping( fn analyze_input_fields( arg_bindings: &[OpArgBinding], - scopes: RefList<'_, &'_ ExecutionScope<'_>>, + op_scope: &OpScope, ) -> Result> { let mut input_field_schemas = Vec::with_capacity(arg_bindings.len()); for arg_binding in arg_bindings.iter() { - let (analyzed_value, value_type) = analyze_value_mapping(&arg_binding.value, scopes)?; + let (analyzed_value, value_type) = analyze_value_mapping(&arg_binding.value, op_scope)?; input_field_schemas.push(OpArgSchema { name: arg_binding.arg_name.clone(), value_type, @@ -575,10 +639,14 @@ fn add_collector( scope_name: &ScopeName, collector_name: FieldName, schema: CollectorSchema, - scopes: RefList<'_, &'_ ExecutionScope<'_>>, + op_scope: &OpScope, ) -> Result { - let (scope_up_level, scope) = find_scope(scope_name, scopes)?; - let local_ref = scope.data.add_collector(collector_name, schema)?; + let (scope_up_level, scope) = find_scope(scope_name, op_scope)?; + let local_ref = scope + .states + .lock() + .unwrap() + .add_collector(collector_name, schema)?; Ok(AnalyzedCollectorReference { local: local_ref, scope_up_level, @@ -596,7 +664,7 @@ struct ExportDataFieldsInfo { impl AnalyzerContext<'_> { pub(super) fn analyze_import_op( &self, - scope: &mut DataScopeBuilder, + op_scope: &Arc, import_op: NamedSpec, metadata: Option<&mut FlowSetupMetadata>, existing_source_states: Option<&Vec<&SourceSetupState>>, @@ -655,7 +723,13 @@ impl AnalyzerContext<'_> { }); let op_name = import_op.name.clone(); - let output = scope.add_field(import_op.name, &output_type)?; + let primary_key_type = output_type + .typ + .key_type() + .ok_or_else(|| api_error!("Source must produce a type with key: {op_name}"))? + .typ + .clone(); + let output = op_scope.add_op_output(import_op.name, output_type)?; let result_fut = async move { trace!("Start building executor for source op `{}`", op_name); let executor = executor.await?; @@ -664,12 +738,7 @@ impl AnalyzerContext<'_> { source_id: source_id.unwrap_or_default(), executor, output, - primary_key_type: output_type - .typ - .key_type() - .ok_or_else(|| api_error!("Source must produce a type with key: {op_name}"))? - .typ - .clone(), + primary_key_type, name: op_name, refresh_options: import_op.spec.refresh_options, }) @@ -679,21 +748,18 @@ impl AnalyzerContext<'_> { pub(super) fn analyze_reactive_op( &self, - scope: &mut ExecutionScope<'_>, + op_scope: &Arc, reactive_op: &NamedSpec, - parent_scopes: RefList<'_, &'_ ExecutionScope<'_>>, ) -> Result>> { let result_fut = match &reactive_op.spec { ReactiveOpSpec::Transform(op) => { let input_field_schemas = - analyze_input_fields(&op.inputs, parent_scopes.prepend(scope)).with_context( - || { - format!( - "Failed to analyze inputs for transform op: {}", - reactive_op.name - ) - }, - )?; + analyze_input_fields(&op.inputs, op_scope).with_context(|| { + format!( + "Failed to analyze inputs for transform op: {}", + reactive_op.name + ) + })?; let spec = serde_json::Value::Object(op.op.spec.clone()); let factory = self.registry.get(&op.op.kind); @@ -703,43 +769,42 @@ impl AnalyzerContext<'_> { .iter() .map(|field| field.analyzed_value.clone()) .collect(); - let (output_type, executor) = fn_executor.clone().build( + let (output_enriched_type, executor) = fn_executor.clone().build( spec, input_field_schemas, self.flow_ctx.clone(), )?; - let output = scope - .data - .add_field(reactive_op.name.clone(), &output_type)?; - let reactive_op = reactive_op.clone(); let logic_fingerprinter = Fingerprinter::default() .with(&op.op)? - .with(&output_type.without_attrs())?; + .with(&output_enriched_type.without_attrs())?; + let output_type = output_enriched_type.typ.clone(); + let output = op_scope + .add_op_output(reactive_op.name.clone(), output_enriched_type)?; + let op_name = reactive_op.name.clone(); async move { - trace!("Start building executor for transform op `{}`", reactive_op.name); + trace!("Start building executor for transform op `{op_name}`"); let executor = executor.await.with_context(|| { - format!("Failed to build executor for transform op: {}", reactive_op.name) + format!("Failed to build executor for transform op: {op_name}") })?; let enable_cache = executor.enable_cache(); let behavior_version = executor.behavior_version(); - trace!("Finished building executor for transform op `{}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}", reactive_op.name); + trace!("Finished building executor for transform op `{op_name}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}"); let function_exec_info = AnalyzedFunctionExecInfo { enable_cache, behavior_version, fingerprinter: logic_fingerprinter .with(&behavior_version)?, - output_type: output_type.typ.clone(), + output_type }; if function_exec_info.enable_cache && function_exec_info.behavior_version.is_none() { api_bail!( - "When caching is enabled, behavior version must be specified for transform op: {}", - reactive_op.name + "When caching is enabled, behavior version must be specified for transform op: {op_name}" ); } Ok(AnalyzedReactiveOp::Transform(AnalyzedTransformOp { - name: reactive_op.name, + name: op_name, inputs: input_value_mappings, function_exec_info, executor, @@ -756,33 +821,28 @@ impl AnalyzerContext<'_> { } } } - ReactiveOpSpec::ForEach(op) => { - let (local_field_ref, value_type) = - scope.data.analyze_field_path(&op.field_path)?; - let sub_scope = match &value_type.typ { - ValueTypeBuilder::Table(table_type) => &table_type.sub_scope, - _ => api_bail!( - "ForEach only works on collection, field {} is not", - op.field_path - ), - }; - let op_scope_fut = { - let mut sub_scope = sub_scope.lock().unwrap(); - let mut exec_scope = ExecutionScope { - name: &op.op_scope.name, - data: &mut sub_scope, - }; - self.analyze_op_scope( - &mut exec_scope, - &op.op_scope.ops, - parent_scopes.prepend(scope), - )? + + ReactiveOpSpec::ForEach(foreach_op) => { + let (local_field_ref, sub_op_scope) = op_scope.new_foreach_op_scope( + foreach_op.op_scope.name.clone(), + &foreach_op.field_path, + )?; + let analyzed_op_scope_fut = { + let analyzed_op_scope_fut = + self.analyze_op_scope(&sub_op_scope, &foreach_op.op_scope.ops)?; + let sub_op_scope_schema = + sub_op_scope.states.lock().unwrap().build_op_scope_schema(); + op_scope.states.lock().unwrap().sub_scopes.insert( + foreach_op.op_scope.name.clone(), + Arc::new(sub_op_scope_schema), + ); + analyzed_op_scope_fut }; let op_name = reactive_op.name.clone(); async move { Ok(AnalyzedReactiveOp::ForEach(AnalyzedForEachOp { local_field_ref, - op_scope: op_scope_fut + op_scope: analyzed_op_scope_fut .await .with_context(|| format!("Analyzing foreach op: {op_name}"))?, name: op_name, @@ -792,8 +852,7 @@ impl AnalyzerContext<'_> { } ReactiveOpSpec::Collect(op) => { - let scopes = parent_scopes.prepend(scope); - let (struct_mapping, fields_schema) = analyze_struct_mapping(&op.input, scopes)?; + let (struct_mapping, fields_schema) = analyze_struct_mapping(&op.input, op_scope)?; let has_auto_uuid_field = op.auto_uuid_field.is_some(); let fingerprinter = Fingerprinter::default().with(&fields_schema)?; let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp { @@ -804,7 +863,7 @@ impl AnalyzerContext<'_> { &op.scope_name, op.collector_name.clone(), CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()), - scopes, + op_scope, )?, fingerprinter, }); @@ -902,7 +961,7 @@ impl AnalyzerContext<'_> { fn analyze_export_op_group( &self, target_kind: String, - scope: &mut DataScopeBuilder, + op_scope: &Arc, flow_inst: &FlowInstanceSpec, export_op_group: &AnalyzedExportTargetOpGroup, declarations: Vec, @@ -913,8 +972,11 @@ impl AnalyzerContext<'_> { let mut data_fields_infos = Vec::::new(); for idx in export_op_group.op_idx.iter() { let export_op = &flow_inst.export_ops[*idx]; - let (local_collector_ref, collector_schema) = - scope.consume_collector(&export_op.spec.collector_name)?; + let (local_collector_ref, collector_schema) = op_scope + .states + .lock() + .unwrap() + .consume_collector(&export_op.spec.collector_name)?; let (key_fields_schema, value_fields_schema, data_collection_info) = match &export_op.spec.index_options.primary_key_fields { Some(fields) => { @@ -1040,17 +1102,18 @@ impl AnalyzerContext<'_> { fn analyze_op_scope( &self, - scope: &mut ExecutionScope<'_>, + op_scope: &Arc, reactive_ops: &[NamedSpec], - parent_scopes: RefList<'_, &'_ ExecutionScope<'_>>, ) -> Result> + Send> { let op_futs = reactive_ops .iter() - .map(|reactive_op| self.analyze_reactive_op(scope, reactive_op, parent_scopes)) + .map(|reactive_op| self.analyze_reactive_op(op_scope, reactive_op)) .collect::>>()?; + let collector_len = op_scope.states.lock().unwrap().collectors.len(); let result_fut = async move { Ok(AnalyzedOpScope { reactive_ops: try_join_all(op_futs).await?, + collector_len, }) }; Ok(result_fut) @@ -1068,18 +1131,25 @@ pub fn build_flow_instance_context( }) } +fn build_flow_schema(root_op_scope: &OpScope) -> Result { + let schema = (&root_op_scope.data.lock().unwrap().data).try_into()?; + let root_op_scope_schema = root_op_scope.states.lock().unwrap().build_op_scope_schema(); + Ok(FlowSchema { + schema, + root_op_scope: root_op_scope_schema, + }) +} + pub fn analyze_flow( flow_inst: &FlowInstanceSpec, flow_ctx: &Arc, existing_flow_ss: Option<&setup::FlowSetupState>, registry: &ExecutorFactoryRegistry, ) -> Result<( - DataSchema, + FlowSchema, impl Future> + Send, setup::FlowSetupState, )> { - let mut root_data_scope = DataScopeBuilder::new(); - let existing_metadata_versions = || { existing_flow_ss .iter() @@ -1138,28 +1208,22 @@ pub fn analyze_flow( }; let analyzer_ctx = AnalyzerContext { registry, flow_ctx }; - let mut root_exec_scope = ExecutionScope { - name: ROOT_SCOPE_NAME, - data: &mut root_data_scope, - }; + let root_data_scope = Arc::new(Mutex::new(DataScopeBuilder::new())); + let root_op_scope = OpScope::new(ROOT_SCOPE_NAME.to_string(), None, root_data_scope); let import_ops_futs = flow_inst .import_ops .iter() .map(|import_op| { let existing_source_states = source_states_by_name.get(import_op.name.as_str()); analyzer_ctx.analyze_import_op( - root_exec_scope.data, + &root_op_scope, import_op.clone(), Some(&mut setup_state.metadata), existing_source_states, ) }) .collect::>>()?; - let op_scope_fut = analyzer_ctx.analyze_op_scope( - &mut root_exec_scope, - &flow_inst.reactive_ops, - RefList::Nil, - )?; + let op_scope_fut = analyzer_ctx.analyze_op_scope(&root_op_scope, &flow_inst.reactive_ops)?; #[derive(Default)] struct TargetOpGroup { @@ -1197,7 +1261,7 @@ pub fn analyze_flow( }; export_ops_futs.extend(analyzer_ctx.analyze_export_op_group( target_kind, - root_exec_scope.data, + &root_op_scope, flow_inst, &analyzed_target_op_group, op_ids.declarations, @@ -1208,10 +1272,11 @@ pub fn analyze_flow( } let tracking_table_setup = setup_state.tracking_table.clone(); - let data_schema = root_data_scope.into_data_schema()?; + + let flow_schema = build_flow_schema(&root_op_scope)?; let logic_fingerprint = Fingerprinter::default() .with(&flow_inst)? - .with(&data_schema)? + .with(&flow_schema.schema)? .into_fingerprint(); let plan_fut = async move { let (import_ops, op_scope, export_ops) = try_join3( @@ -1231,7 +1296,7 @@ pub fn analyze_flow( }) }; - Ok((data_schema, plan_fut, setup_state)) + Ok((flow_schema, plan_fut, setup_state)) } pub fn analyze_transient_flow<'a>( @@ -1240,7 +1305,7 @@ pub fn analyze_transient_flow<'a>( registry: &'a ExecutorFactoryRegistry, ) -> Result<( EnrichedValueType, - DataSchema, + FlowSchema, impl Future> + Send + 'a, )> { let mut root_data_scope = DataScopeBuilder::new(); @@ -1250,19 +1315,15 @@ pub fn analyze_transient_flow<'a>( let analyzed_field = root_data_scope.add_field(field.name.clone(), &field.value_type)?; input_fields.push(analyzed_field); } - let mut root_exec_scope = ExecutionScope { - name: ROOT_SCOPE_NAME, - data: &mut root_data_scope, - }; - let op_scope_fut = analyzer_ctx.analyze_op_scope( - &mut root_exec_scope, - &flow_inst.reactive_ops, - RefList::Nil, - )?; - let (output_value, output_type) = analyze_value_mapping( - &flow_inst.output_value, - RefList::Nil.prepend(&root_exec_scope), - )?; + let root_op_scope = OpScope::new( + ROOT_SCOPE_NAME.to_string(), + None, + Arc::new(Mutex::new(root_data_scope)), + ); + let op_scope_fut = analyzer_ctx.analyze_op_scope(&root_op_scope, &flow_inst.reactive_ops)?; + let (output_value, output_type) = + analyze_value_mapping(&flow_inst.output_value, &root_op_scope)?; + let data_schema = build_flow_schema(&root_op_scope)?; let plan_fut = async move { let op_scope = op_scope_fut.await?; Ok(TransientExecutionPlan { @@ -1271,5 +1332,5 @@ pub fn analyze_transient_flow<'a>( output_value, }) }; - Ok((output_type, root_data_scope.into_data_schema()?, plan_fut)) + Ok((output_type, data_schema, plan_fut)) } diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index b69160a2d..e7e39f538 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -1,14 +1,10 @@ use crate::prelude::*; use pyo3::{exceptions::PyException, prelude::*}; -use std::{ - collections::{btree_map, hash_map::Entry, HashMap}, - ops::Deref, -}; +use std::{collections::btree_map, ops::Deref}; use super::analyzer::{ - build_flow_instance_context, AnalyzerContext, CollectorBuilder, DataScopeBuilder, - ExecutionScope, ValueTypeBuilder, + build_flow_instance_context, AnalyzerContext, CollectorBuilder, DataScopeBuilder, OpScope, }; use crate::{ base::{ @@ -19,43 +15,35 @@ use crate::{ ops::interface::FlowInstanceContext, py::IntoPyResult, setup, - utils::immutable::RefList, }; use crate::{lib_context::FlowContext, py}; -#[derive(Debug)] -pub struct DataScopeRefInfo { - scope_name: String, - parent: Option<(DataScopeRef, spec::FieldPath)>, - scope_builder: Arc>, - children: Mutex>>, -} - #[pyclass] #[derive(Debug, Clone)] -pub struct DataScopeRef(Arc); +pub struct OpScopeRef(Arc); -impl Deref for DataScopeRef { - type Target = DataScopeRefInfo; +impl From> for OpScopeRef { + fn from(scope: Arc) -> Self { + Self(scope) + } +} + +impl Deref for OpScopeRef { + type Target = Arc; fn deref(&self) -> &Self::Target { &self.0 } } -impl std::fmt::Display for DataScopeRef { +impl std::fmt::Display for OpScopeRef { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if let Some((scope, field_path)) = &self.parent { - write!(f, "{} [{} AS {}]", scope, field_path, self.scope_name)?; - } else { - write!(f, "[{}]", self.scope_name)?; - } - Ok(()) + write!(f, "{}", self.0) } } #[pymethods] -impl DataScopeRef { +impl OpScopeRef { pub fn __str__(&self) -> String { format!("{}", self) } @@ -67,92 +55,13 @@ impl DataScopeRef { pub fn add_collector(&mut self, name: String) -> PyResult { let collector = DataCollector { name, - scope: self.clone(), + scope: self.0.clone(), collector: Mutex::new(None), }; Ok(collector) } } -impl DataScopeRef { - fn get_child_scope(&self, field_path: spec::FieldPath) -> Result { - let mut children = self.children.lock().unwrap(); - let result = match children.entry(field_path) { - Entry::Occupied(mut entry) => { - let child = entry.get().upgrade(); - if let Some(child) = child { - DataScopeRef(child) - } else { - let new_scope = self.make_child_scope(entry.key())?; - entry.insert(Arc::downgrade(&new_scope.0)); - new_scope - } - } - Entry::Vacant(entry) => { - let new_scope = self.make_child_scope(entry.key())?; - entry.insert(Arc::downgrade(&new_scope.0)); - new_scope - } - }; - Ok(result) - } - - fn make_child_scope(&self, field_path: &spec::FieldPath) -> Result { - let mut num_parent_layers = 0; - let mut curr_scope = self; - while let Some((parent, _)) = &curr_scope.parent { - curr_scope = parent; - num_parent_layers += 1; - } - - let scope_data = &self.scope_builder.lock().unwrap().data; - let mut field_typ = &scope_data - .find_field( - field_path - .first() - .ok_or_else(|| anyhow!("field path is empty"))?, - ) - .ok_or_else(|| anyhow!("field {} not found", field_path.first().unwrap()))? - .1 - .value_type - .typ; - for field in field_path[1..].iter() { - let struct_builder = match field_typ { - ValueTypeBuilder::Struct(struct_type) => struct_type, - _ => bail!("expect struct type"), - }; - field_typ = &struct_builder - .find_field(field) - .ok_or_else(|| anyhow!("field {} not found", field))? - .1 - .value_type - .typ; - } - let scope_builder = match field_typ { - ValueTypeBuilder::Table(table_type) => table_type.sub_scope.clone(), - _ => api_bail!("expect collection type"), - }; - - let new_scope = DataScopeRef(Arc::new(DataScopeRefInfo { - scope_name: format!("_{}_{}", field_path.join("_"), num_parent_layers), - parent: Some((self.clone(), field_path.clone())), - scope_builder, - children: Mutex::new(HashMap::new()), - })); - Ok(new_scope) - } - - fn is_ds_scope_descendant(&self, other: &Self) -> bool { - if Arc::ptr_eq(&self.0, &other.0) { - return true; - } - match &self.parent { - Some((parent, _)) => parent.is_ds_scope_descendant(other), - None => false, - } - } -} - #[pyclass] #[derive(Debug, Clone)] pub struct DataType { @@ -179,7 +88,7 @@ impl DataType { #[pyclass] #[derive(Debug, Clone)] pub struct DataSlice { - scope: DataScopeRef, + scope: Arc, value: Arc, data_type: DataType, } @@ -243,14 +152,22 @@ impl DataSlice { })) } - pub fn table_row_scope(&self) -> PyResult { + pub fn table_row_scope(&self) -> PyResult { let field_path = match self.value.as_ref() { spec::ValueMapping::Field(v) => &v.field_path, _ => return Err(PyException::new_err("expect field path")), }; - self.scope - .get_child_scope(field_path.clone()) - .into_py_result() + let num_parent_layers = self.scope.ancestors().count(); + let scope_name = format!( + "{}_{}", + field_path.last().map_or("", |s| s.as_str()), + num_parent_layers + ); + let (_, sub_op_scope) = self + .scope + .new_foreach_op_scope(scope_name, field_path) + .into_py_result()?; + Ok(OpScopeRef(sub_op_scope)) } } @@ -259,10 +176,7 @@ impl DataSlice { match self.value.as_ref() { spec::ValueMapping::Field(v) => spec::ValueMapping::Field(spec::FieldMapping { field_path: v.field_path.clone(), - scope: v - .scope - .clone() - .or_else(|| Some(self.scope.scope_name.clone())), + scope: v.scope.clone().or_else(|| Some(self.scope.name.clone())), }), v => v.clone(), } @@ -283,7 +197,7 @@ impl std::fmt::Display for DataSlice { #[pyclass] pub struct DataCollector { name: String, - scope: DataScopeRef, + scope: Arc, collector: Mutex>, } @@ -319,9 +233,7 @@ pub struct FlowBuilder { flow_inst_context: Arc, existing_flow_ss: Option>, - root_data_scope: Arc>, - root_data_scope_ref: DataScopeRef, - + root_op_scope: Arc, flow_instance_name: String, reactive_ops: Vec>, @@ -348,20 +260,17 @@ impl FlowBuilder { .flows .get(name) .cloned(); - let root_data_scope = Arc::new(Mutex::new(DataScopeBuilder::new())); + let root_op_scope = OpScope::new( + spec::ROOT_SCOPE_NAME.to_string(), + None, + Arc::new(Mutex::new(DataScopeBuilder::new())), + ); let flow_inst_context = build_flow_instance_context(name, None); let result = Self { lib_context, flow_inst_context, existing_flow_ss, - - root_data_scope_ref: DataScopeRef(Arc::new(DataScopeRefInfo { - scope_name: spec::ROOT_SCOPE_NAME.to_string(), - parent: None, - scope_builder: root_data_scope.clone(), - children: Mutex::new(HashMap::new()), - })), - root_data_scope, + root_op_scope, flow_instance_name: name.to_string(), reactive_ops: vec![], @@ -379,8 +288,8 @@ impl FlowBuilder { Ok(result) } - pub fn root_scope(&self) -> DataScopeRef { - self.root_data_scope_ref.clone() + pub fn root_scope(&self) -> OpScopeRef { + OpScopeRef(self.root_op_scope.clone()) } #[pyo3(signature = (kind, op_spec, target_scope, name, refresh_options=None))] @@ -388,12 +297,12 @@ impl FlowBuilder { &mut self, kind: String, op_spec: py::Pythonized>, - target_scope: Option, + target_scope: Option, name: String, refresh_options: Option>, ) -> PyResult { if let Some(target_scope) = target_scope { - if !Arc::ptr_eq(&target_scope.0, &self.root_data_scope_ref.0) { + if *target_scope != self.root_op_scope { return Err(PyException::new_err( "source can only be added to the root scope", )); @@ -413,16 +322,12 @@ impl FlowBuilder { registry: &crate::ops::executor_factory_registry(), flow_ctx: &self.flow_inst_context, }; - let mut root_data_scope = self.root_data_scope.lock().unwrap(); - let analyzed = analyzer_ctx - .analyze_import_op(&mut root_data_scope, import_op.clone(), None, None) + .analyze_import_op(&self.root_op_scope, import_op.clone(), None, None) .into_py_result()?; std::mem::drop(analyzed); - let result = - Self::last_field_to_data_slice(&root_data_scope, self.root_data_scope_ref.clone()) - .into_py_result()?; + let result = Self::last_field_to_data_slice(&self.root_op_scope).into_py_result()?; self.import_ops.push(import_op); Ok(result) } @@ -435,7 +340,7 @@ impl FlowBuilder { let schema = value_type.into_inner(); let value = py::value_from_py_object(&schema.typ, &value)?; let slice = DataSlice { - scope: self.root_data_scope_ref.clone(), + scope: self.root_op_scope.clone().into(), value: Arc::new(spec::ValueMapping::Constant(spec::ConstantMapping { schema: schema.clone(), value: serde_json::to_value(value).into_py_result()?, @@ -450,22 +355,21 @@ impl FlowBuilder { name: String, value_type: py::Pythonized, ) -> PyResult { - let mut root_data_scope = self.root_data_scope.lock().unwrap(); - root_data_scope - .add_field(name.clone(), &value_type) - .into_py_result()?; - let result = - Self::last_field_to_data_slice(&root_data_scope, self.root_data_scope_ref.clone()) + let value_type = value_type.into_inner(); + { + let mut root_data_scope = self.root_op_scope.data.lock().unwrap(); + root_data_scope + .add_field(name.clone(), &value_type) .into_py_result()?; - self.direct_input_fields.push(FieldSchema { - name, - value_type: value_type.into_inner(), - }); + } + let result = Self::last_field_to_data_slice(&self.root_op_scope).into_py_result()?; + self.direct_input_fields + .push(FieldSchema { name, value_type }); Ok(result) } pub fn set_direct_output(&mut self, data_slice: DataSlice) -> PyResult<()> { - if !Arc::ptr_eq(&data_slice.scope.0, &self.root_data_scope_ref.0) { + if data_slice.scope != self.root_op_scope { return Err(PyException::new_err( "direct output must be value in the root scope", )); @@ -480,44 +384,46 @@ impl FlowBuilder { kind: String, op_spec: py::Pythonized>, args: Vec<(DataSlice, Option)>, - target_scope: Option, + target_scope: Option, name: String, ) -> PyResult { let spec = spec::OpSpec { kind, spec: op_spec.into_inner(), }; - let common_scope = - Self::minimum_common_scope(args.iter().map(|(ds, _)| &ds.scope), target_scope.as_ref()) - .into_py_result()?; - self.do_in_scope( - common_scope, - |reactive_ops, scope, parent_scopes, analyzer_ctx| { - let reactive_op = spec::NamedSpec { - name, - spec: spec::ReactiveOpSpec::Transform(spec::TransformOpSpec { - inputs: args - .iter() - .map(|(ds, arg_name)| spec::OpArgBinding { - arg_name: spec::OpArgName(arg_name.clone()), - value: ds.extract_value_mapping(), - }) - .collect(), - op: spec, - }), - }; - - let analyzed = - analyzer_ctx.analyze_reactive_op(scope, &reactive_op, parent_scopes)?; - std::mem::drop(analyzed); - - reactive_ops.push(reactive_op); - let result = Self::last_field_to_data_slice(scope.data, common_scope.clone()) - .into_py_result()?; - Ok(result) - }, + let op_scope = Self::minimum_common_scope( + args.iter().map(|(ds, _)| &ds.scope), + target_scope.as_ref().map(|s| &s.0), ) - .into_py_result() + .into_py_result()?; + + let reactive_op = spec::NamedSpec { + name, + spec: spec::ReactiveOpSpec::Transform(spec::TransformOpSpec { + inputs: args + .iter() + .map(|(ds, arg_name)| spec::OpArgBinding { + arg_name: spec::OpArgName(arg_name.clone()), + value: ds.extract_value_mapping(), + }) + .collect(), + op: spec, + }), + }; + + let analyzer_ctx = AnalyzerContext { + registry: &crate::ops::executor_factory_registry(), + flow_ctx: &self.flow_inst_context, + }; + let analyzed = analyzer_ctx + .analyze_reactive_op(op_scope, &reactive_op) + .into_py_result()?; + std::mem::drop(analyzed); + + self.get_mut_reactive_ops(op_scope).push(reactive_op); + + let result = Self::last_field_to_data_slice(op_scope).into_py_result()?; + Ok(result) } #[pyo3(signature = (collector, fields, auto_uuid_field=None))] @@ -531,36 +437,35 @@ impl FlowBuilder { .into_py_result()?; let name = format!(".collect.{}", self.next_generated_op_id); self.next_generated_op_id += 1; - self.do_in_scope( - common_scope, - |reactive_ops, scope, parent_scopes, analyzer_ctx| { - let reactive_op = spec::NamedSpec { - name, - spec: spec::ReactiveOpSpec::Collect(spec::CollectOpSpec { - input: spec::StructMapping { - fields: fields - .iter() - .map(|(name, ds)| NamedSpec { - name: name.clone(), - spec: ds.extract_value_mapping(), - }) - .collect(), - }, - scope_name: collector.scope.scope_name.clone(), - collector_name: collector.name.clone(), - auto_uuid_field: auto_uuid_field.clone(), - }), - }; - - let analyzed = - analyzer_ctx.analyze_reactive_op(scope, &reactive_op, parent_scopes)?; - std::mem::drop(analyzed); - - reactive_ops.push(reactive_op); - Ok(()) - }, - ) - .into_py_result()?; + + let reactive_op = spec::NamedSpec { + name, + spec: spec::ReactiveOpSpec::Collect(spec::CollectOpSpec { + input: spec::StructMapping { + fields: fields + .iter() + .map(|(name, ds)| NamedSpec { + name: name.clone(), + spec: ds.extract_value_mapping(), + }) + .collect(), + }, + scope_name: collector.scope.name.clone(), + collector_name: collector.name.clone(), + auto_uuid_field: auto_uuid_field.clone(), + }), + }; + + let analyzer_ctx = AnalyzerContext { + registry: &crate::ops::executor_factory_registry(), + flow_ctx: &self.flow_inst_context, + }; + let analyzed = analyzer_ctx + .analyze_reactive_op(common_scope, &reactive_op) + .into_py_result()?; + std::mem::drop(analyzed); + + self.get_mut_reactive_ops(common_scope).push(reactive_op); let collector_schema = CollectorSchema::from_fields( fields @@ -599,7 +504,7 @@ impl FlowBuilder { spec: op_spec.into_inner(), }; - if !Arc::ptr_eq(&input.scope.0, &self.root_data_scope_ref.0) { + if input.scope != self.root_op_scope { return Err(PyException::new_err( "Export can only work on collectors belonging to the root scope.", )); @@ -621,13 +526,9 @@ impl FlowBuilder { Ok(()) } - pub fn scope_field( - &self, - scope: DataScopeRef, - field_name: &str, - ) -> PyResult> { + pub fn scope_field(&self, scope: OpScopeRef, field_name: &str) -> PyResult> { let field_type = { - let scope_builder = scope.scope_builder.lock().unwrap(); + let scope_builder = scope.0.data.lock().unwrap(); let (_, field_schema) = scope_builder .data .find_field(field_name) @@ -636,7 +537,7 @@ impl FlowBuilder { .into_py_result()? }; Ok(Some(DataSlice { - scope, + scope: scope.0, value: Arc::new(spec::ValueMapping::Field(spec::FieldMapping { scope: None, field_path: spec::FieldPath(vec![field_name.to_string()]), @@ -766,13 +667,11 @@ impl std::fmt::Display for FlowBuilder { } impl FlowBuilder { - fn last_field_to_data_slice( - data_builder: &DataScopeBuilder, - scope: DataScopeRef, - ) -> Result { - let last_field = data_builder.last_field().unwrap(); + fn last_field_to_data_slice(op_scope: &Arc) -> Result { + let data_scope = op_scope.data.lock().unwrap(); + let last_field = data_scope.last_field().unwrap(); let result = DataSlice { - scope, + scope: op_scope.clone(), value: Arc::new(spec::ValueMapping::Field(spec::FieldMapping { scope: None, field_path: spec::FieldPath(vec![last_field.name.clone()]), @@ -783,17 +682,17 @@ impl FlowBuilder { } fn minimum_common_scope<'a>( - scopes: impl Iterator, - target_scope: Option<&'a DataScopeRef>, - ) -> Result<&'a DataScopeRef> { + scopes: impl Iterator>, + target_scope: Option<&'a Arc>, + ) -> Result<&'a Arc> { let mut scope_iter = scopes; let mut common_scope = scope_iter .next() .ok_or_else(|| PyException::new_err("expect at least one input"))?; for scope in scope_iter { - if scope.is_ds_scope_descendant(common_scope) { + if scope.is_op_scope_descendant(common_scope) { common_scope = scope; - } else if !common_scope.is_ds_scope_descendant(scope) { + } else if !common_scope.is_op_scope_descendant(scope) { api_bail!( "expect all arguments share the common scope, got {} and {} exclusive to each other", common_scope, scope @@ -801,7 +700,7 @@ impl FlowBuilder { } } if let Some(target_scope) = target_scope { - if !target_scope.is_ds_scope_descendant(common_scope) { + if !target_scope.is_op_scope_descendant(common_scope) { api_bail!( "the field can only be attached to a scope or sub-scope of the input value. Target scope: {}, input scope: {}", target_scope, common_scope @@ -812,114 +711,57 @@ impl FlowBuilder { Ok(common_scope) } - fn do_in_scope( - &mut self, - data_slice_scope: &DataScopeRef, - f: impl FnOnce( - &mut Vec>, - &mut ExecutionScope<'_>, - RefList<'_, &'_ ExecutionScope<'_>>, - &AnalyzerContext<'_>, - ) -> Result, - ) -> Result { - let mut data_slice_scopes = Vec::new(); - let mut next_ds_scope = data_slice_scope; - while let Some((parent, _)) = &next_ds_scope.parent { - data_slice_scopes.push(next_ds_scope); - next_ds_scope = parent; - } - - Self::do_in_sub_scope( - &mut ExecutionScope { - name: spec::ROOT_SCOPE_NAME, - data: &mut self.root_data_scope.lock().unwrap(), - }, - RefList::Nil, - &data_slice_scopes, + fn get_mut_reactive_ops<'a>( + &'a mut self, + op_scope: &OpScope, + ) -> &'a mut Vec> { + Self::get_mut_reactive_ops_internal( + op_scope, &mut self.reactive_ops, &mut self.next_generated_op_id, - &AnalyzerContext { - registry: &crate::ops::executor_factory_registry(), - flow_ctx: &self.flow_inst_context, - }, - f, ) } - fn do_in_sub_scope( - scope: &mut ExecutionScope<'_>, - parent_scopes: RefList<'_, &'_ ExecutionScope<'_>>, - data_slice_scopes: &[&DataScopeRef], - reactive_ops: &mut Vec>, + fn get_mut_reactive_ops_internal<'a>( + op_scope: &OpScope, + root_reactive_ops: &'a mut Vec>, next_generated_op_id: &mut usize, - analyzer_ctx: &AnalyzerContext<'_>, - f: impl FnOnce( - &mut Vec>, - &mut ExecutionScope<'_>, - RefList<'_, &'_ ExecutionScope<'_>>, - &AnalyzerContext<'_>, - ) -> Result, - ) -> Result { - let curr_ds_scope = if let Some(&ds_scope) = data_slice_scopes.last() { - ds_scope - } else { - return f(reactive_ops, scope, parent_scopes, analyzer_ctx); - }; - let field_path = if let Some((_, field_path)) = &curr_ds_scope.parent { - field_path - } else { - bail!("expect sub scope, got root") - }; - - // Reuse the last foreach if matched, otherwise create a new one. - let reactive_ops = match reactive_ops.last_mut() { - Some(spec::NamedSpec { - spec: spec::ReactiveOpSpec::ForEach(foreach_spec), - .. - }) if &foreach_spec.field_path == field_path - && foreach_spec.op_scope.name == curr_ds_scope.scope_name => - { - &mut foreach_spec.op_scope.ops - } - _ => { - reactive_ops.push(spec::NamedSpec { - name: format!(".foreach.{}", next_generated_op_id), - spec: spec::ReactiveOpSpec::ForEach(spec::ForEachOpSpec { - field_path: field_path.clone(), - op_scope: spec::ReactiveOpScope { - name: curr_ds_scope.scope_name.clone(), - ops: vec![], - }, - }), - }); - *next_generated_op_id += 1; - match &mut reactive_ops.last_mut().unwrap().spec { + ) -> &'a mut Vec> { + match &op_scope.parent { + None => root_reactive_ops, + Some((parent_op_scope, field_path)) => { + let parent_reactive_ops = Self::get_mut_reactive_ops_internal( + parent_op_scope, + root_reactive_ops, + next_generated_op_id, + ); + // Reuse the last foreach if matched, otherwise create a new one. + match parent_reactive_ops.last() { + Some(spec::NamedSpec { + spec: spec::ReactiveOpSpec::ForEach(foreach_spec), + .. + }) if &foreach_spec.field_path == field_path + && foreach_spec.op_scope.name == op_scope.name => {} + + _ => { + parent_reactive_ops.push(spec::NamedSpec { + name: format!(".foreach.{}", next_generated_op_id), + spec: spec::ReactiveOpSpec::ForEach(spec::ForEachOpSpec { + field_path: field_path.clone(), + op_scope: spec::ReactiveOpScope { + name: op_scope.name.clone(), + ops: vec![], + }, + }), + }); + *next_generated_op_id += 1; + } + } + match &mut parent_reactive_ops.last_mut().unwrap().spec { spec::ReactiveOpSpec::ForEach(foreach_spec) => &mut foreach_spec.op_scope.ops, _ => unreachable!(), } } - }; - - let (_, field_type) = scope.data.analyze_field_path(field_path)?; - let sub_scope = match &field_type.typ { - ValueTypeBuilder::Table(table_type) => &table_type.sub_scope, - t => api_bail!( - "expect table type, got {}", - TryInto::::try_into(t)? - ), - }; - let mut sub_scope = sub_scope.lock().unwrap(); - Self::do_in_sub_scope( - &mut ExecutionScope { - name: curr_ds_scope.scope_name.as_str(), - data: &mut sub_scope, - }, - parent_scopes.prepend(scope), - &data_slice_scopes[0..data_slice_scopes.len() - 1], - reactive_ops, - next_generated_op_id, - analyzer_ctx, - f, - ) + } } } diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 133bdd0b4..234c695b6 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -66,6 +66,7 @@ pub struct AnalyzedFunctionExecInfo { /// Fingerprinter of the function's behavior. pub fingerprinter: Fingerprinter, + /// To deserialize cached value. pub output_type: schema::ValueType, } @@ -124,6 +125,7 @@ pub enum AnalyzedReactiveOp { pub struct AnalyzedOpScope { pub reactive_ops: Vec, + pub collector_len: usize, } pub struct ExecutionPlan { diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs index 25bac21ed..3d3f721ac 100644 --- a/src/execution/dumper.rs +++ b/src/execution/dumper.rs @@ -61,7 +61,7 @@ struct SourceOutputData<'a> { struct Dumper<'a> { plan: &'a ExecutionPlan, - schema: &'a schema::DataSchema, + schema: &'a schema::FlowSchema, pool: &'a PgPool, options: EvaluateAndDumpOptions, } @@ -95,11 +95,7 @@ impl<'a> Dumper<'a> { return Ok(None); }; - *collected_values_buffer = data_builder - .collected_values - .into_iter() - .map(|v| v.into_inner().unwrap()) - .collect(); + *collected_values_buffer = data_builder.collected_values; let exports = self .plan .export_ops @@ -109,7 +105,9 @@ impl<'a> Dumper<'a> { let entry = ( export_op.name.as_str(), TargetExportData { - schema: &self.schema.collectors[collector_idx].spec.fields, + schema: &self.schema.root_op_scope.collectors[collector_idx] + .spec + .fields, data: collected_values_buffer[collector_idx] .iter() .map(|v| -> Result<_> { @@ -225,7 +223,7 @@ impl<'a> Dumper<'a> { pub async fn evaluate_and_dump( plan: &ExecutionPlan, - schema: &schema::DataSchema, + schema: &schema::FlowSchema, options: EvaluateAndDumpOptions, pool: &PgPool, ) -> Result<()> { diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index 5ec20b02b..dfba75713 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -17,8 +17,6 @@ use super::memoization::{evaluate_with_cell, EvaluationMemory, EvaluationMemoryO pub struct ScopeValueBuilder { // TODO: Share the same lock for values produced in the same execution scope, for stricter atomicity. pub fields: Vec>>, - - pub collected_values: Vec>>, } impl From<&ScopeValueBuilder> for value::ScopeValue { @@ -46,26 +44,17 @@ impl From for value::ScopeValue { } impl ScopeValueBuilder { - fn new(num_fields: usize, num_collectors: usize) -> Self { + fn new(num_fields: usize) -> Self { let mut fields = Vec::with_capacity(num_fields); fields.resize_with(num_fields, OnceLock::new); - - let mut collected_values = Vec::with_capacity(num_collectors); - collected_values.resize_with(num_collectors, Default::default); - Self { - fields, - collected_values, - } + Self { fields } } - fn augmented_from( - source: &value::ScopeValue, - schema: &schema::TableSchema, - ) -> Result { + fn augmented_from(source: &value::ScopeValue, schema: &schema::TableSchema) -> Result { let val_index_base = if schema.has_key() { 1 } else { 0 }; let len = schema.row.fields.len() - val_index_base; - let mut builder = Self::new(len, schema.collectors.len()); + let mut builder = Self::new(len); let value::ScopeValue(source_fields) = source; for ((v, t), r) in source_fields @@ -149,9 +138,27 @@ struct ScopeEntry<'a> { key: ScopeKey<'a>, value: &'a ScopeValueBuilder, schema: &'a schema::StructSchema, + collected_values: Vec>>, } -impl ScopeEntry<'_> { +impl<'a> ScopeEntry<'a> { + fn new( + key: ScopeKey<'a>, + value: &'a ScopeValueBuilder, + schema: &'a schema::StructSchema, + analyzed_op_scope: &AnalyzedOpScope, + ) -> Self { + let mut collected_values = Vec::with_capacity(analyzed_op_scope.collector_len); + collected_values.resize_with(analyzed_op_scope.collector_len, Default::default); + + Self { + key, + value, + schema, + collected_values, + } + } + fn get_local_field_schema<'b>( schema: &'b schema::StructSchema, indices: &[u32], @@ -351,11 +358,12 @@ async fn evaluate_op_scope( evaluate_child_op_scope( &op.op_scope, scoped_entries, - ScopeEntry { - key: ScopeKey::None, - value: item, - schema: &table_schema.row, - }, + ScopeEntry::new( + ScopeKey::None, + item, + &table_schema.row, + &op.op_scope, + ), memory, ) }) @@ -366,11 +374,12 @@ async fn evaluate_op_scope( evaluate_child_op_scope( &op.op_scope, scoped_entries, - ScopeEntry { - key: ScopeKey::MapKey(k), - value: v, - schema: &table_schema.row, - }, + ScopeEntry::new( + ScopeKey::MapKey(k), + v, + &table_schema.row, + &op.op_scope, + ), memory, ) }) @@ -382,11 +391,12 @@ async fn evaluate_op_scope( evaluate_child_op_scope( &op.op_scope, scoped_entries, - ScopeEntry { - key: ScopeKey::ListIndex(i), - value: item, - schema: &table_schema.row, - }, + ScopeEntry::new( + ScopeKey::ListIndex(i), + item, + &table_schema.row, + &op.op_scope, + ), memory, ) }) @@ -420,9 +430,9 @@ async fn evaluate_op_scope( }; let collector_entry = scoped_entries .headn(op.collector_ref.scope_up_level as usize) - .unwrap(); + .ok_or_else(|| anyhow::anyhow!("Collector level out of bound"))?; { - let mut collected_records = collector_entry.value.collected_values + let mut collected_records = collector_entry.collected_values [op.collector_ref.local.collector_idx as usize] .lock() .unwrap(); @@ -436,22 +446,28 @@ async fn evaluate_op_scope( Ok(()) } +#[derive(Debug)] +pub struct EvaluateSourceEntryOutput { + pub data_scope: ScopeValueBuilder, + pub collected_values: Vec>, +} + pub async fn evaluate_source_entry( plan: &ExecutionPlan, import_op: &AnalyzedImportOp, - schema: &schema::DataSchema, + schema: &schema::FlowSchema, key: &value::KeyValue, source_value: value::FieldValues, memory: &EvaluationMemory, -) -> Result { +) -> Result { let root_schema = &schema.schema; - let root_scope_value = - ScopeValueBuilder::new(root_schema.fields.len(), schema.collectors.len()); - let root_scope_entry = ScopeEntry { - key: ScopeKey::None, - value: &root_scope_value, - schema: root_schema, - }; + let root_scope_value = ScopeValueBuilder::new(root_schema.fields.len()); + let root_scope_entry = ScopeEntry::new( + ScopeKey::None, + &root_scope_value, + root_schema, + &plan.op_scope, + ); let table_schema = match &root_schema.fields[import_op.output.field_idx as usize] .value_type @@ -476,7 +492,15 @@ pub async fn evaluate_source_entry( memory, ) .await?; - Ok(root_scope_value) + let collected_values = root_scope_entry + .collected_values + .into_iter() + .map(|v| v.into_inner().unwrap()) + .collect::>(); + Ok(EvaluateSourceEntryOutput { + data_scope: root_scope_value, + collected_values, + }) } pub async fn evaluate_transient_flow( @@ -484,13 +508,13 @@ pub async fn evaluate_transient_flow( input_values: &Vec, ) -> Result { let root_schema = &flow.data_schema.schema; - let root_scope_value = - ScopeValueBuilder::new(root_schema.fields.len(), flow.data_schema.collectors.len()); - let root_scope_entry = ScopeEntry { - key: ScopeKey::None, - value: &root_scope_value, - schema: root_schema, - }; + let root_scope_value = ScopeValueBuilder::new(root_schema.fields.len()); + let root_scope_entry = ScopeEntry::new( + ScopeKey::None, + &root_scope_value, + root_schema, + &flow.execution_plan.op_scope, + ); if input_values.len() != flow.execution_plan.input_fields.len() { bail!( diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index 38a301e17..9a28a8ea8 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -6,7 +6,7 @@ use std::collections::{HashMap, HashSet}; use super::db_tracking::{self, read_source_tracking_info_for_processing, TrackedTargetKey}; use super::db_tracking_setup; -use super::evaluator::{evaluate_source_entry, ScopeValueBuilder}; +use super::evaluator::{evaluate_source_entry, EvaluateSourceEntryOutput}; use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoizationInfo}; use super::stats; @@ -131,7 +131,7 @@ struct TrackingInfoForTarget<'a> { #[derive(Debug)] struct PrecommitData<'a> { - scope_value: &'a ScopeValueBuilder, + evaluate_output: &'a EvaluateSourceEntryOutput, memoization_info: &'a StoredMemoizationInfo, } struct PrecommitMetadata { @@ -224,14 +224,12 @@ async fn precommit_source_tracking_info( let mut new_target_keys_info = db_tracking::TrackedTargetKeyForSource::default(); if let Some(data) = &data { for export_op in export_ops.iter() { - let collected_values = data.scope_value.collected_values - [export_op.input.collector_idx as usize] - .lock() - .unwrap(); let target_info = tracking_info_for_targets .entry(export_op.target_id) .or_default(); let mut keys_info = Vec::new(); + let collected_values = + &data.evaluate_output.collected_values[export_op.input.collector_idx as usize]; for value in collected_values.iter() { let primary_key = extract_primary_key(&export_op.primary_key_def, value)?; let primary_key_json = serde_json::to_value(&primary_key)?; @@ -441,11 +439,11 @@ async fn commit_source_tracking_info( pub async fn evaluate_source_entry_with_memory( plan: &ExecutionPlan, import_op: &AnalyzedImportOp, - schema: &schema::DataSchema, + schema: &schema::FlowSchema, key: &value::KeyValue, options: EvaluationMemoryOptions, pool: &PgPool, -) -> Result> { +) -> Result> { let stored_info = if options.enable_cache || !options.evaluation_only { let source_key_json = serde_json::to_value(key)?; let existing_tracking_info = read_source_tracking_info_for_processing( @@ -473,7 +471,7 @@ pub async fn evaluate_source_entry_with_memory( pub async fn update_source_row( plan: &ExecutionPlan, import_op: &AnalyzedImportOp, - schema: &schema::DataSchema, + schema: &schema::FlowSchema, key: &value::KeyValue, source_value: Option, source_version: &SourceVersion, @@ -539,7 +537,7 @@ pub async fn update_source_row( source_version, plan.logic_fingerprint, output.as_ref().map(|scope_value| PrecommitData { - scope_value, + evaluate_output: scope_value, memoization_info: &stored_mem_info, }), &process_timestamp, diff --git a/src/py/mod.rs b/src/py/mod.rs index 3e21262ff..3580eb303 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -382,7 +382,7 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/service/flows.rs b/src/service/flows.rs index 84d21029e..b9040a270 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -1,7 +1,7 @@ use crate::prelude::*; use crate::lib_context::LibContext; -use crate::{base::schema::DataSchema, ops::interface::SourceExecutorListOptions}; +use crate::{base::schema::FlowSchema, ops::interface::SourceExecutorListOptions}; use crate::{ execution::memoization, execution::{row_indexer, stats}, @@ -32,7 +32,7 @@ pub async fn get_flow_spec( pub async fn get_flow_schema( Path(flow_name): Path, State(lib_context): State>, -) -> Result, ApiError> { +) -> Result, ApiError> { let flow_ctx = lib_context.get_flow_context(&flow_name)?; Ok(Json(flow_ctx.flow.data_schema.clone())) } @@ -110,7 +110,7 @@ pub struct EvaluateDataParams { #[derive(Serialize)] pub struct EvaluateDataResponse { - schema: DataSchema, + schema: FlowSchema, data: value::ScopeValue, } @@ -146,7 +146,7 @@ pub async fn evaluate_data( .ok_or_else(|| api_error!("field {} does not have a key", query.field))?; let key = value::KeyValue::from_strs(query.key, &key_field.value_type.typ)?; - let value_builder = row_indexer::evaluate_source_entry_with_memory( + let evaluate_output = row_indexer::evaluate_source_entry_with_memory( &plan, import_op, schema, @@ -162,7 +162,7 @@ pub async fn evaluate_data( Ok(Json(EvaluateDataResponse { schema: schema.clone(), - data: value_builder.into(), + data: evaluate_output.data_scope.into(), })) }