From b38f064e624e72e8748b44cbe97667677e30f4c4 Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Mon, 25 Aug 2025 19:01:53 -0700 Subject: [PATCH] refactor: use separate maps for source/fn/target factories --- src/builder/analyzer.rs | 55 +++++++++++-------------------- src/builder/exec_ctx.rs | 11 ++----- src/ops/registration.rs | 42 +++++++++++++++++++++--- src/ops/registry.rs | 73 ++++++++++++++++++++++++++++++++++------- src/setup/driver.rs | 8 ++--- 5 files changed, 122 insertions(+), 67 deletions(-) diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 6c597f65a..517bdab86 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -1,5 +1,5 @@ use crate::builder::exec_ctx::AnalyzedSetupState; -use crate::ops::get_executor_factory; +use crate::ops::{get_function_factory, get_source_factory, get_target_factory}; use crate::prelude::*; use super::plan::*; @@ -654,15 +654,7 @@ impl AnalyzerContext { op_scope: &Arc, import_op: NamedSpec, ) -> Result> + Send + use<>> { - let source_factory = match get_executor_factory(&import_op.spec.source.kind)? { - ExecutorFactory::Source(source_executor) => source_executor, - _ => { - return Err(anyhow::anyhow!( - "`{}` is not a source op", - import_op.spec.source.kind - )); - } - }; + let source_factory = get_source_factory(&import_op.spec.source.kind)?; let (output_type, executor) = source_factory .build( serde_json::Value::Object(import_op.spec.source.spec), @@ -719,23 +711,22 @@ impl AnalyzerContext { })?; let spec = serde_json::Value::Object(op.op.spec.clone()); - match get_executor_factory(&op.op.kind)? { - ExecutorFactory::SimpleFunction(fn_executor) => { - let input_value_mappings = input_field_schemas - .iter() - .map(|field| field.analyzed_value.clone()) - .collect(); - let (output_enriched_type, executor) = fn_executor - .build(spec, input_field_schemas, self.flow_ctx.clone()) - .await?; - let logic_fingerprinter = Fingerprinter::default() - .with(&op.op)? - .with(&output_enriched_type.without_attrs())?; - let output_type = output_enriched_type.typ.clone(); - let output = op_scope - .add_op_output(reactive_op.name.clone(), output_enriched_type)?; - let op_name = reactive_op.name.clone(); - async move { + let fn_executor = get_function_factory(&op.op.kind)?; + let input_value_mappings = input_field_schemas + .iter() + .map(|field| field.analyzed_value.clone()) + .collect(); + let (output_enriched_type, executor) = fn_executor + .build(spec, input_field_schemas, self.flow_ctx.clone()) + .await?; + let logic_fingerprinter = Fingerprinter::default() + .with(&op.op)? + .with(&output_enriched_type.without_attrs())?; + let output_type = output_enriched_type.typ.clone(); + let output = + op_scope.add_op_output(reactive_op.name.clone(), output_enriched_type)?; + let op_name = reactive_op.name.clone(); + async move { trace!("Start building executor for transform op `{op_name}`"); let executor = executor.await.with_context(|| { format!("Failed to build executor for transform op: {op_name}") @@ -764,11 +755,8 @@ impl AnalyzerContext { executor, output, })) - } - .boxed() - } - _ => api_bail!("`{}` is not a function op", op.op.kind), } + .boxed() } ReactiveOpSpec::ForEach(foreach_op) => { @@ -1068,10 +1056,7 @@ pub async fn analyze_flow( let mut declarations_analyzed_ss = Vec::with_capacity(flow_inst.declarations.len()); for (target_kind, op_ids) in target_op_group.into_iter() { - let target_factory = match get_executor_factory(&target_kind)? { - ExecutorFactory::ExportTarget(export_executor) => export_executor, - _ => api_bail!("`{}` is not a export target op", target_kind), - }; + let target_factory = get_target_factory(&target_kind)?; let analyzed_target_op_group = AnalyzedExportTargetOpGroup { target_factory, op_idx: op_ids.export_op_ids, diff --git a/src/builder/exec_ctx.rs b/src/builder/exec_ctx.rs index c47a058fb..1e6dda603 100644 --- a/src/builder/exec_ctx.rs +++ b/src/builder/exec_ctx.rs @@ -1,7 +1,7 @@ use crate::prelude::*; use crate::execution::db_tracking_setup; -use crate::ops::get_executor_factory; +use crate::ops::get_target_factory; use crate::ops::interface::SetupStateCompatibility; pub struct ImportOpExecutionContext { @@ -80,14 +80,7 @@ fn build_target_id( metadata: &mut setup::FlowSetupMetadata, target_states: &mut IndexMap, ) -> Result { - let interface::ExecutorFactory::ExportTarget(target_factory) = - get_executor_factory(&analyzed_target_ss.target_kind)? - else { - api_bail!( - "`{}` is not a export target op", - analyzed_target_ss.target_kind - ) - }; + let target_factory = get_target_factory(&analyzed_target_ss.target_kind)?; let resource_id = setup::ResourceIdentifier { key: analyzed_target_ss.setup_key.clone(), diff --git a/src/ops/registration.rs b/src/ops/registration.rs index b000ffa2f..24c17c90d 100644 --- a/src/ops/registration.rs +++ b/src/ops/registration.rs @@ -33,14 +33,46 @@ static EXECUTOR_FACTORY_REGISTRY: LazyLock> = La RwLock::new(registry) }); -pub fn get_optional_executor_factory(kind: &str) -> Option { +pub fn get_optional_source_factory( + kind: &str, +) -> Option> { let registry = EXECUTOR_FACTORY_REGISTRY.read().unwrap(); - registry.get(kind).cloned() + registry.get_source(kind).cloned() } -pub fn get_executor_factory(kind: &str) -> Result { - get_optional_executor_factory(kind) - .ok_or_else(|| anyhow::anyhow!("Executor factory not found for op kind: {}", kind)) +pub fn get_optional_function_factory( + kind: &str, +) -> Option> { + let registry = EXECUTOR_FACTORY_REGISTRY.read().unwrap(); + registry.get_function(kind).cloned() +} + +pub fn get_optional_target_factory( + kind: &str, +) -> Option> { + let registry = EXECUTOR_FACTORY_REGISTRY.read().unwrap(); + registry.get_target(kind).cloned() +} + +pub fn get_source_factory( + kind: &str, +) -> Result> { + get_optional_source_factory(kind) + .ok_or_else(|| anyhow::anyhow!("Source factory not found for op kind: {}", kind)) +} + +pub fn get_function_factory( + kind: &str, +) -> Result> { + get_optional_function_factory(kind) + .ok_or_else(|| anyhow::anyhow!("Function factory not found for op kind: {}", kind)) +} + +pub fn get_target_factory( + kind: &str, +) -> Result> { + get_optional_target_factory(kind) + .ok_or_else(|| anyhow::anyhow!("Target factory not found for op kind: {}", kind)) } pub fn register_factory(name: String, factory: ExecutorFactory) -> Result<()> { diff --git a/src/ops/registry.rs b/src/ops/registry.rs index 7e7842b11..52417480d 100644 --- a/src/ops/registry.rs +++ b/src/ops/registry.rs @@ -1,9 +1,13 @@ use super::interface::ExecutorFactory; use anyhow::Result; use std::collections::HashMap; +use std::sync::Arc; pub struct ExecutorFactoryRegistry { - factories: HashMap, + source_factories: HashMap>, + function_factories: + HashMap>, + target_factories: HashMap>, } impl Default for ExecutorFactoryRegistry { @@ -15,24 +19,69 @@ impl Default for ExecutorFactoryRegistry { impl ExecutorFactoryRegistry { pub fn new() -> Self { Self { - factories: HashMap::new(), + source_factories: HashMap::new(), + function_factories: HashMap::new(), + target_factories: HashMap::new(), } } pub fn register(&mut self, name: String, factory: ExecutorFactory) -> Result<()> { - match self.factories.entry(name) { - std::collections::hash_map::Entry::Occupied(entry) => Err(anyhow::anyhow!( - "Factory with name already exists: {}", - entry.key() - )), - std::collections::hash_map::Entry::Vacant(entry) => { - entry.insert(factory); - Ok(()) + match factory { + ExecutorFactory::Source(source_factory) => match self.source_factories.entry(name) { + std::collections::hash_map::Entry::Occupied(entry) => Err(anyhow::anyhow!( + "Source factory with name already exists: {}", + entry.key() + )), + std::collections::hash_map::Entry::Vacant(entry) => { + entry.insert(source_factory); + Ok(()) + } + }, + ExecutorFactory::SimpleFunction(function_factory) => { + match self.function_factories.entry(name) { + std::collections::hash_map::Entry::Occupied(entry) => Err(anyhow::anyhow!( + "Function factory with name already exists: {}", + entry.key() + )), + std::collections::hash_map::Entry::Vacant(entry) => { + entry.insert(function_factory); + Ok(()) + } + } + } + ExecutorFactory::ExportTarget(target_factory) => { + match self.target_factories.entry(name) { + std::collections::hash_map::Entry::Occupied(entry) => Err(anyhow::anyhow!( + "Target factory with name already exists: {}", + entry.key() + )), + std::collections::hash_map::Entry::Vacant(entry) => { + entry.insert(target_factory); + Ok(()) + } + } } } } - pub fn get(&self, name: &str) -> Option<&ExecutorFactory> { - self.factories.get(name) + pub fn get_source( + &self, + name: &str, + ) -> Option<&Arc> { + self.source_factories.get(name) + } + + pub fn get_function( + &self, + name: &str, + ) -> Option<&Arc> { + self.function_factories.get(name) + } + + pub fn get_target( + &self, + name: &str, + ) -> Option<&Arc> { + self.target_factories.get(name) } } diff --git a/src/setup/driver.rs b/src/setup/driver.rs index 68ea06eea..6e4f520f1 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -1,7 +1,7 @@ use crate::{ lib_context::{FlowContext, FlowExecutionContext, LibSetupContext}, ops::{ - get_optional_executor_factory, + get_optional_target_factory, interface::{FlowInstanceContext, TargetFactory}, }, prelude::*, @@ -20,7 +20,6 @@ use super::{ StateChange, TargetSetupState, db_metadata, }; use crate::execution::db_tracking_setup; -use crate::ops::interface::ExecutorFactory; use std::fmt::Write; enum MetadataRecordType { @@ -81,10 +80,7 @@ fn from_metadata_record( } fn get_export_target_factory(target_type: &str) -> Option> { - match get_optional_executor_factory(target_type) { - Some(ExecutorFactory::ExportTarget(factory)) => Some(factory), - _ => None, - } + get_optional_target_factory(target_type) } pub async fn get_existing_setup_state(pool: &PgPool) -> Result> {