diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 90aeb16f4..5597f71c3 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -469,7 +469,19 @@ pub enum EvalConfig { #[serde(rename_all = "camelCase")] pub struct AlertEval {} -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Default, FromStr)] +#[derive( + Debug, + serde::Serialize, + serde::Deserialize, + Clone, + Copy, + PartialEq, + Eq, + PartialOrd, + Ord, + Default, + FromStr, +)] #[serde(rename_all = "camelCase")] pub enum AlertState { Triggered, @@ -488,7 +500,19 @@ impl Display for AlertState { } } -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Default)] +#[derive( + Debug, + serde::Serialize, + serde::Deserialize, + Clone, + Copy, + PartialEq, + Eq, + PartialOrd, + Ord, + Default, + FromStr, +)] #[serde(rename_all = "camelCase")] pub enum Severity { Critical, @@ -1156,7 +1180,7 @@ impl AlertConfig { self.id, self.title.clone(), self.state, - self.severity.clone().to_string(), + self.severity.to_string(), ), DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode), String::default(), @@ -1308,8 +1332,8 @@ pub enum AlertError { ParserError(#[from] ParserError), #[error("Invalid alert query")] InvalidAlertQuery, - #[error("Invalid query parameter")] - InvalidQueryParameter, + #[error("Invalid query parameter: {0}")] + InvalidQueryParameter(String), #[error("{0}")] ArrowError(#[from] ArrowError), #[error("Upgrade to Parseable Enterprise for {0} type alerts")] @@ -1336,7 +1360,7 @@ impl actix_web::ResponseError for AlertError { Self::TargetInUse => StatusCode::CONFLICT, Self::ParserError(_) => StatusCode::BAD_REQUEST, Self::InvalidAlertQuery => StatusCode::BAD_REQUEST, - Self::InvalidQueryParameter => StatusCode::BAD_REQUEST, + Self::InvalidQueryParameter(_) => StatusCode::BAD_REQUEST, Self::ArrowError(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::NotPresentInOSS(_) => StatusCode::BAD_REQUEST, } @@ -1650,7 +1674,7 @@ pub async fn get_alerts_summary() -> Result { triggered_alerts.push(AlertsInfo { title: alert.get_title().to_string(), id: *alert.get_id(), - severity: alert.get_severity().clone(), + severity: *alert.get_severity(), }); } AlertState::Silenced => { @@ -1658,7 +1682,7 @@ pub async fn get_alerts_summary() -> Result { silenced_alerts.push(AlertsInfo { title: alert.get_title().to_string(), id: *alert.get_id(), - severity: alert.get_severity().clone(), + severity: *alert.get_severity(), }); } AlertState::Resolved => { @@ -1666,7 +1690,7 @@ pub async fn get_alerts_summary() -> Result { resolved_alerts.push(AlertsInfo { title: alert.get_title().to_string(), id: *alert.get_id(), - severity: alert.get_severity().clone(), + severity: *alert.get_severity(), }); } } diff --git a/src/cli.rs b/src/cli.rs index 422eb7c36..e8b9a387e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -35,7 +35,7 @@ use crate::{ pub const DEFAULT_USERNAME: &str = "admin"; pub const DEFAULT_PASSWORD: &str = "admin"; -pub const DATASET_FIELD_COUNT_LIMIT: usize = 250; +pub const DATASET_FIELD_COUNT_LIMIT: usize = 1000; #[derive(Parser)] #[command( name = "parseable", diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index 4847cff3a..c86b2041c 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -31,17 +31,28 @@ use actix_web::{ use bytes::Bytes; use ulid::Ulid; -use crate::alerts::{ALERTS, AlertConfig, AlertError, AlertRequest, AlertState}; +use crate::alerts::{ALERTS, AlertConfig, AlertError, AlertRequest, AlertState, Severity}; // GET /alerts /// User needs at least a read access to the stream(s) that is being referenced in an alert /// Read all alerts then return alerts which satisfy the condition +/// Supports pagination with optional query parameters: +/// - tags: comma-separated list of tags to filter alerts +/// - offset: number of alerts to skip (default: 0) +/// - limit: maximum number of alerts to return (default: 100, max: 1000) pub async fn list(req: HttpRequest) -> Result { let session_key = extract_session_key_from_req(&req)?; let query_map = web::Query::>::from_query(req.query_string()) - .map_err(|_| AlertError::InvalidQueryParameter)?; + .map_err(|_| AlertError::InvalidQueryParameter("malformed query parameters".to_string()))?; + let mut tags_list = Vec::new(); + let mut offset = 0usize; + let mut limit = 100usize; // Default limit + const MAX_LIMIT: usize = 1000; // Maximum allowed limit + + // Parse query parameters if !query_map.is_empty() { + // Parse tags parameter if let Some(tags) = query_map.get("tags") { tags_list = tags .split(',') @@ -49,10 +60,34 @@ pub async fn list(req: HttpRequest) -> Result { .filter(|s| !s.is_empty()) .collect(); if tags_list.is_empty() { - return Err(AlertError::InvalidQueryParameter); + return Err(AlertError::InvalidQueryParameter( + "empty tags not allowed with query param tags".to_string(), + )); + } + } + + // Parse offset parameter + if let Some(offset_str) = query_map.get("offset") { + offset = offset_str.parse().map_err(|_| { + AlertError::InvalidQueryParameter("offset is not a valid number".to_string()) + })?; + } + + // Parse limit parameter + if let Some(limit_str) = query_map.get("limit") { + limit = limit_str.parse().map_err(|_| { + AlertError::InvalidQueryParameter("limit is not a valid number".to_string()) + })?; + + // Validate limit bounds + if limit == 0 || limit > MAX_LIMIT { + return Err(AlertError::InvalidQueryParameter( + "limit should be between 1 and 1000".to_string(), + )); } } } + let guard = ALERTS.read().await; let alerts = if let Some(alerts) = guard.as_ref() { alerts @@ -61,11 +96,51 @@ pub async fn list(req: HttpRequest) -> Result { }; let alerts = alerts.list_alerts_for_user(session_key, tags_list).await?; - let alerts_summary = alerts + let mut alerts_summary = alerts .iter() .map(|alert| alert.to_summary()) .collect::>(); - Ok(web::Json(alerts_summary)) + + // Sort by state priority (Triggered > Silenced > Resolved) then by severity (Critical > High > Medium > Low) + alerts_summary.sort_by(|a, b| { + // Parse state and severity from JSON values back to enums + let state_a = a + .get("state") + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(AlertState::Resolved); // Default to lowest priority + + let state_b = b + .get("state") + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(AlertState::Resolved); + + let severity_a = a + .get("severity") + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(Severity::Low); // Default to lowest priority + + let severity_b = b + .get("severity") + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(Severity::Low); + + // First sort by state, then by severity + state_a + .cmp(&state_b) + .then_with(|| severity_a.cmp(&severity_b)) + }); + + let paginated_alerts = alerts_summary + .into_iter() + .skip(offset) + .take(limit) + .collect::>(); + + Ok(web::Json(paginated_alerts)) } // POST /alerts diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 38e30e692..f20341618 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -41,6 +41,7 @@ use parquet::{ use relative_path::RelativePathBuf; use tokio::task::JoinSet; use tracing::{error, info, trace, warn}; +use ulid::Ulid; use crate::{ LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY, @@ -185,7 +186,13 @@ impl Stream { parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, ) -> String { - let mut hostname = hostname::get().unwrap().into_string().unwrap(); + let mut hostname = hostname::get() + .unwrap_or_else(|_| std::ffi::OsString::from(&Ulid::new().to_string())) + .into_string() + .unwrap_or_else(|_| Ulid::new().to_string()) + .matches(|c: char| c.is_alphanumeric() || c == '-' || c == '_') + .collect::(); + if let Some(id) = &self.ingestor_id { hostname.push_str(id); } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index b21251cc2..a77e567c6 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -1011,15 +1011,23 @@ pub fn target_json_path(target_id: &Ulid) -> RelativePathBuf { #[inline(always)] pub fn manifest_path(prefix: &str) -> RelativePathBuf { - match &PARSEABLE.options.mode { - Mode::Ingest => { - let id = INGESTOR_META - .get() - .unwrap_or_else(|| panic!("{}", INGESTOR_EXPECT)) - .get_node_id(); - let manifest_file_name = format!("ingestor.{id}.{MANIFEST_FILE}"); - RelativePathBuf::from_iter([prefix, &manifest_file_name]) - } - _ => RelativePathBuf::from_iter([prefix, MANIFEST_FILE]), + let hostname = hostname::get() + .unwrap_or_else(|_| std::ffi::OsString::from(&Ulid::new().to_string())) + .into_string() + .unwrap_or_else(|_| Ulid::new().to_string()) + .matches(|c: char| c.is_alphanumeric() || c == '-' || c == '_') + .collect::(); + + if PARSEABLE.options.mode == Mode::Ingest { + let id = INGESTOR_META + .get() + .unwrap_or_else(|| panic!("{}", INGESTOR_EXPECT)) + .get_node_id(); + + let manifest_file_name = format!("ingestor.{hostname}.{id}.{MANIFEST_FILE}"); + RelativePathBuf::from_iter([prefix, &manifest_file_name]) + } else { + let manifest_file_name = format!("{hostname}.{MANIFEST_FILE}"); + RelativePathBuf::from_iter([prefix, &manifest_file_name]) } }