Skip to content

Commit 41099c3

Browse files
committed
fix: always treat target as not-compatible when key schema changed
1 parent b941fdd commit 41099c3

File tree

9 files changed

+38
-23
lines changed

9 files changed

+38
-23
lines changed

src/builder/analyzer.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,7 @@ fn add_collector(
643643
struct ExportDataFieldsInfo {
644644
local_collector_ref: AnalyzedLocalCollectorReference,
645645
primary_key_def: AnalyzedPrimaryKeyDef,
646-
primary_key_schema: Vec<FieldSchema>,
646+
primary_key_schema: Arc<[FieldSchema]>,
647647
value_fields_idx: Vec<u32>,
648648
value_stable: bool,
649649
}
@@ -851,8 +851,8 @@ impl AnalyzerContext {
851851

852852
let primary_key_schema = pk_fields_idx
853853
.iter()
854-
.map(|idx| collector_schema.fields[*idx].clone())
855-
.collect::<Vec<_>>();
854+
.map(|idx| collector_schema.fields[*idx].without_attrs())
855+
.collect::<Arc<[_]>>();
856856
let mut value_fields_schema: Vec<FieldSchema> = vec![];
857857
let mut value_fields_idx = vec![];
858858
for (idx, field) in collector_schema.fields.iter().enumerate() {
@@ -911,6 +911,7 @@ impl AnalyzerContext {
911911
setup_key: data_coll_output.setup_key,
912912
desired_setup_state: data_coll_output.desired_setup_state,
913913
setup_by_user: export_op.spec.setup_by_user,
914+
key_schema: Some(data_fields_info.primary_key_schema.clone()),
914915
};
915916
targets_analyzed_ss[*idx] = Some(export_op_ss);
916917

@@ -940,6 +941,7 @@ impl AnalyzerContext {
940941
setup_key,
941942
desired_setup_state,
942943
setup_by_user: false,
944+
key_schema: None,
943945
};
944946
declarations_analyzed_ss.push(decl_ss);
945947
}

src/builder/exec_ctx.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ pub struct AnalyzedTargetSetupState {
2323
pub setup_key: serde_json::Value,
2424
pub desired_setup_state: serde_json::Value,
2525
pub setup_by_user: bool,
26+
/// None for declarations.
27+
pub key_schema: Option<Arc<[schema::FieldSchema]>>,
2628
}
2729

2830
pub struct AnalyzedSetupState {
@@ -121,15 +123,19 @@ fn build_target_id(
121123
let mut compatible_target_ids = HashSet::<Option<i32>>::new();
122124
let mut reusable_schema_version_ids = HashSet::<Option<i32>>::new();
123125
for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) {
124-
let compatibility =
125-
if analyzed_target_ss.setup_by_user == existing_state.common.setup_by_user {
126-
target_factory.check_state_compatibility(
127-
&analyzed_target_ss.desired_setup_state,
128-
&existing_state.state,
129-
)?
130-
} else {
131-
SetupStateCompatibility::NotCompatible
132-
};
126+
let compatibility = if let Some(key_schema) = &analyzed_target_ss.key_schema
127+
&& let Some(existing_key_schema) = &existing_state.common.key_schema
128+
&& key_schema != existing_key_schema
129+
{
130+
SetupStateCompatibility::NotCompatible
131+
} else if analyzed_target_ss.setup_by_user != existing_state.common.setup_by_user {
132+
SetupStateCompatibility::NotCompatible
133+
} else {
134+
target_factory.check_state_compatibility(
135+
&analyzed_target_ss.desired_setup_state,
136+
&existing_state.state,
137+
)?
138+
};
133139
let compatible_target_id = if compatibility != SetupStateCompatibility::NotCompatible {
134140
reusable_schema_version_ids.insert(
135141
(compatibility == SetupStateCompatibility::Compatible)
@@ -184,6 +190,7 @@ fn build_target_id(
184190
schema_version_id,
185191
max_schema_version_id: max_schema_version_id.max(schema_version_id),
186192
setup_by_user: analyzed_target_ss.setup_by_user,
193+
key_schema: analyzed_target_ss.key_schema.clone(),
187194
},
188195
state: analyzed_target_ss.desired_setup_state.clone(),
189196
});

src/builder/plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ pub struct AnalyzedExportOp {
105105
pub export_target_factory: Arc<dyn TargetFactory + Send + Sync>,
106106
pub export_context: Arc<dyn Any + Send + Sync>,
107107
pub primary_key_def: AnalyzedPrimaryKeyDef,
108-
pub primary_key_schema: Vec<FieldSchema>,
108+
pub primary_key_schema: Arc<[FieldSchema]>,
109109
/// idx for value fields - excluding the primary key field.
110110
pub value_fields: Vec<u32>,
111111
/// If true, value is never changed on the same primary key.

src/ops/factory_bases.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ pub struct TypedExportDataCollectionBuildOutput<F: TargetFactoryBase + ?Sized> {
363363
pub struct TypedExportDataCollectionSpec<F: TargetFactoryBase + ?Sized> {
364364
pub name: String,
365365
pub spec: F::Spec,
366-
pub key_fields_schema: Vec<FieldSchema>,
366+
pub key_fields_schema: Arc<[FieldSchema]>,
367367
pub value_fields_schema: Vec<FieldSchema>,
368368
pub index_options: IndexOptions,
369369
}

src/ops/interface.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ pub struct ExportDataCollectionBuildOutput {
258258
pub struct ExportDataCollectionSpec {
259259
pub name: String,
260260
pub spec: serde_json::Value,
261-
pub key_fields_schema: Vec<FieldSchema>,
261+
pub key_fields_schema: Arc<[FieldSchema]>,
262262
pub value_fields_schema: Vec<FieldSchema>,
263263
pub index_options: IndexOptions,
264264
}

src/ops/py_factory.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -356,11 +356,14 @@ impl interface::TargetFactory for PyExportTargetFactory {
356356

357357
fn check_state_compatibility(
358358
&self,
359-
_desired_state: &serde_json::Value,
360-
_existing_state: &serde_json::Value,
359+
desired_state: &serde_json::Value,
360+
existing_state: &serde_json::Value,
361361
) -> Result<SetupStateCompatibility> {
362-
// The Python target connector doesn't support state update yet.
363-
Ok(SetupStateCompatibility::Compatible)
362+
Ok(if desired_state == existing_state {
363+
SetupStateCompatibility::Compatible
364+
} else {
365+
SetupStateCompatibility::PartialCompatible
366+
})
364367
}
365368

366369
fn describe_resource(&self, key: &serde_json::Value) -> Result<String> {

src/ops/targets/postgres.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,7 @@ impl TargetFactoryBase for Factory {
632632
db_ref,
633633
db_pool.clone(),
634634
table_name,
635-
d.key_fields_schema,
635+
d.key_fields_schema.iter().cloned().collect(),
636636
d.value_fields_schema,
637637
)?);
638638
Ok(export_context)

src/ops/targets/shared/property_graph.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ impl<AuthEntry> std::fmt::Display for GraphElementType<AuthEntry> {
112112

113113
pub struct GraphElementSchema {
114114
pub elem_type: ElementType,
115-
pub key_fields: Vec<schema::FieldSchema>,
115+
pub key_fields: Arc<[schema::FieldSchema]>,
116116
pub value_fields: Vec<schema::FieldSchema>,
117117
}
118118

@@ -256,7 +256,7 @@ impl GraphElementSchemaBuilder {
256256
}
257257
Ok(GraphElementSchema {
258258
elem_type: self.elem_type,
259-
key_fields: self.key_fields,
259+
key_fields: self.key_fields.into(),
260260
value_fields: self.value_fields,
261261
})
262262
}
@@ -349,7 +349,7 @@ pub struct DataCollectionGraphMappingInput<'a, AuthEntry> {
349349
pub mapping: &'a GraphElementMapping,
350350
pub index_options: &'a spec::IndexOptions,
351351

352-
pub key_fields_schema: Vec<FieldSchema>,
352+
pub key_fields_schema: Arc<[FieldSchema]>,
353353
pub value_fields_schema: Vec<FieldSchema>,
354354
}
355355

@@ -435,6 +435,7 @@ pub fn analyze_graph_mappings<'a, AuthEntry: 'a>(
435435
.key_fields_schema
436436
.into_iter()
437437
.enumerate()
438+
.map(|(idx, f)| (idx, f.clone()))
438439
.collect(),
439440
data_coll_input
440441
.value_fields_schema

src/setup/states.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ pub struct TargetSetupStateCommon {
180180
pub max_schema_version_id: i32,
181181
#[serde(default)]
182182
pub setup_by_user: bool,
183+
#[serde(default)]
184+
pub key_schema: Option<Arc<[schema::FieldSchema]>>,
183185
}
184186

185187
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]

0 commit comments

Comments
 (0)