Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pgdog.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ read_write_strategy = "aggressive"
prepared_statements_limit = 500
# client_idle_timeout = 5_000

# Replication lag detection settings
# replication_lag_check_interval = 10_000 # Check every 10 seconds (default)
# max_replication_lag_bytes = 1048576 # Ban replicas lagging by more than 1MB (default)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# max_replication_lag_bytes = 1048576 # Ban replicas lagging by more than 1MB (default)
# max_replication_lag_bytes = 1048576 # Ban replicas lagging by more than 1MB (default)

this threshold is too low, and it can lead to frequent ban of the replica. I think it should be at least 10MB or even 100MB by default.


#
# Admin database password.
#
Expand Down
18 changes: 18 additions & 0 deletions pgdog/src/backend/pool/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ pub struct Config {
pub read_only: bool,
/// Maximum prepared statements per connection.
pub prepared_statements_limit: usize,
/// Replication lag check interval.
pub replication_lag_check_interval: Duration,
/// Maximum allowed replication lag in bytes.
pub max_replication_lag_bytes: u64,
}

impl Config {
Expand Down Expand Up @@ -104,6 +108,16 @@ impl Config {
self.rollback_timeout
}

/// Replication lag check interval.
pub fn replication_lag_check_interval(&self) -> Duration {
self.replication_lag_check_interval
}

/// Maximum allowed replication lag in bytes.
pub fn max_replication_lag_bytes(&self) -> u64 {
self.max_replication_lag_bytes
}

/// Read timeout.
pub fn read_timeout(&self) -> Duration {
self.read_timeout
Expand Down Expand Up @@ -162,6 +176,8 @@ impl Config {
.read_only
.unwrap_or(user.read_only.unwrap_or_default()),
prepared_statements_limit: general.prepared_statements_limit,
replication_lag_check_interval: Duration::from_millis(general.replication_lag_check_interval),
max_replication_lag_bytes: general.max_replication_lag_bytes,
..Default::default()
}
}
Expand Down Expand Up @@ -191,6 +207,8 @@ impl Default for Config {
pooler_mode: PoolerMode::default(),
read_only: false,
prepared_statements_limit: usize::MAX,
replication_lag_check_interval: Duration::from_millis(10_000), // 10 seconds
max_replication_lag_bytes: 1024 * 1024, // 1MB
}
}
}
3 changes: 3 additions & 0 deletions pgdog/src/backend/pool/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pub enum Error {
#[error("all replicas down")]
AllReplicasDown,

#[error("replication lag")]
ReplicationLag,

#[error("router error")]
Router,
}
190 changes: 190 additions & 0 deletions pgdog/src/backend/pool/lag_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
//! Replication lag checking.

use crate::backend::{pool::Pool, Server};
use crate::net::messages::DataRow;

use super::{Error, Replicas};

/// Replication lag information for a single replica.
#[derive(Debug, Clone)]
pub struct ReplicationLagInfo {
/// Application name (replica identifier).
pub application_name: String,
/// Replica's flush LSN.
pub flush_lsn: u64,
/// Calculated lag in bytes.
pub lag_bytes: u64,
}

/// Check replication lag for all replicas against the primary.
pub struct ReplicationLagChecker<'a> {
primary: &'a Pool,
replicas: &'a Replicas,
max_lag_bytes: u64,
}

