Skip to content

Commit 02a5172

Browse files
authored
chore: improve logging when a table sync worker is in error (#366)
1 parent defdb0b commit 02a5172

File tree

3 files changed

+37
-31
lines changed

3 files changed

+37
-31
lines changed

etl/src/replication/common.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,18 @@ use crate::error::EtlResult;
55
use crate::state::table::TableReplicationPhase;
66
use crate::store::state::StateStore;
77

8-
/// Returns the table replication states that are either done or in active state.
8+
/// Returns the table replication states that are not yet done.
99
///
1010
/// A table is considered in done state when the apply worker doesn't need to start/restart a table
1111
/// sync worker to make that table progress.
12-
pub async fn get_table_replication_states<S>(
12+
pub async fn get_active_table_replication_states<S>(
1313
state_store: &S,
14-
done: bool,
1514
) -> EtlResult<HashMap<TableId, TableReplicationPhase>>
1615
where
1716
S: StateStore + Clone + Send + Sync + 'static,
1817
{
1918
let mut table_replication_states = state_store.get_table_replication_states().await?;
20-
table_replication_states.retain(|_table_id, state| match done {
21-
true => state.as_type().is_done(),
22-
false => !state.as_type().is_done(),
23-
});
19+
table_replication_states.retain(|_table_id, state| !state.as_type().is_done());
2420

2521
Ok(table_replication_states)
2622
}

etl/src/state/table.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,11 @@ impl TableReplicationPhaseType {
320320
}
321321
}
322322

323+
/// Return `true` if a table with this phase is in error, `false` otherwise.
324+
pub fn is_errored(&self) -> bool {
325+
matches!(self, Self::Errored)
326+
}
327+
323328
pub fn as_static_str(&self) -> &'static str {
324329
match self {
325330
TableReplicationPhaseType::Init => "init",

etl/src/workers/apply.rs

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::sync::Arc;
44
use tokio::sync::Semaphore;
55
use tokio::task::JoinHandle;
66
use tokio_postgres::types::PgLsn;
7+
use tracing::warn;
78
use tracing::{Instrument, debug, error, info};
89

910
use crate::concurrency::shutdown::ShutdownRx;
@@ -14,7 +15,7 @@ use crate::error::{ErrorKind, EtlError, EtlResult};
1415
use crate::etl_error;
1516
use crate::replication::apply::{ApplyLoopAction, ApplyLoopHook, start_apply_loop};
1617
use crate::replication::client::PgReplicationClient;
17-
use crate::replication::common::get_table_replication_states;
18+
use crate::replication::common::get_active_table_replication_states;
1819
use crate::state::table::{
1920
TableReplicationError, TableReplicationPhase, TableReplicationPhaseType,
2021
};
@@ -387,30 +388,34 @@ where
387388
async fn before_loop(&self, _start_lsn: PgLsn) -> EtlResult<ApplyLoopAction> {
388389
info!("starting table sync workers before the main apply loop");
389390

390-
let active_table_replication_states =
391-
get_table_replication_states(&self.store, false).await?;
392-
393-
for (table_id, table_replication_phase) in active_table_replication_states {
394-
// A table in `SyncDone` doesn't need to have its worker started, since the main apply
395-
// worker will move it into `Ready` state automatically once the condition is met.
396-
if let TableReplicationPhaseType::SyncDone = table_replication_phase.as_type() {
397-
continue;
398-
}
391+
for (table_id, table_replication_phase) in self.store.get_table_replication_states().await?
392+
{
393+
if !table_replication_phase.as_type().is_done() {
394+
// A table in `SyncDone` doesn't need to have its worker started, since the main apply
395+
// worker will move it into `Ready` state automatically once the condition is met.
396+
if let TableReplicationPhaseType::SyncDone = table_replication_phase.as_type() {
397+
continue;
398+
}
399399

400-
// If there is already an active worker for this table in the pool, we can avoid starting
401-
// it.
402-
let mut pool = self.pool.lock().await;
403-
if pool.get_active_worker_state(table_id).is_some() {
404-
continue;
405-
}
400+
// If there is already an active worker for this table in the pool, we can avoid starting
401+
// it.
402+
let mut pool = self.pool.lock().await;
403+
if pool.get_active_worker_state(table_id).is_some() {
404+
continue;
405+
}
406406

407-
// If we fail, we just show an error, and hopefully we will succeed when starting it
408-
// during syncing tables.
409-
let table_sync_worker = self.build_table_sync_worker(table_id).await;
410-
if let Err(err) = pool.start_worker(table_sync_worker).await {
411-
error!(
412-
"error starting table sync worker for table {} during initialization: {}",
413-
table_id, err
407+
// If we fail, we just show an error, and hopefully we will succeed when starting it
408+
// during syncing tables.
409+
let table_sync_worker = self.build_table_sync_worker(table_id).await;
410+
if let Err(err) = pool.start_worker(table_sync_worker).await {
411+
error!(
412+
"error starting table sync worker for table {} during initialization: {}",
413+
table_id, err
414+
);
415+
}
416+
} else if table_replication_phase.as_type().is_errored() {
417+
warn!(
418+
"table sync worker for table {table_id} won't run because it is in an errored state."
414419
);
415420
}
416421
}
@@ -429,7 +434,7 @@ where
429434
update_state: bool,
430435
) -> EtlResult<ApplyLoopAction> {
431436
let active_table_replication_states =
432-
get_table_replication_states(&self.store, false).await?;
437+
get_active_table_replication_states(&self.store).await?;
433438
debug!(
434439
"processing syncing tables for apply worker with lsn {}",
435440
current_lsn

0 commit comments

Comments
 (0)