Skip to content

Commit 66f9c7f

Browse files
authored
feat(state): Delete previous state on rollback (#303)
1 parent e735f7f commit 66f9c7f

File tree

2 files changed

+31
-3
lines changed

2 files changed

+31
-3
lines changed

etl-postgres/src/replication/state.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,12 @@ pub async fn rollback_replication_state(
232232
.await?;
233233

234234
if let Some((current_id, Some(prev_id))) = current_row {
235-
// Set current row to not current
235+
// Delete the row we are rolling back from to avoid buildup. Technically, we could keep the
236+
// previous row for tracking purposes, but especially during timed retries, we might end up
237+
// with an infinite growth of the database.
236238
sqlx::query(
237239
r#"
238-
update etl.replication_state
239-
set is_current = false, updated_at = now()
240+
delete from etl.replication_state
240241
where id = $1
241242
"#,
242243
)

etl/tests/integration/postgres_store.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ use etl::store::cleanup::CleanupStore;
44
use etl::store::schema::SchemaStore;
55
use etl::store::state::StateStore;
66
use etl::test_utils::database::spawn_source_database_for_store;
7+
use etl_postgres::replication::connect_to_source_database;
78
use etl_postgres::types::{ColumnSchema, TableId, TableName, TableSchema};
89
use etl_telemetry::tracing::init_test_tracing;
10+
use sqlx::postgres::types::Oid as SqlxTableId;
911
use tokio_postgres::types::{PgLsn, Type as PgType};
1012

1113
fn create_sample_table_schema() -> TableSchema {
@@ -128,6 +130,20 @@ async fn test_state_store_rollback() {
128130
.await
129131
.unwrap();
130132

133+
// Verify two rows exist before rollback (init + data_sync)
134+
let pool = connect_to_source_database(&database.config, 1, 1)
135+
.await
136+
.expect("Failed to connect to source database with sqlx");
137+
let count_before: i64 = sqlx::query_scalar(
138+
"select count(*) from etl.replication_state where pipeline_id = $1 and table_id = $2",
139+
)
140+
.bind(pipeline_id as i64)
141+
.bind(SqlxTableId(table_id.into_inner()))
142+
.fetch_one(&pool)
143+
.await
144+
.unwrap();
145+
assert_eq!(count_before, 2);
146+
131147
// Verify current state
132148
let state = store.get_table_replication_state(table_id).await.unwrap();
133149
assert_eq!(state, Some(data_sync_phase));
@@ -143,6 +159,17 @@ async fn test_state_store_rollback() {
143159
let state = store.get_table_replication_state(table_id).await.unwrap();
144160
assert_eq!(state, Some(init_phase));
145161

162+
// Verify the rolled-from row was deleted to avoid buildup
163+
let count_after: i64 = sqlx::query_scalar(
164+
"select count(*) from etl.replication_state where pipeline_id = $1 and table_id = $2",
165+
)
166+
.bind(pipeline_id as i64)
167+
.bind(SqlxTableId(table_id.into_inner()))
168+
.fetch_one(&pool)
169+
.await
170+
.unwrap();
171+
assert_eq!(count_after, 1);
172+
146173
// Test rollback when there's no previous state
147174
let result = store.rollback_table_replication_state(table_id).await;
148175
assert!(result.is_err());

0 commit comments

Comments
 (0)