Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions etl-api/src/routes/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,8 @@ async fn create_or_update_pipeline_in_k8s(
pipeline,
SupabaseConfig {
project_ref: tenant_id.to_owned(),
// TODO: build actual notifications configuration.
notifications: None
},
)
.await?;
Expand Down
1 change: 1 addition & 0 deletions etl-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ utoipa = ["dep:utoipa"]
config = { workspace = true, features = ["yaml"] }
secrecy = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sqlx = { workspace = true, features = ["postgres"] }
thiserror = { workspace = true }
tokio-postgres = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions etl-config/src/shared/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod base;
mod batch;
mod connection;
mod destination;
mod notification;
mod pipeline;
mod replicator;
mod sentry;
Expand All @@ -11,6 +12,7 @@ pub use base::*;
pub use batch::*;
pub use connection::*;
pub use destination::*;
pub use notification::*;
pub use pipeline::*;
pub use replicator::*;
pub use sentry::*;
Expand Down
29 changes: 29 additions & 0 deletions etl-config/src/shared/notification.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::BTreeMap;

/// Configuration for sending notifications.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NotificationsConfig {
/// Emails configuration for notifications.
pub email: EmailNotificationConfig,
}

/// Configuration for sending error notifications via email.
///
/// Carries the endpoint information and template details required to enqueue
/// emails whenever the replicator reports a failure.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailNotificationConfig {
/// Base URL of the environment-specific API that receives email requests.
pub base_url: String,
/// The API key to authenticate with the API that sends emails.
pub api_key: String,
/// Email addresses that should receive the notification.
pub addresses: Vec<String>,
/// Postmark template alias that should be used to render the message.
pub template_alias: String,
/// Additional properties passed with every notification.
#[serde(default)]
pub custom_properties: BTreeMap<String, Value>,
}
5 changes: 4 additions & 1 deletion etl-config/src/shared/replicator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::Config;
use crate::shared::pipeline::PipelineConfig;
use crate::shared::{DestinationConfig, SentryConfig, SupabaseConfig, ValidationError};
use crate::shared::{
DestinationConfig, SentryConfig, SupabaseConfig,
ValidationError,
};
use serde::{Deserialize, Serialize};

/// Complete configuration for the replicator service.
Expand Down
5 changes: 5 additions & 0 deletions etl-config/src/shared/supabase.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize};
use std::fmt;

use crate::shared::NotificationsConfig;

/// Supabase integration configuration.
///
/// Contains Supabase-specific settings for ETL applications that
Expand All @@ -9,6 +11,9 @@ use std::fmt;
pub struct SupabaseConfig {
/// Supabase project reference identifier.
pub project_ref: String,
/// Optional configuration for sending notifications.
#[serde(skip_serializing_if = "Option::is_none")]
pub notifications: Option<NotificationsConfig>,
}

