Skip to content

Commit 414d2c9

Browse files
authored
Add AlertManager as Target (#306)
Partly fixes #288
1 parent 796995f commit 414d2c9

File tree

2 files changed

+125
-35
lines changed

2 files changed

+125
-35
lines changed

server/src/alerts/rule.rs

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,14 @@ pub struct ConsecutiveNumericRule {
163163

164164
impl ConsecutiveNumericRule {
165165
fn resolves(&self, event: &serde_json::Value) -> AlertState {
166-
if self.base_rule.resolves(event) {
167-
self.state.update_and_fetch_state()
166+
if let Some(resolved) = self.base_rule.resolves(event) {
167+
if resolved {
168+
self.state.update_and_fetch_state()
169+
} else {
170+
self.state.fetch_state()
171+
}
168172
} else {
169-
self.state.fetch_state()
173+
self.state.existing_state()
170174
}
171175
}
172176
}
@@ -182,10 +186,14 @@ pub struct ConsecutiveStringRule {
182186

183187
impl ConsecutiveStringRule {
184188
fn resolves(&self, event: &serde_json::Value) -> AlertState {
185-
if self.base_rule.resolves(event) {
186-
self.state.update_and_fetch_state()
189+
if let Some(resolved) = self.base_rule.resolves(event) {
190+
if resolved {
191+
self.state.update_and_fetch_state()
192+
} else {
193+
self.state.fetch_state()
194+
}
187195
} else {
188-
self.state.fetch_state()
196+
self.state.existing_state()
189197
}
190198
}
191199
}
@@ -211,6 +219,15 @@ impl ConsecutiveRepeatState {
211219
self._fetch_state(false)
212220
}
213221

222+
fn existing_state(&self) -> AlertState {
223+
let repeated = self.repeated.load(Ordering::Acquire);
224+
if repeated >= self.repeats {
225+
AlertState::Firing
226+
} else {
227+
AlertState::Listening
228+
}
229+
}
230+
214231
fn _fetch_state(&self, update: bool) -> AlertState {
215232
let mut repeated = self.repeated.load(Ordering::Acquire);
216233
let mut state = AlertState::Listening;
@@ -290,13 +307,13 @@ pub mod base {
290307
}
291308

292309
impl NumericRule {
293-
pub fn resolves(&self, event: &serde_json::Value) -> bool {
294-
let number = match event.get(&self.column).expect("column exists") {
310+
pub fn resolves(&self, event: &serde_json::Value) -> Option<bool> {
311+
let number = match event.get(&self.column)? {
295312
serde_json::Value::Number(number) => number,
296313
_ => unreachable!("right rule is set for right column type"),
297314
};
298315

299-
match self.operator {
316+
let res = match self.operator {
300317
NumericOperator::EqualTo => number == &self.value,
301318
NumericOperator::NotEqualTo => number != &self.value,
302319
NumericOperator::GreaterThan => {
@@ -311,7 +328,9 @@ pub mod base {
311328
NumericOperator::LessThanEquals => {
312329
number.as_f64().unwrap() <= self.value.as_f64().unwrap()
313330
}
314-
}
331+
};
332+
333+
Some(res)
315334
}
316335
}
317336

@@ -326,13 +345,13 @@ pub mod base {
326345
}
327346

328347
impl StringRule {
329-
pub fn resolves(&self, event: &serde_json::Value) -> bool {
330-
let string = match event.get(&self.column).expect("column exists") {
348+
pub fn resolves(&self, event: &serde_json::Value) -> Option<bool> {
349+
let string = match event.get(&self.column)? {
331350
serde_json::Value::String(s) => s,
332351
_ => unreachable!("right rule is set for right column type"),
333352
};
334353

335-
if self.ignore_case.unwrap_or_default() {
354+
let res = if self.ignore_case.unwrap_or_default() {
336355
match self.operator {
337356
StringOperator::Exact => string.eq_ignore_ascii_case(&self.value),
338357
StringOperator::NotExact => !string.eq_ignore_ascii_case(&self.value),
@@ -350,7 +369,9 @@ pub mod base {
350369
StringOperator::Contains => string.contains(&self.value),
351370
StringOperator::NotContains => !string.contains(&self.value),
352371
}
353-
}
372+
};
373+
374+
Some(res)
354375
}
355376
}
356377

server/src/alerts/target.rs

Lines changed: 90 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818

1919
use std::{
20+
ops::RangeTo,
2021
sync::{Arc, Mutex},
2122
time::Duration,
2223
};
@@ -26,6 +27,11 @@ use serde::{Deserialize, Serialize};
2627

2728
use super::{AlertState, CallableTarget, Context};
2829

30+
enum Retry {
31+
Infinity,
32+
Range(RangeTo<usize>),
33+
}
34+
2935
#[derive(Debug, Serialize, Deserialize)]
3036
#[serde(rename_all = "camelCase")]
3137
#[serde(try_from = "TargetVerifier")]
@@ -45,14 +51,18 @@ impl Target {
4551
match resolves {
4652
AlertState::SetToFiring => {
4753
state.alert_state = AlertState::Firing;
48-
4954
if !state.timed_out {
5055
// set state
5156
state.timed_out = true;
5257
state.awaiting_resolve = true;
5358
drop(state);
5459

55-
self.spawn_timeout_task(timeout, context.clone());
60+
let retry = match self.target {
61+
TargetType::AlertManager(_) => Retry::Infinity,
62+
_ => Retry::Range(..10),
63+
};
64+
65+
self.spawn_timeout_task(timeout, context.clone(), retry);
5666
call_target(self.target.clone(), context)
5767
}
5868
}
@@ -78,38 +88,53 @@ impl Target {
7888
}
7989
}
8090

81-
fn spawn_timeout_task(&self, timeout: &Timeout, alert_context: Context) {
91+
fn spawn_timeout_task(&self, timeout: &Timeout, alert_context: Context, retry: Retry) {
8292
let state = Arc::clone(&timeout.state);
8393
let timeout = timeout.timeout;
8494
let target = self.target.clone();
8595

86-
actix_web::rt::spawn(async move {
87-
const RETRIES: usize = 10;
88-
// sleep for timeout period
89-
for _ in 0..RETRIES {
96+
let sleep_and_check_if_call = move |timeout_state: Arc<Mutex<TimeoutState>>| {
97+
async move {
9098
tokio::time::sleep(timeout).await;
91-
let mut state = state.lock().unwrap();
99+
let mut state = timeout_state.lock().unwrap();
92100
if state.alert_state == AlertState::Firing {
93101
// it is still firing .. sleep more and come back
94102
state.awaiting_resolve = true;
95-
96-
call_target(target.clone(), alert_context.clone())
103+
true
97104
} else {
98105
state.timed_out = false;
99-
return;
106+
false
100107
}
101108
}
109+
};
102110

103-
// fallback for if this task only observed FIRING on all RETRIES
104-
// Stream might be dead and sending too many alerts is not great
105-
// Send and alert stating that this alert will only work once it has seen a RESOLVE
106-
state.lock().unwrap().timed_out = false;
107-
let mut context = alert_context;
108-
context.message = format!(
109-
"Triggering alert did not resolve itself after {RETRIES} retries, This alert is paused until it resolves",
110-
);
111-
// Send and exit this task.
112-
call_target(target, context);
111+
actix_web::rt::spawn(async move {
112+
match retry {
113+
Retry::Infinity => loop {
114+
let should_call = sleep_and_check_if_call(Arc::clone(&state)).await;
115+
if should_call {
116+
call_target(target.clone(), alert_context.clone())
117+
}
118+
},
119+
Retry::Range(range) => {
120+
for _ in 0..range.end {
121+
let should_call = sleep_and_check_if_call(Arc::clone(&state)).await;
122+
if should_call {
123+
call_target(target.clone(), alert_context.clone())
124+
}
125+
}
126+
// fallback for if this task only observed FIRING on all RETRIES
127+
// Stream might be dead and sending too many alerts is not great
128+
// Send and alert stating that this alert will only work once it has seen a RESOLVE
129+
state.lock().unwrap().timed_out = false;
130+
let mut context = alert_context;
131+
context.message = format!(
132+
"Triggering alert did not resolve itself after {} retries, This alert is paused until it resolves",
133+
range.end);
134+
// Send and exit this task.
135+
call_target(target, context);
136+
}
137+
}
113138
});
114139
}
115140
}
@@ -156,13 +181,15 @@ pub enum TargetType {
156181
Slack(SlackWebHook),
157182
#[serde(rename = "webhook")]
158183
Other(OtherWebHook),
184+
AlertManager(AlertManager),
159185
}
160186

161187
impl TargetType {
162188
pub fn call(&self, payload: &Context) {
163189
match self {
164190
TargetType::Slack(target) => target.call(payload),
165191
TargetType::Other(target) => target.call(payload),
192+
TargetType::AlertManager(target) => target.call(payload),
166193
}
167194
}
168195
}
@@ -226,6 +253,48 @@ impl CallableTarget for OtherWebHook {
226253
}
227254
}
228255

256+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
257+
pub struct AlertManager {
258+
url: String,
259+
}
260+
261+
impl CallableTarget for AlertManager {
262+
fn call(&self, payload: &Context) {
263+
let alert = match payload.alert_state {
264+
AlertState::SetToFiring => ureq::json!([{
265+
"labels": {
266+
"status": "firing",
267+
"alertname": payload.alert_name,
268+
"streamname": payload.stream
269+
},
270+
"annotations": {
271+
"message": payload.message,
272+
"reason": payload.reason
273+
}
274+
}]),
275+
AlertState::Resolved => ureq::json!([{
276+
"labels": {
277+
"status": "resolved",
278+
"alertname": payload.alert_name,
279+
"streamname": payload.stream
280+
},
281+
"annotations": {
282+
"message": payload.message,
283+
"reason": payload.reason
284+
}
285+
}]),
286+
_ => unreachable!(),
287+
};
288+
289+
if let Err(e) = ureq::post(&self.url)
290+
.set("Content-Type", "application/json")
291+
.send_json(alert)
292+
{
293+
log::error!("Couldn't make call to alertmanager, error: {}", e)
294+
}
295+
}
296+
}
297+
229298
#[derive(Debug, Serialize, Deserialize)]
230299
pub struct Timeout {
231300
#[serde(with = "humantime_serde")]

0 commit comments

Comments
 (0)