Skip to content

chore: server side improvements #1396

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,19 @@ pub enum EvalConfig {
#[serde(rename_all = "camelCase")]
pub struct AlertEval {}

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Default, FromStr)]
#[derive(
Debug,
serde::Serialize,
serde::Deserialize,
Clone,
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
Default,
FromStr,
)]
#[serde(rename_all = "camelCase")]
pub enum AlertState {
Triggered,
Expand All @@ -488,7 +500,19 @@ impl Display for AlertState {
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Default)]
#[derive(
Debug,
serde::Serialize,
serde::Deserialize,
Clone,
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
Default,
FromStr,
)]
#[serde(rename_all = "camelCase")]
pub enum Severity {
Critical,
Expand Down Expand Up @@ -1308,8 +1332,8 @@ pub enum AlertError {
ParserError(#[from] ParserError),
#[error("Invalid alert query")]
InvalidAlertQuery,
#[error("Invalid query parameter")]
InvalidQueryParameter,
#[error("Invalid query parameter: {0}")]
InvalidQueryParameter(String),
#[error("{0}")]
ArrowError(#[from] ArrowError),
#[error("Upgrade to Parseable Enterprise for {0} type alerts")]
Expand All @@ -1336,7 +1360,7 @@ impl actix_web::ResponseError for AlertError {
Self::TargetInUse => StatusCode::CONFLICT,
Self::ParserError(_) => StatusCode::BAD_REQUEST,
Self::InvalidAlertQuery => StatusCode::BAD_REQUEST,
Self::InvalidQueryParameter => StatusCode::BAD_REQUEST,
Self::InvalidQueryParameter(_) => StatusCode::BAD_REQUEST,
Self::ArrowError(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::NotPresentInOSS(_) => StatusCode::BAD_REQUEST,
}
Expand Down Expand Up @@ -1650,23 +1674,23 @@ pub async fn get_alerts_summary() -> Result<AlertsSummary, AlertError> {
triggered_alerts.push(AlertsInfo {
title: alert.get_title().to_string(),
id: *alert.get_id(),
severity: alert.get_severity().clone(),
severity: *alert.get_severity(),
});
}
AlertState::Silenced => {
silenced += 1;
silenced_alerts.push(AlertsInfo {
title: alert.get_title().to_string(),
id: *alert.get_id(),
severity: alert.get_severity().clone(),
severity: *alert.get_severity(),
});
}
AlertState::Resolved => {
resolved += 1;
resolved_alerts.push(AlertsInfo {
title: alert.get_title().to_string(),
id: *alert.get_id(),
severity: alert.get_severity().clone(),
severity: *alert.get_severity(),
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
pub const DEFAULT_USERNAME: &str = "admin";
pub const DEFAULT_PASSWORD: &str = "admin";

pub const DATASET_FIELD_COUNT_LIMIT: usize = 250;
pub const DATASET_FIELD_COUNT_LIMIT: usize = 1000;
#[derive(Parser)]
#[command(
name = "parseable",
Expand Down
85 changes: 80 additions & 5 deletions src/handlers/http/alerts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,63 @@ use actix_web::{
use bytes::Bytes;
use ulid::Ulid;

use crate::alerts::{ALERTS, AlertConfig, AlertError, AlertRequest, AlertState};
use crate::alerts::{ALERTS, AlertConfig, AlertError, AlertRequest, AlertState, Severity};

// GET /alerts
/// User needs at least a read access to the stream(s) that is being referenced in an alert
/// Read all alerts then return alerts which satisfy the condition
/// Supports pagination with optional query parameters:
/// - tags: comma-separated list of tags to filter alerts
/// - offset: number of alerts to skip (default: 0)
/// - limit: maximum number of alerts to return (default: 100, max: 1000)
pub async fn list(req: HttpRequest) -> Result<impl Responder, AlertError> {
let session_key = extract_session_key_from_req(&req)?;
let query_map = web::Query::<HashMap<String, String>>::from_query(req.query_string())
.map_err(|_| AlertError::InvalidQueryParameter)?;
.map_err(|_| AlertError::InvalidQueryParameter("malformed query parameters".to_string()))?;

let mut tags_list = Vec::new();
let mut offset = 0usize;
let mut limit = 100usize; // Default limit
const MAX_LIMIT: usize = 1000; // Maximum allowed limit

// Parse query parameters
if !query_map.is_empty() {
// Parse tags parameter
if let Some(tags) = query_map.get("tags") {
tags_list = tags
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
if tags_list.is_empty() {
return Err(AlertError::InvalidQueryParameter);
return Err(AlertError::InvalidQueryParameter(
"empty tags not allowed with query param tags".to_string(),
));
}
}

// Parse offset parameter
if let Some(offset_str) = query_map.get("offset") {
offset = offset_str.parse().map_err(|_| {
AlertError::InvalidQueryParameter("offset is not a valid number".to_string())
})?;
}

// Parse limit parameter
if let Some(limit_str) = query_map.get("limit") {
limit = limit_str.parse().map_err(|_| {
AlertError::InvalidQueryParameter("limit is not a valid number".to_string())
})?;

// Validate limit bounds
if limit == 0 || limit > MAX_LIMIT {
return Err(AlertError::InvalidQueryParameter(
"limit should be between 1 and 1000".to_string(),
));
}
}
}

let guard = ALERTS.read().await;
let alerts = if let Some(alerts) = guard.as_ref() {
alerts
Expand All @@ -61,11 +96,51 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, AlertError> {
};

let alerts = alerts.list_alerts_for_user(session_key, tags_list).await?;
let alerts_summary = alerts
let mut alerts_summary = alerts
.iter()
.map(|alert| alert.to_summary())
.collect::<Vec<_>>();
Ok(web::Json(alerts_summary))

// Sort by state priority (Triggered > Silenced > Resolved) then by severity (Critical > High > Medium > Low)
alerts_summary.sort_by(|a, b| {
// Parse state and severity from JSON values back to enums
let state_a = a
.get("state")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<AlertState>().ok())
.unwrap_or(AlertState::Resolved); // Default to lowest priority

let state_b = b
.get("state")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<AlertState>().ok())
.unwrap_or(AlertState::Resolved);

let severity_a = a
.get("severity")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<Severity>().ok())
.unwrap_or(Severity::Low); // Default to lowest priority

let severity_b = b
.get("severity")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<Severity>().ok())
.unwrap_or(Severity::Low);

// First sort by state, then by severity
state_a
.cmp(&state_b)
.then_with(|| severity_a.cmp(&severity_b))
});

let paginated_alerts = alerts_summary
.into_iter()
.skip(offset)
.take(limit)
.collect::<Vec<_>>();

Ok(web::Json(paginated_alerts))
}

// POST /alerts
Expand Down
9 changes: 8 additions & 1 deletion src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use parquet::{
use relative_path::RelativePathBuf;
use tokio::task::JoinSet;
use tracing::{error, info, trace, warn};
use ulid::Ulid;

use crate::{
LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY,
Expand Down Expand Up @@ -185,7 +186,13 @@ impl Stream {
parsed_timestamp: NaiveDateTime,
custom_partition_values: &HashMap<String, String>,
) -> String {
let mut hostname = hostname::get().unwrap().into_string().unwrap();
let mut hostname = hostname::get()
.unwrap_or_else(|_| std::ffi::OsString::from(&Ulid::new().to_string()))
.into_string()
.unwrap_or_else(|_| Ulid::new().to_string())
.matches(|c: char| c.is_alphanumeric() || c == '-' || c == '_')
.collect::<String>();

if let Some(id) = &self.ingestor_id {
hostname.push_str(id);
}
Expand Down
28 changes: 18 additions & 10 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1011,15 +1011,23 @@ pub fn target_json_path(target_id: &Ulid) -> RelativePathBuf {

#[inline(always)]
pub fn manifest_path(prefix: &str) -> RelativePathBuf {
match &PARSEABLE.options.mode {
Mode::Ingest => {
let id = INGESTOR_META
.get()
.unwrap_or_else(|| panic!("{}", INGESTOR_EXPECT))
.get_node_id();
let manifest_file_name = format!("ingestor.{id}.{MANIFEST_FILE}");
RelativePathBuf::from_iter([prefix, &manifest_file_name])
}
_ => RelativePathBuf::from_iter([prefix, MANIFEST_FILE]),
let hostname = hostname::get()
.unwrap_or_else(|_| std::ffi::OsString::from(&Ulid::new().to_string()))
.into_string()
.unwrap_or_else(|_| Ulid::new().to_string())
.matches(|c: char| c.is_alphanumeric() || c == '-' || c == '_')
.collect::<String>();

if PARSEABLE.options.mode == Mode::Ingest {
let id = INGESTOR_META
.get()
.unwrap_or_else(|| panic!("{}", INGESTOR_EXPECT))
.get_node_id();

let manifest_file_name = format!("ingestor.{hostname}.{id}.{MANIFEST_FILE}");
RelativePathBuf::from_iter([prefix, &manifest_file_name])
} else {
let manifest_file_name = format!("{hostname}.{MANIFEST_FILE}");
RelativePathBuf::from_iter([prefix, &manifest_file_name])
}
}
Loading