Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ def export(self, name: str, target_spec: op.StorageSpec, /, *,
name, _spec_kind(target_spec), dump_engine_object(target_spec),
dump_engine_object(index_options), self._engine_data_collector, setup_by_user)

def declare(self, spec: op.DeclarationSpec):
"""
Add a declaration to the flow.
"""
self._flow_builder_state.engine_flow_builder.declare(dump_engine_object(spec))


_flow_name_builder = _NameBuilder()

Expand Down
6 changes: 5 additions & 1 deletion python/cocoindex/op.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class OpCategory(Enum):
FUNCTION = "function"
SOURCE = "source"
STORAGE = "storage"

DECLARATION = "declaration"
@dataclass_transform()
class SpecMeta(type):
"""Meta class for spec classes."""
Expand All @@ -41,6 +41,10 @@ class FunctionSpec(metaclass=SpecMeta, category=OpCategory.FUNCTION): # pylint:
class StorageSpec(metaclass=SpecMeta, category=OpCategory.STORAGE): # pylint: disable=too-few-public-methods
"""A storage spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)"""

class DeclarationSpec(metaclass=SpecMeta, category=OpCategory.DECLARATION): # pylint: disable=too-few-public-methods
"""A declaration spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)"""
kind: str

class Executor(Protocol):
"""An executor for an operation."""
op_category: OpCategory
Expand Down
3 changes: 3 additions & 0 deletions src/base/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ pub struct FlowInstanceSpec {

#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
pub export_ops: Vec<NamedSpec<ExportOpSpec>>,

#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
pub declarations: Vec<OpSpec>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
55 changes: 34 additions & 21 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ impl AnalyzerContext<'_> {
scope: &mut DataScopeBuilder,
flow_inst: &FlowInstanceSpec,
export_op_group: &AnalyzedExportTargetOpGroup,
declarations: Vec<serde_json::Value>,
flow_setup_state: &mut FlowSetupState<DesiredMode>,
existing_target_states: &HashMap<&ResourceIdentifier, Vec<&TargetSetupState>>,
) -> Result<Vec<impl Future<Output = Result<AnalyzedExportOp>> + Send>> {
Expand Down Expand Up @@ -1007,7 +1008,7 @@ impl AnalyzerContext<'_> {
}
let (data_collections_output, _) = export_op_group.target_factory.clone().build(
collection_specs,
vec![],
declarations,
self.flow_ctx.clone(),
)?;
if data_collections_output.len() != data_fields_infos.len() {
Expand Down Expand Up @@ -1162,37 +1163,49 @@ pub fn analyze_flow(
RefList::Nil,
)?;

let mut target_groups = IndexMap::<String, AnalyzedExportTargetOpGroup>::new();
#[derive(Default)]
struct TargetOpGroup {
export_op_ids: Vec<usize>,
declarations: Vec<serde_json::Value>,
}
let mut target_op_group = IndexMap::<String, TargetOpGroup>::new();
for (idx, export_op) in flow_inst.export_ops.iter().enumerate() {
let target_kind = export_op.spec.target.kind.clone();
target_op_group
.entry(export_op.spec.target.kind.clone())
.or_default()
.export_op_ids
.push(idx);
}
for declaration in flow_inst.declarations.iter() {
target_op_group
.entry(declaration.kind.clone())
.or_default()
.declarations
.push(serde_json::Value::Object(declaration.spec.clone()));
}

let mut export_ops_futs = vec![];
let mut analyzed_target_op_groups = vec![];
for (target_kind, op_ids) in target_op_group.into_iter() {
let export_factory = match registry.get(&target_kind) {
Some(ExecutorFactory::ExportTarget(export_executor)) => export_executor,
_ => {
return Err(anyhow::anyhow!(
"Export target kind not found: {}",
export_op.spec.target.kind
))
bail!("Export target kind not found: {target_kind}");
}
};
target_groups
.entry(target_kind)
.or_insert_with(|| AnalyzedExportTargetOpGroup {
target_factory: export_factory.clone(),
op_idx: vec![],
})
.op_idx
.push(idx);
}

let mut export_ops_futs = vec![];
for group in target_groups.values() {
let analyzed_target_op_group = AnalyzedExportTargetOpGroup {
target_factory: export_factory.clone(),
op_idx: op_ids.export_op_ids,
};
export_ops_futs.extend(analyzer_ctx.analyze_export_op_group(
root_exec_scope.data,
flow_inst,
group,
&analyzed_target_op_group,
op_ids.declarations,
&mut setup_state,
&target_states_by_name_type,
)?);
analyzed_target_op_groups.push(analyzed_target_op_group);
}

let tracking_table_setup = setup_state.tracking_table.clone();
Expand All @@ -1215,7 +1228,7 @@ pub fn analyze_flow(
import_ops,
op_scope,
export_ops,
export_op_groups: target_groups.into_values().collect(),
export_op_groups: analyzed_target_op_groups,
})
};

Expand Down
10 changes: 10 additions & 0 deletions src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ pub struct FlowBuilder {
import_ops: Vec<NamedSpec<spec::ImportOpSpec>>,
export_ops: Vec<NamedSpec<spec::ExportOpSpec>>,

declarations: Vec<spec::OpSpec>,

next_generated_op_id: usize,
}

Expand Down Expand Up @@ -370,6 +372,8 @@ impl FlowBuilder {
direct_input_fields: vec![],
direct_output_value: None,

declarations: vec![],

next_generated_op_id: 0,
};
Ok(result)
Expand Down Expand Up @@ -612,6 +616,11 @@ impl FlowBuilder {
Ok(())
}

pub fn declare(&mut self, op_spec: py::Pythonized<spec::OpSpec>) -> PyResult<()> {
self.declarations.push(op_spec.into_inner());
Ok(())
}

pub fn scope_field(
&self,
scope: DataScopeRef,
Expand Down Expand Up @@ -642,6 +651,7 @@ impl FlowBuilder {
import_ops: self.import_ops.clone(),
reactive_ops: self.reactive_ops.clone(),
export_ops: self.export_ops.clone(),
declarations: self.declarations.clone(),
};
let flow_instance_ctx = build_flow_instance_context(
&self.flow_instance_name,
Expand Down