Skip to content

Commit 0e24130

Browse files
authored
notification queue (#994)
1 parent c7d19c2 commit 0e24130

File tree

26 files changed

+2643
-760
lines changed

26 files changed

+2643
-760
lines changed

app-server/src/db/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ pub mod prices;
99
pub mod project_api_keys;
1010
pub mod projects;
1111
pub mod provider_api_keys;
12+
pub mod slack_channel_to_events;
13+
pub mod slack_integrations;
1214
pub mod spans;
1315
pub mod stats;
1416
pub mod summary_trigger_spans;
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use sqlx::PgPool;
2+
use uuid::Uuid;
3+
4+
#[derive(Debug, Clone, sqlx::FromRow)]
5+
pub struct SlackChannelToEvent {
6+
pub channel_id: String,
7+
pub integration_id: Uuid,
8+
}
9+
10+
pub async fn get_channels_for_event(
11+
pool: &PgPool,
12+
project_id: Uuid,
13+
event_name: &str,
14+
) -> anyhow::Result<Vec<SlackChannelToEvent>> {
15+
let records = sqlx::query_as::<_, SlackChannelToEvent>(
16+
r#"
17+
SELECT channel_id, integration_id
18+
FROM slack_channel_to_events
19+
WHERE project_id = $1 AND event_name = $2
20+
"#,
21+
)
22+
.bind(project_id)
23+
.bind(event_name)
24+
.fetch_all(pool)
25+
.await?;
26+
27+
Ok(records)
28+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use anyhow::Result;
2+
use sqlx::{FromRow, PgPool};
3+
use uuid::Uuid;
4+
5+
#[derive(FromRow, Clone, Debug)]
6+
pub struct SlackIntegration {
7+
pub token: String,
8+
pub team_id: String,
9+
pub nonce_hex: String,
10+
}
11+
12+
pub async fn get_integration_by_id(
13+
pool: &PgPool,
14+
integration_id: &Uuid,
15+
) -> Result<Option<SlackIntegration>> {
16+
let integration = sqlx::query_as::<_, SlackIntegration>(
17+
r#"
18+
SELECT token, team_id, nonce_hex
19+
FROM slack_integrations
20+
WHERE id = $1
21+
"#,
22+
)
23+
.bind(integration_id)
24+
.fetch_optional(pool)
25+
.await?;
26+
27+
Ok(integration)
28+
}

app-server/src/main.rs

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use lapin::{
2323
};
2424
use mq::MessageQueue;
2525
use names::NameGenerator;
26+
use notifications::{NOTIFICATIONS_EXCHANGE, NOTIFICATIONS_QUEUE, process_notifications};
2627
use opentelemetry_proto::opentelemetry::proto::collector::trace::v1::trace_service_server::TraceServiceServer;
2728
use query_engine::{
2829
QueryEngine, query_engine::query_engine_service_client::QueryEngineServiceClient,
@@ -66,6 +67,7 @@ mod instrumentation;
6667
mod language_model;
6768
mod mq;
6869
mod names;
70+
mod notifications;
6971
mod opentelemetry_proto;
7072
mod project_api_keys;
7173
mod provider_api_keys;
@@ -362,6 +364,32 @@ fn main() -> anyhow::Result<()> {
362364
.await
363365
.unwrap();
364366

367+
// ==== 3.6 Notifications message queue ====
368+
channel
369+
.exchange_declare(
370+
NOTIFICATIONS_EXCHANGE,
371+
ExchangeKind::Fanout,
372+
ExchangeDeclareOptions {
373+
durable: true,
374+
..Default::default()
375+
},
376+
FieldTable::default(),
377+
)
378+
.await
379+
.unwrap();
380+
381+
channel
382+
.queue_declare(
383+
NOTIFICATIONS_QUEUE,
384+
QueueDeclareOptions {
385+
durable: true,
386+
..Default::default()
387+
},
388+
quorum_queue_args.clone(),
389+
)
390+
.await
391+
.unwrap();
392+
365393
let max_channel_pool_size = env::var("RABBITMQ_MAX_CHANNEL_POOL_SIZE")
366394
.ok()
367395
.and_then(|v| v.parse().ok())
@@ -389,6 +417,8 @@ fn main() -> anyhow::Result<()> {
389417
queue.register_queue(PAYLOADS_EXCHANGE, PAYLOADS_QUEUE);
390418
// ==== 3.5 Trace summary message queue ====
391419
queue.register_queue(TRACE_SUMMARY_EXCHANGE, TRACE_SUMMARY_QUEUE);
420+
// ==== 3.6 Notifications message queue ====
421+
queue.register_queue(NOTIFICATIONS_EXCHANGE, NOTIFICATIONS_QUEUE);
392422
log::info!("Using tokio mpsc queue");
393423
Arc::new(queue.into())
394424
};
@@ -398,7 +428,10 @@ fn main() -> anyhow::Result<()> {
398428

399429
runtime_handle.spawn(cleanup_closed_connections(sse_connections.clone()));
400430

401-
// ==== 3.6 Worker tracker ====
431+
// ==== Slack client ====
432+
let slack_client = Arc::new(reqwest::Client::new());
433+
434+
// ==== 3.7 Worker tracker ====
402435
let worker_tracker = Arc::new(WorkerTracker::new());
403436

404437
let runtime_handle_for_http = runtime_handle.clone();
@@ -565,13 +598,19 @@ fn main() -> anyhow::Result<()> {
565598
.parse::<u8>()
566599
.unwrap_or(2);
567600

601+
let num_notification_workers = env::var("NUM_NOTIFICATION_WORKERS")
602+
.unwrap_or(String::from("2"))
603+
.parse::<u8>()
604+
.unwrap_or(2);
605+
568606
log::info!(
569-
"Spans workers: {}, Browser events workers: {}, Evaluators workers: {}, Payload workers: {}, Trace summary workers: {}",
607+
"Spans workers: {}, Browser events workers: {}, Evaluators workers: {}, Payload workers: {}, Trace summary workers: {}, Notification workers: {}",
570608
num_spans_workers,
571609
num_browser_events_workers,
572610
num_evaluators_workers,
573611
num_payload_workers,
574-
num_trace_summary_workers
612+
num_trace_summary_workers,
613+
num_notification_workers
575614
);
576615

577616
let connection_for_health_clone = connection_for_health.clone();
@@ -593,6 +632,7 @@ fn main() -> anyhow::Result<()> {
593632
num_evaluators_workers as usize,
594633
num_payload_workers as usize,
595634
num_trace_summary_workers as usize,
635+
num_notification_workers as usize,
596636
);
597637
for _ in 0..num_spans_workers {
598638
let worker_handle = worker_tracker_clone.register_worker(WorkerType::Spans);
@@ -665,10 +705,23 @@ fn main() -> anyhow::Result<()> {
665705
for _ in 0..num_trace_summary_workers {
666706
let worker_handle =
667707
worker_tracker_clone.register_worker(WorkerType::TraceSummaries);
708+
let db_clone = db_for_consumer.clone();
668709
let mq_clone = mq_for_consumer.clone();
669710
tokio::spawn(async move {
670711
let _handle = worker_handle; // Keep handle alive for the worker's lifetime
671-
process_trace_summaries(mq_clone).await;
712+
process_trace_summaries(db_clone, mq_clone).await;
713+
});
714+
}
715+
716+
for _ in 0..num_notification_workers {
717+
let worker_handle =
718+
worker_tracker_clone.register_worker(WorkerType::Notifications);
719+
let db_clone = db_for_consumer.clone();
720+
let mq_clone = mq_for_consumer.clone();
721+
let slack_client_clone = slack_client.clone();
722+
tokio::spawn(async move {
723+
let _handle = worker_handle;
724+
process_notifications(db_clone, slack_client_clone, mq_clone).await;
672725
});
673726
}
674727

0 commit comments

Comments
 (0)