1
1
import asyncio
2
+ import json
3
+ import os
2
4
from copy import deepcopy
5
+ from datetime import datetime , timedelta
3
6
from typing import Any , Awaitable , Dict , List
4
7
8
+ from loguru import logger
5
9
from prometheus_client import Gauge
6
10
7
11
from pyth_observer .check import Check , State
10
14
from pyth_observer .event import DatadogEvent # Used dynamically
11
15
from pyth_observer .event import LogEvent # Used dynamically
12
16
from pyth_observer .event import TelegramEvent # Used dynamically
17
+ from pyth_observer .event import ZendutyEvent # Used dynamically
13
18
from pyth_observer .event import Event
19
+ from pyth_observer .zenduty import send_zenduty_alert
14
20
15
21
assert DatadogEvent
16
22
assert LogEvent
17
23
assert TelegramEvent
24
+ assert ZendutyEvent
18
25
19
26
20
27
class Dispatch :
@@ -36,6 +43,16 @@ def __init__(self, config, publishers):
36
43
"Publisher check failure status" ,
37
44
["check" , "symbol" , "publisher" ],
38
45
)
46
+ if "ZendutyEvent" in self .config ["events" ]:
47
+ self .open_alerts_file = os .environ ["OPEN_ALERTS_FILE" ]
48
+ self .open_alerts = self .load_alerts ()
49
+
50
+ def load_alerts (self ):
51
+ try :
52
+ with open (self .open_alerts_file , "r" ) as file :
53
+ return json .load (file )
54
+ except FileNotFoundError :
55
+ return {} # Return an empty dict if the file doesn't exist
39
56
40
57
async def run (self , states : List [State ]):
41
58
# First, run each check and store the ones that failed
@@ -62,8 +79,41 @@ async def run(self, states: List[State]):
62
79
63
80
sent_events .append (event .send ())
64
81
82
+ if event_type == "ZendutyEvent" :
83
+ # Add failed check to open alerts
84
+ alert_identifier = (
85
+ f"{ check .__class__ .__name__ } -{ check .state ().symbol } "
86
+ )
87
+ state = check .state ()
88
+ if isinstance (state , PublisherState ):
89
+ alert_identifier += f"-{ state .publisher_name } "
90
+ self .open_alerts [alert_identifier ] = datetime .now ().isoformat ()
91
+
65
92
await asyncio .gather (* sent_events )
66
93
94
+ # Check open alerts and resolve those that are older than 2 minutes
95
+ if "ZendutyEvent" in self .config ["events" ]:
96
+
97
+ to_remove = []
98
+ current_time = datetime .now ()
99
+ for identifier , last_failure in self .open_alerts .items ():
100
+ if current_time - datetime .fromisoformat (last_failure ) >= timedelta (
101
+ minutes = 2
102
+ ):
103
+ logger .debug (f"Resolving Zenduty alert { identifier } " )
104
+ response = await send_zenduty_alert (
105
+ alert_identifier = identifier , message = identifier , resolved = True
106
+ )
107
+ if response and 200 <= response .status < 300 :
108
+ to_remove .append (identifier )
109
+
110
+ for identifier in to_remove :
111
+ del self .open_alerts [identifier ]
112
+
113
+ # Write open alerts to file to ensure persistence
114
+ with open (self .open_alerts_file , "w" ) as file :
115
+ json .dump (self .open_alerts , file )
116
+
67
117
def check_price_feed (self , state : PriceFeedState ) -> List [Check ]:
68
118
failed_checks : List [Check ] = []
69
119
0 commit comments