Skip to content

Commit 8fc0fdf

Browse files
authored
Support setup_by_user for target storages. (#271)
1 parent 44f27ca commit 8fc0fdf

File tree

8 files changed

+103
-70
lines changed

8 files changed

+103
-70
lines changed

python/cocoindex/flow.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,8 @@ def collect(self, **kwargs):
286286

287287
def export(self, name: str, target_spec: op.StorageSpec, /, *,
288288
primary_key_fields: Sequence[str] | None = None,
289-
vector_index: Sequence[tuple[str, vector.VectorSimilarityMetric]] = ()):
289+
vector_index: Sequence[tuple[str, vector.VectorSimilarityMetric]] = (),
290+
setup_by_user: bool = False):
290291
"""
291292
Export the collected data to the specified target.
292293
"""
@@ -298,7 +299,7 @@ def export(self, name: str, target_spec: op.StorageSpec, /, *,
298299
for field_name, metric in vector_index]
299300
self._flow_builder_state.engine_flow_builder.export(
300301
name, _spec_kind(target_spec), _dump_engine_object(target_spec),
301-
index_options, self._engine_data_collector)
302+
index_options, self._engine_data_collector, setup_by_user)
302303

303304

304305
_flow_name_builder = _NameBuilder()

src/base/spec.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ pub struct ExportOpSpec {
231231
pub collector_name: FieldName,
232232
pub target: OpSpec,
233233
pub index_options: IndexOptions,
234+
pub setup_by_user: bool,
234235
}
235236

236237
/// A reactive operation reacts on given input values.

src/builder/analyzer.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -904,10 +904,15 @@ impl AnalyzerContext<'_> {
904904
let mut compatible_target_ids = HashSet::<Option<i32>>::new();
905905
let mut reusable_schema_version_ids = HashSet::<Option<i32>>::new();
906906
for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) {
907-
let compatibility = export_factory.check_state_compatibility(
908-
&setup_output.desired_setup_state,
909-
&existing_state.state,
910-
)?;
907+
let compatibility =
908+
if export_op.spec.setup_by_user == existing_state.common.setup_by_user {
909+
export_factory.check_state_compatibility(
910+
&setup_output.desired_setup_state,
911+
&existing_state.state,
912+
)?
913+
} else {
914+
SetupStateCompatibility::NotCompatible
915+
};
911916
let compatible_target_id =
912917
if compatibility != SetupStateCompatibility::NotCompatible {
913918
reusable_schema_version_ids.insert(
@@ -962,6 +967,7 @@ impl AnalyzerContext<'_> {
962967
target_id,
963968
schema_version_id,
964969
max_schema_version_id: max_schema_version_id.max(schema_version_id),
970+
setup_by_user: export_op.spec.setup_by_user,
965971
},
966972
state: setup_output.desired_setup_state,
967973
});

src/builder/flow_builder.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,13 +579,15 @@ impl FlowBuilder {
579579
Ok(())
580580
}
581581

582+
#[pyo3(signature = (name, kind, op_spec, index_options, input, setup_by_user=false))]
582583
pub fn export(
583584
&mut self,
584585
name: String,
585586
kind: String,
586587
op_spec: py::Pythonized<serde_json::Map<String, serde_json::Value>>,
587588
index_options: py::Pythonized<spec::IndexOptions>,
588589
input: &DataCollector,
590+
setup_by_user: bool,
589591
) -> PyResult<()> {
590592
let spec = spec::OpSpec {
591593
kind,
@@ -603,6 +605,7 @@ impl FlowBuilder {
603605
collector_name: input.name.clone(),
604606
target: spec,
605607
index_options: index_options.into_inner(),
608+
setup_by_user,
606609
},
607610
});
608611
Ok(())

src/ops/factory_bases.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,8 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
288288
context: Arc<FlowInstanceContext>,
289289
) -> Result<ExportTargetBuildOutput<Self>>;
290290

