Skip to content

Commit 91069ae

Browse files
committed
feat(attachment): add interface for target attachment
1 parent 5db5971 commit 91069ae

File tree

6 files changed

+62
-1
lines changed

6 files changed

+62
-1
lines changed

python/cocoindex/flow.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@ def export(
405405
/,
406406
*,
407407
primary_key_fields: Sequence[str],
408+
attachments: Sequence[op.TargetAttachmentSpec],
408409
vector_indexes: Sequence[index.VectorIndexDef] = (),
409410
vector_index: Sequence[tuple[str, index.VectorSimilarityMetric]] = (),
410411
setup_by_user: bool = False,
@@ -436,6 +437,7 @@ def export(
436437
target_name,
437438
_spec_kind(target_spec),
438439
dump_engine_object(target_spec),
440+
dump_engine_object(attachments),
439441
dump_engine_object(index_options),
440442
self._engine_data_collector,
441443
setup_by_user,

python/cocoindex/op.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class OpCategory(Enum):
4646
SOURCE = "source"
4747
TARGET = "target"
4848
DECLARATION = "declaration"
49+
TARGET_ATTACHMENT = "target_attachment"
4950

5051

5152
@dataclass_transform()
@@ -81,6 +82,10 @@ class TargetSpec(metaclass=SpecMeta, category=OpCategory.TARGET): # pylint: dis
8182
"""A target spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)"""
8283

8384

85+
class TargetAttachmentSpec(metaclass=SpecMeta, category=OpCategory.TARGET_ATTACHMENT): # pylint: disable=too-few-public-methods
86+
"""A target attachment spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)"""
87+
88+
8489
class DeclarationSpec(metaclass=SpecMeta, category=OpCategory.DECLARATION): # pylint: disable=too-few-public-methods
8590
"""A declaration spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)"""
8691

src/base/spec.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,10 @@ impl fmt::Display for IndexOptions {
493493
pub struct ExportOpSpec {
494494
pub collector_name: FieldName,
495495
pub target: OpSpec,
496+
497+
#[serde(default, skip_serializing_if = "Vec::is_empty")]
498+
pub attachments: Vec<OpSpec>,
499+
496500
pub index_options: IndexOptions,
497501
pub setup_by_user: bool,
498502
}

src/builder/flow_builder.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,12 +549,13 @@ impl FlowBuilder {
549549
Ok(())
550550
}
551551

552-
#[pyo3(signature = (name, kind, op_spec, index_options, input, setup_by_user=false))]
552+
#[pyo3(signature = (name, kind, op_spec, attachments, index_options, input, setup_by_user=false))]
553553
pub fn export(
554554
&mut self,
555555
name: String,
556556
kind: String,
557557
op_spec: py::Pythonized<serde_json::Map<String, serde_json::Value>>,
558+
attachments: py::Pythonized<Vec<spec::OpSpec>>,
558559
index_options: py::Pythonized<spec::IndexOptions>,
559560
input: &DataCollector,
560561
setup_by_user: bool,
@@ -574,6 +575,7 @@ impl FlowBuilder {
574575
spec: spec::ExportOpSpec {
575576
collector_name: input.name.clone(),
576577
target: spec,
578+
attachments: attachments.into_inner(),
577579
index_options: index_options.into_inner(),
578580
setup_by_user,
579581
},

src/ops/interface.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,42 @@ pub trait TargetFactory: Send + Sync {
316316
) -> Result<()>;
317317
}
318318

319+
pub struct TargetAttachmentState {
320+
pub setup_key: serde_json::Value,
321+
pub setup_state: serde_json::Value,
322+
}
323+
324+
#[async_trait]
325+
pub trait TargetAttachmentFactory: Send + Sync {
326+
/// Normalize the key. e.g. the JSON format may change (after code change, e.g. new optional field or field ordering), even if the underlying value is not changed.
327+
/// This should always return the canonical serialized form.
328+
fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value>;
329+
330+
fn get_state(
331+
&self,
332+
target_name: &str,
333+
target_spec: &spec::OpSpec,
334+
attachment_spec: serde_json::Value,
335+
) -> Result<TargetAttachmentState>;
336+
337+
fn describe(&self, key: &serde_json::Value, state: &serde_json::Value) -> Result<String>;
338+
339+
fn has_update(&self, old: &serde_json::Value, new: &serde_json::Value) -> bool {
340+
old != new
341+
}
342+
343+
async fn apply_setup_change(
344+
&self,
345+
key: &serde_json::Value,
346+
old: Option<serde_json::Value>,
347+
new: Option<serde_json::Value>,
348+
) -> Result<()>;
349+
}
350+
319351
#[derive(Clone)]
320352
pub enum ExecutorFactory {
321353
Source(Arc<dyn SourceFactory + Send + Sync>),
322354
SimpleFunction(Arc<dyn SimpleFunctionFactory + Send + Sync>),
323355
ExportTarget(Arc<dyn TargetFactory + Send + Sync>),
356+
TargetAttachment(Arc<dyn TargetAttachmentFactory + Send + Sync>),
324357
}

src/ops/registry.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ pub struct ExecutorFactoryRegistry {
88
function_factories:
99
HashMap<String, Arc<dyn super::interface::SimpleFunctionFactory + Send + Sync>>,
1010
target_factories: HashMap<String, Arc<dyn super::interface::TargetFactory + Send + Sync>>,
11+
target_attachment_factories:
12+
HashMap<String, Arc<dyn super::interface::TargetAttachmentFactory + Send + Sync>>,
1113
}
1214

1315
impl Default for ExecutorFactoryRegistry {
@@ -22,6 +24,7 @@ impl ExecutorFactoryRegistry {
2224
source_factories: HashMap::new(),
2325
function_factories: HashMap::new(),
2426
target_factories: HashMap::new(),
27+
target_attachment_factories: HashMap::new(),
2528
}
2629
}
2730

@@ -61,6 +64,18 @@ impl ExecutorFactoryRegistry {
6164
}
6265
}
6366
}
67+
ExecutorFactory::TargetAttachment(target_attachment_factory) => {
68+
match self.target_attachment_factories.entry(name) {
69+
std::collections::hash_map::Entry::Occupied(entry) => Err(anyhow::anyhow!(
70+
"Target attachment factory with name already exists: {}",
71+
entry.key()
72+
)),
73+
std::collections::hash_map::Entry::Vacant(entry) => {
74+
entry.insert(target_attachment_factory);
75+
Ok(())
76+
}
77+
}
78+
}
6479
}
6580
}
6681

0 commit comments

Comments
 (0)