impl<'a> ReplicationLagChecker<'a> {
/// Create a new replication lag checker.
pub fn new(primary: &'a Pool, replicas: &'a Replicas, max_lag_bytes: u64) -> Self {
Self {
primary,
replicas,
max_lag_bytes,
}
}

/// Check replication lag and ban lagging replicas.
pub async fn check_and_ban_lagging_replicas(&self) -> Result<(), Error> {
// Get a connection to the primary to query replication status
let mut primary_conn = match self.primary.get(&super::Request::default()).await {
Ok(conn) => conn,
Err(err) => {
tracing::debug!("Failed to get primary connection for lag check: {}", err);
return Err(err);
}
};

// Get current WAL position on primary
let current_lsn = match self.get_current_wal_lsn(&mut primary_conn).await {
Ok(lsn) => lsn,
Err(err) => {
tracing::error!("Failed to get current WAL LSN from primary: {}", err);
return Err(err);
}
};

// Get replication status for all replicas
let replica_status = match self.get_replica_status(&mut primary_conn).await {
Ok(status) => status,
Err(err) => {
tracing::error!("Failed to get replica status from primary: {}", err);
return Err(err);
}
};

// Check each replica's lag and ban if necessary
for pool in &self.replicas.pools {
let pool_addr = pool.addr().to_string();

// Find this replica in the status (match by host:port)
if let Some(status) = self.find_replica_status(&pool_addr, &replica_status) {
let lag_bytes = current_lsn.saturating_sub(status.flush_lsn);

if lag_bytes > self.max_lag_bytes {
tracing::warn!(
"Replica {} is lagging by {} bytes (max: {}), banning",
pool_addr, lag_bytes, self.max_lag_bytes
);
pool.ban(Error::ReplicationLag);
} else {
tracing::debug!(
"Replica {} lag: {} bytes (within limit: {})",
pool_addr, lag_bytes, self.max_lag_bytes
);
}
} else {
tracing::debug!("Replica {} not found in pg_stat_replication", pool_addr);
}
}

Ok(())
}

/// Get current WAL flush LSN from the primary.
async fn get_current_wal_lsn(&self, conn: &mut impl std::ops::DerefMut<Target = Server>) -> Result<u64, Error> {
let query = "SELECT pg_current_wal_flush_lsn()";

let rows: Vec<WalLsnRow> = conn.fetch_all(query).await
.map_err(|_| Error::HealthcheckError)?;

if let Some(row) = rows.first() {
self.parse_lsn(&row.lsn).ok_or(Error::HealthcheckError)
} else {
Err(Error::HealthcheckError)
}
}

/// Get replication status for all replicas from pg_stat_replication.
async fn get_replica_status(&self, conn: &mut impl std::ops::DerefMut<Target = Server>) -> Result<Vec<ReplicationLagInfo>, Error> {
let query = "SELECT application_name, client_addr, client_port, flush_lsn FROM pg_stat_replication";

let rows: Vec<ReplicationStatusRow> = conn.fetch_all(query).await
.map_err(|_| Error::HealthcheckError)?;

let mut result = Vec::new();
for row in rows {
if let Some(flush_lsn) = self.parse_lsn(&row.flush_lsn) {
result.push(ReplicationLagInfo {
application_name: row.application_name.clone(),
flush_lsn,
lag_bytes: 0, // Will be calculated later
});
}
}

Ok(result)
}

/// Find replica status by matching address.
fn find_replica_status<'b>(&self, _pool_addr: &str, statuses: &'b [ReplicationLagInfo]) -> Option<&'b ReplicationLagInfo> {
// For now, just return the first status if any exist
// In a real implementation, we'd need to match by client_addr and client_port
// which would require extending the ReplicationLagInfo struct
statuses.first()
}

/// Parse PostgreSQL LSN format (e.g., "0/1234ABCD") to u64.
pub fn parse_lsn(&self, lsn_str: &str) -> Option<u64> {
let parts: Vec<&str> = lsn_str.split('/').collect();
if parts.len() != 2 {
return None;
}

let high = u64::from_str_radix(parts[0], 16).ok()?;
let low = u64::from_str_radix(parts[1], 16).ok()?;

Some((high << 32) | low)
}
}

/// Row structure for WAL LSN query result.
struct WalLsnRow {
lsn: String,
}

impl From<DataRow> for WalLsnRow {
fn from(row: DataRow) -> Self {
Self {
lsn: row.column(0)
.map(|bytes| String::from_utf8_lossy(&bytes).to_string())
.unwrap_or_default(),
}
}
}

