Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app-server/src/ch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod datapoints;
pub mod evaluation_datapoints;
pub mod limits;
pub mod logs;
pub mod notification_log;
pub mod service;
pub mod signal_events;
pub mod signal_run_messages;
Expand Down
66 changes: 66 additions & 0 deletions app-server/src/ch/notification_log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use anyhow::Result;
use clickhouse::Row;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

/// ClickHouse row for notification_log table.
/// Records every notification (email or Slack) sent by the system for auditing and inspection.
#[derive(Row, Serialize, Deserialize, Clone, Debug)]
pub struct CHNotificationLog {
#[serde(with = "clickhouse::serde::uuid")]
pub id: Uuid,
#[serde(with = "clickhouse::serde::uuid")]
pub workspace_id: Uuid,
#[serde(with = "clickhouse::serde::uuid")]
pub project_id: Uuid,
/// "report" or "alert"
pub notification_type: String,
/// "email" or "slack"
pub channel: String,
/// Email address or Slack channel ID
pub recipient: String,
/// Email subject or Slack message summary
pub subject: String,
/// Full HTML body (email) or JSON blocks (Slack)
pub body: String,
/// Name of the event that triggered this notification
pub event_name: String,
/// "success" or "error"
pub status: String,
/// Error message if status is "error", empty otherwise
pub error: String,
/// Millisecond-precision timestamp
pub created_at: i64,
}

impl CHNotificationLog {
pub fn now_millis() -> i64 {
chrono::Utc::now().timestamp_millis()
}
}

/// Insert notification log entries into ClickHouse.
pub async fn insert_notification_logs(
clickhouse: &clickhouse::Client,
logs: Vec<CHNotificationLog>,
) -> Result<()> {
if logs.is_empty() {
return Ok(());
}

let mut insert = clickhouse
.insert::<CHNotificationLog>("notification_log")
.await?;
insert = insert.with_option("wait_for_async_insert", "0");

for log in logs {
insert.write(&log).await?;
}

insert
.end()
.await
.map_err(|e| anyhow::anyhow!("ClickHouse notification_log insertion failed: {:?}", e))?;

Ok(())
}
14 changes: 14 additions & 0 deletions app-server/src/db/projects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ pub struct ProjectInfo {
pub name: String,
}

pub async fn get_workspace_id_for_project(
pool: &PgPool,
project_id: &Uuid,
) -> anyhow::Result<Option<Uuid>> {
let row = sqlx::query_scalar::<_, Uuid>(
"SELECT workspace_id FROM projects WHERE id = $1",
)
.bind(project_id)
.fetch_optional(pool)
.await?;

Ok(row)
}

