Skip to content

Commit afb5a96

Browse files
committed
Resolve merge conflict in src/base/json_schema.rs
2 parents 33fdd66 + afea19d commit afb5a96

File tree

17 files changed

+615
-54
lines changed

17 files changed

+615
-54
lines changed

python/cocoindex/flow.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@ def export(
405405
/,
406406
*,
407407
primary_key_fields: Sequence[str],
408+
attachments: Sequence[op.TargetAttachmentSpec] = (),
408409
vector_indexes: Sequence[index.VectorIndexDef] = (),
409410
vector_index: Sequence[tuple[str, index.VectorSimilarityMetric]] = (),
410411
setup_by_user: bool = False,
@@ -436,6 +437,10 @@ def export(
436437
target_name,
437438
_spec_kind(target_spec),
438439
dump_engine_object(target_spec),
440+
[
441+
{"kind": _spec_kind(att), **dump_engine_object(att)}
442+
for att in attachments
443+
],
439444
dump_engine_object(index_options),
440445
self._engine_data_collector,
441446
setup_by_user,

python/cocoindex/op.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class OpCategory(Enum):
4646
SOURCE = "source"
4747
TARGET = "target"
4848
DECLARATION = "declaration"
49+
TARGET_ATTACHMENT = "target_attachment"
4950

5051

5152
@dataclass_transform()
@@ -81,6 +82,10 @@ class TargetSpec(metaclass=SpecMeta, category=OpCategory.TARGET): # pylint: dis
8182
"""A target spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)"""
8283

8384

85+
class TargetAttachmentSpec(metaclass=SpecMeta, category=OpCategory.TARGET_ATTACHMENT): # pylint: disable=too-few-public-methods
86+
"""A target attachment spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)"""
87+
88+
8489
class DeclarationSpec(metaclass=SpecMeta, category=OpCategory.DECLARATION): # pylint: disable=too-few-public-methods
8590
"""A declaration spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)"""
8691

python/cocoindex/targets/_engine_builtin_specs.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ class Postgres(op.TargetSpec):
1616
table_name: str | None = None
1717

1818

19+
class PostgresSqlAttachment(op.TargetAttachmentSpec):
20+
"""Attachment to execute specified SQL statements for Postgres targets."""
21+
22+
name: str
23+
setup_sql: str
24+
teardown_sql: str | None = None
25+
26+
1927
@dataclass
2028
class QdrantConnection:
2129
"""Connection spec for Qdrant."""

src/base/json_schema.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,6 @@ pub fn build_json_schema(
395395
#[cfg(test)]
396396
mod tests {
397397
use super::*;
398-
<<<<<<< HEAD
399398
use crate::base::schema::*;
400399
use expect_test::expect;
401400
use serde_json::json;
@@ -1368,11 +1367,7 @@ mod tests {
13681367
"type": "string"
13691368
}"#]]
13701369
.assert_eq(&serde_json::to_string_pretty(&json_schema).unwrap());
1371-
=======
1372-
use crate::base::schema::{
1373-
BasicValueType, EnrichedValueType, FieldSchema, StructSchema, ValueType,
1374-
};
1375-
use std::sync::Arc;
1370+
}
13761371

13771372
#[test]
13781373
fn test_description_concatenation() {
@@ -1421,6 +1416,5 @@ mod tests {
14211416
} else {
14221417
panic!("No description found in the schema");
14231418
}
1424-
>>>>>>> ef87b68dac10345e67361c4d5d6a3edf44a4af0f
14251419
}
14261420
}

src/base/spec.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,10 @@ impl fmt::Display for IndexOptions {
493493
pub struct ExportOpSpec {
494494
pub collector_name: FieldName,
495495
pub target: OpSpec,
496+
497+
#[serde(default, skip_serializing_if = "Vec::is_empty")]
498+
pub attachments: Vec<OpSpec>,
499+
496500
pub index_options: IndexOptions,
497501
pub setup_by_user: bool,
498502
}

src/builder/analyzer.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use crate::builder::exec_ctx::AnalyzedSetupState;
2-
use crate::ops::{get_function_factory, get_source_factory, get_target_factory};
2+
use crate::ops::{
3+
get_attachment_factory, get_function_factory, get_source_factory, get_target_factory,
4+
};
35
use crate::prelude::*;
46

57
use super::plan::*;
@@ -913,6 +915,27 @@ impl AnalyzerContext {
913915
let op_name = export_op.name.clone();
914916
let export_target_factory = export_op_group.target_factory.clone();
915917

918+
let attachments = export_op
919+
.spec
920+
.attachments
921+
.iter()
922+
.map(|attachment| {
923+
let attachment_factory = get_attachment_factory(&attachment.kind)?;
924+
let attachment_state = attachment_factory.get_state(
925+
&op_name,
926+
&export_op.spec.target.spec,
927+
serde_json::Value::Object(attachment.spec.clone()),
928+
)?;
929+
Ok((
930+
interface::AttachmentSetupKey(
931+
attachment.kind.clone(),
932+
attachment_state.setup_key,
933+
),
934+
attachment_state.setup_state,
935+
))
936+
})
937+
.collect::<Result<IndexMap<_, _>>>()?;
938+
916939
let export_op_ss = exec_ctx::AnalyzedTargetSetupState {
917940
target_kind: target_kind.to_string(),
918941
setup_key: data_coll_output.setup_key,
@@ -925,6 +948,7 @@ impl AnalyzerContext {
925948
.map(|field| field.value_type.typ.clone())
926949
.collect::<Box<[_]>>(),
927950
),
951+
attachments,
928952
};
929953
targets_analyzed_ss[*idx] = Some(export_op_ss);
930954

@@ -956,6 +980,7 @@ impl AnalyzerContext {
956980
desired_setup_state,
957981
setup_by_user: false,
958982
key_type: None,
983+
attachments: IndexMap::new(),
959984
};
960985
declarations_analyzed_ss.push(decl_ss);
961986
}

src/builder/exec_ctx.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ pub struct AnalyzedTargetSetupState {
2626
pub setup_by_user: bool,
2727
/// None for declarations.
2828
pub key_type: Option<Box<[schema::ValueType]>>,
29+
30+
pub attachments: IndexMap<interface::AttachmentSetupKey, serde_json::Value>,
2931
}
3032

3133
pub struct AnalyzedSetupState {
@@ -176,6 +178,7 @@ fn build_export_op_exec_ctx(
176178
} else {
177179
max_schema_version_id + 1
178180
};
181+
179182
match target_states.entry(resource_id) {
180183
indexmap::map::Entry::Occupied(entry) => {
181184
api_bail!(
@@ -194,6 +197,7 @@ fn build_export_op_exec_ctx(
194197
key_type: analyzed_target_ss.key_type.clone(),
195198
},
196199
state: analyzed_target_ss.desired_setup_state.clone(),
200+
attachments: analyzed_target_ss.attachments.clone(),
197201
});
198202
}
199203
}

src/builder/flow_builder.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,12 +549,13 @@ impl FlowBuilder {
549549
Ok(())
550550
}
551551

552-
#[pyo3(signature = (name, kind, op_spec, index_options, input, setup_by_user=false))]
552+
#[pyo3(signature = (name, kind, op_spec, attachments, index_options, input, setup_by_user=false))]
553553
pub fn export(
554554
&mut self,
555555
name: String,
556556
kind: String,
557557
op_spec: py::Pythonized<serde_json::Map<String, serde_json::Value>>,
558+
attachments: py::Pythonized<Vec<spec::OpSpec>>,
558559
index_options: py::Pythonized<spec::IndexOptions>,
559560
input: &DataCollector,
560561
setup_by_user: bool,
@@ -574,6 +575,7 @@ impl FlowBuilder {
574575
spec: spec::ExportOpSpec {
575576
collector_name: input.name.clone(),
576577
target: spec,
578+
attachments: attachments.into_inner(),
577579
index_options: index_options.into_inner(),
578580
setup_by_user,
579581
},

src/execution/db_tracking_setup.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ pub struct TrackingTableSetupChange {
103103
pub legacy_source_state_table_names: BTreeSet<String>,
104104

105105
pub source_names_need_state_cleanup: BTreeMap<i32, BTreeSet<String>>,
106+
107+
has_state_change: bool,
106108
}
107109

108110
impl TrackingTableSetupChange {
@@ -136,6 +138,7 @@ impl TrackingTableSetupChange {
136138
legacy_source_state_table_names,
137139
min_existing_version_id,
138140
source_names_need_state_cleanup,
141+
has_state_change: existing.has_state_diff(desired, |v| v),
139142
})
140143
} else {
141144
None
@@ -148,6 +151,7 @@ impl TrackingTableSetupChange {
148151
ResourceSetupInfo {
149152
key: (),
150153
state: self.desired_state.clone(),
154+
has_tracked_state_change: self.has_state_change,
151155
description: "Internal Storage for Tracking".to_string(),
152156
setup_change: Some(self),
153157
legacy_key: None,

src/ops/factory_bases.rs

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ pub struct TypedResourceSetupChangeItem<'a, F: TargetFactoryBase + ?Sized> {
374374
}
375375

376376
#[async_trait]
377-
pub trait TargetFactoryBase: TargetFactory + Send + Sync + 'static {
377+
pub trait TargetFactoryBase: Send + Sync + 'static {
378378
type Spec: DeserializeOwned + Send + Sync;
379379
type DeclarationSpec: DeserializeOwned + Send + Sync;
380380

@@ -635,3 +635,101 @@ fn from_json_combined_state<T: Debug + Clone + Serialize + DeserializeOwned>(
635635
legacy_state_key: existing_states.legacy_state_key,
636636
})
637637
}
638+
639+
pub struct TypedTargetAttachmentState<F: TargetSpecificAttachmentFactoryBase + ?Sized> {
640+
pub setup_key: F::SetupKey,
641+
pub setup_state: F::SetupState,
642+
}
643+
644+
/// A factory for target-specific attachments.
645+
#[async_trait]
646+
pub trait TargetSpecificAttachmentFactoryBase: Send + Sync + 'static {
647+
type TargetKey: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
648+
type TargetSpec: DeserializeOwned + Send + Sync;
649+
type Spec: DeserializeOwned + Send + Sync;
650+
type SetupKey: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
651+
type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync;
652+
type SetupChange: interface::AttachmentSetupChange + Send + Sync;
653+
654+
fn name(&self) -> &str;
655+
656+
fn get_state(
657+
&self,
658+
target_name: &str,
659+
target_spec: &Self::TargetSpec,
660+
attachment_spec: Self::Spec,
661+
) -> Result<TypedTargetAttachmentState<Self>>;
662+
663+
async fn diff_setup_states(
664+
&self,
665+
target_key: &Self::TargetKey,
666+
attachment_key: &Self::SetupKey,
667+
new_state: Option<Self::SetupState>,
668+
existing_states: setup::CombinedState<Self::SetupState>,
669+
context: &interface::FlowInstanceContext,
670+
) -> Result<Option<Self::SetupChange>>;
671+
672+
/// Deserialize the setup key from a JSON value.
673+
/// You can override this method to provide a custom deserialization logic, e.g. to perform backward compatible deserialization.
674+
fn deserialize_setup_key(key: serde_json::Value) -> Result<Self::SetupKey> {
675+
Ok(utils::deser::from_json_value(key)?)
676+
}
677+
678+
fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()>
679+
where
680+
Self: Sized,
681+
{
682+
registry.register(
683+
self.name().to_string(),
684+
ExecutorFactory::TargetAttachment(Arc::new(self)),
685+
)
686+
}
687+
}
688+
689+
#[async_trait]
690+
impl<T: TargetSpecificAttachmentFactoryBase> TargetAttachmentFactory for T {
691+
fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value> {
692+
let key: T::SetupKey = Self::deserialize_setup_key(key.clone())?;
693+
Ok(serde_json::to_value(key)?)
694+
}
695+
696+
fn get_state(
697+
&self,
698+
target_name: &str,
699+
target_spec: &serde_json::Map<String, serde_json::Value>,
700+
attachment_spec: serde_json::Value,
701+
) -> Result<interface::TargetAttachmentState> {
702+
let state = TargetSpecificAttachmentFactoryBase::get_state(
703+
self,
704+
target_name,
705+
&utils::deser::from_json_value(serde_json::Value::Object(target_spec.clone()))?,
706+
utils::deser::from_json_value(attachment_spec)?,
707+
)?;
708+
Ok(interface::TargetAttachmentState {
709+
setup_key: serde_json::to_value(state.setup_key)?,
710+
setup_state: serde_json::to_value(state.setup_state)?,
711+
})
712+
}
713+
714+
async fn diff_setup_states(
715+
&self,
716+
target_key: &serde_json::Value,
717+
attachment_key: &serde_json::Value,
718+
new_state: Option<serde_json::Value>,
719+
existing_states: setup::CombinedState<serde_json::Value>,
720+
context: &interface::FlowInstanceContext,
721+
) -> Result<Option<Box<dyn AttachmentSetupChange + Send + Sync>>> {
722+
let setup_change = self
723+
.diff_setup_states(
724+
&utils::deser::from_json_value(target_key.clone())?,
725+
&utils::deser::from_json_value(attachment_key.clone())?,
726+
new_state
727+
.map(|v| utils::deser::from_json_value(v))
728+
.transpose()?,
729+
from_json_combined_state(existing_states)?,
730+
context,
731+
)
732+
.await?;
733+
Ok(setup_change.map(|s| Box::new(s) as Box<dyn AttachmentSetupChange + Send + Sync>))
734+
}
735+
}

0 commit comments

Comments
 (0)