Skip to content

Commit 50fa20d

Browse files
authored
Refactor alerts (#179)
Alerts are stored on metadata map which is behind and rwlock. This is fine but having to hold writer lock for alert check is not ideal. This PR refactors code around alerts implementation. Each alerts rule should be self contained with its own interior mutable state if it requires any. Interior mutability pattern allows for alert to be resolved with shared reference. An alert can now contain different variant of rules which can be state or stateless.
1 parent 812917b commit 50fa20d

File tree

8 files changed

+364
-181
lines changed

8 files changed

+364
-181
lines changed

server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ actix-web-static-files = "4.0"
5353
static-files = "0.2.1"
5454
walkdir = "2"
5555
ureq = { version = "2.5.0", features = ["json"] }
56+
uuid = { version = "1.2.1", features = ["v4", "fast-rng", "serde"] }
5657

5758
[build-dependencies]
5859
static-files = "0.2.1"

server/src/alerts.rs

Lines changed: 248 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,24 @@
1616
*
1717
*/
1818

19-
use log::{error, info};
19+
use std::sync::atomic::{AtomicU32, Ordering};
20+
2021
use serde::{Deserialize, Serialize};
22+
use uuid::Uuid;
23+
24+
use crate::event::Event;
2125

22-
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
26+
#[derive(Default, Debug, Serialize, Deserialize)]
2327
#[serde(rename_all = "camelCase")]
2428
pub struct Alerts {
2529
pub alerts: Vec<Alert>,
2630
}
2731

28-
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
32+
#[derive(Debug, Serialize, Deserialize)]
2933
#[serde(rename_all = "camelCase")]
3034
pub struct Alert {
35+
#[serde(default = "crate::utils::uuid::gen")]
36+
pub id: Uuid,
3137
pub name: String,
3238
pub message: String,
3339
pub rule: Rule,
@@ -37,13 +43,20 @@ pub struct Alert {
3743
impl Alert {
3844
// TODO: spawn async tasks to call webhooks if alert rules are met
3945
// This is done to ensure that threads aren't blocked by calls to the webhook
40-
pub async fn check_alert(&mut self, event: &serde_json::Value) -> Result<(), ()> {
41-
if self.rule.resolves(event).await {
42-
info!("Alert triggered; name: {}", self.name);
46+
pub async fn check_alert(&self, event: &Event) -> Result<(), ()> {
47+
let event_json: serde_json::Value = serde_json::from_str(&event.body).map_err(|_| ())?;
48+
49+
if self.rule.resolves(&event_json) {
50+
log::info!("Alert triggered for stream {}", self.name);
4351
for target in self.targets.clone() {
44-
let msg = self.message.clone();
52+
let context = Context::new(
53+
event.stream_name.clone(),
54+
self.name.clone(),
55+
self.message.clone(),
56+
self.rule.trigger_reason(),
57+
);
4558
actix_web::rt::spawn(async move {
46-
target.call(&msg);
59+
target.call(&context);
4760
});
4861
}
4962
}
@@ -52,81 +65,270 @@ impl Alert {
5265
}
5366
}
5467

55-
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
68+
#[derive(Debug, Serialize, Deserialize)]
69+
#[serde(untagged)]
70+
pub enum Rule {
71+
Numeric(NumericRule),
72+
}
73+
74+
impl Rule {
75+
fn resolves(&self, event: &serde_json::Value) -> bool {
76+
match self {
77+
Rule::Numeric(rule) => rule.resolves(event),
78+
}
79+
}
80+
81+
pub fn valid_for_schema(&self, schema: &arrow_schema::Schema) -> bool {
82+
match self {
83+
Rule::Numeric(NumericRule { column, .. }) => match schema.column_with_name(column) {
84+
Some((_, column)) => matches!(
85+
column.data_type(),
86+
arrow_schema::DataType::Int8
87+
| arrow_schema::DataType::Int16
88+
| arrow_schema::DataType::Int32
89+
| arrow_schema::DataType::Int64
90+
| arrow_schema::DataType::UInt8
91+
| arrow_schema::DataType::UInt16
92+
| arrow_schema::DataType::UInt32
93+
| arrow_schema::DataType::UInt64
94+
| arrow_schema::DataType::Float16
95+
| arrow_schema::DataType::Float32
96+
| arrow_schema::DataType::Float64
97+
),
98+
None => false,
99+
},
100+
}
101+
}
102+
103+
pub fn trigger_reason(&self) -> String {
104+
match self {
105+
Rule::Numeric(NumericRule {
106+
column,
107+
operator,
108+
value,
109+
repeats,
110+
..
111+
}) => match operator {
112+
NumericOperator::EqualTo => format!(
113+
"{} column was equal to {}, {} times",
114+
column, value, repeats
115+
),
116+
NumericOperator::NotEqualTo => format!(
117+
"{} column was not equal to {}, {} times",
118+
column, value, repeats
119+
),
120+
NumericOperator::GreaterThan => format!(
121+
"{} column was greater than {}, {} times",
122+
column, value, repeats
123+
),
124+
NumericOperator::GreaterThanEquals => format!(
125+
"{} column was greater than or equal to {}, {} times",
126+
column, value, repeats
127+
),
128+
NumericOperator::LessThan => format!(
129+
"{} column was less than {}, {} times",
130+
column, value, repeats
131+
),
132+
NumericOperator::LessThanEquals => format!(
133+
"{} column was less than or equal to {}, {} times",
134+
column, value, repeats
135+
),
136+
},
137+
}
138+
}
139+
}
140+
141+
#[derive(Debug, Serialize, Deserialize)]
56142
#[serde(rename_all = "camelCase")]
57-
pub struct Rule {
58-
pub field: String,
143+
pub struct NumericRule {
144+
pub column: String,
59145
/// Field that determines what comparison operator is to be used
60146
#[serde(default)]
61-
pub operator: Operator,
62-
pub value: serde_json::Value,
147+
pub operator: NumericOperator,
148+
pub value: serde_json::Number,
63149
pub repeats: u32,
64150
#[serde(skip)]
65-
repeated: u32,
66-
pub within: String,
151+
repeated: AtomicU32,
67152
}
68153

69-
impl Rule {
70-
// TODO: utilise `within` to set a range for validity of rule to trigger alert
71-
pub async fn resolves(&mut self, event: &serde_json::Value) -> bool {
154+
impl NumericRule {
155+
fn resolves(&self, event: &serde_json::Value) -> bool {
156+
let number = match event.get(&self.column).expect("column exists") {
157+
serde_json::Value::Number(number) => number,
158+
_ => unreachable!("right rule is set for right column type"),
159+
};
160+
72161
let comparison = match self.operator {
73-
Operator::EqualTo => event.get(&self.field).unwrap() == &self.value,
74-
// TODO: currently this is a hack, ensure checks are performed in the right way
75-
Operator::GreaterThan => {
76-
event.get(&self.field).unwrap().as_f64().unwrap() > (self.value).as_f64().unwrap()
162+
NumericOperator::EqualTo => number == &self.value,
163+
NumericOperator::NotEqualTo => number != &self.value,
164+
NumericOperator::GreaterThan => number.as_f64().unwrap() > self.value.as_f64().unwrap(),
165+
NumericOperator::GreaterThanEquals => {
166+
number.as_f64().unwrap() >= self.value.as_f64().unwrap()
77167
}
78-
Operator::LessThan => {
79-
event.get(&self.field).unwrap().as_f64().unwrap() < (self.value).as_f64().unwrap()
168+
NumericOperator::LessThan => number.as_f64().unwrap() < self.value.as_f64().unwrap(),
169+
NumericOperator::LessThanEquals => {
170+
number.as_f64().unwrap() <= self.value.as_f64().unwrap()
80171
}
81172
};
82173

83174
// If truthy, increment count of repeated
175+
// acquire lock and load
176+
let mut repeated = self.repeated.load(Ordering::Acquire);
177+
84178
if comparison {
85-
self.repeated += 1;
179+
repeated += 1
86180
}
87181

88182
// If enough repetitions made, return true
89-
if self.repeated >= self.repeats {
90-
self.repeated = 0;
91-
return true;
92-
}
183+
let ret = if repeated >= self.repeats {
184+
repeated = 0;
185+
true
186+
} else {
187+
false
188+
};
189+
// store the value back to repeated and release
190+
self.repeated.store(repeated, Ordering::Release);
93191

94-
false
192+
ret
95193
}
96194
}
97195

98196
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
99197
#[serde(rename_all = "camelCase")]
100-
pub enum Operator {
198+
pub enum NumericOperator {
199+
#[serde(alias = "=")]
101200
EqualTo,
201+
#[serde(alias = "!=")]
202+
NotEqualTo,
203+
#[serde(alias = ">")]
102204
GreaterThan,
205+
#[serde(alias = ">=")]
206+
GreaterThanEquals,
207+
#[serde(alias = "<")]
103208
LessThan,
209+
#[serde(alias = "<=")]
210+
LessThanEquals,
104211
}
105212

106-
impl Default for Operator {
213+
impl Default for NumericOperator {
107214
fn default() -> Self {
108215
Self::EqualTo
109216
}
110217
}
111218

112-
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
219+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
113220
#[serde(rename_all = "camelCase")]
114-
pub struct Target {
115-
pub name: String,
116-
#[serde(rename = "server_url")]
117-
pub server_url: String,
118-
#[serde(rename = "api_key")]
119-
pub api_key: String,
221+
#[serde(tag = "type")]
222+
pub enum Target {
223+
Slack(targets::slack::SlackWebHook),
224+
#[serde(alias = "webhook")]
225+
Other(targets::other::OtherWebHook),
120226
}
121227

122228
impl Target {
123-
pub fn call(&self, msg: &str) {
124-
if let Err(e) = ureq::post(&self.server_url)
125-
.set("Content-Type", "text/plain; charset=iso-8859-1")
126-
.set("X-API-Key", &self.api_key)
127-
.send_string(msg)
128-
{
129-
error!("Couldn't make call to webhook, error: {}", e)
229+
pub fn call(&self, payload: &Context) {
230+
match self {
231+
Target::Slack(target) => target.call(payload),
232+
Target::Other(target) => target.call(payload),
233+
}
234+
}
235+
}
236+
237+
pub trait CallableTarget {
238+
fn call(&self, payload: &Context);
239+
}
240+
241+
pub mod targets {
242+
pub mod slack {
243+
use serde::{Deserialize, Serialize};
244+
245+
use crate::alerts::{CallableTarget, Context};
246+
247+
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
248+
pub struct SlackWebHook {
249+
#[serde(rename = "server_url")]
250+
server_url: String,
251+
}
252+
253+
impl CallableTarget for SlackWebHook {
254+
fn call(&self, payload: &Context) {
255+
if let Err(e) = ureq::post(&self.server_url)
256+
.set("Content-Type", "application/json")
257+
.send_json(ureq::json!({ "text": payload.default_alert_string() }))
258+
{
259+
log::error!("Couldn't make call to webhook, error: {}", e)
260+
}
261+
}
262+
}
263+
}
264+
265+
pub mod other {
266+
use serde::{Deserialize, Serialize};
267+
268+
use crate::alerts::{CallableTarget, Context};
269+
270+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
271+
#[serde(untagged)]
272+
pub enum OtherWebHook {
273+
ApiKey {
274+
#[serde(rename = "server_url")]
275+
server_url: String,
276+
#[serde(rename = "api_key")]
277+
api_key: String,
278+
},
279+
Simple {
280+
#[serde(rename = "server_url")]
281+
server_url: String,
282+
},
130283
}
284+
285+
impl CallableTarget for OtherWebHook {
286+
fn call(&self, payload: &Context) {
287+
let res = match self {
288+
OtherWebHook::Simple { server_url } => ureq::post(server_url)
289+
.set("Content-Type", "text/plain; charset=iso-8859-1")
290+
.send_string(&payload.default_alert_string()),
291+
OtherWebHook::ApiKey {
292+
server_url,
293+
api_key,
294+
} => ureq::post(server_url)
295+
.set("Content-Type", "text/plain; charset=iso-8859-1")
296+
.set("X-API-Key", api_key)
297+
.send_string(&payload.default_alert_string()),
298+
};
299+
300+
if let Err(e) = res {
301+
log::error!("Couldn't make call to webhook, error: {}", e)
302+
}
303+
}
304+
}
305+
}
306+
}
307+
308+
pub struct Context {
309+
stream: String,
310+
alert_name: String,
311+
message: String,
312+
reason: String,
313+
}
314+
315+
impl Context {
316+
pub fn new(stream: String, alert_name: String, message: String, reason: String) -> Self {
317+
Self {
318+
stream,
319+
alert_name,
320+
message,
321+
reason,
322+
}
323+
}
324+
325+
// <Alert_Name> Triggered on <Log_stream>
326+
// Message: Ting
327+
// Failing Condition: Status column was equal to 500, 5 times
328+
fn default_alert_string(&self) -> String {
329+
format!(
330+
"{} triggered on {}\nMessage: {}\nFailing Condition: {}",
331+
self.alert_name, self.stream, self.message, self.reason
332+
)
131333
}
132334
}

0 commit comments

Comments
 (0)