diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 9f7badd85..b9f0ab587 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -405,6 +405,7 @@ def export( /, *, primary_key_fields: Sequence[str], + attachments: Sequence[op.TargetAttachmentSpec] = (), vector_indexes: Sequence[index.VectorIndexDef] = (), vector_index: Sequence[tuple[str, index.VectorSimilarityMetric]] = (), setup_by_user: bool = False, @@ -436,6 +437,7 @@ def export( target_name, _spec_kind(target_spec), dump_engine_object(target_spec), + dump_engine_object(attachments), dump_engine_object(index_options), self._engine_data_collector, setup_by_user, diff --git a/python/cocoindex/op.py b/python/cocoindex/op.py index 380f60188..0ce5cffcf 100644 --- a/python/cocoindex/op.py +++ b/python/cocoindex/op.py @@ -46,6 +46,7 @@ class OpCategory(Enum): SOURCE = "source" TARGET = "target" DECLARATION = "declaration" + TARGET_ATTACHMENT = "target_attachment" @dataclass_transform() @@ -81,6 +82,10 @@ class TargetSpec(metaclass=SpecMeta, category=OpCategory.TARGET): # pylint: dis """A target spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)""" +class TargetAttachmentSpec(metaclass=SpecMeta, category=OpCategory.TARGET_ATTACHMENT): # pylint: disable=too-few-public-methods + """A target attachment spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)""" + + class DeclarationSpec(metaclass=SpecMeta, category=OpCategory.DECLARATION): # pylint: disable=too-few-public-methods """A declaration spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)""" diff --git a/src/base/spec.rs b/src/base/spec.rs index 671e96dc2..378fcc42a 100644 --- a/src/base/spec.rs +++ b/src/base/spec.rs @@ -493,6 +493,10 @@ impl fmt::Display for IndexOptions { pub struct ExportOpSpec { pub collector_name: FieldName, pub target: OpSpec, + + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub attachments: Vec, + pub index_options: IndexOptions, pub setup_by_user: bool, } diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index a6a8c0456..fa2b131f9 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -1,5 +1,7 @@ use crate::builder::exec_ctx::AnalyzedSetupState; -use crate::ops::{get_function_factory, get_source_factory, get_target_factory}; +use crate::ops::{ + get_attachment_factory, get_function_factory, get_source_factory, get_target_factory, +}; use crate::prelude::*; use super::plan::*; @@ -913,6 +915,27 @@ impl AnalyzerContext { let op_name = export_op.name.clone(); let export_target_factory = export_op_group.target_factory.clone(); + let attachments = export_op + .spec + .attachments + .iter() + .map(|attachment| { + let attachment_factory = get_attachment_factory(&attachment.kind)?; + let attachment_state = attachment_factory.get_state( + &op_name, + &export_op.spec.target.spec, + serde_json::Value::Object(attachment.spec.clone()), + )?; + Ok(( + interface::AttachmentSetupKey( + attachment.kind.clone(), + attachment_state.setup_key, + ), + attachment_state.setup_state, + )) + }) + .collect::>>()?; + let export_op_ss = exec_ctx::AnalyzedTargetSetupState { target_kind: target_kind.to_string(), setup_key: data_coll_output.setup_key, @@ -925,6 +948,7 @@ impl AnalyzerContext { .map(|field| field.value_type.typ.clone()) .collect::>(), ), + attachments, }; targets_analyzed_ss[*idx] = Some(export_op_ss); @@ -956,6 +980,7 @@ impl AnalyzerContext { desired_setup_state, setup_by_user: false, key_type: None, + attachments: IndexMap::new(), }; declarations_analyzed_ss.push(decl_ss); } diff --git a/src/builder/exec_ctx.rs b/src/builder/exec_ctx.rs index de9e6b99f..0410d43b4 100644 --- a/src/builder/exec_ctx.rs +++ b/src/builder/exec_ctx.rs @@ -26,6 +26,8 @@ pub struct AnalyzedTargetSetupState { pub setup_by_user: bool, /// None for declarations. pub key_type: Option>, + + pub attachments: IndexMap, } pub struct AnalyzedSetupState { @@ -176,6 +178,7 @@ fn build_export_op_exec_ctx( } else { max_schema_version_id + 1 }; + match target_states.entry(resource_id) { indexmap::map::Entry::Occupied(entry) => { api_bail!( @@ -194,6 +197,7 @@ fn build_export_op_exec_ctx( key_type: analyzed_target_ss.key_type.clone(), }, state: analyzed_target_ss.desired_setup_state.clone(), + attachments: analyzed_target_ss.attachments.clone(), }); } } diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index 7643efc20..daa40332f 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -549,12 +549,13 @@ impl FlowBuilder { Ok(()) } - #[pyo3(signature = (name, kind, op_spec, index_options, input, setup_by_user=false))] + #[pyo3(signature = (name, kind, op_spec, attachments, index_options, input, setup_by_user=false))] pub fn export( &mut self, name: String, kind: String, op_spec: py::Pythonized>, + attachments: py::Pythonized>, index_options: py::Pythonized, input: &DataCollector, setup_by_user: bool, @@ -574,6 +575,7 @@ impl FlowBuilder { spec: spec::ExportOpSpec { collector_name: input.name.clone(), target: spec, + attachments: attachments.into_inner(), index_options: index_options.into_inner(), setup_by_user, }, diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 6553c6789..8ea589d24 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -316,9 +316,52 @@ pub trait TargetFactory: Send + Sync { ) -> Result<()>; } +pub struct TargetAttachmentState { + pub setup_key: serde_json::Value, + pub setup_state: serde_json::Value, +} + +#[async_trait] +pub trait AttachmentSetupChangeAction { + fn describe_change(&self) -> String; + + async fn apply_change(&self) -> Result<()>; +} + +pub trait TargetAttachmentFactory: Send + Sync { + /// Normalize the key. e.g. the JSON format may change (after code change, e.g. new optional field or field ordering), even if the underlying value is not changed. + /// This should always return the canonical serialized form. + fn normalize_setup_key(&self, key: &serde_json::Value) -> Result; + + fn get_state( + &self, + target_name: &str, + target_spec: &serde_json::Map, + attachment_spec: serde_json::Value, + ) -> Result; + + /// Should return Some if and only if any changes are needed. + fn diff_setup_states( + &self, + key: &serde_json::Value, + new_state: Option, + existing_states: setup::CombinedState, + ) -> Result>>; +} + #[derive(Clone)] pub enum ExecutorFactory { Source(Arc), SimpleFunction(Arc), ExportTarget(Arc), + TargetAttachment(Arc), +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct AttachmentSetupKey(pub String, pub serde_json::Value); + +impl std::fmt::Display for AttachmentSetupKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}:{}", self.0, self.1) + } } diff --git a/src/ops/registration.rs b/src/ops/registration.rs index efe3857c8..f91ee1c62 100644 --- a/src/ops/registration.rs +++ b/src/ops/registration.rs @@ -56,6 +56,13 @@ pub fn get_optional_target_factory( registry.get_target(kind).cloned() } +pub fn get_optional_attachment_factory( + kind: &str, +) -> Option> { + let registry = EXECUTOR_FACTORY_REGISTRY.read().unwrap(); + registry.get_target_attachment(kind).cloned() +} + pub fn get_source_factory( kind: &str, ) -> Result> { @@ -77,6 +84,13 @@ pub fn get_target_factory( .ok_or_else(|| anyhow::anyhow!("Target factory not found for op kind: {}", kind)) } +pub fn get_attachment_factory( + kind: &str, +) -> Result> { + get_optional_attachment_factory(kind) + .ok_or_else(|| anyhow::anyhow!("Attachment factory not found for op kind: {}", kind)) +} + pub fn register_factory(name: String, factory: ExecutorFactory) -> Result<()> { let mut registry = EXECUTOR_FACTORY_REGISTRY.write().unwrap(); registry.register(name, factory) diff --git a/src/ops/registry.rs b/src/ops/registry.rs index 52417480d..4e66daa53 100644 --- a/src/ops/registry.rs +++ b/src/ops/registry.rs @@ -8,6 +8,8 @@ pub struct ExecutorFactoryRegistry { function_factories: HashMap>, target_factories: HashMap>, + target_attachment_factories: + HashMap>, } impl Default for ExecutorFactoryRegistry { @@ -22,6 +24,7 @@ impl ExecutorFactoryRegistry { source_factories: HashMap::new(), function_factories: HashMap::new(), target_factories: HashMap::new(), + target_attachment_factories: HashMap::new(), } } @@ -61,6 +64,18 @@ impl ExecutorFactoryRegistry { } } } + ExecutorFactory::TargetAttachment(target_attachment_factory) => { + match self.target_attachment_factories.entry(name) { + std::collections::hash_map::Entry::Occupied(entry) => Err(anyhow::anyhow!( + "Target attachment factory with name already exists: {}", + entry.key() + )), + std::collections::hash_map::Entry::Vacant(entry) => { + entry.insert(target_attachment_factory); + Ok(()) + } + } + } } } @@ -84,4 +99,11 @@ impl ExecutorFactoryRegistry { ) -> Option<&Arc> { self.target_factories.get(name) } + + pub fn get_target_attachment( + &self, + name: &str, + ) -> Option<&Arc> { + self.target_attachment_factories.get(name) + } } diff --git a/src/setup/driver.rs b/src/setup/driver.rs index 38dc22e49..6ea254c33 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -1,10 +1,11 @@ use crate::{ lib_context::{FlowContext, FlowExecutionContext, LibSetupContext}, ops::{ - get_optional_target_factory, - interface::{FlowInstanceContext, TargetFactory}, + get_attachment_factory, get_optional_target_factory, + interface::{AttachmentSetupKey, FlowInstanceContext, TargetFactory}, }, prelude::*, + setup::{AttachmentsSetupChange, TargetSetupChange}, }; use sqlx::PgPool; @@ -185,17 +186,26 @@ fn to_object_status(existing: Option, desired: Option) -> Option, - existing: CombinedState, +#[derive(Debug)] +struct GroupedResourceStates { + desired: Option, + existing: CombinedState, } -fn group_resource_states<'a>( - desired: impl Iterator, - existing: impl Iterator)>, -) -> Result> { - let mut grouped: IndexMap<&'a ResourceIdentifier, GroupedResourceStates> = desired +impl Default for GroupedResourceStates { + fn default() -> Self { + Self { + desired: None, + existing: CombinedState::default(), + } + } +} + +fn group_states( + desired: impl Iterator, + existing: impl Iterator)>, +) -> Result>> { + let mut grouped: IndexMap> = desired .into_iter() .map(|(key, state)| { ( @@ -208,7 +218,7 @@ fn group_resource_states<'a>( }) .collect(); for (key, state) in existing { - let entry = grouped.entry(key); + let entry = grouped.entry(key.clone()); if state.current.is_some() { if let indexmap::map::Entry::Occupied(entry) = &entry { if entry.get().existing.current.is_some() { @@ -228,8 +238,8 @@ fn group_resource_states<'a>( .is_some_and(|v| v != legacy_state_key) { warn!( - "inconsistent legacy key: {:?}, {:?}", - key, entry.existing.legacy_state_key + "inconsistent legacy key: {key}, {:?}", + entry.existing.legacy_state_key ); } entry.existing.legacy_state_key = Some(legacy_state_key.clone()); @@ -246,6 +256,72 @@ fn group_resource_states<'a>( Ok(grouped) } +fn collect_attachments_setup_change( + desired: Option<&TargetSetupState>, + existing: &CombinedState, +) -> Result { + let existing_current_attachments = existing + .current + .iter() + .flat_map(|s| s.attachments.iter()) + .map(|(key, state)| (key.clone(), CombinedState::current(state.clone()))); + let existing_staging_attachments = existing.staging.iter().flat_map(|s| { + match s { + StateChange::Upsert(s) => Some(s.attachments.iter().map(|(key, state)| { + ( + key.clone(), + CombinedState::staging(StateChange::Upsert(state.clone())), + ) + })), + StateChange::Delete => None, + } + .into_iter() + .flatten() + }); + let mut grouped_attachment_states = group_states( + desired.iter().flat_map(|s| { + s.attachments + .iter() + .map(|(key, state)| (key.clone(), state.clone())) + }), + (existing_current_attachments.into_iter()) + .chain(existing_staging_attachments) + .rev(), + )?; + if existing + .staging + .iter() + .any(|s| matches!(s, StateChange::Delete)) + { + for state in grouped_attachment_states.values_mut() { + if state + .existing + .staging + .iter() + .all(|s| matches!(s, StateChange::Delete)) + { + state.existing.staging.push(StateChange::Delete); + } + } + } + + let mut attachments_change = AttachmentsSetupChange::default(); + for (AttachmentSetupKey(kind, key), setup_state) in grouped_attachment_states.into_iter() { + let factory = get_attachment_factory(&kind)?; + let is_upsertion = setup_state.desired.is_some(); + if let Some(action) = + factory.diff_setup_states(&key, setup_state.desired, setup_state.existing)? + { + if is_upsertion { + attachments_change.upserts.push(action); + } else { + attachments_change.deletes.push(action); + } + } + } + Ok(attachments_change) +} + pub async fn diff_flow_setup_states( desired_state: Option<&FlowSetupState>, existing_state: Option<&FlowSetupState>, @@ -317,11 +393,15 @@ pub async fn diff_flow_setup_states( let mut target_resources = Vec::new(); let mut unknown_resources = Vec::new(); - let grouped_target_resources = group_resource_states( - desired_state.iter().flat_map(|d| d.targets.iter()), - existing_state.iter().flat_map(|e| e.targets.iter()), + let grouped_target_resources = group_states( + desired_state + .iter() + .flat_map(|d| d.targets.iter().map(|(k, v)| (k.clone(), v.clone()))), + existing_state + .iter() + .flat_map(|e| e.targets.iter().map(|(k, v)| (k.clone(), v.clone()))), )?; - for (resource_id, v) in grouped_target_resources.into_iter() { + for (resource_id, target_states_group) in grouped_target_resources.into_iter() { let factory = match get_export_target_factory(&resource_id.target_kind) { Some(factory) => factory, None => { @@ -329,16 +409,22 @@ pub async fn diff_flow_setup_states( continue; } }; - let state = v.desired.clone(); - let target_state = v + + let attachments_change = collect_attachments_setup_change( + target_states_group.desired.as_ref(), + &target_states_group.existing, + )?; + + let desired_state = target_states_group.desired.clone(); + let target_state = target_states_group .desired .and_then(|state| (!state.common.setup_by_user).then_some(state.state)); let existing_without_setup_by_user = CombinedState { - current: v + current: target_states_group .existing .current .and_then(|s| s.state_unless_setup_by_user()), - staging: v + staging: target_states_group .existing .staging .into_iter() @@ -349,7 +435,7 @@ pub async fn diff_flow_setup_states( StateChange::Delete => Some(StateChange::Delete), }) .collect(), - legacy_state_key: v.existing.legacy_state_key.clone(), + legacy_state_key: target_states_group.existing.legacy_state_key.clone(), }; let never_setup_by_sys = target_state.is_none() && existing_without_setup_by_user.current.is_none() @@ -357,8 +443,8 @@ pub async fn diff_flow_setup_states( let setup_change = if never_setup_by_sys { None } else { - Some( - factory + Some(TargetSetupChange { + target_change: factory .diff_setup_states( &resource_id.key, target_state, @@ -366,14 +452,16 @@ pub async fn diff_flow_setup_states( flow_instance_ctx.clone(), ) .await?, - ) + attachments_change, + }) }; + target_resources.push(ResourceSetupInfo { key: resource_id.clone(), - state, + state: desired_state, description: factory.describe_resource(&resource_id.key)?, setup_change, - legacy_key: v + legacy_key: target_states_group .existing .legacy_state_key .map(|legacy_state_key| ResourceIdentifier { @@ -532,22 +620,32 @@ async fn apply_changes_for_flow( target_kind, write, resources.into_iter(), - |setup_change| async move { + |targets_change| async move { let factory = get_export_target_factory(target_kind).ok_or_else(|| { anyhow::anyhow!("No factory found for target kind: {}", target_kind) })?; + for target_change in targets_change.iter() { + for delete in target_change.setup_change.attachments_change.deletes.iter() { + delete.apply_change().await?; + } + } factory .apply_setup_changes( - setup_change - .into_iter() + targets_change + .iter() .map(|s| interface::ResourceSetupChangeItem { key: &s.key.key, - setup_change: s.setup_change.as_ref(), + setup_change: s.setup_change.target_change.as_ref(), }) .collect(), flow_ctx.flow.flow_instance_ctx.clone(), ) .await?; + for target_change in targets_change.iter() { + for delete in target_change.setup_change.attachments_change.upserts.iter() { + delete.apply_change().await?; + } + } Ok(()) }, ) @@ -592,7 +690,7 @@ async fn apply_changes_for_flow( Some(state) => { targets.insert( target_resource.key.clone(), - CombinedState::from_desired(state.clone()), + CombinedState::current(state.clone()), ); } None => { diff --git a/src/setup/states.rs b/src/setup/states.rs index f633ffd9d..5e838761e 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -1,3 +1,4 @@ +use crate::ops::interface::AttachmentSetupChangeAction; /// Concepts: /// - Resource: some setup that needs to be tracked and maintained. /// - Setup State: current state of a resource. @@ -49,7 +50,7 @@ pub struct CombinedState { } impl CombinedState { - pub fn from_desired(desired: T) -> Self { + pub fn current(desired: T) -> Self { Self { current: Some(desired), staging: vec![], @@ -57,6 +58,14 @@ impl CombinedState { } } + pub fn staging(change: StateChange) -> Self { + Self { + current: None, + staging: vec![change], + legacy_state_key: None, + } + } + pub fn from_change(prev: Option>, change: Option>) -> Self where T: Clone, @@ -196,6 +205,13 @@ pub struct TargetSetupState { pub common: TargetSetupStateCommon, pub state: serde_json::Value, + + #[serde( + default, + with = "indexmap::map::serde_seq", + skip_serializing_if = "IndexMap::is_empty" + )] + pub attachments: IndexMap, } impl TargetSetupState { @@ -270,7 +286,7 @@ pub enum ChangeDescription { Note(String), } -pub trait ResourceSetupChange: Send + Sync + Debug + Any + 'static { +pub trait ResourceSetupChange: Send + Sync + Any + 'static { fn describe_changes(&self) -> Vec; fn change_type(&self) -> SetupChangeType; @@ -384,7 +400,56 @@ pub trait ObjectSetupChange { } } -#[derive(Debug)] +#[derive(Default)] +pub struct AttachmentsSetupChange { + pub deletes: Vec>, + pub upserts: Vec>, +} + +impl AttachmentsSetupChange { + pub fn is_empty(&self) -> bool { + self.deletes.is_empty() && self.upserts.is_empty() + } +} + +pub struct TargetSetupChange { + pub target_change: Box, + pub attachments_change: AttachmentsSetupChange, +} + +impl ResourceSetupChange for TargetSetupChange { + fn describe_changes(&self) -> Vec { + let mut result = vec![]; + result.extend( + self.attachments_change + .deletes + .iter() + .map(|a| ChangeDescription::Action(a.describe_change())), + ); + result.extend(self.target_change.describe_changes()); + result.extend( + self.attachments_change + .upserts + .iter() + .map(|a| ChangeDescription::Action(a.describe_change())), + ); + result + } + + fn change_type(&self) -> SetupChangeType { + match self.target_change.change_type() { + SetupChangeType::NoChange => { + if self.attachments_change.is_empty() { + SetupChangeType::NoChange + } else { + SetupChangeType::Update + } + } + t => t, + } + } +} + pub struct FlowSetupChange { pub status: Option, pub seen_flow_metadata_version: Option, @@ -394,7 +459,7 @@ pub struct FlowSetupChange { pub tracking_table: Option>, pub target_resources: - Vec>>, + Vec>, pub unknown_resources: Vec, }