Skip to content

Commit f60278c

Browse files
committed
feat(attachment): support target attachment in the core setup engine
1 parent 91069ae commit f60278c

File tree

8 files changed

+273
-50
lines changed

8 files changed

+273
-50
lines changed

python/cocoindex/flow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ def export(
405405
/,
406406
*,
407407
primary_key_fields: Sequence[str],
408-
attachments: Sequence[op.TargetAttachmentSpec],
408+
attachments: Sequence[op.TargetAttachmentSpec] = (),
409409
vector_indexes: Sequence[index.VectorIndexDef] = (),
410410
vector_index: Sequence[tuple[str, index.VectorSimilarityMetric]] = (),
411411
setup_by_user: bool = False,

src/builder/analyzer.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use crate::builder::exec_ctx::AnalyzedSetupState;
2-
use crate::ops::{get_function_factory, get_source_factory, get_target_factory};
2+
use crate::ops::{
3+
get_attachment_factory, get_function_factory, get_source_factory, get_target_factory,
4+
};
35
use crate::prelude::*;
46

57
use super::plan::*;
@@ -913,6 +915,27 @@ impl AnalyzerContext {
913915
let op_name = export_op.name.clone();
914916
let export_target_factory = export_op_group.target_factory.clone();
915917

918+
let attachments = export_op
919+
.spec
920+
.attachments
921+
.iter()
922+
.map(|attachment| {
923+
let attachment_factory = get_attachment_factory(&attachment.kind)?;
924+
let attachment_state = attachment_factory.get_state(
925+
&op_name,
926+
&export_op.spec.target.spec,
927+
serde_json::Value::Object(attachment.spec.clone()),
928+
)?;
929+
Ok((
930+
interface::AttachmentSetupKey(
931+
attachment.kind.clone(),
932+
attachment_state.setup_key,
933+
),
934+
attachment_state.setup_state,
935+
))
936+
})
937+
.collect::<Result<IndexMap<_, _>>>()?;
938+
916939
let export_op_ss = exec_ctx::AnalyzedTargetSetupState {
917940
target_kind: target_kind.to_string(),
918941
setup_key: data_coll_output.setup_key,
@@ -925,6 +948,7 @@ impl AnalyzerContext {
925948
.map(|field| field.value_type.typ.clone())
926949
.collect::<Box<[_]>>(),
927950
),
951+
attachments,
928952
};
929953
targets_analyzed_ss[*idx] = Some(export_op_ss);
930954

@@ -956,6 +980,7 @@ impl AnalyzerContext {
956980
desired_setup_state,
957981
setup_by_user: false,
958982
key_type: None,
983+
attachments: IndexMap::new(),
959984
};
960985
declarations_analyzed_ss.push(decl_ss);
961986
}

src/builder/exec_ctx.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ pub struct AnalyzedTargetSetupState {
2626
pub setup_by_user: bool,
2727
/// None for declarations.
2828
pub key_type: Option<Box<[schema::ValueType]>>,
29+
30+
pub attachments: IndexMap<interface::AttachmentSetupKey, serde_json::Value>,
2931
}
3032

3133
pub struct AnalyzedSetupState {
@@ -176,6 +178,7 @@ fn build_export_op_exec_ctx(
176178
} else {
177179
max_schema_version_id + 1
178180
};
181+
179182
match target_states.entry(resource_id) {
180183
indexmap::map::Entry::Occupied(entry) => {
181184
api_bail!(
@@ -194,6 +197,7 @@ fn build_export_op_exec_ctx(
194197
key_type: analyzed_target_ss.key_type.clone(),
195198
},
196199
state: analyzed_target_ss.desired_setup_state.clone(),
200+
attachments: analyzed_target_ss.attachments.clone(),
197201
});
198202
}
199203
}

src/ops/interface.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,12 @@ pub struct TargetAttachmentState {
322322
}
323323

324324
#[async_trait]
325+
pub trait AttachmentSetupChangeAction {
326+
fn describe_change(&self) -> String;
327+
328+
async fn apply_change(&self) -> Result<()>;
329+
}
330+
325331
pub trait TargetAttachmentFactory: Send + Sync {
326332
/// Normalize the key. e.g. the JSON format may change (after code change, e.g. new optional field or field ordering), even if the underlying value is not changed.
327333
/// This should always return the canonical serialized form.
@@ -330,22 +336,17 @@ pub trait TargetAttachmentFactory: Send + Sync {
330336
fn get_state(
331337
&self,
332338
target_name: &str,
333-
target_spec: &spec::OpSpec,
339+
target_spec: &serde_json::Map<String, serde_json::Value>,
334340
attachment_spec: serde_json::Value,
335341
) -> Result<TargetAttachmentState>;
336342

337-
fn describe(&self, key: &serde_json::Value, state: &serde_json::Value) -> Result<String>;
338-
339-
fn has_update(&self, old: &serde_json::Value, new: &serde_json::Value) -> bool {
340-
old != new
341-
}
342-
343-
async fn apply_setup_change(
343+
/// Should return Some if and only if any changes are needed.
344+
fn diff_setup_states(
344345
&self,
345346
key: &serde_json::Value,
346-
old: Option<serde_json::Value>,
347-
new: Option<serde_json::Value>,
348-
) -> Result<()>;
347+
new_state: Option<serde_json::Value>,
348+
existing_states: setup::CombinedState<serde_json::Value>,
349+
) -> Result<Option<Box<dyn AttachmentSetupChangeAction + Send + Sync>>>;
349350
}
350351

351352
#[derive(Clone)]
@@ -355,3 +356,12 @@ pub enum ExecutorFactory {
355356
ExportTarget(Arc<dyn TargetFactory + Send + Sync>),
356357
TargetAttachment(Arc<dyn TargetAttachmentFactory + Send + Sync>),
357358
}
359+
360+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
361+
pub struct AttachmentSetupKey(pub String, pub serde_json::Value);
362+
363+
impl std::fmt::Display for AttachmentSetupKey {
364+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
365+
write!(f, "{}:{}", self.0, self.1)
366+
}
367+
}

src/ops/registration.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ pub fn get_optional_target_factory(
5656
registry.get_target(kind).cloned()
5757
}
5858

59+
pub fn get_optional_attachment_factory(
60+
kind: &str,
61+
) -> Option<std::sync::Arc<dyn super::interface::TargetAttachmentFactory + Send + Sync>> {
62+
let registry = EXECUTOR_FACTORY_REGISTRY.read().unwrap();
63+
registry.get_target_attachment(kind).cloned()
64+
}
65+
5966
pub fn get_source_factory(
6067
kind: &str,
6168
) -> Result<std::sync::Arc<dyn super::interface::SourceFactory + Send + Sync>> {
@@ -77,6 +84,13 @@ pub fn get_target_factory(
7784
.ok_or_else(|| anyhow::anyhow!("Target factory not found for op kind: {}", kind))
7885
}
7986

87+
pub fn get_attachment_factory(
88+
kind: &str,
89+
) -> Result<std::sync::Arc<dyn super::interface::TargetAttachmentFactory + Send + Sync>> {
90+
get_optional_attachment_factory(kind)
91+
.ok_or_else(|| anyhow::anyhow!("Attachment factory not found for op kind: {}", kind))
92+
}
93+
8094
pub fn register_factory(name: String, factory: ExecutorFactory) -> Result<()> {
8195
let mut registry = EXECUTOR_FACTORY_REGISTRY.write().unwrap();
8296
registry.register(name, factory)

src/ops/registry.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,4 +99,11 @@ impl ExecutorFactoryRegistry {
9999
) -> Option<&Arc<dyn super::interface::TargetFactory + Send + Sync>> {
100100
self.target_factories.get(name)
101101
}
102+
103+
pub fn get_target_attachment(
104+
&self,
105+
name: &str,
106+
) -> Option<&Arc<dyn super::interface::TargetAttachmentFactory + Send + Sync>> {
107+
self.target_attachment_factories.get(name)
108+
}
102109
}

0 commit comments

Comments
 (0)