pub async fn get_projects_for_workspace(
pool: &PgPool,
workspace_id: &Uuid,
Expand Down
29 changes: 29 additions & 0 deletions app-server/src/db/reports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,35 @@ pub struct SignalInfo {
pub project_id: Uuid,
}

#[derive(FromRow, Debug, Clone)]
pub struct SlackReportTarget {
pub channel_id: String,
pub channel_name: String,
pub integration_id: Uuid,
}

/// Fetch Slack targets from report_targets for a given report where type = 'SLACK'.
pub async fn get_report_target_slack_channels(
pool: &PgPool,
report_id: &Uuid,
workspace_id: &Uuid,
) -> anyhow::Result<Vec<SlackReportTarget>> {
let targets = sqlx::query_as::<_, SlackReportTarget>(
"SELECT rt.channel_id, rt.channel_name, rt.integration_id FROM report_targets rt
JOIN reports r ON rt.report_id = r.id
WHERE rt.report_id = $1 AND r.workspace_id = $2
AND rt.type = 'SLACK'
AND rt.channel_id IS NOT NULL
AND rt.integration_id IS NOT NULL",
)
.bind(report_id)
.bind(workspace_id)
.fetch_all(pool)
.await?;

Ok(targets)
}

/// Fetch all signals for all projects in a workspace in a single query.
pub async fn get_signals_for_workspace(
pool: &PgPool,
Expand Down
3 changes: 2 additions & 1 deletion app-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1200,14 +1200,15 @@ fn main() -> anyhow::Result<()> {
// Spawn notification workers
{
let db = db_for_consumer.clone();
let ch = clickhouse_for_consumer.clone();
let client = reqwest::Client::new();
let resend = resend_client.clone();

worker_pool_clone.spawn(
WorkerType::Notifications,
num_notification_workers as usize,
move || {
NotificationHandler::new(db.clone(), client.clone(), resend.clone())
NotificationHandler::new(db.clone(), ch.clone(), client.clone(), resend.clone())
},
QueueConfig {
queue_name: NOTIFICATIONS_QUEUE,
Expand Down
85 changes: 78 additions & 7 deletions app-server/src/notifications/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ use resend_rs::types::{CreateAttachment, CreateEmailBaseOptions};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::ch::notification_log::{CHNotificationLog, insert_notification_logs};
use crate::db::DB;
use crate::mq::{MessageQueue, MessageQueueTrait, utils::mq_max_payload};
use crate::worker::{HandlerError, MessageHandler};

mod slack;
pub use slack::{EventIdentificationPayload, SlackMessagePayload};
pub use slack::{
EventIdentificationPayload, ProjectSlackSummary, ReportSummaryPayload, SlackMessagePayload,
};

pub const NOTIFICATIONS_EXCHANGE: &str = "notifications";
pub const NOTIFICATIONS_QUEUE: &str = "notifications";
Expand Down Expand Up @@ -48,6 +51,8 @@ pub struct NotificationMessage {
pub notification_type: NotificationType,
pub event_name: String,
pub payload: serde_json::Value,
#[serde(default)]
pub workspace_id: Uuid,
}

/// Push a notification message to the notification queue.
Expand Down Expand Up @@ -93,17 +98,34 @@ pub async fn push_to_notification_queue(
Ok(())
}

/// Derive the notification category from the event name.
/// Report events are prefixed with "report_"; everything else is an alert.
fn notification_category(event_name: &str) -> &'static str {
if event_name.starts_with("report_") {
"report"
} else {
"alert"
}
}

/// Handler for notifications
pub struct NotificationHandler {
pub db: Arc<DB>,
pub clickhouse: clickhouse::Client,
pub slack_client: reqwest::Client,
pub resend: Option<Arc<Resend>>,
}

impl NotificationHandler {
pub fn new(db: Arc<DB>, slack_client: reqwest::Client, resend: Option<Arc<Resend>>) -> Self {
pub fn new(
db: Arc<DB>,
clickhouse: clickhouse::Client,
slack_client: reqwest::Client,
resend: Option<Arc<Resend>>,
) -> Self {
Self {
db,
clickhouse,
slack_client,
resend,
}
Expand All @@ -129,6 +151,7 @@ impl NotificationHandler {

let integration_id = match &slack_payload {
SlackMessagePayload::EventIdentification(payload) => payload.integration_id,
SlackMessagePayload::ReportSummary(payload) => payload.integration_id,
};

let integration =
Expand All @@ -153,9 +176,35 @@ impl NotificationHandler {

let channel_id = slack::get_channel_id(&slack_payload);

slack::send_message(&self.slack_client, &decrypted_token, channel_id, blocks)
.await
.map_err(|e| anyhow::anyhow!("Failed to send Slack message: {}", e))?;
let send_result =
slack::send_message(&self.slack_client, &decrypted_token, channel_id, blocks.clone())
.await;

let (status, error_msg) = match &send_result {
Ok(()) => ("success".to_string(), String::new()),
Err(e) => ("error".to_string(), e.to_string()),
};

let log_entry = CHNotificationLog {
id: Uuid::new_v4(),
workspace_id: message.workspace_id,
project_id: message.project_id,
notification_type: notification_category(&message.event_name).to_string(),
channel: "slack".to_string(),
recipient: channel_id.to_string(),
subject: message.event_name.clone(),
body: serde_json::to_string(&blocks).unwrap_or_default(),
event_name: message.event_name.clone(),
status,
error: error_msg,
created_at: CHNotificationLog::now_millis(),
};

if let Err(e) = insert_notification_logs(&self.clickhouse, vec![log_entry]).await {
log::warn!("[Notifications] Failed to write Slack notification log to ClickHouse: {:?}", e);
}

send_result.map_err(|e| anyhow::anyhow!("Failed to send Slack message: {}", e))?;

log::debug!(
"Successfully sent Slack notification for trace_id={}",
Expand Down Expand Up @@ -193,6 +242,7 @@ impl NotificationHandler {

let mut send_failures = 0;
let total = email_payload.to.len();
let mut log_entries = Vec::with_capacity(total);

for recipient in &email_payload.to {
let mut email = CreateEmailBaseOptions::new(
Expand All @@ -211,18 +261,39 @@ impl NotificationHandler {
);
}

match send_email_with_retry(&resend, email).await {
let (status, error_msg) = match send_email_with_retry(&resend, email).await {
Ok(response) => {
log::info!(
"[Notifications] Email sent to recipient. Email ID: {:?}",
response.id
);
("success".to_string(), String::new())
}
Err(e) => {
send_failures += 1;
log::error!("[Notifications] Failed to send email: {:?}", e);
("error".to_string(), format!("{:?}", e))
}
}
};

log_entries.push(CHNotificationLog {
id: Uuid::new_v4(),
workspace_id: message.workspace_id,
project_id: message.project_id,
notification_type: notification_category(&message.event_name).to_string(),
channel: "email".to_string(),
recipient: recipient.clone(),
subject: email_payload.subject.clone(),
body: email_payload.html.clone(),
event_name: message.event_name.clone(),
status,
error: error_msg,
created_at: CHNotificationLog::now_millis(),
});
}

if let Err(e) = insert_notification_logs(&self.clickhouse, log_entries).await {
log::warn!("[Notifications] Failed to write email notification logs to ClickHouse: {:?}", e);
}

if send_failures == total {
Expand Down
Loading
Loading