Skip to content

Bring targets out of alerts #1353

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 2, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,14 @@ use ulid::Ulid;
pub mod alerts_utils;
pub mod target;

use crate::alerts::target::TARGETS;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::rbac::map::SessionKey;
use crate::storage;
use crate::storage::ObjectStorageError;
use crate::sync::alert_runtime;
use crate::utils::user_auth_for_query;

use self::target::Target;

// these types describe the scheduled task for an alert
pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>);

Expand Down Expand Up @@ -513,23 +512,28 @@ pub struct AlertRequest {
pub alert_type: AlertType,
pub aggregates: Aggregates,
pub eval_config: EvalConfig,
pub targets: Vec<Target>,
pub targets: Vec<Ulid>,
}

impl From<AlertRequest> for AlertConfig {
fn from(val: AlertRequest) -> AlertConfig {
AlertConfig {
impl AlertRequest {
pub async fn into(self) -> Result<AlertConfig, AlertError> {
// let mut targets = Vec::new();
// for id in self.targets {
// targets.push(TARGETS.get_target_by_id(id).await?);
// }
let config = AlertConfig {
version: AlertVerison::from(CURRENT_ALERTS_VERSION),
id: Ulid::new(),
severity: val.severity,
title: val.title,
stream: val.stream,
alert_type: val.alert_type,
aggregates: val.aggregates,
eval_config: val.eval_config,
targets: val.targets,
severity: self.severity,
title: self.title,
stream: self.stream,
alert_type: self.alert_type,
aggregates: self.aggregates,
eval_config: self.eval_config,
targets: self.targets,
state: AlertState::default(),
}
};
Ok(config)
}
}

Expand All @@ -545,21 +549,26 @@ pub struct AlertConfig {
pub alert_type: AlertType,
pub aggregates: Aggregates,
pub eval_config: EvalConfig,
pub targets: Vec<Target>,
pub targets: Vec<Ulid>,
// for new alerts, state should be resolved
#[serde(default)]
pub state: AlertState,
}

impl AlertConfig {
pub fn modify(&mut self, alert: AlertRequest) {
pub async fn modify(&mut self, alert: AlertRequest) -> Result<(), AlertError> {
// let mut targets = Vec::new();
// for id in alert.targets {
// targets.push(id);
// }
self.title = alert.title;
self.stream = alert.stream;
self.alert_type = alert.alert_type;
self.aggregates = alert.aggregates;
self.eval_config = alert.eval_config;
self.targets = alert.targets;
self.state = AlertState::default();
Ok(())
}

pub fn get_base_query(&self) -> String {
Expand All @@ -581,7 +590,8 @@ impl AlertConfig {
};

// validate that target repeat notifs !> eval_frequency
for target in &self.targets {
for target_id in &self.targets {
let target = TARGETS.get_target_by_id(target_id).await?;
match &target.timeout.times {
target::Retry::Infinite => {}
target::Retry::Finite(repeat) => {
Expand Down Expand Up @@ -767,7 +777,8 @@ impl AlertConfig {
pub async fn trigger_notifications(&self, message: String) -> Result<(), AlertError> {
let mut context = self.get_context();
context.message = message;
for target in &self.targets {
for target_id in &self.targets {
let target = TARGETS.get_target_by_id(target_id).await?;
trace!("Target (trigger_notifications)-\n{target:?}");
target.call(context.clone());
}
Expand Down Expand Up @@ -801,6 +812,8 @@ pub enum AlertError {
InvalidAlertModifyRequest,
#[error("{0}")]
FromStrError(#[from] FromStrError),
#[error("Invalid Target ID- {0}")]
InvalidTargetID(String),
}

impl actix_web::ResponseError for AlertError {
Expand All @@ -818,6 +831,7 @@ impl actix_web::ResponseError for AlertError {
Self::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::InvalidAlertModifyRequest => StatusCode::BAD_REQUEST,
Self::FromStrError(_) => StatusCode::BAD_REQUEST,
Self::InvalidTargetID(_) => StatusCode::BAD_REQUEST,
}
}

Expand Down
73 changes: 73 additions & 0 deletions src/alerts/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,77 @@ use base64::Engine;
use chrono::Utc;
use http::{header::AUTHORIZATION, HeaderMap, HeaderValue};
use humantime_serde::re::humantime;
use itertools::Itertools;
use once_cell::sync::Lazy;
use reqwest::ClientBuilder;
use tokio::sync::RwLock;
use tracing::{error, trace, warn};
use ulid::Ulid;
use url::Url;

use crate::{alerts::AlertError, parseable::PARSEABLE};

use super::ALERTS;

use super::{AlertState, CallableTarget, Context};

pub static TARGETS: Lazy<TargetConfigs> = Lazy::new(|| TargetConfigs {
target_configs: RwLock::new(HashMap::new()),
});

#[derive(Debug)]
pub struct TargetConfigs {
pub target_configs: RwLock<HashMap<Ulid, Target>>,
}

impl TargetConfigs {
/// Loads alerts from disk, blocks
pub async fn load(&self) -> anyhow::Result<()> {
let mut map = self.target_configs.write().await;
let store = PARSEABLE.storage.get_object_store();

for alert in store.get_targets().await.unwrap_or_default() {
map.insert(alert.id, alert);
}

Ok(())
}

pub async fn update(&self, target: Target) -> Result<(), AlertError> {
let id = target.id;
self.target_configs.write().await.insert(id, target);
Ok(())
}

pub async fn list(&self) -> Result<Vec<Target>, AlertError> {
let targets = self
.target_configs
.read()
.await
.iter()
.map(|(_, v)| v.clone())
.collect_vec();
Ok(targets)
}

pub async fn get_target_by_id(&self, target_id: &Ulid) -> Result<Target, AlertError> {
self.target_configs
.read()
.await
.get(target_id)
.ok_or(AlertError::InvalidTargetID(target_id.to_string()))
.cloned()
}

pub async fn delete(&self, target_id: &Ulid) -> Result<Target, AlertError> {
self.target_configs
.write()
.await
.remove(target_id)
.ok_or(AlertError::InvalidTargetID(target_id.to_string()))
}
}

#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(untagged)]
Expand All @@ -57,9 +120,16 @@ pub struct Target {
pub target: TargetType,
#[serde(default, rename = "repeat")]
pub timeout: Timeout,
#[serde(default = "Ulid::new")]
pub id: Ulid,
}

impl Target {
pub async fn validate(&self) {
// just check for liveness
// what if the target is not live yet but is added by the user?
}

pub fn call(&self, context: Context) {
trace!("target.call context- {context:?}");
let timeout = &self.timeout;
Expand Down Expand Up @@ -193,6 +263,8 @@ pub struct TargetVerifier {
pub target: TargetType,
#[serde(default)]
pub repeat: Option<RepeatVerifier>,
#[serde(default = "Ulid::new")]
pub id: Ulid,
}

impl TryFrom<TargetVerifier> for Target {
Expand Down Expand Up @@ -225,6 +297,7 @@ impl TryFrom<TargetVerifier> for Target {
Ok(Target {
target: value.target,
timeout,
id: value.id,
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/alerts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub async fn post(
req: HttpRequest,
Json(alert): Json<AlertRequest>,
) -> Result<impl Responder, AlertError> {
let alert: AlertConfig = alert.into();
let alert: AlertConfig = alert.into().await?;
alert.validate().await?;

// validate the incoming alert query
Expand Down
1 change: 1 addition & 0 deletions src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod prism_logstream;
pub mod query;
pub mod rbac;
pub mod role;
pub mod targets;
pub mod users;
pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
pub const API_BASE_PATH: &str = "api";
Expand Down
1 change: 1 addition & 0 deletions src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl ParseableServer for QueryServer {
.service(Server::get_counts_webscope())
.service(Server::get_metrics_webscope())
.service(Server::get_alerts_webscope())
.service(Server::get_targets_webscope())
.service(Self::get_cluster_web_scope()),
)
.service(
Expand Down
21 changes: 21 additions & 0 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::handlers::http::base_path;
use crate::handlers::http::health_check;
use crate::handlers::http::prism_base_path;
use crate::handlers::http::query;
use crate::handlers::http::targets;
use crate::handlers::http::users::dashboards;
use crate::handlers::http::users::filters;
use crate::hottier::HotTierManager;
Expand Down Expand Up @@ -87,6 +88,7 @@ impl ParseableServer for Server {
.service(Self::get_roles_webscope())
.service(Self::get_counts_webscope())
.service(Self::get_alerts_webscope())
.service(Self::get_targets_webscope())
.service(Self::get_metrics_webscope()),
)
.service(
Expand Down Expand Up @@ -253,6 +255,25 @@ impl Server {
)
}

pub fn get_targets_webscope() -> Scope {
web::scope("/targets")
.service(
web::resource("")
.route(web::get().to(targets::list).authorize(Action::GetAlert))
.route(web::post().to(targets::post).authorize(Action::PutAlert)),
)
.service(
web::resource("/{target_id}")
.route(web::get().to(targets::get).authorize(Action::GetAlert))
.route(web::put().to(targets::update).authorize(Action::PutAlert))
.route(
web::delete()
.to(targets::delete)
.authorize(Action::DeleteAlert),
),
)
}

// get the dashboards web scope
pub fn get_dashboards_webscope() -> Scope {
web::scope("/dashboards")
Expand Down
92 changes: 92 additions & 0 deletions src/handlers/http/targets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use actix_web::{
web::{self, Json, Path},
HttpRequest, Responder,
};
use bytes::Bytes;
use ulid::Ulid;

use crate::{
alerts::{
target::{Target, TARGETS},
AlertError,
},
parseable::PARSEABLE,
storage::object_storage::target_json_path,
};

// POST /targets
pub async fn post(
_req: HttpRequest,
Json(target): Json<Target>,
) -> Result<impl Responder, AlertError> {
// should check for duplicacy and liveness (??)
target.validate().await;

let path = target_json_path(target.id);

let store = PARSEABLE.storage.get_object_store();
let target_bytes = serde_json::to_vec(&target)?;
store.put_object(&path, Bytes::from(target_bytes)).await?;

// add to the map
TARGETS.update(target.clone()).await?;

Ok(web::Json(target))
}

// GET /targets
pub async fn list(_req: HttpRequest) -> Result<impl Responder, AlertError> {
// add to the map
let list = TARGETS.list().await?;

Ok(web::Json(list))
}

// GET /targets/{target_id}
pub async fn get(_req: HttpRequest, target_id: Path<Ulid>) -> Result<impl Responder, AlertError> {
let target_id = target_id.into_inner();

let target = TARGETS.get_target_by_id(&target_id).await?;

Ok(web::Json(target))
}

// PUT /targets/{target_id}
pub async fn update(
_req: HttpRequest,
target_id: Path<Ulid>,
Json(mut target): Json<Target>,
) -> Result<impl Responder, AlertError> {
let target_id = target_id.into_inner();

// if target_id does not exist, error
TARGETS.get_target_by_id(&target_id).await?;

// esnure that the supplied target id is assigned to the target config
target.id = target_id;
// should check for duplicacy and liveness (??)
target.validate().await;

let path = target_json_path(target.id);

let store = PARSEABLE.storage.get_object_store();
let target_bytes = serde_json::to_vec(&target)?;
store.put_object(&path, Bytes::from(target_bytes)).await?;

// add to the map
TARGETS.update(target.clone()).await?;

Ok(web::Json(target))
}

// DELETE /targets/{target_id}
pub async fn delete(
_req: HttpRequest,
target_id: Path<Ulid>,
) -> Result<impl Responder, AlertError> {
let target_id = target_id.into_inner();

let target = TARGETS.delete(&target_id).await?;

Ok(web::Json(target))
}
Loading