Skip to content

Commit 4c634b5

Browse files
chore: server side improvements (#1396)
1. increase dataset fields limit to 1000 2. add hostname to manifest file path 3. update hostname handling 4. list alerts to be sorted on state and then on severity
1 parent ecdb27f commit 4c634b5

File tree

5 files changed

+140
-26
lines changed

5 files changed

+140
-26
lines changed

src/alerts/mod.rs

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,19 @@ pub enum EvalConfig {
469469
#[serde(rename_all = "camelCase")]
470470
pub struct AlertEval {}
471471

472-
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Default, FromStr)]
472+
#[derive(
473+
Debug,
474+
serde::Serialize,
475+
serde::Deserialize,
476+
Clone,
477+
Copy,
478+
PartialEq,
479+
Eq,
480+
PartialOrd,
481+
Ord,
482+
Default,
483+
FromStr,
484+
)]
473485
#[serde(rename_all = "camelCase")]
474486
pub enum AlertState {
475487
Triggered,
@@ -488,7 +500,19 @@ impl Display for AlertState {
488500
}
489501
}
490502

491-
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Default)]
503+
#[derive(
504+
Debug,
505+
serde::Serialize,
506+
serde::Deserialize,
507+
Clone,
508+
Copy,
509+
PartialEq,
510+
Eq,
511+
PartialOrd,
512+
Ord,
513+
Default,
514+
FromStr,
515+
)]
492516
#[serde(rename_all = "camelCase")]
493517
pub enum Severity {
494518
Critical,
@@ -1156,7 +1180,7 @@ impl AlertConfig {
11561180
self.id,
11571181
self.title.clone(),
11581182
self.state,
1159-
self.severity.clone().to_string(),
1183+
self.severity.to_string(),
11601184
),
11611185
DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode),
11621186
String::default(),
@@ -1308,8 +1332,8 @@ pub enum AlertError {
13081332
ParserError(#[from] ParserError),
13091333
#[error("Invalid alert query")]
13101334
InvalidAlertQuery,
1311-
#[error("Invalid query parameter")]
1312-
InvalidQueryParameter,
1335+
#[error("Invalid query parameter: {0}")]
1336+
InvalidQueryParameter(String),
13131337
#[error("{0}")]
13141338
ArrowError(#[from] ArrowError),
13151339
#[error("Upgrade to Parseable Enterprise for {0} type alerts")]
@@ -1336,7 +1360,7 @@ impl actix_web::ResponseError for AlertError {
13361360
Self::TargetInUse => StatusCode::CONFLICT,
13371361
Self::ParserError(_) => StatusCode::BAD_REQUEST,
13381362
Self::InvalidAlertQuery => StatusCode::BAD_REQUEST,
1339-
Self::InvalidQueryParameter => StatusCode::BAD_REQUEST,
1363+
Self::InvalidQueryParameter(_) => StatusCode::BAD_REQUEST,
13401364
Self::ArrowError(_) => StatusCode::INTERNAL_SERVER_ERROR,
13411365
Self::NotPresentInOSS(_) => StatusCode::BAD_REQUEST,
13421366
}
@@ -1650,23 +1674,23 @@ pub async fn get_alerts_summary() -> Result<AlertsSummary, AlertError> {
16501674
triggered_alerts.push(AlertsInfo {
16511675
title: alert.get_title().to_string(),
16521676
id: *alert.get_id(),
1653-
severity: alert.get_severity().clone(),
1677+
severity: *alert.get_severity(),
16541678
});
16551679
}
16561680
AlertState::Silenced => {
16571681
silenced += 1;
16581682
silenced_alerts.push(AlertsInfo {
16591683
title: alert.get_title().to_string(),
16601684
id: *alert.get_id(),
1661-
severity: alert.get_severity().clone(),
1685+
severity: *alert.get_severity(),
16621686
});
16631687
}
16641688
AlertState::Resolved => {
16651689
resolved += 1;
16661690
resolved_alerts.push(AlertsInfo {
16671691
title: alert.get_title().to_string(),
16681692
id: *alert.get_id(),
1669-
severity: alert.get_severity().clone(),
1693+
severity: *alert.get_severity(),
16701694
});
16711695
}
16721696
}

src/cli.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::{
3535
pub const DEFAULT_USERNAME: &str = "admin";
3636
pub const DEFAULT_PASSWORD: &str = "admin";
3737

38-
pub const DATASET_FIELD_COUNT_LIMIT: usize = 250;
38+
pub const DATASET_FIELD_COUNT_LIMIT: usize = 1000;
3939
#[derive(Parser)]
4040
#[command(
4141
name = "parseable",

src/handlers/http/alerts.rs

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,28 +31,63 @@ use actix_web::{
3131
use bytes::Bytes;
3232
use ulid::Ulid;
3333

34-
use crate::alerts::{ALERTS, AlertConfig, AlertError, AlertRequest, AlertState};
34+
use crate::alerts::{ALERTS, AlertConfig, AlertError, AlertRequest, AlertState, Severity};
3535

3636
// GET /alerts
3737
/// User needs at least a read access to the stream(s) that is being referenced in an alert
3838
/// Read all alerts then return alerts which satisfy the condition
39+
/// Supports pagination with optional query parameters:
40+
/// - tags: comma-separated list of tags to filter alerts
41+
/// - offset: number of alerts to skip (default: 0)
42+
/// - limit: maximum number of alerts to return (default: 100, max: 1000)
3943
pub async fn list(req: HttpRequest) -> Result<impl Responder, AlertError> {
4044
let session_key = extract_session_key_from_req(&req)?;
4145
let query_map = web::Query::<HashMap<String, String>>::from_query(req.query_string())
42-
.map_err(|_| AlertError::InvalidQueryParameter)?;
46+
.map_err(|_| AlertError::InvalidQueryParameter("malformed query parameters".to_string()))?;
47+
4348
let mut tags_list = Vec::new();
49+
let mut offset = 0usize;
50+
let mut limit = 100usize; // Default limit
51+
const MAX_LIMIT: usize = 1000; // Maximum allowed limit
52+
53+
// Parse query parameters
4454
if !query_map.is_empty() {
55+
// Parse tags parameter
4556
if let Some(tags) = query_map.get("tags") {
4657
tags_list = tags
4758
.split(',')
4859
.map(|s| s.trim().to_string())
4960
.filter(|s| !s.is_empty())
5061
.collect();
5162
if tags_list.is_empty() {
52-
return Err(AlertError::InvalidQueryParameter);
63+
return Err(AlertError::InvalidQueryParameter(
64+
"empty tags not allowed with query param tags".to_string(),
65+
));
66+
}
67+
}
68+
69+
// Parse offset parameter
70+
if let Some(offset_str) = query_map.get("offset") {
71+
offset = offset_str.parse().map_err(|_| {
72+
AlertError::InvalidQueryParameter("offset is not a valid number".to_string())
73+
})?;
74+
}
75+
76+
// Parse limit parameter
77+
if let Some(limit_str) = query_map.get("limit") {
78+
limit = limit_str.parse().map_err(|_| {
79+
AlertError::InvalidQueryParameter("limit is not a valid number".to_string())
80+
})?;
81+
82+
// Validate limit bounds
83+
if limit == 0 || limit > MAX_LIMIT {
84+
return Err(AlertError::InvalidQueryParameter(
85+
"limit should be between 1 and 1000".to_string(),
86+
));
5387
}
5488
}
5589
}
90+
5691
let guard = ALERTS.read().await;
5792
let alerts = if let Some(alerts) = guard.as_ref() {
5893
alerts
@@ -61,11 +96,51 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, AlertError> {
6196
};
6297

6398
let alerts = alerts.list_alerts_for_user(session_key, tags_list).await?;
64-
let alerts_summary = alerts
99+
let mut alerts_summary = alerts
65100
.iter()
66101
.map(|alert| alert.to_summary())
67102
.collect::<Vec<_>>();
68-
Ok(web::Json(alerts_summary))
103+
104+
// Sort by state priority (Triggered > Silenced > Resolved) then by severity (Critical > High > Medium > Low)
105+
alerts_summary.sort_by(|a, b| {
106+
// Parse state and severity from JSON values back to enums
107+
let state_a = a
108+
.get("state")
109+
.and_then(|v| v.as_str())
110+
.and_then(|s| s.parse::<AlertState>().ok())
111+
.unwrap_or(AlertState::Resolved); // Default to lowest priority
112+
113+
let state_b = b
114+
.get("state")
115+
.and_then(|v| v.as_str())
116+
.and_then(|s| s.parse::<AlertState>().ok())
117+
.unwrap_or(AlertState::Resolved);
118+
119+
let severity_a = a
120+
.get("severity")
121+
.and_then(|v| v.as_str())
122+
.and_then(|s| s.parse::<Severity>().ok())
123+
.unwrap_or(Severity::Low); // Default to lowest priority
124+
125+
let severity_b = b
126+
.get("severity")
127+
.and_then(|v| v.as_str())
128+
.and_then(|s| s.parse::<Severity>().ok())
129+
.unwrap_or(Severity::Low);
130+
131+
// First sort by state, then by severity
132+
state_a
133+
.cmp(&state_b)
134+
.then_with(|| severity_a.cmp(&severity_b))
135+
});
136+
137+
let paginated_alerts = alerts_summary
138+
.into_iter()
139+
.skip(offset)
140+
.take(limit)
141+
.collect::<Vec<_>>();
142+
143+
Ok(web::Json(paginated_alerts))
69144
}
70145

71146
// POST /alerts

src/parseable/streams.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use parquet::{
4141
use relative_path::RelativePathBuf;
4242
use tokio::task::JoinSet;
4343
use tracing::{error, info, trace, warn};
44+
use ulid::Ulid;
4445

4546
use crate::{
4647
LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY,
@@ -185,7 +186,13 @@ impl Stream {
185186
parsed_timestamp: NaiveDateTime,
186187
custom_partition_values: &HashMap<String, String>,
187188
) -> String {
188-
let mut hostname = hostname::get().unwrap().into_string().unwrap();
189+
let mut hostname = hostname::get()
190+
.unwrap_or_else(|_| std::ffi::OsString::from(&Ulid::new().to_string()))
191+
.into_string()
192+
.unwrap_or_else(|_| Ulid::new().to_string())
193+
.matches(|c: char| c.is_alphanumeric() || c == '-' || c == '_')
194+
.collect::<String>();
195+
189196
if let Some(id) = &self.ingestor_id {
190197
hostname.push_str(id);
191198
}

src/storage/object_storage.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,15 +1011,23 @@ pub fn target_json_path(target_id: &Ulid) -> RelativePathBuf {
10111011

10121012
#[inline(always)]
10131013
pub fn manifest_path(prefix: &str) -> RelativePathBuf {
1014-
match &PARSEABLE.options.mode {
1015-
Mode::Ingest => {
1016-
let id = INGESTOR_META
1017-
.get()
1018-
.unwrap_or_else(|| panic!("{}", INGESTOR_EXPECT))
1019-
.get_node_id();
1020-
let manifest_file_name = format!("ingestor.{id}.{MANIFEST_FILE}");
1021-
RelativePathBuf::from_iter([prefix, &manifest_file_name])
1022-
}
1023-
_ => RelativePathBuf::from_iter([prefix, MANIFEST_FILE]),
1014+
let hostname = hostname::get()
1015+
.unwrap_or_else(|_| std::ffi::OsString::from(&Ulid::new().to_string()))
1016+
.into_string()
1017+
.unwrap_or_else(|_| Ulid::new().to_string())
1018+
.matches(|c: char| c.is_alphanumeric() || c == '-' || c == '_')
1019+
.collect::<String>();
1020+
1021+
if PARSEABLE.options.mode == Mode::Ingest {
1022+
let id = INGESTOR_META
1023+
.get()
1024+
.unwrap_or_else(|| panic!("{}", INGESTOR_EXPECT))
1025+
.get_node_id();
1026+
1027+
let manifest_file_name = format!("ingestor.{hostname}.{id}.{MANIFEST_FILE}");
1028+
RelativePathBuf::from_iter([prefix, &manifest_file_name])
1029+
} else {
1030+
let manifest_file_name = format!("{hostname}.{MANIFEST_FILE}");
1031+
RelativePathBuf::from_iter([prefix, &manifest_file_name])
10241032
}
10251033
}

0 commit comments

Comments
 (0)