From 8acc1699aaa05bb1bdc2dc47f8b29db763c6ada8 Mon Sep 17 00:00:00 2001 From: LJ Date: Mon, 21 Apr 2025 23:13:09 -0700 Subject: [PATCH] feat(target-factory): support declaration in the framework --- python/cocoindex/flow.py | 6 ++++ python/cocoindex/op.py | 6 +++- src/base/spec.rs | 3 ++ src/builder/analyzer.rs | 55 +++++++++++++++++++++++-------------- src/builder/flow_builder.rs | 10 +++++++ 5 files changed, 58 insertions(+), 22 deletions(-) diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 292d61f6d..f65026239 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -290,6 +290,12 @@ def export(self, name: str, target_spec: op.StorageSpec, /, *, name, _spec_kind(target_spec), dump_engine_object(target_spec), dump_engine_object(index_options), self._engine_data_collector, setup_by_user) + def declare(self, spec: op.DeclarationSpec): + """ + Add a declaration to the flow. + """ + self._flow_builder_state.engine_flow_builder.declare(dump_engine_object(spec)) + _flow_name_builder = _NameBuilder() diff --git a/python/cocoindex/op.py b/python/cocoindex/op.py index f46f3cbd2..902590345 100644 --- a/python/cocoindex/op.py +++ b/python/cocoindex/op.py @@ -18,7 +18,7 @@ class OpCategory(Enum): FUNCTION = "function" SOURCE = "source" STORAGE = "storage" - + DECLARATION = "declaration" @dataclass_transform() class SpecMeta(type): """Meta class for spec classes.""" @@ -41,6 +41,10 @@ class FunctionSpec(metaclass=SpecMeta, category=OpCategory.FUNCTION): # pylint: class StorageSpec(metaclass=SpecMeta, category=OpCategory.STORAGE): # pylint: disable=too-few-public-methods """A storage spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)""" +class DeclarationSpec(metaclass=SpecMeta, category=OpCategory.DECLARATION): # pylint: disable=too-few-public-methods + """A declaration spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)""" + kind: str + class Executor(Protocol): """An executor for an operation.""" op_category: OpCategory diff --git a/src/base/spec.rs b/src/base/spec.rs index 570544bea..5b23d37f8 100644 --- a/src/base/spec.rs +++ b/src/base/spec.rs @@ -274,6 +274,9 @@ pub struct FlowInstanceSpec { #[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")] pub export_ops: Vec>, + + #[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")] + pub declarations: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index e21ea73ab..bc6bb25b3 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -930,6 +930,7 @@ impl AnalyzerContext<'_> { scope: &mut DataScopeBuilder, flow_inst: &FlowInstanceSpec, export_op_group: &AnalyzedExportTargetOpGroup, + declarations: Vec, flow_setup_state: &mut FlowSetupState, existing_target_states: &HashMap<&ResourceIdentifier, Vec<&TargetSetupState>>, ) -> Result> + Send>> { @@ -1007,7 +1008,7 @@ impl AnalyzerContext<'_> { } let (data_collections_output, _) = export_op_group.target_factory.clone().build( collection_specs, - vec![], + declarations, self.flow_ctx.clone(), )?; if data_collections_output.len() != data_fields_infos.len() { @@ -1162,37 +1163,49 @@ pub fn analyze_flow( RefList::Nil, )?; - let mut target_groups = IndexMap::::new(); + #[derive(Default)] + struct TargetOpGroup { + export_op_ids: Vec, + declarations: Vec, + } + let mut target_op_group = IndexMap::::new(); for (idx, export_op) in flow_inst.export_ops.iter().enumerate() { - let target_kind = export_op.spec.target.kind.clone(); + target_op_group + .entry(export_op.spec.target.kind.clone()) + .or_default() + .export_op_ids + .push(idx); + } + for declaration in flow_inst.declarations.iter() { + target_op_group + .entry(declaration.kind.clone()) + .or_default() + .declarations + .push(serde_json::Value::Object(declaration.spec.clone())); + } + + let mut export_ops_futs = vec![]; + let mut analyzed_target_op_groups = vec![]; + for (target_kind, op_ids) in target_op_group.into_iter() { let export_factory = match registry.get(&target_kind) { Some(ExecutorFactory::ExportTarget(export_executor)) => export_executor, _ => { - return Err(anyhow::anyhow!( - "Export target kind not found: {}", - export_op.spec.target.kind - )) + bail!("Export target kind not found: {target_kind}"); } }; - target_groups - .entry(target_kind) - .or_insert_with(|| AnalyzedExportTargetOpGroup { - target_factory: export_factory.clone(), - op_idx: vec![], - }) - .op_idx - .push(idx); - } - - let mut export_ops_futs = vec![]; - for group in target_groups.values() { + let analyzed_target_op_group = AnalyzedExportTargetOpGroup { + target_factory: export_factory.clone(), + op_idx: op_ids.export_op_ids, + }; export_ops_futs.extend(analyzer_ctx.analyze_export_op_group( root_exec_scope.data, flow_inst, - group, + &analyzed_target_op_group, + op_ids.declarations, &mut setup_state, &target_states_by_name_type, )?); + analyzed_target_op_groups.push(analyzed_target_op_group); } let tracking_table_setup = setup_state.tracking_table.clone(); @@ -1215,7 +1228,7 @@ pub fn analyze_flow( import_ops, op_scope, export_ops, - export_op_groups: target_groups.into_values().collect(), + export_op_groups: analyzed_target_op_groups, }) }; diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index 1354f7344..c52a788b7 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -331,6 +331,8 @@ pub struct FlowBuilder { import_ops: Vec>, export_ops: Vec>, + declarations: Vec, + next_generated_op_id: usize, } @@ -370,6 +372,8 @@ impl FlowBuilder { direct_input_fields: vec![], direct_output_value: None, + declarations: vec![], + next_generated_op_id: 0, }; Ok(result) @@ -612,6 +616,11 @@ impl FlowBuilder { Ok(()) } + pub fn declare(&mut self, op_spec: py::Pythonized) -> PyResult<()> { + self.declarations.push(op_spec.into_inner()); + Ok(()) + } + pub fn scope_field( &self, scope: DataScopeRef, @@ -642,6 +651,7 @@ impl FlowBuilder { import_ops: self.import_ops.clone(), reactive_ops: self.reactive_ops.clone(), export_ops: self.export_ops.clone(), + declarations: self.declarations.clone(), }; let flow_instance_ctx = build_flow_instance_context( &self.flow_instance_name,