291-
/// This is only called for non-user-setup targets.
291+
/// Will not be called if it's setup by user.
292+
/// It returns an error if the target only supports setup by user.
292293
fn check_setup_status(
293294
&self,
294295
key: Self::Key,

src/ops/interface.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,6 @@ pub struct ExportTargetBuildOutput {
162162
}
163163

164164
pub trait ExportTargetFactory {
165-
// The first field of the `input_schema` is the primary key field.
166-
// If it has struct type, it should be converted to composite primary key.
167165
fn build(
168166
self: Arc<Self>,
169167
name: String,
@@ -174,6 +172,8 @@ pub trait ExportTargetFactory {
174172
context: Arc<FlowInstanceContext>,
175173
) -> Result<ExportTargetBuildOutput>;
176174

175+
/// Will not be called if it's setup by user.
176+
/// It returns an error if the target only supports setup by user.
177177
fn check_setup_status(
178178
&self,
179179
key: &serde_json::Value,

src/setup/driver.rs

Lines changed: 70 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use super::{
1414
db_metadata, CombinedState, DesiredMode, ExistingMode, FlowSetupState, FlowSetupStatusCheck,
1515
ObjectSetupStatusCheck, ObjectStatus, ResourceIdentifier, ResourceSetupStatusCheck,
1616
SetupChangeType, StateChange, TargetResourceSetupStatusCheck, TargetSetupState,
17-
TargetSetupStateCommon,
1817
};
1918
use super::{AllSetupState, AllSetupStatusCheck};
2019
use crate::execution::db_tracking_setup;
@@ -166,9 +165,8 @@ fn to_object_status<A, B>(existing: Option<A>, desired: Option<B>) -> Result<Obj
166165

167166
#[derive(Debug, Default)]
168167
struct GroupedResourceStates {
169-
desired_common: Option<TargetSetupStateCommon>,
170-
desired: Option<serde_json::Value>,
171-
existing: CombinedState<serde_json::Value>,
168+
desired: Option<TargetSetupState>,
169+
existing: CombinedState<TargetSetupState>,
172170
}
173171

174172
fn group_resource_states<'a>(
@@ -181,8 +179,7 @@ fn group_resource_states<'a>(
181179
(
182180
key,
183181
GroupedResourceStates {
184-
desired_common: Some(state.common.clone()),
185-
desired: Some(state.state.clone()),
182+
desired: Some(state.clone()),
186183
existing: CombinedState::default(),
187184
},
188185
)
@@ -199,14 +196,13 @@ fn group_resource_states<'a>(
199196
}
200197
let entry = entry.or_default();
201198
if let Some(current) = &state.current {
202-
entry.existing.current = Some(current.state.clone());
199+
entry.existing.current = Some(current.clone());
203200
}
204201
for s in state.staging.iter() {
205202
match s {
206-
StateChange::Upsert(v) => entry
207-
.existing
208-
.staging
209-
.push(StateChange::Upsert(v.state.clone())),
203+
StateChange::Upsert(v) => {
204+
entry.existing.staging.push(StateChange::Upsert(v.clone()))
205+
}
210206
StateChange::Delete => entry.existing.staging.push(StateChange::Delete),
211207
}
212208
}
@@ -247,41 +243,72 @@ pub fn check_flow_setup_status(
247243
.collect(),
248244
);
249245

250-
let target_resources = {
251-
let grouped_target_resources = group_resource_states(
252-
desired_state.iter().flat_map(|d| d.targets.iter()),
253-
existing_state.iter().flat_map(|e| e.targets.iter()),
254-
)?;
255-
let registry = executor_factory_registry();
256-
grouped_target_resources
257-
.into_iter()
258-
.map(|(resource_id, v)| -> Result<_> {
259-
let factory = registry.get(&resource_id.target_kind).ok_or_else(|| {
260-
anyhow::anyhow!(
261-
"Target resource type not found: {}",
262-
resource_id.target_kind
263-
)
264-
})?;
265-
let status_check = match factory {
266-
ExecutorFactory::ExportTarget(factory) => {
267-
factory.check_setup_status(&resource_id.key, v.desired, v.existing)?
246+
let mut target_setup_state_updates = Vec::new();
247+
let mut target_resources = Vec::new();
248+
249+
let grouped_target_resources = group_resource_states(
250+
desired_state.iter().flat_map(|d| d.targets.iter()),
251+
existing_state.iter().flat_map(|e| e.targets.iter()),
252+
)?;
253+
let registry = executor_factory_registry();
254+
for (resource_id, v) in grouped_target_resources.into_iter() {
255+
let factory = registry.get(&resource_id.target_kind).ok_or_else(|| {
256+
anyhow::anyhow!(
257+
"Target resource type not found: {}",
258+
resource_id.target_kind
259+
)
260+
})?;
261+
target_setup_state_updates.push((resource_id.clone(), v.desired.clone()));
262+
let (desired_state, desired_common) = match v.desired {
263+
Some(desired) => (
264+
(!desired.common.setup_by_user).then_some(desired.state),
265+
Some(desired.common),
266+
),
267+
None => (None, None),
268+
};
269+
let existing_without_setup_by_user = CombinedState {
270+
current: v
271+
.existing
272+
.current
273+
.and_then(|s| s.state_unless_setup_by_user()),
274+
staging: v
275+
.existing
276+
.staging
277+
.into_iter()
278+
.filter_map(|s| match s {
279+
StateChange::Upsert(s) => {
280+
s.state_unless_setup_by_user().map(StateChange::Upsert)
268281
}
269-
_ => bail!("Unexpected factory type for {}", resource_id.target_kind),
270-
};
271-
Ok(TargetResourceSetupStatusCheck {
272-
target_kind: resource_id.target_kind.clone(),
273-
common: v.desired_common,
274-
status_check,
282+
StateChange::Delete => Some(StateChange::Delete),
275283
})
276-
})
277-
.collect::<Result<Vec<_>>>()?
278-
};
284+
.collect(),
285+
};
286+
let never_setup_by_sys = desired_state.is_none()
287+
&& existing_without_setup_by_user.current.is_none()
288+
&& existing_without_setup_by_user.staging.is_empty();
289+
if !never_setup_by_sys {
290+
let status_check = match factory {
291+
ExecutorFactory::ExportTarget(factory) => factory.check_setup_status(
292+
&resource_id.key,
293+
desired_state,
294+
existing_without_setup_by_user,
295+
)?,
296+
_ => bail!("Unexpected factory type for {}", resource_id.target_kind),
297+
};
298+
target_resources.push(TargetResourceSetupStatusCheck {
299+
target_kind: resource_id.target_kind.clone(),
300+
common: desired_common,
301+
status_check,
302+
});
303+
}
304+
}
279305
Ok(FlowSetupStatusCheck {
280306
status: to_object_status(existing_state, desired_state)?,
281307
seen_flow_metadata_version: existing_state.and_then(|s| s.seen_flow_metadata_version),
282308
metadata_change,
283309
tracking_table: tracking_table_change,
284310
target_resources,
311+
target_setup_state_updates,
285312
})
286313
}
287314

@@ -392,25 +419,15 @@ pub async fn apply_changes(
392419
.transpose()?,
393420
);
394421
}
395-
for target_resource in &flow_status.target_resources {
422+
for (resource_id, state_update) in &flow_status.target_setup_state_updates {
396423
state_updates.insert(
397424
db_metadata::ResourceTypeKey::new(
398-
MetadataRecordType::Target(target_resource.target_kind.clone()).to_string(),
399-
target_resource.status_check.key().clone(),
425+
MetadataRecordType::Target(resource_id.target_kind.clone()).to_string(),
426+
resource_id.key.clone(),
400427
),
401-
target_resource
402-
.common
428+
state_update
403429
.as_ref()
404-
.map(|c| {
405-
serde_json::to_value(TargetSetupState {
406-
common: c.clone(),
407-
state: target_resource
408-
.status_check
409-
.desired_state()
410-
.cloned()
411-
.unwrap_or_default(),
412-
})
413-
})
430+
.map(serde_json::to_value)
414431
.transpose()?,
415432
);
416433
}

src/setup/states.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,14 @@
1111
/// - [resource: tracking table]
1212
/// - Target
1313
/// - [resource: target-specific stuff]
14-
use anyhow::Result;
15-
use axum::async_trait;
14+
use crate::prelude::*;
15+
1616
use indenter::indented;
17-
use indexmap::IndexMap;
18-
use serde::de::DeserializeOwned;
19-
use serde::{Deserialize, Serialize};
20-
use std::collections::BTreeSet;
17+
use std::fmt::Debug;
2118
use std::fmt::{Display, Write};
2219
use std::hash::Hash;
23-
use std::{collections::BTreeMap, fmt::Debug};
2420

2521
use super::db_metadata;
26-
use crate::base::schema;
2722
use crate::execution::db_tracking_setup;
2823

2924
const INDENT: &str = " ";
@@ -142,6 +137,8 @@ pub struct TargetSetupStateCommon {
142137
pub target_id: i32,
143138
pub schema_version_id: i32,
144139
pub max_schema_version_id: i32,
140+
#[serde(default)]
141+
pub setup_by_user: bool,
145142
}
146143

147144
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@@ -151,6 +148,12 @@ pub struct TargetSetupState {
151148
pub state: serde_json::Value,
152149
}
153150

151+
impl TargetSetupState {
152+
pub fn state_unless_setup_by_user(self) -> Option<serde_json::Value> {
153+
(!self.common.setup_by_user).then_some(self.state)
154+
}
155+
}
156+
154157
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
155158
pub struct FlowSetupMetadata {
156159
pub last_source_id: i32,
@@ -269,6 +272,7 @@ pub struct FlowSetupStatusCheck {
269272

270273
pub tracking_table: db_tracking_setup::TrackingTableSetupStatusCheck,
271274
pub target_resources: Vec<TargetResourceSetupStatusCheck>,
275+
pub target_setup_state_updates: Vec<(ResourceIdentifier, Option<TargetSetupState>)>,
272276
}
273277
impl ObjectSetupStatusCheck for FlowSetupStatusCheck {
274278
fn status(&self) -> ObjectStatus {

0 commit comments

Comments
 (0)