From c718018e233e8e8b492e266f6a6f17b4ef7231a7 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 24 Sep 2025 17:49:20 +0200 Subject: [PATCH 1/5] feat(alerts): Send email notifications when a pipeline fails --- etl-config/Cargo.toml | 1 + etl-config/src/shared/mod.rs | 2 + etl-config/src/shared/notification.rs | 35 ++++++++++++ etl-config/src/shared/replicator.rs | 5 +- etl-replicator/Cargo.toml | 3 + etl-replicator/README.md | 18 ++++++ etl-replicator/src/main.rs | 52 ++++++++--------- etl-replicator/src/notifications.rs | 82 +++++++++++++++++++++++++++ etl-telemetry/src/metrics.rs | 4 +- 9 files changed, 173 insertions(+), 29 deletions(-) create mode 100644 etl-config/src/shared/notification.rs create mode 100644 etl-replicator/src/notifications.rs diff --git a/etl-config/Cargo.toml b/etl-config/Cargo.toml index 98f982515..055085735 100644 --- a/etl-config/Cargo.toml +++ b/etl-config/Cargo.toml @@ -14,6 +14,7 @@ utoipa = ["dep:utoipa"] config = { workspace = true, features = ["yaml"] } secrecy = { workspace = true } serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } sqlx = { workspace = true, features = ["postgres"] } thiserror = { workspace = true } tokio-postgres = { workspace = true } diff --git a/etl-config/src/shared/mod.rs b/etl-config/src/shared/mod.rs index e9f87563e..3e7bfc24b 100644 --- a/etl-config/src/shared/mod.rs +++ b/etl-config/src/shared/mod.rs @@ -2,6 +2,7 @@ mod base; mod batch; mod connection; mod destination; +mod notification; mod pipeline; mod replicator; mod sentry; @@ -11,6 +12,7 @@ pub use base::*; pub use batch::*; pub use connection::*; pub use destination::*; +pub use notification::*; pub use pipeline::*; pub use replicator::*; pub use sentry::*; diff --git a/etl-config/src/shared/notification.rs b/etl-config/src/shared/notification.rs new file mode 100644 index 000000000..9f8dae0a6 --- /dev/null +++ b/etl-config/src/shared/notification.rs @@ -0,0 +1,35 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::BTreeMap; + +/// Provides the default timeout in seconds used for the email notification request. +fn default_timeout_seconds() -> u64 { + 5 +} + +/// Configuration for sending notifications. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NotificationsConfig { + /// Emails configuration for notifications. + pub email: EmailNotificationsConfig +} + +/// Configuration for sending error notifications via email. +/// +/// Carries the endpoint information and template details required to enqueue +/// emails whenever the replicator reports a failure. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EmailNotificationsConfig { + /// Base URL of the environment-specific API that receives email requests. + pub base_url: String, + /// Email addresses that should receive the notification. + pub addresses: Vec, + /// Postmark template alias that should be used to render the message. + pub template_alias: String, + /// Additional properties passed with every notification. + #[serde(default)] + pub custom_properties: BTreeMap, + /// Timeout in seconds applied to the HTTP request made to the email service. + #[serde(default = "default_timeout_seconds")] + pub timeout_seconds: u64, +} diff --git a/etl-config/src/shared/replicator.rs b/etl-config/src/shared/replicator.rs index 26f3342ee..c84399fed 100644 --- a/etl-config/src/shared/replicator.rs +++ b/etl-config/src/shared/replicator.rs @@ -1,6 +1,6 @@ use crate::Config; use crate::shared::pipeline::PipelineConfig; -use crate::shared::{DestinationConfig, SentryConfig, SupabaseConfig, ValidationError}; +use crate::shared::{DestinationConfig, EmailNotificationsConfig, NotificationsConfig, SentryConfig, SupabaseConfig, ValidationError}; use serde::{Deserialize, Serialize}; /// Complete configuration for the replicator service. @@ -24,6 +24,9 @@ pub struct ReplicatorConfig { /// If provided, enables Supabase-specific features or reporting. If `None`, the replicator operates independently of Supabase. #[serde(skip_serializing_if = "Option::is_none")] pub supabase: Option, + /// Optional configuration for sending notifications. + #[serde(skip_serializing_if = "Option::is_none")] + pub notifications: Option, } impl ReplicatorConfig { diff --git a/etl-replicator/Cargo.toml b/etl-replicator/Cargo.toml index abf2c75f9..25f65884f 100644 --- a/etl-replicator/Cargo.toml +++ b/etl-replicator/Cargo.toml @@ -24,3 +24,6 @@ sqlx = { workspace = true, features = [ thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] } tracing = { workspace = true, default-features = true } +reqwest = { workspace = true, features = ["blocking", "json", "rustls-tls"] } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } diff --git a/etl-replicator/README.md b/etl-replicator/README.md index f6f76db71..ea2cbde21 100644 --- a/etl-replicator/README.md +++ b/etl-replicator/README.md @@ -1,3 +1,21 @@ # `etl` - Replicator Long-lived process that performs Postgres logical replication using the `etl` crate. + +## Error Notifications + +Set the `notifications.email` section in the replicator configuration to trigger +an HTTP request to `/system/email/send` whenever the process exits with +an error: + +```yaml +notifications: + email: + base_url: "https://example.supabase.co" + addresses: + - "oncall@example.com" + template_alias: "replicator_failure" + custom_properties: + service: "replicator" + timeout_seconds: 5 +``` \ No newline at end of file diff --git a/etl-replicator/src/main.rs b/etl-replicator/src/main.rs index fd111c8c8..19060f708 100644 --- a/etl-replicator/src/main.rs +++ b/etl-replicator/src/main.rs @@ -4,8 +4,6 @@ //! and routes data to configured destinations. Includes telemetry, error handling, and //! graceful shutdown capabilities. -use crate::config::load_replicator_config; -use crate::core::start_replicator_with_config; use etl_config::Environment; use etl_config::shared::ReplicatorConfig; use etl_telemetry::metrics::init_metrics; @@ -14,9 +12,14 @@ use std::sync::Arc; use thiserror::__private::AsDynError; use tracing::{error, info}; +use crate::config::load_replicator_config; +use crate::core::start_replicator_with_config; +use crate::notifications::send_error_notification; + mod config; mod core; mod migrations; +mod notifications; /// The name of the environment variable which contains version information for this replicator. const APP_VERSION_ENV_NAME: &str = "APP_VERSION"; @@ -27,9 +30,23 @@ const APP_VERSION_ENV_NAME: &str = "APP_VERSION"; /// and launches the replicator pipeline. Handles all errors and ensures proper /// service initialization sequence. fn main() -> anyhow::Result<()> { - // Load replicator config let replicator_config = load_replicator_config()?; + if let Err(err) = guarded_main(replicator_config.clone()) { + sentry::capture_error(err.as_dyn_error()); + error!("an error occurred in the replicator: {err}"); + + if let Err(notification_err) = send_error_notification(&replicator_config, &err) { + error!("failed to dispatch error notification email: {notification_err}"); + } + + return Err(err); + } + + Ok(()) +} + +fn guarded_main(replicator_config: ReplicatorConfig) -> anyhow::Result<()> { // Extract project reference to use in logs let project_ref = replicator_config .supabase @@ -44,32 +61,17 @@ fn main() -> anyhow::Result<()> { )?; // Initialize Sentry before the async runtime starts - let _sentry_guard = init_sentry()?; + let _sentry_guard = init_sentry(&replicator_config)?; // Initialize metrics collection init_metrics(project_ref)?; - // We start the runtime. + // We start the runtime and propagate any errors to Sentry and logs. The reason for why we do + // this here is that we know that at this point Sentry and logging are configured. tokio::runtime::Builder::new_multi_thread() .enable_all() .build()? - .block_on(async_main(replicator_config))?; - - Ok(()) -} - -/// Main async entry point that starts the replicator pipeline. -/// -/// Launches the replicator with the provided configuration and captures any errors -/// to Sentry before propagating them up. -async fn async_main(replicator_config: ReplicatorConfig) -> anyhow::Result<()> { - // We start the replicator and catch any errors. - if let Err(err) = start_replicator_with_config(replicator_config).await { - sentry::capture_error(err.as_dyn_error()); - error!("an error occurred in the replicator: {err}"); - - return Err(err); - } + .block_on(start_replicator_with_config(replicator_config))?; Ok(()) } @@ -79,10 +81,8 @@ async fn async_main(replicator_config: ReplicatorConfig) -> anyhow::Result<()> { /// Loads configuration and sets up Sentry if a DSN is provided in the config. /// Tags all errors with the "replicator" service identifier and configures /// panic handling to automatically capture and send panics to Sentry. -fn init_sentry() -> anyhow::Result> { - if let Ok(config) = load_replicator_config() - && let Some(sentry_config) = &config.sentry - { +fn init_sentry(config: &ReplicatorConfig) -> anyhow::Result> { + if let Some(sentry_config) = &config.sentry { info!("initializing sentry with supplied dsn"); let environment = Environment::load()?; diff --git a/etl-replicator/src/notifications.rs b/etl-replicator/src/notifications.rs new file mode 100644 index 000000000..0d473b212 --- /dev/null +++ b/etl-replicator/src/notifications.rs @@ -0,0 +1,82 @@ +use anyhow::Context; +use etl_config::shared::{EmailNotificationsConfig, ReplicatorConfig}; +use reqwest::blocking::Client; +use serde::Serialize; +use serde_json::Value; +use std::collections::BTreeMap; +use std::time::Duration; + +/// Represents the JSON payload accepted by the email service. +#[derive(Debug, Serialize)] +struct SendEmailRequest { + /// Target email addresses. + addresses: Vec, + /// Postmark template alias. + #[serde(rename = "template_alias")] + template_alias: String, + /// Arbitrary template variables. + #[serde(rename = "custom_properties")] + custom_properties: BTreeMap, +} + +/// Dispatches an email notification when the replicator terminates with an error. +/// +/// The notification is only sent when the `email_notifications` configuration is present. +/// Any failure encountered while preparing or sending the request is surfaced to the caller +/// so it can be logged without masking the original error. +pub fn send_error_notification(replicator_config: &ReplicatorConfig, error: &anyhow::Error) -> anyhow::Result<()> { + let Some(notification_config) = &replicator_config.notifications.as_ref().map(|n| n.email) else { + return Ok(()); + }; + + let payload = build_payload(notification_config, replicator_config, error); + + let timeout = Duration::from_secs(notification_config.timeout_seconds.max(1)); + let client = Client::builder() + .timeout(timeout) + .build() + .context("failed to build HTTP client for email notifications")?; + + let endpoint = format!( + "{}/system/email/send", + notification_config.base_url.trim_end_matches('/'), + ); + + let response = client + .post(&endpoint) + .json(&payload) + .send() + .with_context(|| format!("failed to send error notification to {endpoint}"))?; + + if !response.status().is_success() { + let status = response.status(); + let body = response + .text() + .unwrap_or_else(|_| "".to_string()); + + anyhow::bail!("email notification endpoint responded with {status} and body `{body}`",); + } + + Ok(()) +} + +/// Builds the request payload combining static configuration with runtime context. +fn build_payload( + notification_config: &EmailNotificationsConfig, + replicator_config: &ReplicatorConfig, + error: &anyhow::Error, +) -> SendEmailRequest { + let mut custom_properties = notification_config.custom_properties.clone(); + custom_properties.insert( + "pipeline_id".into(), + Value::String(replicator_config.pipeline.id.to_string()), + ); + custom_properties.insert("error_message".into(), Value::String(error.to_string())); + custom_properties.insert("error_chain".into(), Value::String(format!("{error:#}"))); + + SendEmailRequest { + addresses: notification_config.addresses.clone(), + template_alias: notification_config.template_alias.clone(), + custom_properties, + } +} diff --git a/etl-telemetry/src/metrics.rs b/etl-telemetry/src/metrics.rs index 0374ec2fd..3691aed45 100644 --- a/etl-telemetry/src/metrics.rs +++ b/etl-telemetry/src/metrics.rs @@ -5,8 +5,8 @@ use tracing::trace; // One time initialization objects like Once, OnceCell, OnceLock or other variants // are not used here and instead a mutex is used because the initialization code -// is fallible and ideally we'd like to use OnceLock::get_or_try_init which -// allows fallibe code to run as part of initialization but it is currently +// is fallible, and ideally we'd like to use OnceLock::get_or_try_init which +// allows fallible code to run as part of initialization, but it is currently // unstable. // // The reason we want to only initialize this once is because the call to From 9a7c40e53bcda0225593e7fd4b7ddc7477b5f794 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Thu, 25 Sep 2025 10:18:10 +0200 Subject: [PATCH 2/5] Improve --- etl-config/src/shared/notification.rs | 12 ++------- etl-config/src/shared/replicator.rs | 5 +--- etl-config/src/shared/supabase.rs | 5 ++++ etl-replicator/configuration/dev.yaml | 10 +++++++- etl-replicator/src/main.rs | 2 ++ etl-replicator/src/notifications.rs | 36 ++++++++++++++------------- 6 files changed, 38 insertions(+), 32 deletions(-) diff --git a/etl-config/src/shared/notification.rs b/etl-config/src/shared/notification.rs index 9f8dae0a6..925a1eb29 100644 --- a/etl-config/src/shared/notification.rs +++ b/etl-config/src/shared/notification.rs @@ -2,16 +2,11 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::BTreeMap; -/// Provides the default timeout in seconds used for the email notification request. -fn default_timeout_seconds() -> u64 { - 5 -} - /// Configuration for sending notifications. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NotificationsConfig { /// Emails configuration for notifications. - pub email: EmailNotificationsConfig + pub email: EmailNotificationConfig } /// Configuration for sending error notifications via email. @@ -19,7 +14,7 @@ pub struct NotificationsConfig { /// Carries the endpoint information and template details required to enqueue /// emails whenever the replicator reports a failure. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct EmailNotificationsConfig { +pub struct EmailNotificationConfig { /// Base URL of the environment-specific API that receives email requests. pub base_url: String, /// Email addresses that should receive the notification. @@ -29,7 +24,4 @@ pub struct EmailNotificationsConfig { /// Additional properties passed with every notification. #[serde(default)] pub custom_properties: BTreeMap, - /// Timeout in seconds applied to the HTTP request made to the email service. - #[serde(default = "default_timeout_seconds")] - pub timeout_seconds: u64, } diff --git a/etl-config/src/shared/replicator.rs b/etl-config/src/shared/replicator.rs index c84399fed..5d7a0d114 100644 --- a/etl-config/src/shared/replicator.rs +++ b/etl-config/src/shared/replicator.rs @@ -1,6 +1,6 @@ use crate::Config; use crate::shared::pipeline::PipelineConfig; -use crate::shared::{DestinationConfig, EmailNotificationsConfig, NotificationsConfig, SentryConfig, SupabaseConfig, ValidationError}; +use crate::shared::{DestinationConfig, EmailNotificationConfig, NotificationsConfig, SentryConfig, SupabaseConfig, ValidationError}; use serde::{Deserialize, Serialize}; /// Complete configuration for the replicator service. @@ -24,9 +24,6 @@ pub struct ReplicatorConfig { /// If provided, enables Supabase-specific features or reporting. If `None`, the replicator operates independently of Supabase. #[serde(skip_serializing_if = "Option::is_none")] pub supabase: Option, - /// Optional configuration for sending notifications. - #[serde(skip_serializing_if = "Option::is_none")] - pub notifications: Option, } impl ReplicatorConfig { diff --git a/etl-config/src/shared/supabase.rs b/etl-config/src/shared/supabase.rs index 2c7423d58..13baba0fc 100644 --- a/etl-config/src/shared/supabase.rs +++ b/etl-config/src/shared/supabase.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; use std::fmt; +use crate::shared::NotificationsConfig; + /// Supabase integration configuration. /// /// Contains Supabase-specific settings for ETL applications that @@ -9,6 +11,9 @@ use std::fmt; pub struct SupabaseConfig { /// Supabase project reference identifier. pub project_ref: String, + /// Optional configuration for sending notifications. + #[serde(skip_serializing_if = "Option::is_none")] + pub notifications: Option, } impl fmt::Debug for SupabaseConfig { diff --git a/etl-replicator/configuration/dev.yaml b/etl-replicator/configuration/dev.yaml index 3ce27d846..df81a93a1 100644 --- a/etl-replicator/configuration/dev.yaml +++ b/etl-replicator/configuration/dev.yaml @@ -9,4 +9,12 @@ pipeline: password: "postgres" tls: trusted_root_certs: "" - enabled: false \ No newline at end of file + enabled: false +supabase: + notifications: + email: + base_url: "http://localhost:8080" + addresses: + - "test_email@example.com" + template_alias: "etl-pipeline-error" + custom_properties: {} \ No newline at end of file diff --git a/etl-replicator/src/main.rs b/etl-replicator/src/main.rs index 19060f708..4055f8070 100644 --- a/etl-replicator/src/main.rs +++ b/etl-replicator/src/main.rs @@ -36,7 +36,9 @@ fn main() -> anyhow::Result<()> { sentry::capture_error(err.as_dyn_error()); error!("an error occurred in the replicator: {err}"); + // We try to send an error notification, if we fail, we just report the failure. if let Err(notification_err) = send_error_notification(&replicator_config, &err) { + sentry::capture_error(notification_err.as_dyn_error()); error!("failed to dispatch error notification email: {notification_err}"); } diff --git a/etl-replicator/src/notifications.rs b/etl-replicator/src/notifications.rs index 0d473b212..1a9075389 100644 --- a/etl-replicator/src/notifications.rs +++ b/etl-replicator/src/notifications.rs @@ -1,5 +1,5 @@ use anyhow::Context; -use etl_config::shared::{EmailNotificationsConfig, ReplicatorConfig}; +use etl_config::shared::{EmailNotificationConfig, ReplicatorConfig, SupabaseConfig}; use reqwest::blocking::Client; use serde::Serialize; use serde_json::Value; @@ -8,14 +8,13 @@ use std::time::Duration; /// Represents the JSON payload accepted by the email service. #[derive(Debug, Serialize)] +#[serde(rename_all = "snake_case")] struct SendEmailRequest { /// Target email addresses. addresses: Vec, /// Postmark template alias. - #[serde(rename = "template_alias")] template_alias: String, /// Arbitrary template variables. - #[serde(rename = "custom_properties")] custom_properties: BTreeMap, } @@ -25,21 +24,23 @@ struct SendEmailRequest { /// Any failure encountered while preparing or sending the request is surfaced to the caller /// so it can be logged without masking the original error. pub fn send_error_notification(replicator_config: &ReplicatorConfig, error: &anyhow::Error) -> anyhow::Result<()> { - let Some(notification_config) = &replicator_config.notifications.as_ref().map(|n| n.email) else { + let Some(supabase_config) = &replicator_config.supabase else { return Ok(()); }; - let payload = build_payload(notification_config, replicator_config, error); + let Some(email_config) = supabase_config.notifications.as_ref().map(|n| &n.email) else { + return Ok(()); + }; + + let payload = build_send_email_request(supabase_config, email_config, replicator_config, error); - let timeout = Duration::from_secs(notification_config.timeout_seconds.max(1)); let client = Client::builder() - .timeout(timeout) .build() .context("failed to build HTTP client for email notifications")?; let endpoint = format!( "{}/system/email/send", - notification_config.base_url.trim_end_matches('/'), + email_config.base_url.trim_end_matches('/'), ); let response = client @@ -61,22 +62,23 @@ pub fn send_error_notification(replicator_config: &ReplicatorConfig, error: &any } /// Builds the request payload combining static configuration with runtime context. -fn build_payload( - notification_config: &EmailNotificationsConfig, +fn build_send_email_request( + supabase_config: &SupabaseConfig, + email_config: &EmailNotificationConfig, replicator_config: &ReplicatorConfig, error: &anyhow::Error, -) -> SendEmailRequest { - let mut custom_properties = notification_config.custom_properties.clone(); +) -> Option { + let mut custom_properties = email_config.custom_properties.clone(); + custom_properties.insert("project_ref".into(), Value::String(supabase_config.project_ref.clone())); custom_properties.insert( "pipeline_id".into(), Value::String(replicator_config.pipeline.id.to_string()), ); custom_properties.insert("error_message".into(), Value::String(error.to_string())); - custom_properties.insert("error_chain".into(), Value::String(format!("{error:#}"))); - SendEmailRequest { - addresses: notification_config.addresses.clone(), - template_alias: notification_config.template_alias.clone(), + Some(SendEmailRequest { + addresses: email_config.addresses.clone(), + template_alias: email_config.template_alias.clone(), custom_properties, - } + }) } From 87d962d6056c75cd7694d9dda3285b0eeccf5e50 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Thu, 25 Sep 2025 10:27:43 +0200 Subject: [PATCH 3/5] Improve --- etl-config/src/shared/notification.rs | 2 +- etl-config/src/shared/replicator.rs | 5 +- etl-replicator/src/notifications.rs | 10 +- etl/src/error.rs | 139 +++++++++++++++++++++----- 4 files changed, 125 insertions(+), 31 deletions(-) diff --git a/etl-config/src/shared/notification.rs b/etl-config/src/shared/notification.rs index 925a1eb29..839da1524 100644 --- a/etl-config/src/shared/notification.rs +++ b/etl-config/src/shared/notification.rs @@ -6,7 +6,7 @@ use std::collections::BTreeMap; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NotificationsConfig { /// Emails configuration for notifications. - pub email: EmailNotificationConfig + pub email: EmailNotificationConfig, } /// Configuration for sending error notifications via email. diff --git a/etl-config/src/shared/replicator.rs b/etl-config/src/shared/replicator.rs index 5d7a0d114..6f1e37de3 100644 --- a/etl-config/src/shared/replicator.rs +++ b/etl-config/src/shared/replicator.rs @@ -1,6 +1,9 @@ use crate::Config; use crate::shared::pipeline::PipelineConfig; -use crate::shared::{DestinationConfig, EmailNotificationConfig, NotificationsConfig, SentryConfig, SupabaseConfig, ValidationError}; +use crate::shared::{ + DestinationConfig, EmailNotificationConfig, NotificationsConfig, SentryConfig, SupabaseConfig, + ValidationError, +}; use serde::{Deserialize, Serialize}; /// Complete configuration for the replicator service. diff --git a/etl-replicator/src/notifications.rs b/etl-replicator/src/notifications.rs index 1a9075389..d33189f0e 100644 --- a/etl-replicator/src/notifications.rs +++ b/etl-replicator/src/notifications.rs @@ -23,7 +23,10 @@ struct SendEmailRequest { /// The notification is only sent when the `email_notifications` configuration is present. /// Any failure encountered while preparing or sending the request is surfaced to the caller /// so it can be logged without masking the original error. -pub fn send_error_notification(replicator_config: &ReplicatorConfig, error: &anyhow::Error) -> anyhow::Result<()> { +pub fn send_error_notification( + replicator_config: &ReplicatorConfig, + error: &anyhow::Error, +) -> anyhow::Result<()> { let Some(supabase_config) = &replicator_config.supabase else { return Ok(()); }; @@ -69,7 +72,10 @@ fn build_send_email_request( error: &anyhow::Error, ) -> Option { let mut custom_properties = email_config.custom_properties.clone(); - custom_properties.insert("project_ref".into(), Value::String(supabase_config.project_ref.clone())); + custom_properties.insert( + "project_ref".into(), + Value::String(supabase_config.project_ref.clone()), + ); custom_properties.insert( "pipeline_id".into(), Value::String(replicator_config.pipeline.id.to_string()), diff --git a/etl/src/error.rs b/etl/src/error.rs index 1f421e279..29f2ac6a2 100644 --- a/etl/src/error.rs +++ b/etl/src/error.rs @@ -199,42 +199,127 @@ impl PartialEq for EtlError { impl fmt::Display for EtlError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - match self.repr { + match &self.repr { ErrorRepr::WithDescription(kind, desc) => { - fmt::Debug::fmt(&kind, f)?; - f.write_str(": ")?; - desc.fmt(f)?; - - Ok(()) + write_single_error_line(f, *kind, desc, None, 0) } - ErrorRepr::WithDescriptionAndDetail(kind, desc, ref detail) => { - fmt::Debug::fmt(&kind, f)?; - f.write_str(": ")?; - desc.fmt(f)?; - f.write_str(" -> ")?; - detail.fmt(f)?; - - Ok(()) + ErrorRepr::WithDescriptionAndDetail(kind, desc, detail) => { + write_single_error_line(f, *kind, desc, Some(detail.as_str()), 0) } - ErrorRepr::Many(ref errors) => { - if errors.is_empty() { - write!(f, "Multiple errors occurred (empty)")?; - } else if errors.len() == 1 { - // If there's only one error, just display it directly - errors[0].fmt(f)?; - } else { - write!(f, "Multiple errors occurred ({} total):", errors.len())?; - for (i, error) in errors.iter().enumerate() { - write!(f, "\n {}: {}", i + 1, error)?; - } + ErrorRepr::Many(errors) => write_many_errors(f, errors), + } + } +} + +impl error::Error for EtlError {} + +fn write_single_error_line( + f: &mut fmt::Formatter<'_>, + kind: ErrorKind, + description: &str, + detail: Option<&str>, + indent_level: usize, +) -> Result<(), fmt::Error> { + let prefix = " ".repeat(indent_level); + write!(f, "{}{}: {}", prefix, format_error_kind(kind), description)?; + + if let Some(detail) = detail { + if !detail.trim().is_empty() { + write!(f, "\n{} Details: {}", prefix, detail)?; + } + } + + Ok(()) +} + +fn write_many_errors(f: &mut fmt::Formatter<'_>, errors: &[EtlError]) -> Result<(), fmt::Error> { + match errors.len() { + 0 => write!(f, "Multiple errors (empty)"), + 1 => fmt::Display::fmt(&errors[0], f), + count => { + write!(f, "Multiple errors ({}):", count)?; + for (index, error) in errors.iter().enumerate() { + let rendered = error.to_string(); + let mut lines = rendered.lines(); + + if let Some(first) = lines.next() { + write!(f, "\n {}. {}", index + 1, first)?; + } + + for line in lines { + write!(f, "\n {}", line)?; } - Ok(()) } + Ok(()) } } } -impl error::Error for EtlError {} +fn format_error_kind(kind: ErrorKind) -> String { + let identifier = format!("{:?}", kind); + let mut words = Vec::new(); + let mut current = String::new(); + let mut chars = identifier.chars().peekable(); + + while let Some(ch) = chars.next() { + if current.is_empty() { + current.push(ch); + continue; + } + + let is_upper = ch.is_uppercase(); + let prev_is_upper = current + .chars() + .last() + .map(|c| c.is_uppercase()) + .unwrap_or(false); + let next_is_lower = chars + .peek() + .map(|next| next.is_lowercase()) + .unwrap_or(false); + + if is_upper && (!prev_is_upper || next_is_lower) { + words.push(current); + current = String::new(); + current.push(ch); + } else { + current.push(ch); + } + } + + if !current.is_empty() { + words.push(current); + } + + let mut words_iter = words.into_iter(); + let mut formatted = String::new(); + + if let Some(first) = words_iter.next() { + formatted.push_str(&capitalize(&first)); + } + + for word in words_iter { + formatted.push(' '); + formatted.push_str(&word.to_lowercase()); + } + + formatted +} + +fn capitalize(segment: &str) -> String { + let mut chars = segment.chars(); + match chars.next() { + None => String::new(), + Some(first) => { + let mut result = String::with_capacity(segment.len()); + result.push(first.to_ascii_uppercase()); + for ch in chars { + result.push(ch.to_ascii_lowercase()); + } + result + } + } +} /// Creates an [`EtlError`] from an error kind and static description. impl From<(ErrorKind, &'static str)> for EtlError { From 288a5383b7ce1ee516217726bbf9b35c0c34ffe7 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Thu, 25 Sep 2025 10:29:24 +0200 Subject: [PATCH 4/5] Improve --- etl-api/src/routes/pipelines.rs | 2 ++ etl-config/src/shared/replicator.rs | 2 +- etl-replicator/src/notifications.rs | 1 - etl/src/error.rs | 8 ++++---- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/etl-api/src/routes/pipelines.rs b/etl-api/src/routes/pipelines.rs index 803bd4254..f63c7023f 100644 --- a/etl-api/src/routes/pipelines.rs +++ b/etl-api/src/routes/pipelines.rs @@ -1226,6 +1226,8 @@ async fn create_or_update_pipeline_in_k8s( pipeline, SupabaseConfig { project_ref: tenant_id.to_owned(), + // TODO: build actual notifications configuration. + notifications: None }, ) .await?; diff --git a/etl-config/src/shared/replicator.rs b/etl-config/src/shared/replicator.rs index 6f1e37de3..5dff8c262 100644 --- a/etl-config/src/shared/replicator.rs +++ b/etl-config/src/shared/replicator.rs @@ -1,7 +1,7 @@ use crate::Config; use crate::shared::pipeline::PipelineConfig; use crate::shared::{ - DestinationConfig, EmailNotificationConfig, NotificationsConfig, SentryConfig, SupabaseConfig, + DestinationConfig, SentryConfig, SupabaseConfig, ValidationError, }; use serde::{Deserialize, Serialize}; diff --git a/etl-replicator/src/notifications.rs b/etl-replicator/src/notifications.rs index d33189f0e..315e0c502 100644 --- a/etl-replicator/src/notifications.rs +++ b/etl-replicator/src/notifications.rs @@ -4,7 +4,6 @@ use reqwest::blocking::Client; use serde::Serialize; use serde_json::Value; use std::collections::BTreeMap; -use std::time::Duration; /// Represents the JSON payload accepted by the email service. #[derive(Debug, Serialize)] diff --git a/etl/src/error.rs b/etl/src/error.rs index 29f2ac6a2..2a8b0ef18 100644 --- a/etl/src/error.rs +++ b/etl/src/error.rs @@ -225,7 +225,7 @@ fn write_single_error_line( if let Some(detail) = detail { if !detail.trim().is_empty() { - write!(f, "\n{} Details: {}", prefix, detail)?; + write!(f, "\n{prefix} Details: {detail}")?; } } @@ -237,7 +237,7 @@ fn write_many_errors(f: &mut fmt::Formatter<'_>, errors: &[EtlError]) -> Result< 0 => write!(f, "Multiple errors (empty)"), 1 => fmt::Display::fmt(&errors[0], f), count => { - write!(f, "Multiple errors ({}):", count)?; + write!(f, "Multiple errors ({count}):")?; for (index, error) in errors.iter().enumerate() { let rendered = error.to_string(); let mut lines = rendered.lines(); @@ -247,7 +247,7 @@ fn write_many_errors(f: &mut fmt::Formatter<'_>, errors: &[EtlError]) -> Result< } for line in lines { - write!(f, "\n {}", line)?; + write!(f, "\n {line}")?; } } Ok(()) @@ -256,7 +256,7 @@ fn write_many_errors(f: &mut fmt::Formatter<'_>, errors: &[EtlError]) -> Result< } fn format_error_kind(kind: ErrorKind) -> String { - let identifier = format!("{:?}", kind); + let identifier = format!("{kind:?}"); let mut words = Vec::new(); let mut current = String::new(); let mut chars = identifier.chars().peekable(); From 6487b3bc35e894e8f845d0b2af1809dfe602138b Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Thu, 25 Sep 2025 13:02:19 +0200 Subject: [PATCH 5/5] Improve --- etl-config/src/shared/notification.rs | 2 ++ etl-replicator/configuration/dev.yaml | 1 + etl-replicator/src/notifications.rs | 2 ++ 3 files changed, 5 insertions(+) diff --git a/etl-config/src/shared/notification.rs b/etl-config/src/shared/notification.rs index 839da1524..f85d16236 100644 --- a/etl-config/src/shared/notification.rs +++ b/etl-config/src/shared/notification.rs @@ -17,6 +17,8 @@ pub struct NotificationsConfig { pub struct EmailNotificationConfig { /// Base URL of the environment-specific API that receives email requests. pub base_url: String, + /// The API key to authenticate with the API that sends emails. + pub api_key: String, /// Email addresses that should receive the notification. pub addresses: Vec, /// Postmark template alias that should be used to render the message. diff --git a/etl-replicator/configuration/dev.yaml b/etl-replicator/configuration/dev.yaml index df81a93a1..0fe4bb812 100644 --- a/etl-replicator/configuration/dev.yaml +++ b/etl-replicator/configuration/dev.yaml @@ -14,6 +14,7 @@ supabase: notifications: email: base_url: "http://localhost:8080" + api_key: "000000000000000000009" addresses: - "test_email@example.com" template_alias: "etl-pipeline-error" diff --git a/etl-replicator/src/notifications.rs b/etl-replicator/src/notifications.rs index 315e0c502..c111a07bc 100644 --- a/etl-replicator/src/notifications.rs +++ b/etl-replicator/src/notifications.rs @@ -40,6 +40,7 @@ pub fn send_error_notification( .build() .context("failed to build HTTP client for email notifications")?; + // TODO: change endpoint when the new one is built. let endpoint = format!( "{}/system/email/send", email_config.base_url.trim_end_matches('/'), @@ -47,6 +48,7 @@ pub fn send_error_notification( let response = client .post(&endpoint) + .header("apikey", email_config.api_key.as_str()) .json(&payload) .send() .with_context(|| format!("failed to send error notification to {endpoint}"))?;