Skip to content

Commit 0b0a046

Browse files
committed
feat(reliability) Add metrics
1 parent 8ab2e0e commit 0b0a046

File tree

3 files changed

+447
-74
lines changed

3 files changed

+447
-74
lines changed

pyth_observer/__init__.py

Lines changed: 132 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from pyth_observer.crosschain import CrosschainPrice
2525
from pyth_observer.crosschain import CrosschainPriceObserver as Crosschain
2626
from pyth_observer.dispatch import Dispatch
27+
from pyth_observer.metrics import metrics
2728
from pyth_observer.models import Publisher
2829

2930
PYTHTEST_HTTP_ENDPOINT = "https://api.pythtest.pyth.network/"
@@ -71,7 +72,16 @@ def __init__(
7172
self.crosschain_throttler = Throttler(rate_limit=1, period=1)
7273
self.coingecko_mapping = coingecko_mapping
7374

75+
metrics.set_observer_info(
76+
network=config["network"]["name"],
77+
config=config,
78+
)
79+
80+
metrics.observer_up = 1
81+
7482
async def run(self):
83+
# global states
84+
states = []
7585
while True:
7686
try:
7787
logger.info("Running checks")
@@ -81,6 +91,10 @@ async def run(self):
8191
crosschain_prices = await self.get_crosschain_prices()
8292

8393
health_server.observer_ready = True
94+
metrics.observer_ready = 1
95+
96+
processed_feeds = 0
97+
active_publishers_by_symbol = {}
8498

8599
for product in products:
86100
# Skip tombstone accounts with blank metadata
@@ -121,80 +135,139 @@ async def run(self):
121135
if not price_account.aggregate_price_info:
122136
raise RuntimeError("Aggregate price info is missing")
123137

124-
states.append(
125-
PriceFeedState(
126-
symbol=product.attrs["symbol"],
127-
asset_type=product.attrs["asset_type"],
128-
schedule=MarketSchedule(product.attrs["schedule"]),
129-
public_key=price_account.key,
130-
status=price_account.aggregate_price_status,
131-
# this is the solana block slot when price account was fetched
132-
latest_block_slot=latest_block_slot,
133-
latest_trading_slot=price_account.last_slot,
134-
price_aggregate=price_account.aggregate_price_info.price,
135-
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
136-
coingecko_price=coingecko_prices.get(
137-
product.attrs["base"]
138-
),
139-
coingecko_update=coingecko_updates.get(
140-
product.attrs["base"]
141-
),
142-
crosschain_price=crosschain_price,
143-
)
138+
price_feed_state = PriceFeedState(
139+
symbol=product.attrs["symbol"],
140+
asset_type=product.attrs["asset_type"],
141+
schedule=MarketSchedule(product.attrs["schedule"]),
142+
public_key=price_account.key,
143+
status=price_account.aggregate_price_status,
144+
# this is the solana block slot when price account was fetched
145+
latest_block_slot=latest_block_slot,
146+
latest_trading_slot=price_account.last_slot,
147+
price_aggregate=price_account.aggregate_price_info.price,
148+
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
149+
coingecko_price=coingecko_prices.get(product.attrs["base"]),
150+
coingecko_update=coingecko_updates.get(
151+
product.attrs["base"]
152+
),
153+
crosschain_price=crosschain_price,
144154
)
145155

156+
states.append(price_feed_state)
157+
processed_feeds += 1
158+
159+
metrics.update_price_feed_metrics(price_feed_state)
160+
161+
symbol = product.attrs["symbol"]
162+
if symbol not in active_publishers_by_symbol:
163+
active_publishers_by_symbol[symbol] = {
164+
"count": 0,
165+
"asset_type": product.attrs["asset_type"],
166+
}
167+
146168
for component in price_account.price_components:
147169
pub = self.publishers.get(component.publisher_key.key, None)
148170
publisher_name = (
149171
(pub.name if pub else "")
150172
+ f" ({component.publisher_key.key})"
151173
).strip()
152-
states.append(
153-
PublisherState(
154-
publisher_name=publisher_name,
155-
symbol=product.attrs["symbol"],
156-
asset_type=product.attrs["asset_type"],
157-
schedule=MarketSchedule(product.attrs["schedule"]),
158-
public_key=component.publisher_key,
159-
confidence_interval=component.latest_price_info.confidence_interval,
160-
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
161-
price=component.latest_price_info.price,
162-
price_aggregate=price_account.aggregate_price_info.price,
163-
slot=component.latest_price_info.pub_slot,
164-
aggregate_slot=price_account.last_slot,
165-
# this is the solana block slot when price account was fetched
166-
latest_block_slot=latest_block_slot,
167-
status=component.latest_price_info.price_status,
168-
aggregate_status=price_account.aggregate_price_status,
169-
)
174+
175+
publisher_state = PublisherState(
176+
publisher_name=publisher_name,
177+
symbol=product.attrs["symbol"],
178+
asset_type=product.attrs["asset_type"],
179+
schedule=MarketSchedule(product.attrs["schedule"]),
180+
public_key=component.publisher_key,
181+
confidence_interval=component.latest_price_info.confidence_interval,
182+
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
183+
price=component.latest_price_info.price,
184+
price_aggregate=price_account.aggregate_price_info.price,
185+
slot=component.latest_price_info.pub_slot,
186+
aggregate_slot=price_account.last_slot,
187+
# this is the solana block slot when price account was fetched
188+
latest_block_slot=latest_block_slot,
189+
status=component.latest_price_info.price_status,
190+
aggregate_status=price_account.aggregate_price_status,
170191
)
171192

172-
await self.dispatch.run(states)
193+
states.append(publisher_state)
194+
active_publishers_by_symbol[symbol]["count"] += 1
195+
196+
metrics.price_feeds_processed.set(processed_feeds)
197+
198+
for symbol, info in active_publishers_by_symbol.items():
199+
metrics.publishers_active.labels(
200+
symbol=symbol, asset_type=info["asset_type"]
201+
).set(info["count"])
202+
203+
await self.dispatch.run(states)
204+
173205
except Exception as e:
174206
logger.error(f"Error in run loop: {e}")
175207
health_server.observer_ready = False
208+
metrics.observer_ready = 0
209+
metrics.loop_errors_total.labels(error_type=type(e).__name__).inc()
176210

177-
logger.debug("Sleeping...")
211+
metrics.observer_ready = 0
178212
await asyncio.sleep(5)
179213

180214
async def get_pyth_products(self) -> List[PythProductAccount]:
181215
logger.debug("Fetching Pyth product accounts...")
182216

183-
async with self.pyth_throttler:
184-
return await self.pyth_client.refresh_products()
217+
try:
218+
async with self.pyth_throttler:
219+
with metrics.time_operation(
220+
metrics.api_request_duration, service="pyth", endpoint="products"
221+
):
222+
result = await self.pyth_client.refresh_products()
223+
metrics.api_request_total.labels(
224+
service="pyth", endpoint="products", status="success"
225+
).inc()
226+
return result
227+
except Exception:
228+
metrics.api_request_total.labels(
229+
service="pyth", endpoint="products", status="error"
230+
).inc()
231+
raise
185232

186233
async def get_pyth_prices(
187234
self, product: PythProductAccount
188235
) -> Dict[PythPriceType, PythPriceAccount]:
189236
logger.debug("Fetching Pyth price accounts...")
190237

191-
async with self.pyth_throttler:
192-
return await product.refresh_prices()
238+
try:
239+
async with self.pyth_throttler:
240+
with metrics.time_operation(
241+
metrics.api_request_duration, service="pyth", endpoint="prices"
242+
):
243+
result = await product.refresh_prices()
244+
metrics.api_request_total.labels(
245+
service="pyth", endpoint="prices", status="success"
246+
).inc()
247+
return result
248+
except Exception:
249+
metrics.api_request_total.labels(
250+
service="pyth", endpoint="prices", status="error"
251+
).inc()
252+
raise
193253

194254
async def get_coingecko_prices(self):
195255
logger.debug("Fetching CoinGecko prices...")
196256

197-
data = await get_coingecko_prices(self.coingecko_mapping)
257+
try:
258+
with metrics.time_operation(
259+
metrics.api_request_duration, service="coingecko", endpoint="prices"
260+
):
261+
data = await get_coingecko_prices(self.coingecko_mapping)
262+
metrics.api_request_total.labels(
263+
service="coingecko", endpoint="prices", status="success"
264+
).inc()
265+
except Exception:
266+
metrics.api_request_total.labels(
267+
service="coingecko", endpoint="prices", status="error"
268+
).inc()
269+
raise
270+
198271
prices: Dict[str, float] = {}
199272
updates: Dict[str, int] = {} # Unix timestamps
200273

@@ -205,4 +278,17 @@ async def get_coingecko_prices(self):
205278
return (prices, updates)
206279

207280
async def get_crosschain_prices(self) -> Dict[str, CrosschainPrice]:
208-
return await self.crosschain.get_crosschain_prices()
281+
try:
282+
with metrics.time_operation(
283+
metrics.api_request_duration, service="crosschain", endpoint="prices"
284+
):
285+
result = await self.crosschain.get_crosschain_prices()
286+
metrics.api_request_total.labels(
287+
service="crosschain", endpoint="prices", status="success"
288+
).inc()
289+
return result
290+
except Exception:
291+
metrics.api_request_total.labels(
292+
service="crosschain", endpoint="prices", status="error"
293+
).inc()
294+
raise

pyth_observer/dispatch.py

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from typing import Any, Awaitable, Dict, List
77

88
from loguru import logger
9-
from prometheus_client import Gauge
109

1110
from pyth_observer.check import Check, State
1211
from pyth_observer.check.price_feed import PRICE_FEED_CHECKS, PriceFeedState
@@ -15,6 +14,7 @@
1514
from pyth_observer.event import LogEvent # Used dynamically
1615
from pyth_observer.event import TelegramEvent # Used dynamically
1716
from pyth_observer.event import Context, Event, ZendutyEvent
17+
from pyth_observer.metrics import metrics
1818
from pyth_observer.zenduty import send_zenduty_alert
1919

2020
assert DatadogEvent
@@ -32,16 +32,6 @@ class Dispatch:
3232
def __init__(self, config, publishers):
3333
self.config = config
3434
self.publishers = publishers
35-
self.price_feed_check_gauge = Gauge(
36-
"price_feed_check_failed",
37-
"Price feed check failure status",
38-
["check", "symbol"],
39-
)
40-
self.publisher_check_gauge = Gauge(
41-
"publisher_check_failed",
42-
"Publisher check failure status",
43-
["check", "symbol", "publisher"],
44-
)
4535
if "ZendutyEvent" in self.config["events"]:
4636
self.open_alerts_file = os.environ["OPEN_ALERTS_FILE"]
4737
self.open_alerts = self.load_alerts()
@@ -98,48 +88,70 @@ async def run(self, states: List[State]):
9888
sent_events.append(event.send())
9989

10090
await asyncio.gather(*sent_events)
91+
92+
metrics.update_alert_metrics(self.open_alerts)
93+
10194
if "ZendutyEvent" in self.config["events"]:
10295
await self.process_zenduty_events(current_time)
10396

10497
def check_price_feed(self, state: PriceFeedState) -> List[Check]:
10598
failed_checks: List[Check] = []
99+
total_checks = 0
100+
passed_checks = 0
106101

107102
for check_class in PRICE_FEED_CHECKS:
108103
config = self.load_config(check_class.__name__, state.symbol)
109-
check = check_class(state, config)
110-
gauge = self.price_feed_check_gauge.labels(
111-
check=check_class.__name__,
112-
symbol=state.symbol,
113-
)
114104

115105
if config["enable"]:
116-
if check.run():
117-
gauge.set(0)
106+
total_checks += 1
107+
check = check_class(state, config)
108+
109+
with metrics.time_operation(
110+
metrics.check_execution_duration, check_type=check_class.__name__
111+
):
112+
check_passed = check.run()
113+
114+
if check_passed:
115+
passed_checks += 1
118116
else:
119117
failed_checks.append(check)
120-
gauge.set(1)
118+
119+
if total_checks > 0:
120+
success_rate = passed_checks / total_checks
121+
metrics.check_success_rate.labels(
122+
check_type="price_feed", symbol=state.symbol
123+
).set(success_rate)
121124

122125
return failed_checks
123126

124127
def check_publisher(self, state: PublisherState) -> List[Check]:
125128
failed_checks: List[Check] = []
129+
total_checks = 0
130+
passed_checks = 0
126131

127132
for check_class in PUBLISHER_CHECKS:
128133
config = self.load_config(check_class.__name__, state.symbol)
129-
check = check_class(state, config)
130-
gauge = self.publisher_check_gauge.labels(
131-
check=check_class.__name__,
132-
symbol=state.symbol,
133-
publisher=self.publishers.get(state.public_key, state.public_key),
134-
)
135134

136135
if config["enable"]:
137-
if check.run():
138-
gauge.set(0)
136+
total_checks += 1
137+
check = check_class(state, config)
138+
139+
with metrics.time_operation(
140+
metrics.check_execution_duration, check_type=check_class.__name__
141+
):
142+
check_passed = check.run()
143+
144+
if check_passed:
145+
passed_checks += 1
139146
else:
140-
gauge.set(1)
141147
failed_checks.append(check)
142148

149+
if total_checks > 0:
150+
success_rate = passed_checks / total_checks
151+
metrics.check_success_rate.labels(
152+
check_type="publisher", symbol=state.symbol
153+
).set(success_rate)
154+
143155
return failed_checks
144156

145157
def load_config(self, check_name: str, symbol: str) -> Dict[str, Any]:
@@ -187,12 +199,16 @@ async def process_zenduty_events(self, current_time):
187199
):
188200
logger.debug(f"Resolving Zenduty alert {identifier}")
189201
resolved = True
202+
190203
if info["sent"]:
191204
response = await send_zenduty_alert(
192205
identifier, identifier, resolved=True
193206
)
194207
if response and 200 <= response.status < 300:
195208
to_remove.append(identifier)
209+
metrics.alerts_sent_total.labels(
210+
alert_type=info["type"], channel="zenduty"
211+
).inc()
196212
else:
197213
to_remove.append(identifier)
198214
# Raise alert if failed > $threshold times within the last 5m window
@@ -216,6 +232,10 @@ async def process_zenduty_events(self, current_time):
216232
event = self.delayed_events.get(key)
217233
if event:
218234
to_alert.append(event.send())
235+
metrics.alerts_sent_total.labels(
236+
alert_type=info["type"],
237+
channel=event_type.lower().replace("event", ""),
238+
).inc()
219239

220240
# Send the alerts that were delayed due to thresholds
221241
await asyncio.gather(*to_alert)
@@ -229,5 +249,7 @@ async def process_zenduty_events(self, current_time):
229249
if self.delayed_events.get(key):
230250
del self.delayed_events[key]
231251

252+
metrics.update_alert_metrics(self.open_alerts)
253+
232254
with open(self.open_alerts_file, "w") as file:
233255
json.dump(self.open_alerts, file)

0 commit comments

Comments
 (0)