diff --git a/etl-api/src/routes/pipelines.rs b/etl-api/src/routes/pipelines.rs index 803bd4254..f63c7023f 100644 --- a/etl-api/src/routes/pipelines.rs +++ b/etl-api/src/routes/pipelines.rs @@ -1226,6 +1226,8 @@ async fn create_or_update_pipeline_in_k8s( pipeline, SupabaseConfig { project_ref: tenant_id.to_owned(), + // TODO: build actual notifications configuration. + notifications: None }, ) .await?; diff --git a/etl-config/Cargo.toml b/etl-config/Cargo.toml index 98f982515..055085735 100644 --- a/etl-config/Cargo.toml +++ b/etl-config/Cargo.toml @@ -14,6 +14,7 @@ utoipa = ["dep:utoipa"] config = { workspace = true, features = ["yaml"] } secrecy = { workspace = true } serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } sqlx = { workspace = true, features = ["postgres"] } thiserror = { workspace = true } tokio-postgres = { workspace = true } diff --git a/etl-config/src/shared/mod.rs b/etl-config/src/shared/mod.rs index e9f87563e..3e7bfc24b 100644 --- a/etl-config/src/shared/mod.rs +++ b/etl-config/src/shared/mod.rs @@ -2,6 +2,7 @@ mod base; mod batch; mod connection; mod destination; +mod notification; mod pipeline; mod replicator; mod sentry; @@ -11,6 +12,7 @@ pub use base::*; pub use batch::*; pub use connection::*; pub use destination::*; +pub use notification::*; pub use pipeline::*; pub use replicator::*; pub use sentry::*; diff --git a/etl-config/src/shared/notification.rs b/etl-config/src/shared/notification.rs new file mode 100644 index 000000000..f85d16236 --- /dev/null +++ b/etl-config/src/shared/notification.rs @@ -0,0 +1,29 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::BTreeMap; + +/// Configuration for sending notifications. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NotificationsConfig { + /// Emails configuration for notifications. + pub email: EmailNotificationConfig, +} + +/// Configuration for sending error notifications via email. +/// +/// Carries the endpoint information and template details required to enqueue +/// emails whenever the replicator reports a failure. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EmailNotificationConfig { + /// Base URL of the environment-specific API that receives email requests. + pub base_url: String, + /// The API key to authenticate with the API that sends emails. + pub api_key: String, + /// Email addresses that should receive the notification. + pub addresses: Vec, + /// Postmark template alias that should be used to render the message. + pub template_alias: String, + /// Additional properties passed with every notification. + #[serde(default)] + pub custom_properties: BTreeMap, +} diff --git a/etl-config/src/shared/replicator.rs b/etl-config/src/shared/replicator.rs index 26f3342ee..5dff8c262 100644 --- a/etl-config/src/shared/replicator.rs +++ b/etl-config/src/shared/replicator.rs @@ -1,6 +1,9 @@ use crate::Config; use crate::shared::pipeline::PipelineConfig; -use crate::shared::{DestinationConfig, SentryConfig, SupabaseConfig, ValidationError}; +use crate::shared::{ + DestinationConfig, SentryConfig, SupabaseConfig, + ValidationError, +}; use serde::{Deserialize, Serialize}; /// Complete configuration for the replicator service. diff --git a/etl-config/src/shared/supabase.rs b/etl-config/src/shared/supabase.rs index 2c7423d58..13baba0fc 100644 --- a/etl-config/src/shared/supabase.rs +++ b/etl-config/src/shared/supabase.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; use std::fmt; +use crate::shared::NotificationsConfig; + /// Supabase integration configuration. /// /// Contains Supabase-specific settings for ETL applications that @@ -9,6 +11,9 @@ use std::fmt; pub struct SupabaseConfig { /// Supabase project reference identifier. pub project_ref: String, + /// Optional configuration for sending notifications. + #[serde(skip_serializing_if = "Option::is_none")] + pub notifications: Option, } impl fmt::Debug for SupabaseConfig { diff --git a/etl-replicator/Cargo.toml b/etl-replicator/Cargo.toml index abf2c75f9..25f65884f 100644 --- a/etl-replicator/Cargo.toml +++ b/etl-replicator/Cargo.toml @@ -24,3 +24,6 @@ sqlx = { workspace = true, features = [ thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] } tracing = { workspace = true, default-features = true } +reqwest = { workspace = true, features = ["blocking", "json", "rustls-tls"] } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } diff --git a/etl-replicator/README.md b/etl-replicator/README.md index f6f76db71..ea2cbde21 100644 --- a/etl-replicator/README.md +++ b/etl-replicator/README.md @@ -1,3 +1,21 @@ # `etl` - Replicator Long-lived process that performs Postgres logical replication using the `etl` crate. + +## Error Notifications + +Set the `notifications.email` section in the replicator configuration to trigger +an HTTP request to `/system/email/send` whenever the process exits with +an error: + +```yaml +notifications: + email: + base_url: "https://example.supabase.co" + addresses: + - "oncall@example.com" + template_alias: "replicator_failure" + custom_properties: + service: "replicator" + timeout_seconds: 5 +``` \ No newline at end of file diff --git a/etl-replicator/configuration/dev.yaml b/etl-replicator/configuration/dev.yaml index 3ce27d846..0fe4bb812 100644 --- a/etl-replicator/configuration/dev.yaml +++ b/etl-replicator/configuration/dev.yaml @@ -9,4 +9,13 @@ pipeline: password: "postgres" tls: trusted_root_certs: "" - enabled: false \ No newline at end of file + enabled: false +supabase: + notifications: + email: + base_url: "http://localhost:8080" + api_key: "000000000000000000009" + addresses: + - "test_email@example.com" + template_alias: "etl-pipeline-error" + custom_properties: {} \ No newline at end of file diff --git a/etl-replicator/src/main.rs b/etl-replicator/src/main.rs index fd111c8c8..4055f8070 100644 --- a/etl-replicator/src/main.rs +++ b/etl-replicator/src/main.rs @@ -4,8 +4,6 @@ //! and routes data to configured destinations. Includes telemetry, error handling, and //! graceful shutdown capabilities. -use crate::config::load_replicator_config; -use crate::core::start_replicator_with_config; use etl_config::Environment; use etl_config::shared::ReplicatorConfig; use etl_telemetry::metrics::init_metrics; @@ -14,9 +12,14 @@ use std::sync::Arc; use thiserror::__private::AsDynError; use tracing::{error, info}; +use crate::config::load_replicator_config; +use crate::core::start_replicator_with_config; +use crate::notifications::send_error_notification; + mod config; mod core; mod migrations; +mod notifications; /// The name of the environment variable which contains version information for this replicator. const APP_VERSION_ENV_NAME: &str = "APP_VERSION"; @@ -27,9 +30,25 @@ const APP_VERSION_ENV_NAME: &str = "APP_VERSION"; /// and launches the replicator pipeline. Handles all errors and ensures proper /// service initialization sequence. fn main() -> anyhow::Result<()> { - // Load replicator config let replicator_config = load_replicator_config()?; + if let Err(err) = guarded_main(replicator_config.clone()) { + sentry::capture_error(err.as_dyn_error()); + error!("an error occurred in the replicator: {err}"); + + // We try to send an error notification, if we fail, we just report the failure. + if let Err(notification_err) = send_error_notification(&replicator_config, &err) { + sentry::capture_error(notification_err.as_dyn_error()); + error!("failed to dispatch error notification email: {notification_err}"); + } + + return Err(err); + } + + Ok(()) +} + +fn guarded_main(replicator_config: ReplicatorConfig) -> anyhow::Result<()> { // Extract project reference to use in logs let project_ref = replicator_config .supabase @@ -44,32 +63,17 @@ fn main() -> anyhow::Result<()> { )?; // Initialize Sentry before the async runtime starts - let _sentry_guard = init_sentry()?; + let _sentry_guard = init_sentry(&replicator_config)?; // Initialize metrics collection init_metrics(project_ref)?; - // We start the runtime. + // We start the runtime and propagate any errors to Sentry and logs. The reason for why we do + // this here is that we know that at this point Sentry and logging are configured. tokio::runtime::Builder::new_multi_thread() .enable_all() .build()? - .block_on(async_main(replicator_config))?; - - Ok(()) -} - -/// Main async entry point that starts the replicator pipeline. -/// -/// Launches the replicator with the provided configuration and captures any errors -/// to Sentry before propagating them up. -async fn async_main(replicator_config: ReplicatorConfig) -> anyhow::Result<()> { - // We start the replicator and catch any errors. - if let Err(err) = start_replicator_with_config(replicator_config).await { - sentry::capture_error(err.as_dyn_error()); - error!("an error occurred in the replicator: {err}"); - - return Err(err); - } + .block_on(start_replicator_with_config(replicator_config))?; Ok(()) } @@ -79,10 +83,8 @@ async fn async_main(replicator_config: ReplicatorConfig) -> anyhow::Result<()> { /// Loads configuration and sets up Sentry if a DSN is provided in the config. /// Tags all errors with the "replicator" service identifier and configures /// panic handling to automatically capture and send panics to Sentry. -fn init_sentry() -> anyhow::Result> { - if let Ok(config) = load_replicator_config() - && let Some(sentry_config) = &config.sentry - { +fn init_sentry(config: &ReplicatorConfig) -> anyhow::Result> { + if let Some(sentry_config) = &config.sentry { info!("initializing sentry with supplied dsn"); let environment = Environment::load()?; diff --git a/etl-replicator/src/notifications.rs b/etl-replicator/src/notifications.rs new file mode 100644 index 000000000..c111a07bc --- /dev/null +++ b/etl-replicator/src/notifications.rs @@ -0,0 +1,91 @@ +use anyhow::Context; +use etl_config::shared::{EmailNotificationConfig, ReplicatorConfig, SupabaseConfig}; +use reqwest::blocking::Client; +use serde::Serialize; +use serde_json::Value; +use std::collections::BTreeMap; + +/// Represents the JSON payload accepted by the email service. +#[derive(Debug, Serialize)] +#[serde(rename_all = "snake_case")] +struct SendEmailRequest { + /// Target email addresses. + addresses: Vec, + /// Postmark template alias. + template_alias: String, + /// Arbitrary template variables. + custom_properties: BTreeMap, +} + +/// Dispatches an email notification when the replicator terminates with an error. +/// +/// The notification is only sent when the `email_notifications` configuration is present. +/// Any failure encountered while preparing or sending the request is surfaced to the caller +/// so it can be logged without masking the original error. +pub fn send_error_notification( + replicator_config: &ReplicatorConfig, + error: &anyhow::Error, +) -> anyhow::Result<()> { + let Some(supabase_config) = &replicator_config.supabase else { + return Ok(()); + }; + + let Some(email_config) = supabase_config.notifications.as_ref().map(|n| &n.email) else { + return Ok(()); + }; + + let payload = build_send_email_request(supabase_config, email_config, replicator_config, error); + + let client = Client::builder() + .build() + .context("failed to build HTTP client for email notifications")?; + + // TODO: change endpoint when the new one is built. + let endpoint = format!( + "{}/system/email/send", + email_config.base_url.trim_end_matches('/'), + ); + + let response = client + .post(&endpoint) + .header("apikey", email_config.api_key.as_str()) + .json(&payload) + .send() + .with_context(|| format!("failed to send error notification to {endpoint}"))?; + + if !response.status().is_success() { + let status = response.status(); + let body = response + .text() + .unwrap_or_else(|_| "".to_string()); + + anyhow::bail!("email notification endpoint responded with {status} and body `{body}`",); + } + + Ok(()) +} + +/// Builds the request payload combining static configuration with runtime context. +fn build_send_email_request( + supabase_config: &SupabaseConfig, + email_config: &EmailNotificationConfig, + replicator_config: &ReplicatorConfig, + error: &anyhow::Error, +) -> Option { + let mut custom_properties = email_config.custom_properties.clone(); + custom_properties.insert( + "project_ref".into(), + Value::String(supabase_config.project_ref.clone()), + ); + custom_properties.insert( + "pipeline_id".into(), + Value::String(replicator_config.pipeline.id.to_string()), + ); + custom_properties.insert("error_message".into(), Value::String(error.to_string())); + + Some(SendEmailRequest { + addresses: email_config.addresses.clone(), + template_alias: email_config.template_alias.clone(), + custom_properties, + }) +} diff --git a/etl-telemetry/src/metrics.rs b/etl-telemetry/src/metrics.rs index 0374ec2fd..3691aed45 100644 --- a/etl-telemetry/src/metrics.rs +++ b/etl-telemetry/src/metrics.rs @@ -5,8 +5,8 @@ use tracing::trace; // One time initialization objects like Once, OnceCell, OnceLock or other variants // are not used here and instead a mutex is used because the initialization code -// is fallible and ideally we'd like to use OnceLock::get_or_try_init which -// allows fallibe code to run as part of initialization but it is currently +// is fallible, and ideally we'd like to use OnceLock::get_or_try_init which +// allows fallible code to run as part of initialization, but it is currently // unstable. // // The reason we want to only initialize this once is because the call to diff --git a/etl/src/error.rs b/etl/src/error.rs index 1f421e279..2a8b0ef18 100644 --- a/etl/src/error.rs +++ b/etl/src/error.rs @@ -199,42 +199,127 @@ impl PartialEq for EtlError { impl fmt::Display for EtlError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - match self.repr { + match &self.repr { ErrorRepr::WithDescription(kind, desc) => { - fmt::Debug::fmt(&kind, f)?; - f.write_str(": ")?; - desc.fmt(f)?; - - Ok(()) + write_single_error_line(f, *kind, desc, None, 0) } - ErrorRepr::WithDescriptionAndDetail(kind, desc, ref detail) => { - fmt::Debug::fmt(&kind, f)?; - f.write_str(": ")?; - desc.fmt(f)?; - f.write_str(" -> ")?; - detail.fmt(f)?; - - Ok(()) + ErrorRepr::WithDescriptionAndDetail(kind, desc, detail) => { + write_single_error_line(f, *kind, desc, Some(detail.as_str()), 0) } - ErrorRepr::Many(ref errors) => { - if errors.is_empty() { - write!(f, "Multiple errors occurred (empty)")?; - } else if errors.len() == 1 { - // If there's only one error, just display it directly - errors[0].fmt(f)?; - } else { - write!(f, "Multiple errors occurred ({} total):", errors.len())?; - for (i, error) in errors.iter().enumerate() { - write!(f, "\n {}: {}", i + 1, error)?; - } + ErrorRepr::Many(errors) => write_many_errors(f, errors), + } + } +} + +impl error::Error for EtlError {} + +fn write_single_error_line( + f: &mut fmt::Formatter<'_>, + kind: ErrorKind, + description: &str, + detail: Option<&str>, + indent_level: usize, +) -> Result<(), fmt::Error> { + let prefix = " ".repeat(indent_level); + write!(f, "{}{}: {}", prefix, format_error_kind(kind), description)?; + + if let Some(detail) = detail { + if !detail.trim().is_empty() { + write!(f, "\n{prefix} Details: {detail}")?; + } + } + + Ok(()) +} + +fn write_many_errors(f: &mut fmt::Formatter<'_>, errors: &[EtlError]) -> Result<(), fmt::Error> { + match errors.len() { + 0 => write!(f, "Multiple errors (empty)"), + 1 => fmt::Display::fmt(&errors[0], f), + count => { + write!(f, "Multiple errors ({count}):")?; + for (index, error) in errors.iter().enumerate() { + let rendered = error.to_string(); + let mut lines = rendered.lines(); + + if let Some(first) = lines.next() { + write!(f, "\n {}. {}", index + 1, first)?; + } + + for line in lines { + write!(f, "\n {line}")?; } - Ok(()) } + Ok(()) } } } -impl error::Error for EtlError {} +fn format_error_kind(kind: ErrorKind) -> String { + let identifier = format!("{kind:?}"); + let mut words = Vec::new(); + let mut current = String::new(); + let mut chars = identifier.chars().peekable(); + + while let Some(ch) = chars.next() { + if current.is_empty() { + current.push(ch); + continue; + } + + let is_upper = ch.is_uppercase(); + let prev_is_upper = current + .chars() + .last() + .map(|c| c.is_uppercase()) + .unwrap_or(false); + let next_is_lower = chars + .peek() + .map(|next| next.is_lowercase()) + .unwrap_or(false); + + if is_upper && (!prev_is_upper || next_is_lower) { + words.push(current); + current = String::new(); + current.push(ch); + } else { + current.push(ch); + } + } + + if !current.is_empty() { + words.push(current); + } + + let mut words_iter = words.into_iter(); + let mut formatted = String::new(); + + if let Some(first) = words_iter.next() { + formatted.push_str(&capitalize(&first)); + } + + for word in words_iter { + formatted.push(' '); + formatted.push_str(&word.to_lowercase()); + } + + formatted +} + +fn capitalize(segment: &str) -> String { + let mut chars = segment.chars(); + match chars.next() { + None => String::new(), + Some(first) => { + let mut result = String::with_capacity(segment.len()); + result.push(first.to_ascii_uppercase()); + for ch in chars { + result.push(ch.to_ascii_lowercase()); + } + result + } + } +} /// Creates an [`EtlError`] from an error kind and static description. impl From<(ErrorKind, &'static str)> for EtlError {