Skip to content

Commit 4a965b2

Browse files
authored
feat(api): Expose replication lag metrics from source Postgres (#374)
1 parent 17547bb commit 4a965b2

File tree

15 files changed

+421
-148
lines changed

15 files changed

+421
-148
lines changed

etl-api/src/db/pipelines.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pub enum PipelinesDbError {
4747
DestinationsDb(#[from] DestinationsDbError),
4848

4949
#[error("Slot operation failed: {0}")]
50-
SlotError(#[from] slots::SlotError),
50+
SlotError(#[from] slots::EtlReplicationSlotError),
5151
}
5252

5353
pub async fn count_pipelines_for_tenant<'c, E>(

etl-api/src/routes/pipelines.rs

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use etl_config::{
99
Environment,
1010
shared::{ReplicatorConfig, SupabaseConfig, TlsConfig},
1111
};
12-
use etl_postgres::replication::{TableLookupError, get_table_name_from_oid, health, state};
12+
use etl_postgres::replication::{TableLookupError, get_table_name_from_oid, health, lag, state};
1313
use etl_postgres::types::TableId;
1414
use secrecy::ExposeSecret;
1515
use serde::{Deserialize, Serialize};
@@ -271,9 +271,7 @@ pub enum SimpleTableReplicationState {
271271
Queued,
272272
CopyingTable,
273273
CopiedTable,
274-
FollowingWal {
275-
lag: u64,
276-
},
274+
FollowingWal,
277275
Error {
278276
reason: String,
279277
#[serde(skip_serializing_if = "Option::is_none")]
@@ -301,13 +299,10 @@ impl From<state::TableReplicationState> for SimpleTableReplicationState {
301299
state::TableReplicationState::Init => SimpleTableReplicationState::Queued,
302300
state::TableReplicationState::DataSync => SimpleTableReplicationState::CopyingTable,
303301
state::TableReplicationState::FinishedCopy => SimpleTableReplicationState::CopiedTable,
304-
// TODO: add lag metric when available.
305302
state::TableReplicationState::SyncDone { .. } => {
306-
SimpleTableReplicationState::FollowingWal { lag: 0 }
307-
}
308-
state::TableReplicationState::Ready => {
309-
SimpleTableReplicationState::FollowingWal { lag: 0 }
303+
SimpleTableReplicationState::FollowingWal
310304
}
305+
state::TableReplicationState::Ready => SimpleTableReplicationState::FollowingWal,
311306
state::TableReplicationState::Errored {
312307
reason,
313308
solution,
@@ -340,12 +335,52 @@ pub struct TableReplicationStatus {
340335
#[schema(example = "public.users")]
341336
pub table_name: String,
342337
pub state: SimpleTableReplicationState,
338+
#[serde(skip_serializing_if = "Option::is_none")]
339+
#[schema(nullable = true)]
340+
pub table_sync_lag: Option<SlotLagMetricsResponse>,
341+
}
342+
343+
/// Lag metrics reported for replication slots.
344+
#[derive(Debug, Serialize, Deserialize, ToSchema)]
345+
pub struct SlotLagMetricsResponse {
346+
/// Bytes between the current WAL location and the slot restart LSN.
347+
#[schema(example = 1024)]
348+
pub restart_lsn_bytes: i64,
349+
/// Bytes between the current WAL location and the confirmed flush LSN.
350+
#[schema(example = 2048)]
351+
pub confirmed_flush_lsn_bytes: i64,
352+
/// How many bytes of WAL are still safe to build up before the limit of the slot is reached.
353+
#[schema(example = 8192)]
354+
pub safe_wal_size_bytes: i64,
355+
/// Write lag expressed in milliseconds.
356+
#[serde(skip_serializing_if = "Option::is_none")]
357+
#[schema(example = 1500, nullable = true)]
358+
pub write_lag: Option<i64>,
359+
/// Flush lag expressed in milliseconds.
360+
#[serde(skip_serializing_if = "Option::is_none")]
361+
#[schema(example = 1200, nullable = true)]
362+
pub flush_lag: Option<i64>,
363+
}
364+
365+
impl From<lag::SlotLagMetrics> for SlotLagMetricsResponse {
366+
fn from(metrics: lag::SlotLagMetrics) -> Self {
367+
Self {
368+
restart_lsn_bytes: metrics.restart_lsn_bytes,
369+
confirmed_flush_lsn_bytes: metrics.confirmed_flush_lsn_bytes,
370+
safe_wal_size_bytes: metrics.safe_wal_size_bytes,
371+
write_lag: metrics.write_lag_ms,
372+
flush_lag: metrics.flush_lag_ms,
373+
}
374+
}
343375
}
344376

345377
#[derive(Debug, Serialize, Deserialize, ToSchema)]
346378
pub struct GetPipelineReplicationStatusResponse {
347379
#[schema(example = 1)]
348380
pub pipeline_id: i64,
381+
#[serde(skip_serializing_if = "Option::is_none")]
382+
#[schema(nullable = true)]
383+
pub apply_lag: Option<SlotLagMetricsResponse>,
349384
pub table_statuses: Vec<TableReplicationStatus>,
350385
}
351386

@@ -924,13 +959,14 @@ pub async fn get_pipeline_replication_status(
924959

925960
// Fetch replication state for all tables in this pipeline
926961
let state_rows = state::get_table_replication_state_rows(&source_pool, pipeline_id).await?;
962+
let mut lag_metrics = lag::get_pipeline_lag_metrics(&source_pool, pipeline_id as u64).await?;
963+
let apply_lag = lag_metrics.apply.map(Into::into);
927964

928965
// Convert database states to UI-friendly format and fetch table names
929966
let mut tables: Vec<TableReplicationStatus> = Vec::new();
930967
for row in state_rows {
931-
let table_id = row.table_id.0;
932-
let table_name =
933-
get_table_name_from_oid(&source_pool, TableId::new(row.table_id.0)).await?;
968+
let table_id = TableId::new(row.table_id.0);
969+
let table_name = get_table_name_from_oid(&source_pool, table_id).await?;
934970

935971
// Extract the metadata row from the database
936972
let table_replication_state = row
@@ -939,14 +975,16 @@ pub async fn get_pipeline_replication_status(
939975
.ok_or(PipelineError::MissingTableReplicationState)?;
940976

941977
tables.push(TableReplicationStatus {
942-
table_id,
978+
table_id: table_id.into_inner(),
943979
table_name: table_name.to_string(),
944980
state: table_replication_state.into(),
981+
table_sync_lag: lag_metrics.table_sync.remove(&table_id).map(Into::into),
945982
});
946983
}
947984

948985
let response = GetPipelineReplicationStatusResponse {
949986
pipeline_id,
987+
apply_lag,
950988
table_statuses: tables,
951989
};
952990

etl-api/tests/pipelines.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,6 +1007,7 @@ async fn pipeline_replication_status_returns_table_states_and_names() {
10071007

10081008
assert_eq!(response.pipeline_id, pipeline_id);
10091009
assert_eq!(response.table_statuses.len(), 2);
1010+
assert!(response.apply_lag.is_none());
10101011

10111012
// Verify table states
10121013
for (table_oid, table_name) in &tables {
@@ -1017,6 +1018,7 @@ async fn pipeline_replication_status_returns_table_states_and_names() {
10171018
.expect("Table not found in response");
10181019

10191020
assert_eq!(table_status.table_id, table_oid.0);
1021+
assert!(table_status.table_sync_lag.is_none());
10201022

10211023
match table_name.as_str() {
10221024
"test.test_table_users" => assert!(matches!(
@@ -1025,7 +1027,7 @@ async fn pipeline_replication_status_returns_table_states_and_names() {
10251027
)),
10261028
"test.test_table_orders" => assert!(matches!(
10271029
table_status.state,
1028-
SimpleTableReplicationState::FollowingWal { .. }
1030+
SimpleTableReplicationState::FollowingWal
10291031
)),
10301032
_ => panic!("Unexpected table name: {table_name}"),
10311033
}
@@ -1069,7 +1071,7 @@ async fn rollback_table_state_succeeds_for_manual_retry_errors() {
10691071
assert_eq!(response.table_id, table_oid.0);
10701072
assert!(matches!(
10711073
response.new_state,
1072-
SimpleTableReplicationState::FollowingWal { .. }
1074+
SimpleTableReplicationState::FollowingWal
10731075
));
10741076

10751077
drop_pg_database(&source_db_config).await;

etl-postgres/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ sqlx = { workspace = true, features = [
2828
"postgres",
2929
"json",
3030
"migrate",
31+
"time",
3132
] }
3233
thiserror = { workspace = true }
3334
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use sqlx::{FromRow, PgPool};
2+
use std::collections::BTreeMap;
3+
4+
use crate::replication::slots::EtlReplicationSlot;
5+
use crate::types::TableId;
6+
7+
/// Lag metrics associated with a logical replication slot.
8+
#[derive(Debug)]
9+
pub struct SlotLagMetrics {
10+
/// The number of bytes between the current WAL LSN and the slot restart LSN.
11+
pub restart_lsn_bytes: i64,
12+
/// The number of bytes between the current WAL LSN and the confirmed flush LSN.
13+
pub confirmed_flush_lsn_bytes: i64,
14+
/// How many bytes of WAL are still safe to build up before the limit of the slot is reached.
15+
pub safe_wal_size_bytes: i64,
16+
/// Write lag in milliseconds relative to the primary.
17+
pub write_lag_ms: Option<i64>,
18+
/// Flush lag in milliseconds relative to the primary.
19+
pub flush_lag_ms: Option<i64>,
20+
}
21+
22+
/// Lag metrics for pipeline apply and table sync workers.
23+
#[derive(Debug, Default)]
24+
pub struct PipelineLagMetrics {
25+
/// Lag metrics for the apply worker slot.
26+
pub apply: Option<SlotLagMetrics>,
27+
/// Lag metrics keyed by table OID for table sync worker slots.
28+
pub table_sync: BTreeMap<TableId, SlotLagMetrics>,
29+
}
30+
31+
/// Database row returned by the replication slot lag query.
32+
#[derive(Debug, FromRow)]
33+
struct SlotLagRow {
34+
slot_name: String,
35+
restart_lsn_bytes: i64,
36+
confirmed_flush_lsn_bytes: i64,
37+
safe_wal_size_bytes: i64,
38+
write_lag_ms: Option<i64>,
39+
flush_lag_ms: Option<i64>,
40+
}
41+
42+
/// Fetches replication lag metrics for the given pipeline by inspecting logical replication slots.
43+
///
44+
/// Returns aggregated lag metrics for the apply worker slot and each table sync slot associated
45+
/// with the pipeline. Slots that are not currently active in `pg_stat_replication` still report
46+
/// their WAL metrics, while the write and flush lag values remain `None`.
47+
pub async fn get_pipeline_lag_metrics(
48+
pool: &PgPool,
49+
pipeline_id: u64,
50+
) -> sqlx::Result<PipelineLagMetrics> {
51+
let Ok(apply_prefix) = EtlReplicationSlot::apply_prefix(pipeline_id) else {
52+
return Ok(PipelineLagMetrics::default());
53+
};
54+
let Ok(table_sync_prefix) = EtlReplicationSlot::table_sync_prefix(pipeline_id) else {
55+
return Ok(PipelineLagMetrics::default());
56+
};
57+
58+
let rows: Vec<SlotLagRow> = sqlx::query_as(
59+
r#"
60+
select
61+
s.slot_name,
62+
coalesce(pg_wal_lsn_diff(pg_current_wal_lsn(), s.restart_lsn), 0)::bigint as restart_lsn_bytes,
63+
coalesce(pg_wal_lsn_diff(pg_current_wal_lsn(), s.confirmed_flush_lsn), 0)::bigint as confirmed_flush_lsn_bytes,
64+
coalesce(s.safe_wal_size, 0) as safe_wal_size_bytes,
65+
round(extract(epoch from r.write_lag) * 1000)::bigint as write_lag_ms,
66+
round(extract(epoch from r.flush_lag) * 1000)::bigint as flush_lag_ms
67+
from pg_replication_slots as s
68+
left outer join pg_stat_replication as r on s.active_pid = r.pid
69+
where s.slot_type = 'logical'
70+
and (s.slot_name = $1 or s.slot_name like $2)
71+
"#,
72+
)
73+
.bind(apply_prefix)
74+
.bind(format!("{table_sync_prefix}%"))
75+
.fetch_all(pool)
76+
.await?;
77+
78+
let mut metrics = PipelineLagMetrics::default();
79+
80+
for row in rows {
81+
let slot_lag_metrics = SlotLagMetrics {
82+
restart_lsn_bytes: row.restart_lsn_bytes,
83+
confirmed_flush_lsn_bytes: row.confirmed_flush_lsn_bytes,
84+
safe_wal_size_bytes: row.safe_wal_size_bytes,
85+
write_lag_ms: row.write_lag_ms,
86+
flush_lag_ms: row.flush_lag_ms,
87+
};
88+
89+
match EtlReplicationSlot::try_from(row.slot_name.as_str()) {
90+
Ok(EtlReplicationSlot::Apply {
91+
pipeline_id: slot_pipeline_id,
92+
}) if slot_pipeline_id == pipeline_id => {
93+
metrics.apply = Some(slot_lag_metrics);
94+
}
95+
Ok(EtlReplicationSlot::TableSync {
96+
pipeline_id: slot_pipeline_id,
97+
table_id,
98+
}) if slot_pipeline_id == pipeline_id => {
99+
metrics.table_sync.insert(table_id, slot_lag_metrics);
100+
}
101+
_ => {}
102+
}
103+
}
104+
105+
Ok(metrics)
106+
}

etl-postgres/src/replication/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod db;
22
pub mod health;
3+
pub mod lag;
34
pub mod schema;
45
pub mod slots;
56
pub mod state;

0 commit comments

Comments
 (0)