Skip to content

Commit 7726912

Browse files
authored
feat: bring targets out of alerts (#1353)
New Features - Introduced RESTful API endpoints for managing alert targets, including creating, listing, retrieving, updating, and deleting targets. - Added unique identifiers and names to alert targets for precise identification. - Enabled asynchronous, concurrent management and validation of alert targets. - Added support for loading, persisting, and deleting targets in storage with concurrency safety. - Added new error responses for invalid, duplicate, or in-use target configurations. - Implemented asynchronous validation and error handling during alert creation and modification. Chores - Updated internal alert configuration to reference targets by ID, improving performance and maintainability. - Added new constants for target-related storage directories.
1 parent b8a9bd0 commit 7726912

File tree

10 files changed

+273
-20
lines changed

10 files changed

+273
-20
lines changed

src/alerts/mod.rs

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,14 @@ use ulid::Ulid;
3737
pub mod alerts_utils;
3838
pub mod target;
3939

40+
use crate::alerts::target::TARGETS;
4041
use crate::parseable::{StreamNotFound, PARSEABLE};
4142
use crate::rbac::map::SessionKey;
4243
use crate::storage;
4344
use crate::storage::ObjectStorageError;
4445
use crate::sync::alert_runtime;
4546
use crate::utils::user_auth_for_query;
4647

47-
use self::target::Target;
48-
4948
// these types describe the scheduled task for an alert
5049
pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>);
5150

@@ -531,23 +530,28 @@ pub struct AlertRequest {
531530
pub alert_type: AlertType,
532531
pub aggregates: Aggregates,
533532
pub eval_config: EvalConfig,
534-
pub targets: Vec<Target>,
533+
pub targets: Vec<Ulid>,
535534
}
536535

