|
1 | 1 | use crate::builder::exec_ctx::AnalyzedSetupState; |
2 | | -use crate::ops::get_executor_factory; |
| 2 | +use crate::ops::{get_function_factory, get_source_factory, get_target_factory}; |
3 | 3 | use crate::prelude::*; |
4 | 4 |
|
5 | 5 | use super::plan::*; |
@@ -654,15 +654,7 @@ impl AnalyzerContext { |
654 | 654 | op_scope: &Arc<OpScope>, |
655 | 655 | import_op: NamedSpec<ImportOpSpec>, |
656 | 656 | ) -> Result<impl Future<Output = Result<AnalyzedImportOp>> + Send + use<>> { |
657 | | - let source_factory = match get_executor_factory(&import_op.spec.source.kind)? { |
658 | | - ExecutorFactory::Source(source_executor) => source_executor, |
659 | | - _ => { |
660 | | - return Err(anyhow::anyhow!( |
661 | | - "`{}` is not a source op", |
662 | | - import_op.spec.source.kind |
663 | | - )); |
664 | | - } |
665 | | - }; |
| 657 | + let source_factory = get_source_factory(&import_op.spec.source.kind)?; |
666 | 658 | let (output_type, executor) = source_factory |
667 | 659 | .build( |
668 | 660 | serde_json::Value::Object(import_op.spec.source.spec), |
@@ -719,23 +711,22 @@ impl AnalyzerContext { |
719 | 711 | })?; |
720 | 712 | let spec = serde_json::Value::Object(op.op.spec.clone()); |
721 | 713 |
|
722 | | - match get_executor_factory(&op.op.kind)? { |
723 | | - ExecutorFactory::SimpleFunction(fn_executor) => { |
724 | | - let input_value_mappings = input_field_schemas |
725 | | - .iter() |
726 | | - .map(|field| field.analyzed_value.clone()) |
727 | | - .collect(); |
728 | | - let (output_enriched_type, executor) = fn_executor |
729 | | - .build(spec, input_field_schemas, self.flow_ctx.clone()) |
730 | | - .await?; |
731 | | - let logic_fingerprinter = Fingerprinter::default() |
732 | | - .with(&op.op)? |
733 | | - .with(&output_enriched_type.without_attrs())?; |
734 | | - let output_type = output_enriched_type.typ.clone(); |
735 | | - let output = op_scope |
736 | | - .add_op_output(reactive_op.name.clone(), output_enriched_type)?; |
737 | | - let op_name = reactive_op.name.clone(); |
738 | | - async move { |
| 714 | + let fn_executor = get_function_factory(&op.op.kind)?; |
| 715 | + let input_value_mappings = input_field_schemas |
| 716 | + .iter() |
| 717 | + .map(|field| field.analyzed_value.clone()) |
| 718 | + .collect(); |
| 719 | + let (output_enriched_type, executor) = fn_executor |
| 720 | + .build(spec, input_field_schemas, self.flow_ctx.clone()) |
| 721 | + .await?; |
| 722 | + let logic_fingerprinter = Fingerprinter::default() |
| 723 | + .with(&op.op)? |
| 724 | + .with(&output_enriched_type.without_attrs())?; |
| 725 | + let output_type = output_enriched_type.typ.clone(); |
| 726 | + let output = |
| 727 | + op_scope.add_op_output(reactive_op.name.clone(), output_enriched_type)?; |
| 728 | + let op_name = reactive_op.name.clone(); |
| 729 | + async move { |
739 | 730 | trace!("Start building executor for transform op `{op_name}`"); |
740 | 731 | let executor = executor.await.with_context(|| { |
741 | 732 | format!("Failed to build executor for transform op: {op_name}") |
@@ -764,11 +755,8 @@ impl AnalyzerContext { |
764 | 755 | executor, |
765 | 756 | output, |
766 | 757 | })) |
767 | | - } |
768 | | - .boxed() |
769 | | - } |
770 | | - _ => api_bail!("`{}` is not a function op", op.op.kind), |
771 | 758 | } |
| 759 | + .boxed() |
772 | 760 | } |
773 | 761 |
|
774 | 762 | ReactiveOpSpec::ForEach(foreach_op) => { |
@@ -1068,10 +1056,7 @@ pub async fn analyze_flow( |
1068 | 1056 | let mut declarations_analyzed_ss = Vec::with_capacity(flow_inst.declarations.len()); |
1069 | 1057 |
|
1070 | 1058 | for (target_kind, op_ids) in target_op_group.into_iter() { |
1071 | | - let target_factory = match get_executor_factory(&target_kind)? { |
1072 | | - ExecutorFactory::ExportTarget(export_executor) => export_executor, |
1073 | | - _ => api_bail!("`{}` is not a export target op", target_kind), |
1074 | | - }; |
| 1059 | + let target_factory = get_target_factory(&target_kind)?; |
1075 | 1060 | let analyzed_target_op_group = AnalyzedExportTargetOpGroup { |
1076 | 1061 | target_factory, |
1077 | 1062 | op_idx: op_ids.export_op_ids, |
|
0 commit comments