/// Row structure for replication status query result.
struct ReplicationStatusRow {
application_name: String,
client_addr: String,
client_port: String,
flush_lsn: String,
}

impl From<DataRow> for ReplicationStatusRow {
fn from(row: DataRow) -> Self {
Self {
application_name: row.column(0)
.map(|bytes| String::from_utf8_lossy(&bytes).to_string())
.unwrap_or_default(),
client_addr: row.column(1)
.map(|bytes| String::from_utf8_lossy(&bytes).to_string())
.unwrap_or_default(),
client_port: row.column(2)
.map(|bytes| String::from_utf8_lossy(&bytes).to_string())
.unwrap_or_default(),
flush_lsn: row.column(3)
.map(|bytes| String::from_utf8_lossy(&bytes).to_string())
.unwrap_or_default(),
}
}
}
2 changes: 2 additions & 0 deletions pgdog/src/backend/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod error;
pub mod guard;
pub mod healthcheck;
pub mod inner;
pub mod lag_check;
pub mod mapping;
pub mod monitor;
pub mod oids;
Expand All @@ -30,6 +31,7 @@ pub use connection::Connection;
pub use error::Error;
pub use guard::Guard;
pub use healthcheck::Healtcheck;
pub use lag_check::ReplicationLagChecker;
use monitor::Monitor;
pub use oids::Oids;
pub use pool_impl::Pool;
Expand Down
58 changes: 57 additions & 1 deletion pgdog/src/backend/pool/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use crate::{
config::{LoadBalancingStrategy, ReadWriteSplit, Role},
net::messages::BackendKeyData,
};
use tokio::{spawn, time::interval};
use tracing::{debug, error};

use super::{Error, Guard, Pool, PoolConfig, Replicas, Request};
use super::{Error, Guard, Pool, PoolConfig, Replicas, Request, ReplicationLagChecker};

/// Primary and replicas.
#[derive(Clone, Default, Debug)]
Expand Down Expand Up @@ -134,12 +136,66 @@ impl Shard {
/// Launch the shard, bringing all pools online.
pub fn launch(&self) {
self.pools().iter().for_each(|pool| pool.launch());

// Launch replication lag monitoring if we have both primary and replicas
if let Some(primary) = &self.primary {
if !self.replicas.is_empty() {
Self::launch_lag_monitoring(primary.clone(), self.replicas.clone());
}
}
}

/// Shutdown all pools, taking the shard offline.
pub fn shutdown(&self) {
self.pools().iter().for_each(|pool| pool.shutdown());
}

/// Launch replication lag monitoring task.
fn launch_lag_monitoring(primary: Pool, replicas: Replicas) {
spawn(async move {
Self::replication_lag_monitor(primary, replicas).await;
});
}

/// Monitor replication lag and ban lagging replicas.
async fn replication_lag_monitor(primary: Pool, replicas: Replicas) {
debug!("Starting replication lag monitoring");

// Get configuration from one of the replica pools
let (check_interval, max_lag_bytes) = {
if let Some(pool) = replicas.pools.first() {
let config = pool.lock().config;
(config.replication_lag_check_interval(), config.max_replication_lag_bytes())
} else {
return; // No replicas to monitor
}
};

let mut interval = interval(check_interval);

loop {
// Wait for next check interval
interval.tick().await;

// Check if primary is online
{
let primary_guard = primary.lock();
if !primary_guard.online {
debug!("Primary is offline, stopping replication lag monitoring");
break;
}
}

// Perform replication lag check
let checker = ReplicationLagChecker::new(&primary, &replicas, max_lag_bytes);
if let Err(err) = checker.check_and_ban_lagging_replicas().await {
error!("Replication lag check failed: {}", err);
// Continue monitoring even if this check failed
}
}

debug!("Replication lag monitoring stopped");
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions pgdog/src/backend/pool/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::state::State;
use super::*;

mod replica;
mod replication_lag;

pub fn pool() -> Pool {
let config = Config {
Expand Down
Loading
Loading