Skip to content

Commit f4b23f3

Browse files
committed
Merge branch 'main' into postgres_embedding
2 parents 29306d0 + e248f6b commit f4b23f3

File tree

5 files changed

+122
-67
lines changed

5 files changed

+122
-67
lines changed

src/builder/analyzer.rs

Lines changed: 20 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::builder::exec_ctx::AnalyzedSetupState;
2-
use crate::ops::get_executor_factory;
2+
use crate::ops::{get_function_factory, get_source_factory, get_target_factory};
33
use crate::prelude::*;
44

55
use super::plan::*;
@@ -654,15 +654,7 @@ impl AnalyzerContext {
654654
op_scope: &Arc<OpScope>,
655655
import_op: NamedSpec<ImportOpSpec>,
656656
) -> Result<impl Future<Output = Result<AnalyzedImportOp>> + Send + use<>> {
657-
let source_factory = match get_executor_factory(&import_op.spec.source.kind)? {
658-
ExecutorFactory::Source(source_executor) => source_executor,
659-
_ => {
660-
return Err(anyhow::anyhow!(
661-
"`{}` is not a source op",
662-
import_op.spec.source.kind
663-
));
664-
}
665-
};
657+
let source_factory = get_source_factory(&import_op.spec.source.kind)?;
666658
let (output_type, executor) = source_factory
667659
.build(
668660
serde_json::Value::Object(import_op.spec.source.spec),
@@ -719,23 +711,22 @@ impl AnalyzerContext {
719711
})?;
720712
let spec = serde_json::Value::Object(op.op.spec.clone());
721713

722-
match get_executor_factory(&op.op.kind)? {
723-
ExecutorFactory::SimpleFunction(fn_executor) => {
724-
let input_value_mappings = input_field_schemas
725-
.iter()
726-
.map(|field| field.analyzed_value.clone())
727-
.collect();
728-
let (output_enriched_type, executor) = fn_executor
729-
.build(spec, input_field_schemas, self.flow_ctx.clone())
730-
.await?;
731-
let logic_fingerprinter = Fingerprinter::default()
732-
.with(&op.op)?
733-
.with(&output_enriched_type.without_attrs())?;
734-
let output_type = output_enriched_type.typ.clone();
735-
let output = op_scope
736-
.add_op_output(reactive_op.name.clone(), output_enriched_type)?;
737-
let op_name = reactive_op.name.clone();
738-
async move {
714+
let fn_executor = get_function_factory(&op.op.kind)?;
715+
let input_value_mappings = input_field_schemas
716+
.iter()
717+
.map(|field| field.analyzed_value.clone())
718+
.collect();
719+
let (output_enriched_type, executor) = fn_executor
720+
.build(spec, input_field_schemas, self.flow_ctx.clone())
721+
.await?;
722+
let logic_fingerprinter = Fingerprinter::default()
723+
.with(&op.op)?
724+
.with(&output_enriched_type.without_attrs())?;
725+
let output_type = output_enriched_type.typ.clone();
726+
let output =
727+
op_scope.add_op_output(reactive_op.name.clone(), output_enriched_type)?;
728+
let op_name = reactive_op.name.clone();
729+
async move {
739730
trace!("Start building executor for transform op `{op_name}`");
740731
let executor = executor.await.with_context(|| {
741732
format!("Failed to build executor for transform op: {op_name}")
@@ -764,11 +755,8 @@ impl AnalyzerContext {
764755
executor,
765756
output,
766757
}))
767-
}
768-
.boxed()
769-
}
770-
_ => api_bail!("`{}` is not a function op", op.op.kind),
771758
}
759+
.boxed()
772760
}
773761

774762
ReactiveOpSpec::ForEach(foreach_op) => {
@@ -1068,10 +1056,7 @@ pub async fn analyze_flow(
10681056
let mut declarations_analyzed_ss = Vec::with_capacity(flow_inst.declarations.len());
10691057

10701058
for (target_kind, op_ids) in target_op_group.into_iter() {
1071-
let target_factory = match get_executor_factory(&target_kind)? {
1072-
ExecutorFactory::ExportTarget(export_executor) => export_executor,
1073-
_ => api_bail!("`{}` is not a export target op", target_kind),
1074-
};
1059+
let target_factory = get_target_factory(&target_kind)?;
10751060
let analyzed_target_op_group = AnalyzedExportTargetOpGroup {
10761061
target_factory,
10771062
op_idx: op_ids.export_op_ids,

src/builder/exec_ctx.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::prelude::*;
22

33
use crate::execution::db_tracking_setup;
4-
use crate::ops::get_executor_factory;
4+
use crate::ops::get_target_factory;
55
use crate::ops::interface::SetupStateCompatibility;
66

77
pub struct ImportOpExecutionContext {
@@ -80,14 +80,7 @@ fn build_target_id(
8080
metadata: &mut setup::FlowSetupMetadata,
8181
target_states: &mut IndexMap<setup::ResourceIdentifier, setup::TargetSetupState>,
8282
) -> Result<i32> {
83-
let interface::ExecutorFactory::ExportTarget(target_factory) =
84-
get_executor_factory(&analyzed_target_ss.target_kind)?
85-
else {
86-
api_bail!(
87-
"`{}` is not a export target op",
88-
analyzed_target_ss.target_kind
89-
)
90-
};
83+
let target_factory = get_target_factory(&analyzed_target_ss.target_kind)?;
9184

9285
let resource_id = setup::ResourceIdentifier {
9386
key: analyzed_target_ss.setup_key.clone(),

src/ops/registration.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,46 @@ static EXECUTOR_FACTORY_REGISTRY: LazyLock<RwLock<ExecutorFactoryRegistry>> = La
3434
RwLock::new(registry)
3535
});
3636

37-
pub fn get_optional_executor_factory(kind: &str) -> Option<ExecutorFactory> {
37+
pub fn get_optional_source_factory(
38+
kind: &str,
39+
) -> Option<std::sync::Arc<dyn super::interface::SourceFactory + Send + Sync>> {
3840
let registry = EXECUTOR_FACTORY_REGISTRY.read().unwrap();
39-
registry.get(kind).cloned()
41+
registry.get_source(kind).cloned()
4042
}
4143

42-
pub fn get_executor_factory(kind: &str) -> Result<ExecutorFactory> {
43-
get_optional_executor_factory(kind)
44-
.ok_or_else(|| anyhow::anyhow!("Executor factory not found for op kind: {}", kind))
44+
pub fn get_optional_function_factory(
45+
kind: &str,
46+
) -> Option<std::sync::Arc<dyn super::interface::SimpleFunctionFactory + Send + Sync>> {
47+
let registry = EXECUTOR_FACTORY_REGISTRY.read().unwrap();
48+
registry.get_function(kind).cloned()
49+
}
50+
51+
pub fn get_optional_target_factory(
52+
kind: &str,
53+
) -> Option<std::sync::Arc<dyn super::interface::TargetFactory + Send + Sync>> {
54+
let registry = EXECUTOR_FACTORY_REGISTRY.read().unwrap();
55+
registry.get_target(kind).cloned()
56+
}
57+
58+
pub fn get_source_factory(
59+
kind: &str,
60+
) -> Result<std::sync::Arc<dyn super::interface::SourceFactory + Send + Sync>> {
61+
get_optional_source_factory(kind)
62+
.ok_or_else(|| anyhow::anyhow!("Source factory not found for op kind: {}", kind))
63+
}
64+
65+
pub fn get_function_factory(
66+
kind: &str,
67+
) -> Result<std::sync::Arc<dyn super::interface::SimpleFunctionFactory + Send + Sync>> {
68+
get_optional_function_factory(kind)
69+
.ok_or_else(|| anyhow::anyhow!("Function factory not found for op kind: {}", kind))
70+
}
71+
72+
pub fn get_target_factory(
73+
kind: &str,
74+
) -> Result<std::sync::Arc<dyn super::interface::TargetFactory + Send + Sync>> {
75+
get_optional_target_factory(kind)
76+
.ok_or_else(|| anyhow::anyhow!("Target factory not found for op kind: {}", kind))
4577
}
4678

4779
pub fn register_factory(name: String, factory: ExecutorFactory) -> Result<()> {

src/ops/registry.rs

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
use super::interface::ExecutorFactory;
22
use anyhow::Result;
33
use std::collections::HashMap;
4+
use std::sync::Arc;
45

56
pub struct ExecutorFactoryRegistry {
6-
factories: HashMap<String, ExecutorFactory>,
7+
source_factories: HashMap<String, Arc<dyn super::interface::SourceFactory + Send + Sync>>,
8+
function_factories:
9+
HashMap<String, Arc<dyn super::interface::SimpleFunctionFactory + Send + Sync>>,
10+
target_factories: HashMap<String, Arc<dyn super::interface::TargetFactory + Send + Sync>>,
711
}
812

913
impl Default for ExecutorFactoryRegistry {
@@ -15,24 +19,69 @@ impl Default for ExecutorFactoryRegistry {
1519
impl ExecutorFactoryRegistry {
1620
pub fn new() -> Self {
1721
Self {
18-
factories: HashMap::new(),
22+
source_factories: HashMap::new(),
23+
function_factories: HashMap::new(),
24+
target_factories: HashMap::new(),
1925
}
2026
}
2127

2228
pub fn register(&mut self, name: String, factory: ExecutorFactory) -> Result<()> {
23-
match self.factories.entry(name) {
24-
std::collections::hash_map::Entry::Occupied(entry) => Err(anyhow::anyhow!(
25-
"Factory with name already exists: {}",
26-
entry.key()
27-
)),
28-
std::collections::hash_map::Entry::Vacant(entry) => {
29-
entry.insert(factory);
30-
Ok(())
29+
match factory {
30+
ExecutorFactory::Source(source_factory) => match self.source_factories.entry(name) {
31+
std::collections::hash_map::Entry::Occupied(entry) => Err(anyhow::anyhow!(
32+
"Source factory with name already exists: {}",
33+
entry.key()
34+
)),
35+
std::collections::hash_map::Entry::Vacant(entry) => {
36+
entry.insert(source_factory);
37+
Ok(())
38+
}
39+
},
40+
ExecutorFactory::SimpleFunction(function_factory) => {
41+
match self.function_factories.entry(name) {
42+
std::collections::hash_map::Entry::Occupied(entry) => Err(anyhow::anyhow!(
43+
"Function factory with name already exists: {}",
44+
entry.key()
45+
)),
46+
std::collections::hash_map::Entry::Vacant(entry) => {
47+
entry.insert(function_factory);
48+
Ok(())
49+
}
50+
}
51+
}
52+
ExecutorFactory::ExportTarget(target_factory) => {
53+
match self.target_factories.entry(name) {
54+
std::collections::hash_map::Entry::Occupied(entry) => Err(anyhow::anyhow!(
55+
"Target factory with name already exists: {}",
56+
entry.key()
57+
)),
58+
std::collections::hash_map::Entry::Vacant(entry) => {
59+
entry.insert(target_factory);
60+
Ok(())
61+
}
62+
}
3163
}
3264
}
3365
}
3466

35-
pub fn get(&self, name: &str) -> Option<&ExecutorFactory> {
36-
self.factories.get(name)
67+
pub fn get_source(
68+
&self,
69+
name: &str,
70+
) -> Option<&Arc<dyn super::interface::SourceFactory + Send + Sync>> {
71+
self.source_factories.get(name)
72+
}
73+
74+
pub fn get_function(
75+
&self,
76+
name: &str,
77+
) -> Option<&Arc<dyn super::interface::SimpleFunctionFactory + Send + Sync>> {
78+
self.function_factories.get(name)
79+
}
80+
81+
pub fn get_target(
82+
&self,
83+
name: &str,
84+
) -> Option<&Arc<dyn super::interface::TargetFactory + Send + Sync>> {
85+
self.target_factories.get(name)
3786
}
3887
}

src/setup/driver.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
22
lib_context::{FlowContext, FlowExecutionContext, LibSetupContext},
33
ops::{
4-
get_optional_executor_factory,
4+
get_optional_target_factory,
55
interface::{FlowInstanceContext, TargetFactory},
66
},
77
prelude::*,
@@ -20,7 +20,6 @@ use super::{
2020
StateChange, TargetSetupState, db_metadata,
2121
};
2222
use crate::execution::db_tracking_setup;
23-
use crate::ops::interface::ExecutorFactory;
2423
use std::fmt::Write;
2524

2625
enum MetadataRecordType {
@@ -81,10 +80,7 @@ fn from_metadata_record<S: DeserializeOwned + Debug + Clone>(
8180
}
8281

8382
fn get_export_target_factory(target_type: &str) -> Option<Arc<dyn TargetFactory + Send + Sync>> {
84-
match get_optional_executor_factory(target_type) {
85-
Some(ExecutorFactory::ExportTarget(factory)) => Some(factory),
86-
_ => None,
87-
}
83+
get_optional_target_factory(target_type)
8884
}
8985

9086
pub async fn get_existing_setup_state(pool: &PgPool) -> Result<AllSetupStates<ExistingMode>> {

0 commit comments

Comments
 (0)