Skip to content

Commit 5aeb1cd

Browse files
authored
Introduce alert state and timeouts (#189)
Implement alert state such that webhooks can be called on both alert trigger and alert resolution. Timeouts are also added to avoid flooding targets ( this is opt in rather than opt out because it adds overhead of state check and timer spawn mechanism ). Numeric rule and String rule are used through Consecutive* rule which takes repeat value as 1 if no repeat value is given.
1 parent 8763abb commit 5aeb1cd

File tree

6 files changed

+548
-211
lines changed

6 files changed

+548
-211
lines changed

server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ env_logger = "0.9.0"
2727
futures = "0.3"
2828
filetime = "0.2.17"
2929
http = "0.2.4"
30+
humantime-serde = "1.1.1"
3031
lazy_static = "1.4.0"
3132
log = "0.4.14"
3233
num_cpus = "1.0.0"

server/src/alerts/mod.rs

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ pub mod target;
2424

2525
pub use self::rule::Rule;
2626
use self::target::Target;
27-
use crate::event::Event;
2827

2928
#[derive(Default, Debug, Serialize, Deserialize)]
3029
#[serde(rename_all = "camelCase")]
@@ -44,46 +43,58 @@ pub struct Alert {
4443
}
4544

4645
impl Alert {
47-
pub async fn check_alert(&self, event: &Event) -> Result<(), ()> {
48-
let event_json: serde_json::Value = serde_json::from_str(&event.body).map_err(|_| ())?;
46+
pub fn check_alert(&self, stream_name: String, event_json: &serde_json::Value) {
47+
let resolves = self.rule.resolves(event_json);
4948

50-
if self.rule.resolves(&event_json) {
51-
log::info!("Alert triggered for stream {}", self.name);
52-
for target in self.targets.clone() {
53-
let context = Context::new(
54-
event.stream_name.clone(),
55-
self.name.clone(),
56-
self.message.clone(),
57-
self.rule.trigger_reason(),
58-
);
59-
actix_web::rt::spawn(async move {
60-
target.call(&context);
61-
});
49+
match resolves {
50+
AlertState::Listening | AlertState::Firing => (),
51+
alert_state @ (AlertState::SetToFiring | AlertState::Resolved) => {
52+
let context = self.get_context(stream_name, alert_state);
53+
for target in &self.targets {
54+
target.call(context.clone());
55+
}
6256
}
6357
}
58+
}
6459

65-
Ok(())
60+
fn get_context(&self, stream_name: String, alert_state: AlertState) -> Context {
61+
Context::new(
62+
stream_name,
63+
self.name.clone(),
64+
self.message.clone(),
65+
self.rule.trigger_reason(),
66+
alert_state,
67+
)
6668
}
6769
}
6870

6971
pub trait CallableTarget {
7072
fn call(&self, payload: &Context);
7173
}
7274

75+
#[derive(Debug, Clone)]
7376
pub struct Context {
7477
stream: String,
7578
alert_name: String,
7679
message: String,
7780
reason: String,
81+
alert_state: AlertState,
7882
}
7983

8084
impl Context {
81-
pub fn new(stream: String, alert_name: String, message: String, reason: String) -> Self {
85+
pub fn new(
86+
stream: String,
87+
alert_name: String,
88+
message: String,
89+
reason: String,
90+
alert_state: AlertState,
91+
) -> Self {
8292
Self {
8393
stream,
8494
alert_name,
8595
message,
8696
reason,
97+
alert_state,
8798
}
8899
}
89100

@@ -93,4 +104,22 @@ impl Context {
93104
self.alert_name, self.stream, self.message, self.reason
94105
)
95106
}
107+
108+
fn default_resolved_string(&self) -> String {
109+
format!("{} on {} is now resolved ", self.alert_name, self.stream)
110+
}
111+
}
112+
113+
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
114+
pub enum AlertState {
115+
Listening,
116+
SetToFiring,
117+
Firing,
118+
Resolved,
119+
}
120+
121+
impl Default for AlertState {
122+
fn default() -> Self {
123+
Self::Listening
124+
}
96125
}

0 commit comments

Comments
 (0)