Skip to content

Commit af649bc

Browse files
authored
fix(state): Reset slot and table also on Init (#299)
1 parent 70337ce commit af649bc

File tree

2 files changed

+25
-19
lines changed

2 files changed

+25
-19
lines changed

etl/src/destination/base.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ pub trait Destination {
1919
/// This operation is called during initial table synchronization to ensure the
2020
/// destination table starts from a clean state before bulk loading. The operation
2121
/// should be atomic and handle cases where the table may not exist.
22+
///
23+
/// The implementation should assume that when truncation is called, the table might not be
24+
/// present since truncation could be called after a failure that happened before the table copy
25+
/// was started.
2226
fn truncate_table(&self, table_id: TableId) -> impl Future<Output = EtlResult<()>> + Send;
2327

2428
/// Writes a batch of table rows to the destination.

etl/src/replication/table_sync.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -133,28 +133,30 @@ where
133133
// In case the phase is any other phase, we will return an error.
134134
let start_lsn = match phase_type {
135135
TableReplicationPhaseType::Init | TableReplicationPhaseType::DataSync => {
136-
if phase_type == TableReplicationPhaseType::DataSync {
137-
// If we are in `DataSync` it means we failed during table copying, so we want to delete the
138-
// existing slot before continuing.
139-
if let Err(err) = replication_client.delete_slot(&slot_name).await {
140-
// If the slot is not found, we are safe to continue, for any other error, we bail.
141-
if err.kind() != ErrorKind::ReplicationSlotNotFound {
142-
return Err(err);
143-
}
136+
// When we are in these states, it could be for the following reasons:
137+
// - `Init` -> we can be in this state because we just started replicating the table or the state
138+
// was reset. In this case we don't want to make assumptions about the previous state, so we
139+
// just try to delete the slot and truncate the table.
140+
// - `DataSync` -> we can be in this state because we failed during data sync, meaning that table
141+
// copy failed. In this case, we want to delete the slot and truncate the table.
142+
if let Err(err) = replication_client.delete_slot(&slot_name).await {
143+
// If the slot is not found, we are safe to continue, for any other error, we bail.
144+
if err.kind() != ErrorKind::ReplicationSlotNotFound {
145+
return Err(err);
144146
}
145-
146-
// We must truncate the destination table before starting a copy to avoid data inconsistencies.
147-
// Example scenario:
148-
// 1. The source table has a single row (id = 1) that is copied to the destination during the initial copy.
149-
// 2. Before the table’s phase is set to `FinishedCopy`, the process crashes.
150-
// 3. While down, the source deletes row id = 1 and inserts row id = 2.
151-
// 4. When restarted, the process sees the table in the ` DataSync ` state, deletes the slot, and copies again.
152-
// 5. This time, only row id = 2 is copied, but row id = 1 still exists in the destination.
153-
// Result: the destination has two rows (id = 1 and id = 2) instead of only one (id = 2).
154-
// Fix: Always truncate the destination table before starting a copy.
155-
destination.truncate_table(table_id).await?;
156147
}
157148

149+
// We must truncate the destination table before starting a copy to avoid data inconsistencies.
150+
// Example scenario:
151+
// 1. The source table has a single row (id = 1) that is copied to the destination during the initial copy.
152+
// 2. Before the table’s phase is set to `FinishedCopy`, the process crashes.
153+
// 3. While down, the source deletes row id = 1 and inserts row id = 2.
154+
// 4. When restarted, the process sees the table in the ` DataSync ` state, deletes the slot, and copies again.
155+
// 5. This time, only row id = 2 is copied, but row id = 1 still exists in the destination.
156+
// Result: the destination has two rows (id = 1 and id = 2) instead of only one (id = 2).
157+
// Fix: Always truncate the destination table before starting a copy.
158+
destination.truncate_table(table_id).await?;
159+
158160
// We are ready to start copying table data, and we update the state accordingly.
159161
info!("starting data copy for table {}", table_id);
160162
{

0 commit comments

Comments
 (0)