Skip to content

Commit 7e8e873

Browse files
authored
feat(source-state-table): create source state table for new flows (#865)
1 parent 1937f8d commit 7e8e873

File tree

5 files changed

+175
-48
lines changed

5 files changed

+175
-48
lines changed

src/builder/exec_ctx.rs

Lines changed: 59 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ fn build_import_op_exec_ctx(
7676
fn build_target_id(
7777
analyzed_target_ss: &AnalyzedTargetSetupState,
7878
existing_target_states: &HashMap<&setup::ResourceIdentifier, Vec<&setup::TargetSetupState>>,
79-
flow_setup_state: &mut setup::FlowSetupState<setup::DesiredMode>,
79+
metadata: &mut setup::FlowSetupMetadata,
80+
target_states: &mut IndexMap<setup::ResourceIdentifier, setup::TargetSetupState>,
8081
) -> Result<i32> {
8182
let interface::ExecutorFactory::ExportTarget(target_factory) =
8283
get_executor_factory(&analyzed_target_ss.target_kind)?
@@ -125,8 +126,8 @@ fn build_target_id(
125126
None
126127
};
127128
let target_id = target_id.unwrap_or_else(|| {
128-
flow_setup_state.metadata.last_target_id += 1;
129-
flow_setup_state.metadata.last_target_id
129+
metadata.last_target_id += 1;
130+
metadata.last_target_id
130131
});
131132
let max_schema_version_id = existing_target_states
132133
.iter()
@@ -143,7 +144,7 @@ fn build_target_id(
143144
} else {
144145
max_schema_version_id + 1
145146
};
146-
match flow_setup_state.targets.entry(resource_id) {
147+
match target_states.entry(resource_id) {
147148
indexmap::map::Entry::Occupied(entry) => {
148149
api_bail!(
149150
"Target resource already exists: kind = {}, key = {}",
@@ -199,34 +200,27 @@ pub fn build_flow_setup_execution_context(
199200
}
200201
}
201202

202-
let mut setup_state = setup::FlowSetupState::<setup::DesiredMode> {
203-
seen_flow_metadata_version: existing_flow_ss
204-
.and_then(|flow_ss| flow_ss.seen_flow_metadata_version),
205-
metadata: setup::FlowSetupMetadata {
206-
last_source_id: existing_metadata_versions()
207-
.map(|metadata| metadata.last_source_id)
208-
.max()
209-
.unwrap_or(0),
210-
last_target_id: existing_metadata_versions()
211-
.map(|metadata| metadata.last_target_id)
212-
.max()
213-
.unwrap_or(0),
214-
sources: BTreeMap::new(),
215-
},
216-
tracking_table: db_tracking_setup::TrackingTableSetupState {
217-
table_name: existing_flow_ss
218-
.and_then(|flow_ss| {
219-
flow_ss
220-
.tracking_table
221-
.current
222-
.as_ref()
223-
.map(|v| v.table_name.clone())
224-
})
225-
.unwrap_or_else(|| db_tracking_setup::default_tracking_table_name(&flow_inst.name)),
226-
version_id: db_tracking_setup::CURRENT_TRACKING_TABLE_VERSION,
227-
},
228-
targets: IndexMap::new(),
203+
let mut metadata = setup::FlowSetupMetadata {
204+
last_source_id: existing_metadata_versions()
205+
.map(|metadata| metadata.last_source_id)
206+
.max()
207+
.unwrap_or(0),
208+
last_target_id: existing_metadata_versions()
209+
.map(|metadata| metadata.last_target_id)
210+
.max()
211+
.unwrap_or(0),
212+
sources: BTreeMap::new(),
213+
features: existing_flow_ss
214+
.map(|m| {
215+
m.metadata
216+
.possible_versions()
217+
.flat_map(|v| v.features.iter())
218+
.cloned()
219+
.collect::<BTreeSet<_>>()
220+
})
221+
.unwrap_or_else(setup::flow_features::default_features),
229222
};
223+
let mut target_states = IndexMap::new();
230224

231225
let import_op_exec_ctx = flow_inst
232226
.import_ops
@@ -241,7 +235,7 @@ pub fn build_flow_setup_execution_context(
241235
&import_op.name,
242236
output_type,
243237
source_states_by_name.get(&import_op.name.as_str()),
244-
&mut setup_state.metadata,
238+
&mut metadata,
245239
)
246240
})
247241
.collect::<Result<Vec<_>>>()?;
@@ -253,7 +247,8 @@ pub fn build_flow_setup_execution_context(
253247
let target_id = build_target_id(
254248
analyzed_target_ss,
255249
&target_states_by_name_type,
256-
&mut setup_state,
250+
&mut metadata,
251+
&mut target_states,
257252
)?;
258253
Ok(ExportOpExecutionContext { target_id })
259254
})
@@ -263,10 +258,40 @@ pub fn build_flow_setup_execution_context(
263258
build_target_id(
264259
analyzed_target_ss,
265260
&target_states_by_name_type,
266-
&mut setup_state,
261+
&mut metadata,
262+
&mut target_states,
267263
)?;
268264
}
269265

266+
let setup_state = setup::FlowSetupState::<setup::DesiredMode> {
267+
seen_flow_metadata_version: existing_flow_ss
268+
.and_then(|flow_ss| flow_ss.seen_flow_metadata_version),
269+
tracking_table: db_tracking_setup::TrackingTableSetupState {
270+
table_name: existing_flow_ss
271+
.and_then(|flow_ss| {
272+
flow_ss
273+
.tracking_table
274+
.current
275+
.as_ref()
276+
.map(|v| v.table_name.clone())
277+
})
278+
.unwrap_or_else(|| db_tracking_setup::default_tracking_table_name(&flow_inst.name)),
279+
version_id: db_tracking_setup::CURRENT_TRACKING_TABLE_VERSION,
280+
source_state_table_name: metadata
281+
.features
282+
.contains(setup::flow_features::SOURCE_STATE_TABLE)
283+
.then(|| {
284+
existing_flow_ss
285+
.and_then(|flow_ss| flow_ss.tracking_table.current.as_ref())
286+
.and_then(|v| v.source_state_table_name.clone())
287+
.unwrap_or_else(|| {
288+
db_tracking_setup::default_source_state_table_name(&flow_inst.name)
289+
})
290+
}),
291+
},
292+
targets: target_states,
293+
metadata,
294+
};
270295
Ok(FlowSetupExecutionContext {
271296
setup_state,
272297
import_ops: import_op_exec_ctx,

src/execution/db_tracking_setup.rs

Lines changed: 106 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ pub fn default_tracking_table_name(flow_name: &str) -> String {
1111
)
1212
}
1313

14+
pub fn default_source_state_table_name(flow_name: &str) -> String {
15+
format!(
16+
"{}__cocoindex_srcstate",
17+
utils::db::sanitize_identifier(flow_name)
18+
)
19+
}
20+
1421
pub const CURRENT_TRACKING_TABLE_VERSION: i32 = 1;
1522

1623
async fn upgrade_tracking_table(
@@ -46,19 +53,38 @@ async fn upgrade_tracking_table(
4653
Ok(())
4754
}
4855

56+
async fn create_source_state_table(pool: &PgPool, table_name: &str) -> Result<()> {
57+
let query = format!(
58+
"CREATE TABLE IF NOT EXISTS {table_name} (
59+
source_id INTEGER NOT NULL,
60+
key JSONB NOT NULL,
61+
value JSONB NOT NULL,
62+
63+
PRIMARY KEY (source_id, key)
64+
)"
65+
);
66+
sqlx::query(&query).execute(pool).await?;
67+
Ok(())
68+
}
69+
4970
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
5071
pub struct TrackingTableSetupState {
5172
pub table_name: String,
5273
pub version_id: i32,
74+
#[serde(default)]
75+
pub source_state_table_name: Option<String>,
5376
}
5477

5578
#[derive(Debug)]
5679
pub struct TrackingTableSetupStatus {
5780
pub desired_state: Option<TrackingTableSetupState>,
5881

59-
pub legacy_table_names: Vec<String>,
60-
6182
pub min_existing_version_id: Option<i32>,
83+
pub legacy_tracking_table_names: BTreeSet<String>,
84+
85+
pub source_state_table_always_exists: bool,
86+
pub legacy_source_state_table_names: BTreeSet<String>,
87+
6288
pub source_ids_to_delete: Vec<i32>,
6389
}
6490

@@ -68,19 +94,29 @@ impl TrackingTableSetupStatus {
6894
existing: &CombinedState<TrackingTableSetupState>,
6995
source_ids_to_delete: Vec<i32>,
7096
) -> Option<Self> {
71-
let legacy_table_names = existing
97+
let legacy_tracking_table_names = existing
7298
.legacy_values(desired, |v| &v.table_name)
7399
.into_iter()
74100
.cloned()
75-
.collect();
101+
.collect::<BTreeSet<_>>();
102+
let legacy_source_state_table_names = existing
103+
.legacy_values(desired, |v| &v.source_state_table_name)
104+
.into_iter()
105+
.filter_map(|v| v.clone())
106+
.collect::<BTreeSet<_>>();
76107
let min_existing_version_id = existing
77108
.always_exists()
78109
.then(|| existing.possible_versions().map(|v| v.version_id).min())
79110
.flatten();
80111
if desired.is_some() || min_existing_version_id.is_some() {
81112
Some(Self {
82113
desired_state: desired.cloned(),
83-
legacy_table_names,
114+
legacy_tracking_table_names,
115+
source_state_table_always_exists: existing.always_exists()
116+
&& existing
117+
.possible_versions()
118+
.all(|v| v.source_state_table_name.is_some()),
119+
legacy_source_state_table_names,
84120
min_existing_version_id,
85121
source_ids_to_delete,
86122
})
@@ -95,7 +131,7 @@ impl TrackingTableSetupStatus {
95131
ResourceSetupInfo {
96132
key: (),
97133
state: self.desired_state.clone(),
98-
description: "Tracking Table".to_string(),
134+
description: "Internal Storage for Tracking".to_string(),
99135
setup_status: Some(self),
100136
legacy_key: None,
101137
}
@@ -105,10 +141,10 @@ impl TrackingTableSetupStatus {
105141
impl ResourceSetupStatus for TrackingTableSetupStatus {
106142
fn describe_changes(&self) -> Vec<setup::ChangeDescription> {
107143
let mut changes: Vec<setup::ChangeDescription> = vec![];
108-
if self.desired_state.is_some() && !self.legacy_table_names.is_empty() {
144+
if self.desired_state.is_some() && !self.legacy_tracking_table_names.is_empty() {
109145
changes.push(setup::ChangeDescription::Action(format!(
110146
"Rename legacy tracking tables: {}. ",
111-
self.legacy_table_names.join(", ")
147+
self.legacy_tracking_table_names.iter().join(", ")
112148
)));
113149
}
114150
match (self.min_existing_version_id, &self.desired_state) {
@@ -127,10 +163,37 @@ impl ResourceSetupStatus for TrackingTableSetupStatus {
127163
}
128164
(Some(_), None) => changes.push(setup::ChangeDescription::Action(format!(
129165
"Drop existing tracking table: {}. ",
130-
self.legacy_table_names.join(", ")
166+
self.legacy_tracking_table_names.iter().join(", ")
131167
))),
132168
(None, None) => (),
133169
}
170+
171+
let source_state_table_name = self
172+
.desired_state
173+
.as_ref()
174+
.and_then(|v| v.source_state_table_name.as_ref());
175+
if let Some(source_state_table_name) = source_state_table_name {
176+
if !self.legacy_source_state_table_names.is_empty() {
177+
changes.push(setup::ChangeDescription::Action(format!(
178+
"Rename legacy source state tables: {}. ",
179+
self.legacy_source_state_table_names.iter().join(", ")
180+
)));
181+
}
182+
if !self.source_state_table_always_exists {
183+
changes.push(setup::ChangeDescription::Action(format!(
184+
"Create the source state table: {}. ",
185+
source_state_table_name
186+
)));
187+
}
188+
} else if !self.source_state_table_always_exists
189+
&& !self.legacy_source_state_table_names.is_empty()
190+
{
191+
changes.push(setup::ChangeDescription::Action(format!(
192+
"Drop existing source state table: {}. ",
193+
self.legacy_source_state_table_names.iter().join(", ")
194+
)));
195+
}
196+
134197
if !self.source_ids_to_delete.is_empty() {
135198
changes.push(setup::ChangeDescription::Action(format!(
136199
"Delete source IDs: {}. ",
@@ -145,12 +208,21 @@ impl ResourceSetupStatus for TrackingTableSetupStatus {
145208
}
146209

147210
fn change_type(&self) -> SetupChangeType {
211+
let source_state_table_up_to_date = self.legacy_source_state_table_names.is_empty()
212+
&& (self.source_state_table_always_exists
213+
|| self
214+
.desired_state
215+
.as_ref()
216+
.map_or(true, |v| v.source_state_table_name.is_none()));
148217
match (self.min_existing_version_id, &self.desired_state) {
149218
(None, Some(_)) => SetupChangeType::Create,
150219
(Some(min_version_id), Some(desired)) => {
151-
if min_version_id == desired.version_id && self.legacy_table_names.is_empty() {
220+
if min_version_id == desired.version_id
221+
&& self.legacy_tracking_table_names.is_empty()
222+
&& source_state_table_up_to_date
223+
{
152224
SetupChangeType::NoChange
153-
} else if min_version_id < desired.version_id {
225+
} else if min_version_id < desired.version_id || !source_state_table_up_to_date {
154226
SetupChangeType::Update
155227
} else {
156228
SetupChangeType::Invalid
@@ -167,7 +239,7 @@ impl TrackingTableSetupStatus {
167239
let lib_context = get_lib_context()?;
168240
let pool = lib_context.require_builtin_db_pool()?;
169241
if let Some(desired) = &self.desired_state {
170-
for lagacy_name in self.legacy_table_names.iter() {
242+
for lagacy_name in self.legacy_tracking_table_names.iter() {
171243
let query = format!(
172244
"ALTER TABLE IF EXISTS {} RENAME TO {}",
173245
lagacy_name, desired.table_name
@@ -185,11 +257,31 @@ impl TrackingTableSetupStatus {
185257
.await?;
186258
}
187259
} else {
188-
for lagacy_name in self.legacy_table_names.iter() {
260+
for lagacy_name in self.legacy_tracking_table_names.iter() {
261+
let query = format!("DROP TABLE IF EXISTS {lagacy_name}");
262+
sqlx::query(&query).execute(pool).await?;
263+
}
264+
}
265+
266+
let source_state_table_name = self
267+
.desired_state
268+
.as_ref()
269+
.and_then(|v| v.source_state_table_name.as_ref());
270+
if let Some(source_state_table_name) = source_state_table_name {
271+
for lagacy_name in self.legacy_source_state_table_names.iter() {
272+
let query = format!(
273+
"ALTER TABLE IF EXISTS {lagacy_name} RENAME TO {source_state_table_name}"
274+
);
275+
sqlx::query(&query).execute(pool).await?;
276+
}
277+
if !self.source_state_table_always_exists {
278+
create_source_state_table(pool, source_state_table_name).await?;
279+
}
280+
} else {
281+
for lagacy_name in self.legacy_source_state_table_names.iter() {
189282
let query = format!("DROP TABLE IF EXISTS {lagacy_name}");
190283
sqlx::query(&query).execute(pool).await?;
191284
}
192-
return Ok(());
193285
}
194286
Ok(())
195287
}

src/setup/flow_features.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
use crate::prelude::*;
2+
3+
pub const SOURCE_STATE_TABLE: &str = "source_state_table";
4+
5+
pub fn default_features() -> BTreeSet<String> {
6+
vec![SOURCE_STATE_TABLE.to_string()].into_iter().collect()
7+
}

src/setup/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ mod driver;
44
mod states;
55

66
pub mod components;
7+
pub mod flow_features;
78

89
pub use auth_registry::AuthRegistry;
910
pub use driver::*;

src/setup/states.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ pub struct FlowSetupMetadata {
190190
pub last_source_id: i32,
191191
pub last_target_id: i32,
192192
pub sources: BTreeMap<String, SourceSetupState>,
193+
#[serde(default)]
194+
pub features: BTreeSet<String>,
193195
}
194196

195197
#[derive(Debug, Clone)]

0 commit comments

Comments
 (0)