Skip to content

Commit e194841

Browse files
authored
feat(publications): Handle tables removed from the publication (#300)
1 parent af649bc commit e194841

File tree

16 files changed

+585
-43
lines changed

16 files changed

+585
-43
lines changed

AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
- Link types and methods as [`Type`], [`Type::method`].
2020
- Keep wording concise, correct, and punctuated; reword for clarity while preserving intent.
2121
- No code examples; include private helpers for maintainers; apply to modules, types, traits, impls, and functions.
22+
- Normal comments, should always finish with `.`.
2223

2324
## Rust Tests Execution
2425
- If output shows "0 passed; 0 failed; 0 ignored; n filtered out", tests did not run; treat as failure.

etl-api/src/db/pipelines.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,11 +220,15 @@ pub async fn delete_pipeline_cascading(
220220

221221
// Delete state, schema, and table mappings from the source database, only if ETL tables exist.
222222
if etl_present {
223-
let _ =
224-
state::delete_pipeline_replication_state(source_txn.deref_mut(), pipeline.id).await?;
225-
let _ = schema::delete_pipeline_table_schemas(source_txn.deref_mut(), pipeline.id).await?;
226-
let _ = table_mappings::delete_pipeline_table_mappings(source_txn.deref_mut(), pipeline.id)
223+
let _ = state::delete_replication_state_for_all_tables(source_txn.deref_mut(), pipeline.id)
227224
.await?;
225+
let _ = schema::delete_table_schemas_for_all_tables(source_txn.deref_mut(), pipeline.id)
226+
.await?;
227+
let _ = table_mappings::delete_table_mappings_for_all_tables(
228+
source_txn.deref_mut(),
229+
pipeline.id,
230+
)
231+
.await?;
228232
}
229233

230234
// Here we finish `txn` before `source_txn` since we want the guarantee that the pipeline has

etl-postgres/src/replication/schema.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use sqlx::postgres::PgRow;
2-
use sqlx::postgres::types::Oid;
2+
use sqlx::postgres::types::Oid as SqlxTableId;
33
use sqlx::{PgExecutor, PgPool, Row};
44
use std::collections::HashMap;
55
use tokio_postgres::types::Type as PgType;
@@ -232,7 +232,7 @@ pub async fn load_table_schemas(
232232
let mut table_schemas = HashMap::new();
233233

234234
for row in rows {
235-
let table_oid: Oid = row.get("table_id");
235+
let table_oid: SqlxTableId = row.get("table_id");
236236
let table_id = TableId::new(table_oid.0);
237237
let schema_name: String = row.get("schema_name");
238238
let table_name: String = row.get("table_name");
@@ -251,7 +251,7 @@ pub async fn load_table_schemas(
251251
///
252252
/// Removes all table schema records and associated columns for the specified
253253
/// pipeline, using CASCADE delete for automatic cleanup of related column records.
254-
pub async fn delete_pipeline_table_schemas<'c, E>(
254+
pub async fn delete_table_schemas_for_all_tables<'c, E>(
255255
executor: E,
256256
pipeline_id: i64,
257257
) -> Result<u64, sqlx::Error>
@@ -271,6 +271,31 @@ where
271271
Ok(result.rows_affected())
272272
}
273273

274+
/// Deletes the table schema for a specific table in a pipeline.
275+
///
276+
/// Also cascades to `etl.table_columns` via FK.
277+
pub async fn delete_table_schema_for_table<'c, E>(
278+
executor: E,
279+
pipeline_id: i64,
280+
table_id: TableId,
281+
) -> Result<u64, sqlx::Error>
282+
where
283+
E: PgExecutor<'c>,
284+
{
285+
let result = sqlx::query(
286+
r#"
287+
delete from etl.table_schemas
288+
where pipeline_id = $1 and table_id = $2
289+
"#,
290+
)
291+
.bind(pipeline_id)
292+
.bind(SqlxTableId(table_id.into_inner()))
293+
.execute(executor)
294+
.await?;
295+
296+
Ok(result.rows_affected())
297+
}
298+
274299
/// Builds a [`ColumnSchema`] from a database row.
275300
///
276301
/// Assumes all required fields are present in the row after appropriate joins.

etl-postgres/src/replication/state.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ pub async fn reset_replication_state(
328328
///
329329
/// Removes all replication state records including historical entries
330330
/// for the specified pipeline. Used during pipeline cleanup.
331-
pub async fn delete_pipeline_replication_state<'c, E>(
331+
pub async fn delete_replication_state_for_all_tables<'c, E>(
332332
executor: E,
333333
pipeline_id: i64,
334334
) -> sqlx::Result<u64>
@@ -348,6 +348,29 @@ where
348348
Ok(result.rows_affected())
349349
}
350350

351+
/// Deletes all replication state entries for a specific table in a pipeline.
352+
pub async fn delete_replication_state_for_table<'c, E>(
353+
executor: E,
354+
pipeline_id: i64,
355+
table_id: TableId,
356+
) -> sqlx::Result<u64>
357+
where
358+
E: PgExecutor<'c>,
359+
{
360+
let result = sqlx::query(
361+
r#"
362+
delete from etl.replication_state
363+
where pipeline_id = $1 and table_id = $2
364+
"#,
365+
)
366+
.bind(pipeline_id)
367+
.bind(SqlxTableId(table_id.into_inner()))
368+
.execute(executor)
369+
.await?;
370+
371+
Ok(result.rows_affected())
372+
}
373+
351374
/// Gets all table IDs that have replication state for a given pipeline.
352375
///
353376
/// Returns a vector of table IDs that are currently being replicated for the specified pipeline.