impl fmt::Debug for SupabaseConfig {
Expand Down
3 changes: 3 additions & 0 deletions etl-replicator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ sqlx = { workspace = true, features = [
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] }
tracing = { workspace = true, default-features = true }
reqwest = { workspace = true, features = ["blocking", "json", "rustls-tls"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
18 changes: 18 additions & 0 deletions etl-replicator/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
# `etl` - Replicator

Long-lived process that performs Postgres logical replication using the `etl` crate.

## Error Notifications

Set the `notifications.email` section in the replicator configuration to trigger
an HTTP request to `<env_url>/system/email/send` whenever the process exits with
an error:

```yaml
notifications:
email:
base_url: "https://example.supabase.co"
addresses:
- "[email protected]"
template_alias: "replicator_failure"
custom_properties:
service: "replicator"
timeout_seconds: 5
```
11 changes: 10 additions & 1 deletion etl-replicator/configuration/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,13 @@ pipeline:
password: "postgres"
tls:
trusted_root_certs: ""
enabled: false
enabled: false
supabase:
notifications:
email:
base_url: "http://localhost:8080"
api_key: "000000000000000000009"
addresses:
- "[email protected]"
template_alias: "etl-pipeline-error"
custom_properties: {}
54 changes: 28 additions & 26 deletions etl-replicator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
//! and routes data to configured destinations. Includes telemetry, error handling, and
//! graceful shutdown capabilities.

use crate::config::load_replicator_config;
use crate::core::start_replicator_with_config;
use etl_config::Environment;
use etl_config::shared::ReplicatorConfig;
use etl_telemetry::metrics::init_metrics;
Expand All @@ -14,9 +12,14 @@ use std::sync::Arc;
use thiserror::__private::AsDynError;
use tracing::{error, info};

use crate::config::load_replicator_config;
use crate::core::start_replicator_with_config;
use crate::notifications::send_error_notification;

mod config;
mod core;
mod migrations;
mod notifications;

/// The name of the environment variable which contains version information for this replicator.
const APP_VERSION_ENV_NAME: &str = "APP_VERSION";
Expand All @@ -27,9 +30,25 @@ const APP_VERSION_ENV_NAME: &str = "APP_VERSION";
/// and launches the replicator pipeline. Handles all errors and ensures proper
/// service initialization sequence.
fn main() -> anyhow::Result<()> {
// Load replicator config
let replicator_config = load_replicator_config()?;

if let Err(err) = guarded_main(replicator_config.clone()) {
sentry::capture_error(err.as_dyn_error());
error!("an error occurred in the replicator: {err}");

// We try to send an error notification, if we fail, we just report the failure.
if let Err(notification_err) = send_error_notification(&replicator_config, &err) {
sentry::capture_error(notification_err.as_dyn_error());
error!("failed to dispatch error notification email: {notification_err}");
}

return Err(err);
}

Ok(())
}

fn guarded_main(replicator_config: ReplicatorConfig) -> anyhow::Result<()> {
// Extract project reference to use in logs
let project_ref = replicator_config
.supabase
Expand All @@ -44,32 +63,17 @@ fn main() -> anyhow::Result<()> {
)?;

// Initialize Sentry before the async runtime starts
let _sentry_guard = init_sentry()?;
let _sentry_guard = init_sentry(&replicator_config)?;

// Initialize metrics collection
init_metrics(project_ref)?;

// We start the runtime.
// We start the runtime and propagate any errors to Sentry and logs. The reason for why we do
// this here is that we know that at this point Sentry and logging are configured.
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(async_main(replicator_config))?;

Ok(())
}

/// Main async entry point that starts the replicator pipeline.
///
/// Launches the replicator with the provided configuration and captures any errors
/// to Sentry before propagating them up.
async fn async_main(replicator_config: ReplicatorConfig) -> anyhow::Result<()> {
// We start the replicator and catch any errors.
if let Err(err) = start_replicator_with_config(replicator_config).await {
sentry::capture_error(err.as_dyn_error());
error!("an error occurred in the replicator: {err}");

return Err(err);
}
.block_on(start_replicator_with_config(replicator_config))?;

Ok(())
}
Expand All @@ -79,10 +83,8 @@ async fn async_main(replicator_config: ReplicatorConfig) -> anyhow::Result<()> {
/// Loads configuration and sets up Sentry if a DSN is provided in the config.
/// Tags all errors with the "replicator" service identifier and configures
/// panic handling to automatically capture and send panics to Sentry.
fn init_sentry() -> anyhow::Result<Option<sentry::ClientInitGuard>> {
if let Ok(config) = load_replicator_config()
&& let Some(sentry_config) = &config.sentry
{
fn init_sentry(config: &ReplicatorConfig) -> anyhow::Result<Option<sentry::ClientInitGuard>> {
if let Some(sentry_config) = &config.sentry {
info!("initializing sentry with supplied dsn");

let environment = Environment::load()?;
Expand Down
91 changes: 91 additions & 0 deletions etl-replicator/src/notifications.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use anyhow::Context;
use etl_config::shared::{EmailNotificationConfig, ReplicatorConfig, SupabaseConfig};
use reqwest::blocking::Client;
use serde::Serialize;
use serde_json::Value;
use std::collections::BTreeMap;

/// Represents the JSON payload accepted by the email service.
#[derive(Debug, Serialize)]
#[serde(rename_all = "snake_case")]
struct SendEmailRequest {
/// Target email addresses.
addresses: Vec<String>,
/// Postmark template alias.
template_alias: String,
/// Arbitrary template variables.
custom_properties: BTreeMap<String, Value>,
}

/// Dispatches an email notification when the replicator terminates with an error.
///
/// The notification is only sent when the `email_notifications` configuration is present.
/// Any failure encountered while preparing or sending the request is surfaced to the caller
/// so it can be logged without masking the original error.
pub fn send_error_notification(
replicator_config: &ReplicatorConfig,
error: &anyhow::Error,
) -> anyhow::Result<()> {
let Some(supabase_config) = &replicator_config.supabase else {
return Ok(());
};

let Some(email_config) = supabase_config.notifications.as_ref().map(|n| &n.email) else {
return Ok(());
};

let payload = build_send_email_request(supabase_config, email_config, replicator_config, error);

let client = Client::builder()
.build()
.context("failed to build HTTP client for email notifications")?;

// TODO: change endpoint when the new one is built.
let endpoint = format!(
"{}/system/email/send",
email_config.base_url.trim_end_matches('/'),
);

let response = client
.post(&endpoint)
.header("apikey", email_config.api_key.as_str())
.json(&payload)
.send()
.with_context(|| format!("failed to send error notification to {endpoint}"))?;

if !response.status().is_success() {
let status = response.status();
let body = response
.text()
.unwrap_or_else(|_| "<unavailable>".to_string());

anyhow::bail!("email notification endpoint responded with {status} and body `{body}`",);
}

Ok(())
}

/// Builds the request payload combining static configuration with runtime context.
fn build_send_email_request(
supabase_config: &SupabaseConfig,
email_config: &EmailNotificationConfig,
replicator_config: &ReplicatorConfig,
error: &anyhow::Error,
) -> Option<SendEmailRequest> {
let mut custom_properties = email_config.custom_properties.clone();
custom_properties.insert(
"project_ref".into(),
Value::String(supabase_config.project_ref.clone()),
);
custom_properties.insert(
"pipeline_id".into(),
Value::String(replicator_config.pipeline.id.to_string()),
);
custom_properties.insert("error_message".into(), Value::String(error.to_string()));

Some(SendEmailRequest {
addresses: email_config.addresses.clone(),
template_alias: email_config.template_alias.clone(),
custom_properties,
})
}
4 changes: 2 additions & 2 deletions etl-telemetry/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use tracing::trace;

// One time initialization objects like Once, OnceCell, OnceLock or other variants
// are not used here and instead a mutex is used because the initialization code
// is fallible and ideally we'd like to use OnceLock::get_or_try_init which
// allows fallibe code to run as part of initialization but it is currently
// is fallible, and ideally we'd like to use OnceLock::get_or_try_init which
// allows fallible code to run as part of initialization, but it is currently
// unstable.
//
// The reason we want to only initialize this once is because the call to
Expand Down
Loading
Loading