Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions etl-benchmarks/benches/table_copies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ async fn start_pipeline(args: RunArgs) -> Result<(), Box<dyn Error>> {
id: 1,
publication_name: args.publication_name,
pg_connection: pg_connection_config,
primary_connection: None,
heartbeat: None,
batch: BatchConfig {
max_size: args.batch_max_size,
max_fill_ms: args.batch_max_fill_ms,
Expand Down
195 changes: 195 additions & 0 deletions etl-config/src/shared/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
//! Heartbeat configuration for read replica replication slot maintenance.

use crate::shared::ValidationError;
use serde::{Deserialize, Serialize};
#[cfg(feature = "utoipa")]
use utoipa::ToSchema;

/// Configuration for the heartbeat worker that maintains replication slot activity.
///
/// When replicating from a read replica, the replication slot on the primary can
/// become inactive during idle periods. The heartbeat worker periodically emits
/// WAL messages to keep the slot active.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[cfg_attr(feature = "utoipa", derive(ToSchema))]
pub struct HeartbeatConfig {
/// Interval in milliseconds between heartbeat emissions.
///
/// Default: 30000 (30 seconds)
#[serde(default = "default_interval_ms")]
pub interval_ms: u64,

/// Minimum backoff duration in milliseconds after a failed heartbeat attempt.
///
/// Default: 1000 (1 second)
#[serde(default = "default_min_backoff_ms")]
pub min_backoff_ms: u64,

/// Maximum backoff duration in milliseconds after repeated failures.
///
/// Default: 60000 (60 seconds)
#[serde(default = "default_max_backoff_ms")]
pub max_backoff_ms: u64,

/// Jitter percentage to apply to backoff duration (0-100).
///
/// Helps prevent thundering herd when multiple workers reconnect.
/// Default: 25
#[serde(default = "default_jitter_percent")]
pub jitter_percent: u8,
}

impl HeartbeatConfig {
/// Default heartbeat interval: 30 seconds.
pub const DEFAULT_INTERVAL_MS: u64 = 30_000;

/// Default minimum backoff: 1 second.
pub const DEFAULT_MIN_BACKOFF_MS: u64 = 1_000;

/// Default maximum backoff: 60 seconds.
pub const DEFAULT_MAX_BACKOFF_MS: u64 = 60_000;

/// Default jitter percentage: 25%.
pub const DEFAULT_JITTER_PERCENT: u8 = 25;

/// Validates the heartbeat configuration.
///
/// Ensures interval_ms > 0, jitter_percent <= 100, min_backoff_ms > 0,
/// and min_backoff_ms <= max_backoff_ms.
pub fn validate(&self) -> Result<(), ValidationError> {
if self.interval_ms == 0 {
return Err(ValidationError::InvalidFieldValue {
field: "interval_ms".to_string(),
constraint: "must be greater than 0".to_string(),
});
}

if self.jitter_percent > 100 {
return Err(ValidationError::InvalidFieldValue {
field: "jitter_percent".to_string(),
constraint: "must be <= 100".to_string(),
});
}

if self.min_backoff_ms == 0 {
return Err(ValidationError::InvalidFieldValue {
field: "min_backoff_ms".to_string(),
constraint: "must be greater than 0".to_string(),
});
}

if self.min_backoff_ms > self.max_backoff_ms {
return Err(ValidationError::InvalidFieldValue {
field: "min_backoff_ms".to_string(),
constraint: "must be <= max_backoff_ms".to_string(),
});
}

Ok(())
}
}

impl Default for HeartbeatConfig {
fn default() -> Self {
Self {
interval_ms: Self::DEFAULT_INTERVAL_MS,
min_backoff_ms: Self::DEFAULT_MIN_BACKOFF_MS,
max_backoff_ms: Self::DEFAULT_MAX_BACKOFF_MS,
jitter_percent: Self::DEFAULT_JITTER_PERCENT,
}
}
}

fn default_interval_ms() -> u64 {
HeartbeatConfig::DEFAULT_INTERVAL_MS
}

fn default_min_backoff_ms() -> u64 {
HeartbeatConfig::DEFAULT_MIN_BACKOFF_MS
}

fn default_max_backoff_ms() -> u64 {
HeartbeatConfig::DEFAULT_MAX_BACKOFF_MS
}

fn default_jitter_percent() -> u8 {
HeartbeatConfig::DEFAULT_JITTER_PERCENT
}

/// Connection options optimized for heartbeat connections.
///
/// Uses shorter timeouts since heartbeat connections are lightweight
/// health checks that should fail fast.
pub const ETL_HEARTBEAT_OPTIONS: &str = concat!(
"application_name=etl_heartbeat",
" ", "statement_timeout=5000",
" ", "lock_timeout=5000",
" ", "idle_in_transaction_session_timeout=30000",
);

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_default_config() {
let config = HeartbeatConfig::default();
assert_eq!(config.interval_ms, 30_000);
assert_eq!(config.min_backoff_ms, 1_000);
assert_eq!(config.max_backoff_ms, 60_000);
assert_eq!(config.jitter_percent, 25);
}

#[test]
fn test_heartbeat_options() {
assert!(ETL_HEARTBEAT_OPTIONS.contains("application_name=etl_heartbeat"));
assert!(ETL_HEARTBEAT_OPTIONS.contains("statement_timeout=5000"));
// Verify options are properly space-separated
assert!(ETL_HEARTBEAT_OPTIONS.contains(" statement_timeout="));
assert!(ETL_HEARTBEAT_OPTIONS.contains(" lock_timeout="));
assert!(ETL_HEARTBEAT_OPTIONS.contains(" idle_in_transaction_session_timeout="));
}

#[test]
fn test_validate_valid_config() {
let config = HeartbeatConfig::default();
assert!(config.validate().is_ok());
}

#[test]
fn test_validate_zero_interval() {
let config = HeartbeatConfig {
interval_ms: 0,
..Default::default()
};
assert!(config.validate().is_err());
}

#[test]
fn test_validate_jitter_too_high() {
let config = HeartbeatConfig {
jitter_percent: 101,
..Default::default()
};
assert!(config.validate().is_err());
}

#[test]
fn test_validate_min_greater_than_max() {
let config = HeartbeatConfig {
min_backoff_ms: 10_000,
max_backoff_ms: 1_000,
..Default::default()
};
assert!(config.validate().is_err());
}

#[test]
fn test_validate_zero_min_backoff() {
let config = HeartbeatConfig {
min_backoff_ms: 0,
..Default::default()
};
assert!(config.validate().is_err());
}
}
2 changes: 2 additions & 0 deletions etl-config/src/shared/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod base;
mod batch;
mod connection;
mod destination;
mod heartbeat;
mod pipeline;
mod replicator;
mod sentry;
Expand All @@ -11,6 +12,7 @@ pub use base::*;
pub use batch::*;
pub use connection::*;
pub use destination::*;
pub use heartbeat::*;
pub use pipeline::*;
pub use replicator::*;
pub use sentry::*;
Expand Down
47 changes: 45 additions & 2 deletions etl-config/src/shared/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

use crate::shared::{
PgConnectionConfig, PgConnectionConfigWithoutSecrets, ValidationError, batch::BatchConfig,
HeartbeatConfig, PgConnectionConfig, PgConnectionConfigWithoutSecrets, ValidationError,
batch::BatchConfig,
};

/// c copy should be performed.Selection rules for tables participating in replication.
/// Selection rules for tables participating in replication.
///
/// Controls which tables are eligible for initial table copy and streaming.
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
Expand Down Expand Up @@ -67,6 +68,15 @@ pub struct PipelineConfig {
/// The connection configuration for the Postgres instance to which the pipeline connects for
/// replication.
pub pg_connection: PgConnectionConfig,
/// Optional connection configuration for the primary database when replicating from a read replica.
///
/// When set, enables replica mode where heartbeat messages are sent to the primary
/// to keep the replication slot active.
pub primary_connection: Option<PgConnectionConfig>,
/// Optional heartbeat configuration for replica mode.
///
/// Only used when `primary_connection` is set. If not provided, default values are used.
pub heartbeat: Option<HeartbeatConfig>,
/// Batch processing configuration.
#[serde(default)]
pub batch: BatchConfig,
Expand Down Expand Up @@ -94,6 +104,26 @@ impl PipelineConfig {
/// Default maximum number of concurrent table sync workers.
pub const DEFAULT_MAX_TABLE_SYNC_WORKERS: u16 = 4;

/// Returns `true` if the pipeline is configured for replica mode.
///
/// Replica mode is enabled when `primary_connection` is set, indicating
/// that replication is from a read replica and heartbeat messages should
/// be sent to the primary.
pub fn is_replica_mode(&self) -> bool {
self.primary_connection.is_some()
}

/// Returns the heartbeat configuration, using defaults if not explicitly set.
///
/// Returns `None` if not in replica mode (no `primary_connection`).
pub fn heartbeat_config(&self) -> Option<HeartbeatConfig> {
if self.is_replica_mode() {
Some(self.heartbeat.clone().unwrap_or_default())
} else {
None
}
}

/// Validates pipeline configuration settings.
///
/// Checks batch configuration and ensures worker counts and retry attempts are non-zero.
Expand All @@ -114,6 +144,13 @@ impl PipelineConfig {
});
}

// Only validate heartbeat config when replica mode is enabled (primary_connection is set)
if self.primary_connection.is_some() {
if let Some(ref heartbeat) = self.heartbeat {
heartbeat.validate()?;
}
}

Ok(())
}
}
Expand Down Expand Up @@ -145,6 +182,10 @@ pub struct PipelineConfigWithoutSecrets {
/// The connection configuration for the Postgres instance to which the pipeline connects for
/// replication.
pub pg_connection: PgConnectionConfigWithoutSecrets,
/// Optional connection configuration for the primary database when replicating from a read replica.
pub primary_connection: Option<PgConnectionConfigWithoutSecrets>,
/// Optional heartbeat configuration for replica mode.
pub heartbeat: Option<HeartbeatConfig>,
/// Batch processing configuration.
#[serde(default)]
pub batch: BatchConfig,
Expand All @@ -168,6 +209,8 @@ impl From<PipelineConfig> for PipelineConfigWithoutSecrets {
id: value.id,
publication_name: value.publication_name,
pg_connection: value.pg_connection.into(),
primary_connection: value.primary_connection.map(Into::into),
heartbeat: value.heartbeat,
batch: value.batch,
table_error_retry_delay_ms: value.table_error_retry_delay_ms,
table_error_retry_max_attempts: value.table_error_retry_max_attempts,
Expand Down
4 changes: 4 additions & 0 deletions etl-examples/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ async fn main_impl() -> Result<(), Box<dyn Error>> {
id: 1, // Using a simple ID for the example
publication_name: args.publication,
pg_connection: pg_connection_config,
// Primary database connection for replica mode (None = replicating from primary directly)
primary_connection: None,
// Heartbeat config for keeping replication slot active in replica mode
heartbeat: None,
batch: BatchConfig {
max_size: args.bq_args.max_batch_size,
max_fill_ms: args.bq_args.max_batch_fill_duration_ms,
Expand Down
4 changes: 2 additions & 2 deletions etl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ metrics = { workspace = true }
pg_escape = { workspace = true }
pin-project-lite = { workspace = true }
postgres-replication = { workspace = true }
rand = { workspace = true, features = ["thread_rng"] }
rustls = { workspace = true, features = ["aws-lc-rs", "logging"] }
serde_json = { workspace = true, features = ["std"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-postgres = { workspace = true, features = [
"runtime",
Expand All @@ -50,5 +52,3 @@ etl-postgres = { workspace = true, features = [
"test-utils",
] }
etl-telemetry = { workspace = true }

rand = { workspace = true, features = ["thread_rng"] }
Loading