537-
impl From<AlertRequest> for AlertConfig {
538-
fn from(val: AlertRequest) -> AlertConfig {
539-
AlertConfig {
536+
impl AlertRequest {
537+
pub async fn into(self) -> Result<AlertConfig, AlertError> {
538+
// Validate that all target IDs exist
539+
for id in &self.targets {
540+
TARGETS.get_target_by_id(id).await?;
541+
}
542+
let config = AlertConfig {
540543
version: AlertVerison::from(CURRENT_ALERTS_VERSION),
541544
id: Ulid::new(),
542-
severity: val.severity,
543-
title: val.title,
544-
stream: val.stream,
545-
alert_type: val.alert_type,
546-
aggregates: val.aggregates,
547-
eval_config: val.eval_config,
548-
targets: val.targets,
545+
severity: self.severity,
546+
title: self.title,
547+
stream: self.stream,
548+
alert_type: self.alert_type,
549+
aggregates: self.aggregates,
550+
eval_config: self.eval_config,
551+
targets: self.targets,
549552
state: AlertState::default(),
550-
}
553+
};
554+
Ok(config)
551555
}
552556
}
553557

@@ -563,21 +567,26 @@ pub struct AlertConfig {
563567
pub alert_type: AlertType,
564568
pub aggregates: Aggregates,
565569
pub eval_config: EvalConfig,
566-
pub targets: Vec<Target>,
570+
pub targets: Vec<Ulid>,
567571
// for new alerts, state should be resolved
568572
#[serde(default)]
569573
pub state: AlertState,
570574
}
571575

572576
impl AlertConfig {
573-
pub fn modify(&mut self, alert: AlertRequest) {
577+
pub async fn modify(&mut self, alert: AlertRequest) -> Result<(), AlertError> {
578+
// Validate that all target IDs exist
579+
for id in &alert.targets {
580+
TARGETS.get_target_by_id(id).await?;
581+
}
574582
self.title = alert.title;
575583
self.stream = alert.stream;
576584
self.alert_type = alert.alert_type;
577585
self.aggregates = alert.aggregates;
578586
self.eval_config = alert.eval_config;
579587
self.targets = alert.targets;
580588
self.state = AlertState::default();
589+
Ok(())
581590
}
582591

583592
pub fn get_base_query(&self) -> String {
@@ -599,7 +608,8 @@ impl AlertConfig {
599608
};
600609

601610
// validate that target repeat notifs !> eval_frequency
602-
for target in &self.targets {
611+
for target_id in &self.targets {
612+
let target = TARGETS.get_target_by_id(target_id).await?;
603613
match &target.timeout.times {
604614
target::Retry::Infinite => {}
605615
target::Retry::Finite(repeat) => {
@@ -806,7 +816,8 @@ impl AlertConfig {
806816
pub async fn trigger_notifications(&self, message: String) -> Result<(), AlertError> {
807817
let mut context = self.get_context();
808818
context.message = message;
809-
for target in &self.targets {
819+
for target_id in &self.targets {
820+
let target = TARGETS.get_target_by_id(target_id).await?;
810821
trace!("Target (trigger_notifications)-\n{target:?}");
811822
target.call(context.clone());
812823
}
@@ -840,6 +851,12 @@ pub enum AlertError {
840851
InvalidAlertModifyRequest,
841852
#[error("{0}")]
842853
FromStrError(#[from] FromStrError),
854+
#[error("Invalid Target ID- {0}")]
855+
InvalidTargetID(String),
856+
#[error("Target already exists")]
857+
DuplicateTargetConfig,
858+
#[error("Can't delete a Target which is being used")]
859+
TargetInUse,
843860
}
844861

845862
impl actix_web::ResponseError for AlertError {
@@ -857,6 +874,9 @@ impl actix_web::ResponseError for AlertError {
857874
Self::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR,
858875
Self::InvalidAlertModifyRequest => StatusCode::BAD_REQUEST,
859876
Self::FromStrError(_) => StatusCode::BAD_REQUEST,
877+
Self::InvalidTargetID(_) => StatusCode::BAD_REQUEST,
878+
Self::DuplicateTargetConfig => StatusCode::BAD_REQUEST,
879+
Self::TargetInUse => StatusCode::CONFLICT,
860880
}
861881
}
862882

src/alerts/target.rs

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,110 @@ use std::{
2424

2525
use async_trait::async_trait;
2626
use base64::Engine;
27+
use bytes::Bytes;
2728
use chrono::Utc;
2829
use http::{header::AUTHORIZATION, HeaderMap, HeaderValue};
2930
use humantime_serde::re::humantime;
31+
use itertools::Itertools;
32+
use once_cell::sync::Lazy;
3033
use reqwest::ClientBuilder;
34+
use tokio::sync::RwLock;
3135
use tracing::{error, trace, warn};
36+
use ulid::Ulid;
3237
use url::Url;
3338

39+
use crate::{alerts::AlertError, parseable::PARSEABLE, storage::object_storage::target_json_path};
40+
3441
use super::ALERTS;
3542

3643
use super::{AlertState, CallableTarget, Context};
3744

38-
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
45+
pub static TARGETS: Lazy<TargetConfigs> = Lazy::new(|| TargetConfigs {
46+
target_configs: RwLock::new(HashMap::new()),
47+
});
48+
49+
#[derive(Debug)]
50+
pub struct TargetConfigs {
51+
pub target_configs: RwLock<HashMap<Ulid, Target>>,
52+
}
53+
54+
impl TargetConfigs {
55+
/// Loads alerts from disk, blocks
56+
pub async fn load(&self) -> anyhow::Result<()> {
57+
let mut map = self.target_configs.write().await;
58+
let store = PARSEABLE.storage.get_object_store();
59+
60+
for alert in store.get_targets().await.unwrap_or_default() {
61+
map.insert(alert.id, alert);
62+
}
63+
64+
Ok(())
65+
}
66+
67+
pub async fn update(&self, target: Target) -> Result<(), AlertError> {
68+
let mut map = self.target_configs.write().await;
69+
if map.values().any(|t| {
70+
t.target == target.target
71+
&& t.timeout.interval == target.timeout.interval
72+
&& t.timeout.times == target.timeout.times
73+
&& t.id != target.id
74+
}) {
75+
return Err(AlertError::DuplicateTargetConfig);
76+
}
77+
map.insert(target.id, target.clone());
78+
79+
let path = target_json_path(&target.id);
80+
81+
let store = PARSEABLE.storage.get_object_store();
82+
let target_bytes = serde_json::to_vec(&target)?;
83+
store.put_object(&path, Bytes::from(target_bytes)).await?;
84+
Ok(())
85+
}
86+
87+
pub async fn list(&self) -> Result<Vec<Target>, AlertError> {
88+
let targets = self
89+
.target_configs
90+
.read()
91+
.await
92+
.values()
93+
.cloned()
94+
.collect_vec();
95+
Ok(targets)
96+
}
97+
98+
pub async fn get_target_by_id(&self, target_id: &Ulid) -> Result<Target, AlertError> {
99+
let target = self
100+
.target_configs
101+
.read()
102+
.await
103+
.get(target_id)
104+
.ok_or(AlertError::InvalidTargetID(target_id.to_string()))
105+
.cloned()?;
106+
107+
Ok(target)
108+
}
109+
110+
pub async fn delete(&self, target_id: &Ulid) -> Result<Target, AlertError> {
111+
// ensure that the target is not being used by any alert
112+
for (_, alert) in ALERTS.alerts.read().await.iter() {
113+
if alert.targets.contains(target_id) {
114+
return Err(AlertError::TargetInUse);
115+
}
116+
}
117+
let target = self
118+
.target_configs
119+
.write()
120+
.await
121+
.remove(target_id)
122+
.ok_or(AlertError::InvalidTargetID(target_id.to_string()))?;
123+
let path = target_json_path(&target.id);
124+
let store = PARSEABLE.storage.get_object_store();
125+
store.delete_object(&path).await?;
126+
Ok(target)
127+
}
128+
}
129+
130+
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq)]
39131
#[serde(rename_all = "camelCase")]
40132
#[serde(untagged)]
41133
pub enum Retry {
@@ -53,10 +145,13 @@ impl Default for Retry {
53145
#[serde(rename_all = "camelCase")]
54146
#[serde(try_from = "TargetVerifier")]
55147
pub struct Target {
148+
pub name: String,
56149
#[serde(flatten)]
57150
pub target: TargetType,
58151
#[serde(default, rename = "repeat")]
59152
pub timeout: Timeout,
153+
#[serde(default = "Ulid::new")]
154+
pub id: Ulid,
60155
}
61156

62157
impl Target {
@@ -189,10 +284,13 @@ pub struct RepeatVerifier {
189284
#[derive(Debug, serde::Deserialize)]
190285
#[serde(rename_all = "camelCase")]
191286
pub struct TargetVerifier {
287+
pub name: String,
192288
#[serde(flatten)]
193289
pub target: TargetType,
194290
#[serde(default)]
195291
pub repeat: Option<RepeatVerifier>,
292+
#[serde(default = "Ulid::new")]
293+
pub id: Ulid,
196294
}
197295

198296
impl TryFrom<TargetVerifier> for Target {
@@ -223,8 +321,10 @@ impl TryFrom<TargetVerifier> for Target {
223321
}
224322

225323
Ok(Target {
324+
name: value.name,
226325
target: value.target,
227326
timeout,
327+
id: value.id,
228328
})
229329
}
230330
}

src/handlers/http/alerts.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub async fn post(
4848
req: HttpRequest,
4949
Json(alert): Json<AlertRequest>,
5050
) -> Result<impl Responder, AlertError> {
51-
let alert: AlertConfig = alert.into();
51+
let alert: AlertConfig = alert.into().await?;
5252
alert.validate().await?;
5353

5454
// validate the incoming alert query

src/handlers/http/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ pub mod query;
4848
pub mod rbac;
4949
pub mod resource_check;
5050
pub mod role;
51+
pub mod targets;
5152
pub mod users;
5253
pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
5354
pub const API_BASE_PATH: &str = "api";

src/handlers/http/modal/query_server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ impl ParseableServer for QueryServer {
7474
)))
7575
.service(Server::get_metrics_webscope())
7676
.service(Server::get_alerts_webscope())
77+
.service(Server::get_targets_webscope())
7778
.service(Self::get_cluster_web_scope()),
7879
)
7980
.service(

src/handlers/http/modal/server.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::handlers::http::health_check;
2727
use crate::handlers::http::prism_base_path;
2828
use crate::handlers::http::query;
2929
use crate::handlers::http::resource_check;
30+
use crate::handlers::http::targets;
3031
use crate::handlers::http::users::dashboards;
3132
use crate::handlers::http::users::filters;
3233
use crate::hottier::HotTierManager;
@@ -95,6 +96,7 @@ impl ParseableServer for Server {
9596
resource_check::check_resource_utilization_middleware,
9697
)))
9798
.service(Self::get_alerts_webscope())
99+
.service(Self::get_targets_webscope())
98100
.service(Self::get_metrics_webscope()),
99101
)
100102
.service(
@@ -263,6 +265,25 @@ impl Server {
263265
)
264266
}
265267

268+
pub fn get_targets_webscope() -> Scope {
269+
web::scope("/targets")
270+
.service(
271+
web::resource("")
272+
.route(web::get().to(targets::list).authorize(Action::GetAlert))
273+
.route(web::post().to(targets::post).authorize(Action::PutAlert)),
274+
)
275+
.service(
276+
web::resource("/{target_id}")
277+
.route(web::get().to(targets::get).authorize(Action::GetAlert))
278+
.route(web::put().to(targets::update).authorize(Action::PutAlert))
279+
.route(
280+
web::delete()
281+
.to(targets::delete)
282+
.authorize(Action::DeleteAlert),
283+
),
284+
)
285+
}
286+
266287
// get the dashboards web scope
267288
pub fn get_dashboards_webscope() -> Scope {
268289
web::scope("/dashboards")

0 commit comments

Comments
 (0)