etl-postgres/src/replication/table_mappings.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ pub async fn load_table_mappings(
6565
///
6666
/// Removes all table mapping records for the specified pipeline.
6767
/// Used during pipeline cleanup.
68-
pub async fn delete_pipeline_table_mappings<'c, E>(
68+
pub async fn delete_table_mappings_for_all_tables<'c, E>(
6969
executor: E,
7070
pipeline_id: i64,
7171
) -> Result<u64, sqlx::Error>
@@ -84,3 +84,26 @@ where
8484

8585
Ok(result.rows_affected())
8686
}
87+
88+
/// Deletes a single table mapping for a given pipeline and source table id.
89+
pub async fn delete_table_mappings_for_table<'c, E>(
90+
executor: E,
91+
pipeline_id: i64,
92+
source_table_id: &TableId,
93+
) -> Result<u64, sqlx::Error>
94+
where
95+
E: PgExecutor<'c>,
96+
{
97+
let result = sqlx::query(
98+
r#"
99+
delete from etl.table_mappings
100+
where pipeline_id = $1 and source_table_id = $2
101+
"#,
102+
)
103+
.bind(pipeline_id)
104+
.bind(SqlxTableId(source_table_id.into_inner()))
105+
.execute(executor)
106+
.await?;
107+
108+
Ok(result.rows_affected())
109+
}

etl-replicator/src/core.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use crate::migrations::migrate_state_store;
12
use etl::destination::Destination;
23
use etl::destination::memory::MemoryDestination;
34
use etl::pipeline::Pipeline;
45
use etl::store::both::postgres::PostgresStore;
6+
use etl::store::cleanup::CleanupStore;
57
use etl::store::schema::SchemaStore;
68
use etl::store::state::StateStore;
79
use etl::types::PipelineId;
@@ -13,8 +15,6 @@ use secrecy::ExposeSecret;
1315
use tokio::signal::unix::{SignalKind, signal};
1416
use tracing::{debug, info, warn};
1517

