Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ def export(
/,
*,
primary_key_fields: Sequence[str],
attachments: Sequence[op.TargetAttachmentSpec] = (),
vector_indexes: Sequence[index.VectorIndexDef] = (),
vector_index: Sequence[tuple[str, index.VectorSimilarityMetric]] = (),
setup_by_user: bool = False,
Expand Down Expand Up @@ -436,6 +437,7 @@ def export(
target_name,
_spec_kind(target_spec),
dump_engine_object(target_spec),
dump_engine_object(attachments),
dump_engine_object(index_options),
self._engine_data_collector,
setup_by_user,
Expand Down
5 changes: 5 additions & 0 deletions python/cocoindex/op.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class OpCategory(Enum):
SOURCE = "source"
TARGET = "target"
DECLARATION = "declaration"
TARGET_ATTACHMENT = "target_attachment"


@dataclass_transform()
Expand Down Expand Up @@ -81,6 +82,10 @@ class TargetSpec(metaclass=SpecMeta, category=OpCategory.TARGET): # pylint: dis
"""A target spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)"""


class TargetAttachmentSpec(metaclass=SpecMeta, category=OpCategory.TARGET_ATTACHMENT): # pylint: disable=too-few-public-methods
"""A target attachment spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)"""


class DeclarationSpec(metaclass=SpecMeta, category=OpCategory.DECLARATION): # pylint: disable=too-few-public-methods
"""A declaration spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)"""

Expand Down
4 changes: 4 additions & 0 deletions src/base/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,10 @@ impl fmt::Display for IndexOptions {
pub struct ExportOpSpec {
pub collector_name: FieldName,
pub target: OpSpec,

#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub attachments: Vec<OpSpec>,

pub index_options: IndexOptions,
pub setup_by_user: bool,
}
Expand Down
27 changes: 26 additions & 1 deletion src/builder/analyzer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::builder::exec_ctx::AnalyzedSetupState;
use crate::ops::{get_function_factory, get_source_factory, get_target_factory};
use crate::ops::{
get_attachment_factory, get_function_factory, get_source_factory, get_target_factory,
};
use crate::prelude::*;

use super::plan::*;
Expand Down Expand Up @@ -913,6 +915,27 @@ impl AnalyzerContext {
let op_name = export_op.name.clone();
let export_target_factory = export_op_group.target_factory.clone();

let attachments = export_op
.spec
.attachments
.iter()
.map(|attachment| {
let attachment_factory = get_attachment_factory(&attachment.kind)?;
let attachment_state = attachment_factory.get_state(
&op_name,
&export_op.spec.target.spec,
serde_json::Value::Object(attachment.spec.clone()),
)?;
Ok((
interface::AttachmentSetupKey(
attachment.kind.clone(),
attachment_state.setup_key,
),
attachment_state.setup_state,
))
})
.collect::<Result<IndexMap<_, _>>>()?;

let export_op_ss = exec_ctx::AnalyzedTargetSetupState {
target_kind: target_kind.to_string(),
setup_key: data_coll_output.setup_key,
Expand All @@ -925,6 +948,7 @@ impl AnalyzerContext {
.map(|field| field.value_type.typ.clone())
.collect::<Box<[_]>>(),
),
attachments,
};
targets_analyzed_ss[*idx] = Some(export_op_ss);

Expand Down Expand Up @@ -956,6 +980,7 @@ impl AnalyzerContext {
desired_setup_state,
setup_by_user: false,
key_type: None,
attachments: IndexMap::new(),
};
declarations_analyzed_ss.push(decl_ss);
}
Expand Down
4 changes: 4 additions & 0 deletions src/builder/exec_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct AnalyzedTargetSetupState {
pub setup_by_user: bool,
/// None for declarations.
pub key_type: Option<Box<[schema::ValueType]>>,

pub attachments: IndexMap<interface::AttachmentSetupKey, serde_json::Value>,
}

pub struct AnalyzedSetupState {
Expand Down Expand Up @@ -176,6 +178,7 @@ fn build_export_op_exec_ctx(
} else {
max_schema_version_id + 1
};

match target_states.entry(resource_id) {
indexmap::map::Entry::Occupied(entry) => {
api_bail!(
Expand All @@ -194,6 +197,7 @@ fn build_export_op_exec_ctx(
key_type: analyzed_target_ss.key_type.clone(),
},
state: analyzed_target_ss.desired_setup_state.clone(),
attachments: analyzed_target_ss.attachments.clone(),
});
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,12 +549,13 @@ impl FlowBuilder {
Ok(())
}

#[pyo3(signature = (name, kind, op_spec, index_options, input, setup_by_user=false))]
#[pyo3(signature = (name, kind, op_spec, attachments, index_options, input, setup_by_user=false))]
pub fn export(
&mut self,
name: String,
kind: String,
op_spec: py::Pythonized<serde_json::Map<String, serde_json::Value>>,
attachments: py::Pythonized<Vec<spec::OpSpec>>,
index_options: py::Pythonized<spec::IndexOptions>,
input: &DataCollector,
setup_by_user: bool,
Expand All @@ -574,6 +575,7 @@ impl FlowBuilder {
spec: spec::ExportOpSpec {
collector_name: input.name.clone(),
target: spec,
attachments: attachments.into_inner(),
index_options: index_options.into_inner(),
setup_by_user,
},
Expand Down
43 changes: 43 additions & 0 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,52 @@ pub trait TargetFactory: Send + Sync {
) -> Result<()>;
}

pub struct TargetAttachmentState {
pub setup_key: serde_json::Value,
pub setup_state: serde_json::Value,
}

#[async_trait]
pub trait AttachmentSetupChangeAction {
fn describe_change(&self) -> String;

async fn apply_change(&self) -> Result<()>;
}

pub trait TargetAttachmentFactory: Send + Sync {
/// Normalize the key. e.g. the JSON format may change (after code change, e.g. new optional field or field ordering), even if the underlying value is not changed.
/// This should always return the canonical serialized form.
fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value>;

fn get_state(
&self,
target_name: &str,
target_spec: &serde_json::Map<String, serde_json::Value>,
attachment_spec: serde_json::Value,
) -> Result<TargetAttachmentState>;

/// Should return Some if and only if any changes are needed.
fn diff_setup_states(
&self,
key: &serde_json::Value,
new_state: Option<serde_json::Value>,
existing_states: setup::CombinedState<serde_json::Value>,
) -> Result<Option<Box<dyn AttachmentSetupChangeAction + Send + Sync>>>;
}

#[derive(Clone)]
pub enum ExecutorFactory {
Source(Arc<dyn SourceFactory + Send + Sync>),
SimpleFunction(Arc<dyn SimpleFunctionFactory + Send + Sync>),
ExportTarget(Arc<dyn TargetFactory + Send + Sync>),
TargetAttachment(Arc<dyn TargetAttachmentFactory + Send + Sync>),
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct AttachmentSetupKey(pub String, pub serde_json::Value);

impl std::fmt::Display for AttachmentSetupKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}", self.0, self.1)
}
}
14 changes: 14 additions & 0 deletions src/ops/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ pub fn get_optional_target_factory(
registry.get_target(kind).cloned()
}

pub fn get_optional_attachment_factory(
kind: &str,
) -> Option<std::sync::Arc<dyn super::interface::TargetAttachmentFactory + Send + Sync>> {
let registry = EXECUTOR_FACTORY_REGISTRY.read().unwrap();
registry.get_target_attachment(kind).cloned()
}

pub fn get_source_factory(
kind: &str,
) -> Result<std::sync::Arc<dyn super::interface::SourceFactory + Send + Sync>> {
Expand All @@ -77,6 +84,13 @@ pub fn get_target_factory(
.ok_or_else(|| anyhow::anyhow!("Target factory not found for op kind: {}", kind))
}

pub fn get_attachment_factory(
kind: &str,
) -> Result<std::sync::Arc<dyn super::interface::TargetAttachmentFactory + Send + Sync>> {
get_optional_attachment_factory(kind)
.ok_or_else(|| anyhow::anyhow!("Attachment factory not found for op kind: {}", kind))
}

pub fn register_factory(name: String, factory: ExecutorFactory) -> Result<()> {
let mut registry = EXECUTOR_FACTORY_REGISTRY.write().unwrap();
registry.register(name, factory)
Expand Down
22 changes: 22 additions & 0 deletions src/ops/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub struct ExecutorFactoryRegistry {
function_factories:
HashMap<String, Arc<dyn super::interface::SimpleFunctionFactory + Send + Sync>>,
target_factories: HashMap<String, Arc<dyn super::interface::TargetFactory + Send + Sync>>,
target_attachment_factories:
HashMap<String, Arc<dyn super::interface::TargetAttachmentFactory + Send + Sync>>,
}

impl Default for ExecutorFactoryRegistry {
Expand All @@ -22,6 +24,7 @@ impl ExecutorFactoryRegistry {
source_factories: HashMap::new(),
function_factories: HashMap::new(),
target_factories: HashMap::new(),
target_attachment_factories: HashMap::new(),
}
}

Expand Down Expand Up @@ -61,6 +64,18 @@ impl ExecutorFactoryRegistry {
}
}
}
ExecutorFactory::TargetAttachment(target_attachment_factory) => {
match self.target_attachment_factories.entry(name) {
std::collections::hash_map::Entry::Occupied(entry) => Err(anyhow::anyhow!(
"Target attachment factory with name already exists: {}",
entry.key()
)),
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(target_attachment_factory);
Ok(())
}
}
}
}
}

Expand All @@ -84,4 +99,11 @@ impl ExecutorFactoryRegistry {
) -> Option<&Arc<dyn super::interface::TargetFactory + Send + Sync>> {
self.target_factories.get(name)
}

pub fn get_target_attachment(
&self,
name: &str,
) -> Option<&Arc<dyn super::interface::TargetAttachmentFactory + Send + Sync>> {
self.target_attachment_factories.get(name)
}
}
Loading