Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 20 additions & 35 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::builder::exec_ctx::AnalyzedSetupState;
use crate::ops::get_executor_factory;
use crate::ops::{get_function_factory, get_source_factory, get_target_factory};
use crate::prelude::*;

use super::plan::*;
Expand Down Expand Up @@ -654,15 +654,7 @@ impl AnalyzerContext {
op_scope: &Arc<OpScope>,
import_op: NamedSpec<ImportOpSpec>,
) -> Result<impl Future<Output = Result<AnalyzedImportOp>> + Send + use<>> {
let source_factory = match get_executor_factory(&import_op.spec.source.kind)? {
ExecutorFactory::Source(source_executor) => source_executor,
_ => {
return Err(anyhow::anyhow!(
"`{}` is not a source op",
import_op.spec.source.kind
));
}
};
let source_factory = get_source_factory(&import_op.spec.source.kind)?;
let (output_type, executor) = source_factory
.build(
serde_json::Value::Object(import_op.spec.source.spec),
Expand Down Expand Up @@ -719,23 +711,22 @@ impl AnalyzerContext {
})?;
let spec = serde_json::Value::Object(op.op.spec.clone());

match get_executor_factory(&op.op.kind)? {
ExecutorFactory::SimpleFunction(fn_executor) => {
let input_value_mappings = input_field_schemas
.iter()
.map(|field| field.analyzed_value.clone())
.collect();
let (output_enriched_type, executor) = fn_executor
.build(spec, input_field_schemas, self.flow_ctx.clone())
.await?;
let logic_fingerprinter = Fingerprinter::default()
.with(&op.op)?
.with(&output_enriched_type.without_attrs())?;
let output_type = output_enriched_type.typ.clone();
let output = op_scope
.add_op_output(reactive_op.name.clone(), output_enriched_type)?;
let op_name = reactive_op.name.clone();
async move {
let fn_executor = get_function_factory(&op.op.kind)?;
let input_value_mappings = input_field_schemas
.iter()
.map(|field| field.analyzed_value.clone())
.collect();
let (output_enriched_type, executor) = fn_executor
.build(spec, input_field_schemas, self.flow_ctx.clone())
.await?;
let logic_fingerprinter = Fingerprinter::default()
.with(&op.op)?
.with(&output_enriched_type.without_attrs())?;
let output_type = output_enriched_type.typ.clone();
let output =
op_scope.add_op_output(reactive_op.name.clone(), output_enriched_type)?;
let op_name = reactive_op.name.clone();
async move {
trace!("Start building executor for transform op `{op_name}`");
let executor = executor.await.with_context(|| {
format!("Failed to build executor for transform op: {op_name}")
Expand Down Expand Up @@ -764,11 +755,8 @@ impl AnalyzerContext {
executor,
output,
}))
}
.boxed()
}
_ => api_bail!("`{}` is not a function op", op.op.kind),
}
.boxed()
}

ReactiveOpSpec::ForEach(foreach_op) => {
Expand Down Expand Up @@ -1068,10 +1056,7 @@ pub async fn analyze_flow(
let mut declarations_analyzed_ss = Vec::with_capacity(flow_inst.declarations.len());

for (target_kind, op_ids) in target_op_group.into_iter() {
let target_factory = match get_executor_factory(&target_kind)? {
ExecutorFactory::ExportTarget(export_executor) => export_executor,
_ => api_bail!("`{}` is not a export target op", target_kind),
};
let target_factory = get_target_factory(&target_kind)?;
let analyzed_target_op_group = AnalyzedExportTargetOpGroup {
target_factory,
op_idx: op_ids.export_op_ids,
Expand Down
11 changes: 2 additions & 9 deletions src/builder/exec_ctx.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::prelude::*;

use crate::execution::db_tracking_setup;
use crate::ops::get_executor_factory;
use crate::ops::get_target_factory;
use crate::ops::interface::SetupStateCompatibility;

pub struct ImportOpExecutionContext {
Expand Down Expand Up @@ -80,14 +80,7 @@ fn build_target_id(
metadata: &mut setup::FlowSetupMetadata,
target_states: &mut IndexMap<setup::ResourceIdentifier, setup::TargetSetupState>,
) -> Result<i32> {
let interface::ExecutorFactory::ExportTarget(target_factory) =
get_executor_factory(&analyzed_target_ss.target_kind)?
else {
api_bail!(
"`{}` is not a export target op",
analyzed_target_ss.target_kind
)
};
let target_factory = get_target_factory(&analyzed_target_ss.target_kind)?;

let resource_id = setup::ResourceIdentifier {
key: analyzed_target_ss.setup_key.clone(),
Expand Down
42 changes: 37 additions & 5 deletions src/ops/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,46 @@ static EXECUTOR_FACTORY_REGISTRY: LazyLock<RwLock<ExecutorFactoryRegistry>> = La
RwLock::new(registry)
});

pub fn get_optional_executor_factory(kind: &str) -> Option<ExecutorFactory> {
pub fn get_optional_source_factory(
kind: &str,
) -> Option<std::sync::Arc<dyn super::interface::SourceFactory + Send + Sync>> {
let registry = EXECUTOR_FACTORY_REGISTRY.read().unwrap();
registry.get(kind).cloned()
registry.get_source(kind).cloned()
}

pub fn get_executor_factory(kind: &str) -> Result<ExecutorFactory> {
get_optional_executor_factory(kind)
.ok_or_else(|| anyhow::anyhow!("Executor factory not found for op kind: {}", kind))
pub fn get_optional_function_factory(
kind: &str,
) -> Option<std::sync::Arc<dyn super::interface::SimpleFunctionFactory + Send + Sync>> {
let registry = EXECUTOR_FACTORY_REGISTRY.read().unwrap();
registry.get_function(kind).cloned()
}

pub fn get_optional_target_factory(
kind: &str,
) -> Option<std::sync::Arc<dyn super::interface::TargetFactory + Send + Sync>> {
let registry = EXECUTOR_FACTORY_REGISTRY.read().unwrap();
registry.get_target(kind).cloned()
}

pub fn get_source_factory(
kind: &str,
) -> Result<std::sync::Arc<dyn super::interface::SourceFactory + Send + Sync>> {
get_optional_source_factory(kind)
.ok_or_else(|| anyhow::anyhow!("Source factory not found for op kind: {}", kind))
}

pub fn get_function_factory(
kind: &str,
) -> Result<std::sync::Arc<dyn super::interface::SimpleFunctionFactory + Send + Sync>> {
get_optional_function_factory(kind)
.ok_or_else(|| anyhow::anyhow!("Function factory not found for op kind: {}", kind))
}

pub fn get_target_factory(
kind: &str,
) -> Result<std::sync::Arc<dyn super::interface::TargetFactory + Send + Sync>> {
get_optional_target_factory(kind)
.ok_or_else(|| anyhow::anyhow!("Target factory not found for op kind: {}", kind))
}

pub fn register_factory(name: String, factory: ExecutorFactory) -> Result<()> {
Expand Down
73 changes: 61 additions & 12 deletions src/ops/registry.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use super::interface::ExecutorFactory;
use anyhow::Result;
use std::collections::HashMap;
use std::sync::Arc;

pub struct ExecutorFactoryRegistry {
factories: HashMap<String, ExecutorFactory>,
source_factories: HashMap<String, Arc<dyn super::interface::SourceFactory + Send + Sync>>,
function_factories:
HashMap<String, Arc<dyn super::interface::SimpleFunctionFactory + Send + Sync>>,
target_factories: HashMap<String, Arc<dyn super::interface::TargetFactory + Send + Sync>>,
}

impl Default for ExecutorFactoryRegistry {
Expand All @@ -15,24 +19,69 @@ impl Default for ExecutorFactoryRegistry {
impl ExecutorFactoryRegistry {
pub fn new() -> Self {
Self {
factories: HashMap::new(),
source_factories: HashMap::new(),
function_factories: HashMap::new(),
target_factories: HashMap::new(),
}
}

pub fn register(&mut self, name: String, factory: ExecutorFactory) -> Result<()> {
match self.factories.entry(name) {
std::collections::hash_map::Entry::Occupied(entry) => Err(anyhow::anyhow!(
"Factory with name already exists: {}",
entry.key()
)),
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(factory);
Ok(())
match factory {
ExecutorFactory::Source(source_factory) => match self.source_factories.entry(name) {
std::collections::hash_map::Entry::Occupied(entry) => Err(anyhow::anyhow!(
"Source factory with name already exists: {}",
entry.key()
)),
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(source_factory);
Ok(())
}
},
ExecutorFactory::SimpleFunction(function_factory) => {
match self.function_factories.entry(name) {
std::collections::hash_map::Entry::Occupied(entry) => Err(anyhow::anyhow!(
"Function factory with name already exists: {}",
entry.key()
)),
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(function_factory);
Ok(())
}
}
}
ExecutorFactory::ExportTarget(target_factory) => {
match self.target_factories.entry(name) {
std::collections::hash_map::Entry::Occupied(entry) => Err(anyhow::anyhow!(
"Target factory with name already exists: {}",
entry.key()
)),
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(target_factory);
Ok(())
}
}
}
}
}

pub fn get(&self, name: &str) -> Option<&ExecutorFactory> {
self.factories.get(name)
pub fn get_source(
&self,
name: &str,
) -> Option<&Arc<dyn super::interface::SourceFactory + Send + Sync>> {
self.source_factories.get(name)
}

pub fn get_function(
&self,
name: &str,
) -> Option<&Arc<dyn super::interface::SimpleFunctionFactory + Send + Sync>> {
self.function_factories.get(name)
}

pub fn get_target(
&self,
name: &str,
) -> Option<&Arc<dyn super::interface::TargetFactory + Send + Sync>> {
self.target_factories.get(name)
}
}
8 changes: 2 additions & 6 deletions src/setup/driver.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
lib_context::{FlowContext, FlowExecutionContext, LibSetupContext},
ops::{
get_optional_executor_factory,
get_optional_target_factory,
interface::{FlowInstanceContext, TargetFactory},
},
prelude::*,
Expand All @@ -20,7 +20,6 @@ use super::{
StateChange, TargetSetupState, db_metadata,
};
use crate::execution::db_tracking_setup;
use crate::ops::interface::ExecutorFactory;
use std::fmt::Write;

enum MetadataRecordType {
Expand Down Expand Up @@ -81,10 +80,7 @@ fn from_metadata_record<S: DeserializeOwned + Debug + Clone>(
}

fn get_export_target_factory(target_type: &str) -> Option<Arc<dyn TargetFactory + Send + Sync>> {
match get_optional_executor_factory(target_type) {
Some(ExecutorFactory::ExportTarget(factory)) => Some(factory),
_ => None,
}
get_optional_target_factory(target_type)
}

pub async fn get_existing_setup_state(pool: &PgPool) -> Result<AllSetupStates<ExistingMode>> {
Expand Down
Loading