16-
use crate::migrations::migrate_state_store;
17-
1818
/// Starts the replicator service with the provided configuration.
1919
///
2020
/// Initializes the state store, creates the appropriate destination based on
@@ -138,7 +138,7 @@ fn log_batch_config(config: &BatchConfig) {
138138
async fn init_store(
139139
pipeline_id: PipelineId,
140140
pg_connection_config: PgConnectionConfig,
141-
) -> anyhow::Result<impl StateStore + SchemaStore + Clone> {
141+
) -> anyhow::Result<impl StateStore + SchemaStore + CleanupStore + Clone> {
142142
migrate_state_store(&pg_connection_config).await?;
143143

144144
Ok(PostgresStore::new(pipeline_id, pg_connection_config))
@@ -152,7 +152,7 @@ async fn init_store(
152152
#[tracing::instrument(skip(pipeline), fields(pipeline_id = pipeline.id()))]
153153
async fn start_pipeline<S, D>(mut pipeline: Pipeline<S, D>) -> anyhow::Result<()>
154154
where
155-
S: StateStore + SchemaStore + Clone + Send + Sync + 'static,
155+
S: StateStore + SchemaStore + CleanupStore + Clone + Send + Sync + 'static,
156156
D: Destination + Clone + Send + Sync + 'static,
157157
{
158158
// Start the pipeline.

etl/src/failpoints.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::bail;
99
use crate::error::{ErrorKind, EtlError, EtlResult};
1010

1111
pub const START_TABLE_SYNC__BEFORE_DATA_SYNC_SLOT_CREATION: &str =
12-
"start_table_sync.befor_data_sync_slot_creation";
12+
"start_table_sync.before_data_sync_slot_creation";
1313
pub const START_TABLE_SYNC__DURING_DATA_SYNC: &str = "start_table_sync.during_data_sync";
1414

1515
/// Executes a configurable failpoint for testing error scenarios.

etl/src/pipeline.rs

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,26 @@
33
//! Contains the main [`Pipeline`] struct that coordinates Postgres logical replication
44
//! with destination systems. Manages worker lifecycles, shutdown coordination, and error handling.
55
6-
use etl_config::shared::PipelineConfig;
7-
use std::sync::Arc;
8-
use tokio::sync::Semaphore;
9-
use tracing::{error, info};
10-
116
use crate::bail;
127
use crate::concurrency::shutdown::{ShutdownTx, create_shutdown_channel};
138
use crate::destination::Destination;
149
use crate::error::{ErrorKind, EtlError, EtlResult};
1510
use crate::metrics::register_metrics;
1611
use crate::replication::client::PgReplicationClient;
1712
use crate::state::table::TableReplicationPhase;
13+
use crate::store::cleanup::CleanupStore;
1814
use crate::store::schema::SchemaStore;
1915
use crate::store::state::StateStore;
2016
use crate::types::PipelineId;
2117
use crate::workers::apply::{ApplyWorker, ApplyWorkerHandle};
2218
use crate::workers::base::{Worker, WorkerHandle};
2319
use crate::workers::pool::TableSyncWorkerPool;
20+
use etl_config::shared::PipelineConfig;
21+
use etl_postgres::types::TableId;
22+
use std::collections::HashSet;
23+
use std::sync::Arc;
24+
use tokio::sync::Semaphore;
25+
use tracing::{error, info};
2426

2527
/// Internal state tracking for pipeline lifecycle.
2628
///
@@ -62,7 +64,7 @@ pub struct Pipeline<S, D> {
6264

6365
impl<S, D> Pipeline<S, D>
6466
where
65-
S: StateStore + SchemaStore + Clone + Send + Sync + 'static,
67+
S: StateStore + SchemaStore + CleanupStore + Clone + Send + Sync + 'static,
6668
D: Destination + Clone + Send + Sync + 'static,
6769
{
6870
/// Creates a new pipeline with the given configuration.
@@ -252,11 +254,17 @@ where
252254
self.wait().await
253255
}
254256

255-
/// Initializes table replication states for all tables in the publication.
257+
/// Initializes table replication states for tables in the publication and
258+
/// purges state for tables removed from it.
259+
///
260+
/// Ensures each table currently in the Postgres publication has a
261+
/// corresponding replication state; tables without existing states are
262+
/// initialized to [`TableReplicationPhase::Init`].
256263
///
257-
/// This private method ensures that each table in the Postgres publication has
258-
/// a corresponding replication state record. Tables without existing states are
259-
/// initialized to the [`TableReplicationPhase::Init`] phase.
264+
/// Also detects tables for which we have stored state but are no longer
265+
/// part of the publication, and deletes their stored state (replication
266+
/// state, table mappings, and table schemas) without touching the actual
267+
/// destination tables.
260268
async fn initialize_table_states(
261269
&self,
262270
replication_client: &PgReplicationClient,
@@ -281,26 +289,43 @@ where
281289
);
282290
}
283291

284-
let table_ids = replication_client
292+
let publication_table_ids = replication_client
285293
.get_publication_table_ids(&self.config.publication_name)
286294
.await?;
287295

288296
info!(
289297
"the publication '{}' contains {} tables",
290298
self.config.publication_name,
291-
table_ids.len()
299+
publication_table_ids.len()
292300
);
293301

294302
self.store.load_table_replication_states().await?;
295-
let states = self.store.get_table_replication_states().await?;
296-
for table_id in table_ids {
297-
if !states.contains_key(&table_id) {
303+
let table_replication_states = self.store.get_table_replication_states().await?;
304+
305+
// Initialize states for newly added tables in the publication
306+
for table_id in &publication_table_ids {
307+
if !table_replication_states.contains_key(table_id) {
298308
self.store
299-
.update_table_replication_state(table_id, TableReplicationPhase::Init)
309+
.update_table_replication_state(*table_id, TableReplicationPhase::Init)
300310
.await?;
301311
}
302312
}
303313

314+
// Detect and purge tables that have been removed from the publication.
315+
//
316+
// We must not delete the destination table, only the internal state.
317+
let publication_set: HashSet<TableId> = publication_table_ids.iter().copied().collect();
318+
for (table_id, _) in table_replication_states {
319+
if !publication_set.contains(&table_id) {
320+
info!(
321+
"table {} removed from publication, purging stored state",
322+
table_id
323+
);
324+
325+
self.store.cleanup_table_state(table_id).await?;
326+
}
327+
}
328+
304329
Ok(())
305330
}
306331
}

etl/src/store/both/memory.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use tokio::sync::Mutex;
66
use crate::error::{ErrorKind, EtlError, EtlResult};
77
use crate::etl_error;
88
use crate::state::table::TableReplicationPhase;
9+
use crate::store::cleanup::CleanupStore;
910
use crate::store::schema::SchemaStore;
1011
use crate::store::state::StateStore;
1112

@@ -198,3 +199,16 @@ impl SchemaStore for MemoryStore {
198199
Ok(())
199200
}
200201
}
202+
203+
impl CleanupStore for MemoryStore {
204+
async fn cleanup_table_state(&self, table_id: TableId) -> EtlResult<()> {
205+
let mut inner = self.inner.lock().await;
206+
207+
inner.table_replication_states.remove(&table_id);
208+
inner.table_state_history.remove(&table_id);
209+
inner.table_schemas.remove(&table_id);
210+
inner.table_mappings.remove(&table_id);
211+
212+
Ok(())
213+
}
214+
}

0 commit comments

Comments
 (0)