Skip to content

Commit 6e1617b

Browse files
committed
refactor: bugfixes
1 parent 6db3908 commit 6e1617b

File tree

6 files changed

+206
-95
lines changed

6 files changed

+206
-95
lines changed

src/handlers/http/alerts/alerts_utils.rs

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818

1919
use datafusion::{
2020
common::tree_node::TreeNode,
21+
functions_aggregate::{
22+
count::count,
23+
expr_fn::avg,
24+
min_max::{max, min},
25+
sum::sum,
26+
},
2127
prelude::{col, lit, Expr},
2228
};
2329
use tracing::trace;
@@ -77,8 +83,8 @@ pub async fn user_auth_for_query(session_key: &SessionKey, query: &str) -> Resul
7783
}
7884

7985
/// This function contains the logic to run the alert evaluation task
80-
pub async fn evaluate_alert(alert: AlertConfig) -> Result<(), AlertError> {
81-
println!("RUNNING EVAL TASK FOR- {alert:?}");
86+
pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> {
87+
trace!("RUNNING EVAL TASK FOR- {alert:?}");
8288

8389
let (start_time, end_time) = match &alert.eval_type {
8490
super::EvalConfig::RollingWindow(rolling_window) => {
@@ -87,13 +93,11 @@ pub async fn evaluate_alert(alert: AlertConfig) -> Result<(), AlertError> {
8793
};
8894

8995
let session_state = QUERY_SESSION.state();
90-
let raw_logical_plan = session_state
91-
.create_logical_plan(&alert.query)
92-
.await
93-
.unwrap();
96+
let raw_logical_plan = session_state.create_logical_plan(&alert.query).await?;
9497

9598
// TODO: Filter tags should be taken care of!!!
96-
let time_range = TimeRange::parse_human_time(start_time, end_time).unwrap();
99+
let time_range = TimeRange::parse_human_time(start_time, end_time)
100+
.map_err(|err| AlertError::CustomError(err.to_string()))?;
97101
let query = crate::query::Query {
98102
raw_logical_plan,
99103
time_range,
@@ -102,12 +106,28 @@ pub async fn evaluate_alert(alert: AlertConfig) -> Result<(), AlertError> {
102106

103107
// for now proceed in a similar fashion as we do in query
104108
// TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data)
105-
let stream_name = query.first_table_name().unwrap();
109+
let stream_name = if let Some(stream_name) = query.first_table_name() {
110+
stream_name
111+
} else {
112+
return Err(AlertError::CustomError(format!(
113+
"Table name not found in query- {}",
114+
alert.query
115+
)));
116+
};
106117

107-
let df = query.get_dataframe(stream_name).await.unwrap();
118+
let df = query
119+
.get_dataframe(stream_name)
120+
.await
121+
.map_err(|err| AlertError::CustomError(err.to_string()))?;
108122

109123
// let df = DataFrame::new(session_state, raw_logical_plan);
110124

125+
// for now group by is empty, we can include this later
126+
let group_expr = vec![];
127+
128+
// agg expression
129+
let mut aggr_expr = vec![];
130+
111131
let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true)));
112132
for threshold in &alert.thresholds {
113133
let res = match threshold.operator {
@@ -137,10 +157,29 @@ pub async fn evaluate_alert(alert: AlertConfig) -> Result<(), AlertError> {
137157
}
138158
};
139159

160+
aggr_expr.push(match threshold.agg {
161+
crate::handlers::http::alerts::Aggregate::Avg => {
162+
avg(col(&threshold.column)).alias(&threshold.column)
163+
}
164+
crate::handlers::http::alerts::Aggregate::Count => {
165+
count(col(&threshold.column)).alias(&threshold.column)
166+
}
167+
crate::handlers::http::alerts::Aggregate::Min => {
168+
min(col(&threshold.column)).alias(&threshold.column)
169+
}
170+
crate::handlers::http::alerts::Aggregate::Max => {
171+
max(col(&threshold.column)).alias(&threshold.column)
172+
}
173+
crate::handlers::http::alerts::Aggregate::Sum => {
174+
sum(col(&threshold.column)).alias(&threshold.column)
175+
}
176+
});
140177
expr = expr.and(res);
141178
}
142179

143-
let nrows = df.clone().filter(expr).unwrap().count().await.unwrap();
180+
let df = df.aggregate(group_expr, aggr_expr)?;
181+
182+
let nrows = df.clone().filter(expr)?.count().await?;
144183
trace!("dataframe-\n{:?}", df.collect().await);
145184

146185
if nrows > 0 {

src/handlers/http/alerts/http_handlers.rs

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@
1818

1919
use crate::{
2020
option::CONFIG,
21-
storage::object_storage::alert_json_path,
21+
storage::{object_storage::alert_json_path, ALERTS_ROOT_DIRECTORY, PARSEABLE_ROOT_DIRECTORY},
2222
sync::schedule_alert_task,
23-
utils::{actix::extract_session_key_from_req, uid::Uid},
23+
utils::actix::extract_session_key_from_req,
2424
};
2525
use actix_web::{web, HttpRequest, Responder};
2626
use bytes::Bytes;
27-
use tracing::warn;
27+
use relative_path::RelativePathBuf;
2828

29-
use super::{alerts_utils::user_auth_for_query, AlertConfig, AlertError, AlertState, ALERTS};
29+
use super::{
30+
alerts_utils::user_auth_for_query, AlertConfig, AlertError, AlertRequest, AlertState, ALERTS,
31+
};
3032

3133
// GET /alerts
3234
/// User needs at least a read access to the stream(s) that is being referenced in an alert
@@ -39,10 +41,11 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, AlertError> {
3941
}
4042

4143
// POST /alerts
42-
pub async fn post(req: HttpRequest, alert: AlertConfig) -> Result<impl Responder, AlertError> {
44+
pub async fn post(req: HttpRequest, alert: AlertRequest) -> Result<impl Responder, AlertError> {
45+
let alert: AlertConfig = alert.into();
4346
// validate the incoming alert query
4447
// does the user have access to these tables or not?
45-
let session_key = extract_session_key_from_req(&req).unwrap();
48+
let session_key = extract_session_key_from_req(&req)?;
4649
user_auth_for_query(&session_key, &alert.query).await?;
4750

4851
// now that we've validated that the user can run this query
@@ -71,18 +74,27 @@ pub async fn get(req: HttpRequest) -> Result<impl Responder, AlertError> {
7174
.get("alert_id")
7275
.ok_or(AlertError::Metadata("No alert ID Provided"))?;
7376

74-
let alert = ALERTS.get_alert_by_id(session_key, id).await?;
77+
let alert = ALERTS.get_alert_by_id(id).await?;
78+
// validate that the user has access to the tables mentioned
79+
user_auth_for_query(&session_key, &alert.query).await?;
80+
7581
Ok(web::Json(alert))
7682
}
7783

7884
// DELETE /alerts/{alert_id}
7985
/// Deletion should happen from disk, sheduled tasks, then memory
8086
pub async fn delete(req: HttpRequest) -> Result<impl Responder, AlertError> {
87+
let session_key = extract_session_key_from_req(&req)?;
8188
let alert_id = req
8289
.match_info()
8390
.get("alert_id")
8491
.ok_or(AlertError::Metadata("No alert ID Provided"))?;
8592

93+
let alert = ALERTS.get_alert_by_id(alert_id).await?;
94+
95+
// validate that the user has access to the tables mentioned
96+
user_auth_for_query(&session_key, &alert.query).await?;
97+
8698
// delete from disk and memory
8799
ALERTS.delete(alert_id).await?;
88100

@@ -95,31 +107,34 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, AlertError> {
95107
// PUT /alerts/{alert_id}
96108
/// first save on disk, then in memory
97109
/// then modify scheduled task
98-
pub async fn modify(
99-
req: HttpRequest,
100-
mut alert: AlertConfig,
101-
) -> Result<impl Responder, AlertError> {
110+
pub async fn modify(req: HttpRequest, alert: AlertRequest) -> Result<impl Responder, AlertError> {
102111
let session_key = extract_session_key_from_req(&req)?;
103112
let alert_id = req
104113
.match_info()
105114
.get("alert_id")
106115
.ok_or(AlertError::Metadata("No alert ID Provided"))?;
107116

108-
// ensure that the user doesn't unknowingly change the ID
109-
if alert_id != alert.id.to_string() {
110-
warn!("Alert modify request is trying to change Alert ID, reverting ID");
111-
alert.id = Uid::from_string(alert_id)
112-
.map_err(|_| AlertError::CustomError("Unable to get Uid from String".to_owned()))?;
113-
}
117+
// check if alert id exists in map
118+
ALERTS.get_alert_by_id(alert_id).await?;
114119

115120
// validate that the user has access to the tables mentioned
116121
user_auth_for_query(&session_key, &alert.query).await?;
117122

118-
// // fetch the alert from this ID to get AlertState
119-
// let state = ALERTS.get_alert_by_id(session_key, alert_id).await?.state;
120-
121123
let store = CONFIG.storage().get_object_store();
122124

125+
// fetch the alert object for the relevant ID
126+
let old_alert_config: AlertConfig = serde_json::from_slice(
127+
&store
128+
.get_object(&RelativePathBuf::from_iter([
129+
PARSEABLE_ROOT_DIRECTORY,
130+
ALERTS_ROOT_DIRECTORY,
131+
&format!("{alert_id}.json"),
132+
]))
133+
.await?,
134+
)?;
135+
136+
let alert = alert.modify(old_alert_config);
137+
123138
// modify on disk
124139
store.put_alert(&alert.id.to_string(), &alert).await?;
125140

@@ -136,11 +151,18 @@ pub async fn modify(
136151

137152
// PUT /alerts/{alert_id}/update_state
138153
pub async fn update_state(req: HttpRequest, state: String) -> Result<impl Responder, AlertError> {
154+
let session_key = extract_session_key_from_req(&req)?;
139155
let alert_id = req
140156
.match_info()
141157
.get("alert_id")
142158
.ok_or(AlertError::Metadata("No alert ID Provided"))?;
143159

160+
// check if alert id exists in map
161+
let alert = ALERTS.get_alert_by_id(alert_id).await?;
162+
163+
// validate that the user has access to the tables mentioned
164+
user_auth_for_query(&session_key, &alert.query).await?;
165+
144166
// get current state
145167
let current_state = ALERTS.get_state(alert_id).await?;
146168

0 commit comments

Comments
 (0)