Skip to content

Commit ebb7a5a

Browse files
authored
Divide alert into submodules (#185)
1 parent 50fa20d commit ebb7a5a

File tree

3 files changed

+195
-166
lines changed

3 files changed

+195
-166
lines changed

server/src/alerts/mod.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Parseable Server (C) 2022 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use serde::{Deserialize, Serialize};
20+
use uuid::Uuid;
21+
22+
pub mod rule;
23+
pub mod target;
24+
25+
pub use self::rule::Rule;
26+
use self::target::Target;
27+
use crate::event::Event;
28+
29+
#[derive(Default, Debug, Serialize, Deserialize)]
30+
#[serde(rename_all = "camelCase")]
31+
pub struct Alerts {
32+
pub alerts: Vec<Alert>,
33+
}
34+
35+
#[derive(Debug, Serialize, Deserialize)]
36+
#[serde(rename_all = "camelCase")]
37+
pub struct Alert {
38+
#[serde(default = "crate::utils::uuid::gen")]
39+
pub id: Uuid,
40+
pub name: String,
41+
pub message: String,
42+
pub rule: Rule,
43+
pub targets: Vec<Target>,
44+
}
45+
46+
impl Alert {
47+
pub async fn check_alert(&self, event: &Event) -> Result<(), ()> {
48+
let event_json: serde_json::Value = serde_json::from_str(&event.body).map_err(|_| ())?;
49+
50+
if self.rule.resolves(&event_json) {
51+
log::info!("Alert triggered for stream {}", self.name);
52+
for target in self.targets.clone() {
53+
let context = Context::new(
54+
event.stream_name.clone(),
55+
self.name.clone(),
56+
self.message.clone(),
57+
self.rule.trigger_reason(),
58+
);
59+
actix_web::rt::spawn(async move {
60+
target.call(&context);
61+
});
62+
}
63+
}
64+
65+
Ok(())
66+
}
67+
}
68+
69+
pub trait CallableTarget {
70+
fn call(&self, payload: &Context);
71+
}
72+
73+
pub struct Context {
74+
stream: String,
75+
alert_name: String,
76+
message: String,
77+
reason: String,
78+
}
79+
80+
impl Context {
81+
pub fn new(stream: String, alert_name: String, message: String, reason: String) -> Self {
82+
Self {
83+
stream,
84+
alert_name,
85+
message,
86+
reason,
87+
}
88+
}
89+
90+
fn default_alert_string(&self) -> String {
91+
format!(
92+
"{} triggered on {}\nMessage: {}\nFailing Condition: {}",
93+
self.alert_name, self.stream, self.message, self.reason
94+
)
95+
}
96+
}

server/src/alerts.rs renamed to server/src/alerts/rule.rs

Lines changed: 7 additions & 166 deletions
Original file line numberDiff line numberDiff line change
@@ -16,54 +16,8 @@
1616
*
1717
*/
1818

19-
use std::sync::atomic::{AtomicU32, Ordering};
20-
2119
use serde::{Deserialize, Serialize};
22-
use uuid::Uuid;
23-
24-
use crate::event::Event;
25-
26-
#[derive(Default, Debug, Serialize, Deserialize)]
27-
#[serde(rename_all = "camelCase")]
28-
pub struct Alerts {
29-
pub alerts: Vec<Alert>,
30-
}
31-
32-
#[derive(Debug, Serialize, Deserialize)]
33-
#[serde(rename_all = "camelCase")]
34-
pub struct Alert {
35-
#[serde(default = "crate::utils::uuid::gen")]
36-
pub id: Uuid,
37-
pub name: String,
38-
pub message: String,
39-
pub rule: Rule,
40-
pub targets: Vec<Target>,
41-
}
42-
43-
impl Alert {
44-
// TODO: spawn async tasks to call webhooks if alert rules are met
45-
// This is done to ensure that threads aren't blocked by calls to the webhook
46-
pub async fn check_alert(&self, event: &Event) -> Result<(), ()> {
47-
let event_json: serde_json::Value = serde_json::from_str(&event.body).map_err(|_| ())?;
48-
49-
if self.rule.resolves(&event_json) {
50-
log::info!("Alert triggered for stream {}", self.name);
51-
for target in self.targets.clone() {
52-
let context = Context::new(
53-
event.stream_name.clone(),
54-
self.name.clone(),
55-
self.message.clone(),
56-
self.rule.trigger_reason(),
57-
);
58-
actix_web::rt::spawn(async move {
59-
target.call(&context);
60-
});
61-
}
62-
}
63-
64-
Ok(())
65-
}
66-
}
20+
use std::sync::atomic::{AtomicU32, Ordering};
6721

6822
#[derive(Debug, Serialize, Deserialize)]
6923
#[serde(untagged)]
@@ -72,7 +26,7 @@ pub enum Rule {
7226
}
7327

7428
impl Rule {
75-
fn resolves(&self, event: &serde_json::Value) -> bool {
29+
pub(super) fn resolves(&self, event: &serde_json::Value) -> bool {
7630
match self {
7731
Rule::Numeric(rule) => rule.resolves(event),
7832
}
@@ -100,7 +54,7 @@ impl Rule {
10054
}
10155
}
10256

103-
pub fn trigger_reason(&self) -> String {
57+
pub(super) fn trigger_reason(&self) -> String {
10458
match self {
10559
Rule::Numeric(NumericRule {
10660
column,
@@ -138,6 +92,8 @@ impl Rule {
13892
}
13993
}
14094

95+
// Rules for alerts
96+
14197
#[derive(Debug, Serialize, Deserialize)]
14298
#[serde(rename_all = "camelCase")]
14399
pub struct NumericRule {
@@ -193,6 +149,8 @@ impl NumericRule {
193149
}
194150
}
195151

152+
// Operator for comparing values
153+
196154
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
197155
#[serde(rename_all = "camelCase")]
198156
pub enum NumericOperator {
@@ -215,120 +173,3 @@ impl Default for NumericOperator {
215173
Self::EqualTo
216174
}
217175
}
218-
219-
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
220-
#[serde(rename_all = "camelCase")]
221-
#[serde(tag = "type")]
222-
pub enum Target {
223-
Slack(targets::slack::SlackWebHook),
224-
#[serde(alias = "webhook")]
225-
Other(targets::other::OtherWebHook),
226-
}
227-
228-
impl Target {
229-
pub fn call(&self, payload: &Context) {
230-
match self {
231-
Target::Slack(target) => target.call(payload),
232-
Target::Other(target) => target.call(payload),
233-
}
234-
}
235-
}
236-
237-
pub trait CallableTarget {
238-
fn call(&self, payload: &Context);
239-
}
240-
241-
pub mod targets {
242-
pub mod slack {
243-
use serde::{Deserialize, Serialize};
244-
245-
use crate::alerts::{CallableTarget, Context};
246-
247-
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
248-
pub struct SlackWebHook {
249-
#[serde(rename = "server_url")]
250-
server_url: String,
251-
}
252-
253-
impl CallableTarget for SlackWebHook {
254-
fn call(&self, payload: &Context) {
255-
if let Err(e) = ureq::post(&self.server_url)
256-
.set("Content-Type", "application/json")
257-
.send_json(ureq::json!({ "text": payload.default_alert_string() }))
258-
{
259-
log::error!("Couldn't make call to webhook, error: {}", e)
260-
}
261-
}
262-
}
263-
}
264-
265-
pub mod other {
266-
use serde::{Deserialize, Serialize};
267-
268-
use crate::alerts::{CallableTarget, Context};
269-
270-
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
271-
#[serde(untagged)]
272-
pub enum OtherWebHook {
273-
ApiKey {
274-
#[serde(rename = "server_url")]
275-
server_url: String,
276-
#[serde(rename = "api_key")]
277-
api_key: String,
278-
},
279-
Simple {
280-
#[serde(rename = "server_url")]
281-
server_url: String,
282-
},
283-
}
284-
285-
impl CallableTarget for OtherWebHook {
286-
fn call(&self, payload: &Context) {
287-
let res = match self {
288-
OtherWebHook::Simple { server_url } => ureq::post(server_url)
289-
.set("Content-Type", "text/plain; charset=iso-8859-1")
290-
.send_string(&payload.default_alert_string()),
291-
OtherWebHook::ApiKey {
292-
server_url,
293-
api_key,
294-
} => ureq::post(server_url)
295-
.set("Content-Type", "text/plain; charset=iso-8859-1")
296-
.set("X-API-Key", api_key)
297-
.send_string(&payload.default_alert_string()),
298-
};
299-
300-
if let Err(e) = res {
301-
log::error!("Couldn't make call to webhook, error: {}", e)
302-
}
303-
}
304-
}
305-
}
306-
}
307-
308-
pub struct Context {
309-
stream: String,
310-
alert_name: String,
311-
message: String,
312-
reason: String,
313-
}
314-
315-
impl Context {
316-
pub fn new(stream: String, alert_name: String, message: String, reason: String) -> Self {
317-
Self {
318-
stream,
319-
alert_name,
320-
message,
321-
reason,
322-
}
323-
}
324-
325-
// <Alert_Name> Triggered on <Log_stream>
326-
// Message: Ting
327-
// Failing Condition: Status column was equal to 500, 5 times
328-
fn default_alert_string(&self) -> String {
329-
format!(
330-
"{} triggered on {}\nMessage: {}\nFailing Condition: {}",
331-
self.alert_name, self.stream, self.message, self.reason
332-
)
333-
}
334-
}

0 commit comments

Comments
 (0)