Skip to content

Commit 201c4a8

Browse files
committed
feat(reliability) Add /live and /ready probes
1 parent 7328160 commit 201c4a8

File tree

6 files changed

+310
-150
lines changed

6 files changed

+310
-150
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
__pycache__/
22
*.py[cod]
33
*$py.class
4+
poetry.toml
45

56
.DS_Store
67
.envrc
78
.coverage
89

910
.env
10-
.vscode/
11+
.vscode/
12+
.idea/

poetry.lock

Lines changed: 185 additions & 71 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ types-pyyaml = "^6.0.12"
2828
types-pytz = "^2022.4.0.0"
2929
python-dotenv = "^1.0.1"
3030
numpy = "^2.1.3"
31+
cffi = "^1.17"
3132

3233

3334
[tool.poetry.group.dev.dependencies]

pyth_observer/__init__.py

Lines changed: 85 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import os
3+
import pprint
34
from typing import Any, Dict, List, Tuple
45

56
from base58 import b58decode
@@ -24,6 +25,7 @@
2425
from pyth_observer.crosschain import CrosschainPriceObserver as Crosschain
2526
from pyth_observer.dispatch import Dispatch
2627
from pyth_observer.models import Publisher
28+
import pyth_observer.health_server as health_server
2729

2830
PYTHTEST_HTTP_ENDPOINT = "https://api.pythtest.pyth.network/"
2931
PYTHTEST_WS_ENDPOINT = "wss://api.pythtest.pyth.network/"
@@ -72,98 +74,104 @@ def __init__(
7274

7375
async def run(self):
7476
while True:
75-
logger.info("Running checks")
76-
77-
products = await self.get_pyth_products()
78-
coingecko_prices, coingecko_updates = await self.get_coingecko_prices()
79-
crosschain_prices = await self.get_crosschain_prices()
80-
81-
for product in products:
82-
# Skip tombstone accounts with blank metadata
83-
if "base" not in product.attrs:
84-
continue
85-
86-
if not product.first_price_account_key:
87-
continue
88-
89-
# For each product, we build a list of price feed states (one
90-
# for each price account) and a list of publisher states (one
91-
# for each publisher).
92-
states = []
93-
price_accounts = await self.get_pyth_prices(product)
94-
95-
crosschain_price = crosschain_prices.get(
96-
b58decode(product.first_price_account_key.key).hex(), None
97-
)
98-
99-
for _, price_account in price_accounts.items():
100-
# Handle potential None for min_publishers
101-
if (
102-
price_account.min_publishers is None
103-
# When min_publishers is high it means that the price is not production-ready
104-
# yet and it is still being tested. We need no alerting for these prices.
105-
or price_account.min_publishers >= 10
106-
):
77+
try:
78+
logger.info("Running checks")
79+
80+
products = await self.get_pyth_products()
81+
coingecko_prices, coingecko_updates = await self.get_coingecko_prices()
82+
crosschain_prices = await self.get_crosschain_prices()
83+
84+
health_server.observer_ready = True
85+
86+
for product in products:
87+
# Skip tombstone accounts with blank metadata
88+
if "base" not in product.attrs:
89+
continue
90+
91+
if not product.first_price_account_key:
10792
continue
10893

109-
# Ensure latest_block_slot is not None or provide a default value
110-
latest_block_slot = (
111-
price_account.slot if price_account.slot is not None else -1
94+
# For each product, we build a list of price feed states (one
95+
# for each price account) and a list of publisher states (one
96+
# for each publisher).
97+
states = []
98+
price_accounts = await self.get_pyth_prices(product)
99+
100+
crosschain_price = crosschain_prices.get(
101+
b58decode(product.first_price_account_key.key).hex(), None
112102
)
113103

114-
if not price_account.aggregate_price_status:
115-
raise RuntimeError("Price account status is missing")
116-
117-
if not price_account.aggregate_price_info:
118-
raise RuntimeError("Aggregate price info is missing")
119-
120-
states.append(
121-
PriceFeedState(
122-
symbol=product.attrs["symbol"],
123-
asset_type=product.attrs["asset_type"],
124-
schedule=MarketSchedule(product.attrs["schedule"]),
125-
public_key=price_account.key,
126-
status=price_account.aggregate_price_status,
127-
# this is the solana block slot when price account was fetched
128-
latest_block_slot=latest_block_slot,
129-
latest_trading_slot=price_account.last_slot,
130-
price_aggregate=price_account.aggregate_price_info.price,
131-
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
132-
coingecko_price=coingecko_prices.get(product.attrs["base"]),
133-
coingecko_update=coingecko_updates.get(
134-
product.attrs["base"]
135-
),
136-
crosschain_price=crosschain_price,
104+
for _, price_account in price_accounts.items():
105+
# Handle potential None for min_publishers
106+
if (
107+
price_account.min_publishers is None
108+
# When min_publishers is high it means that the price is not production-ready
109+
# yet and it is still being tested. We need no alerting for these prices.
110+
or price_account.min_publishers >= 10
111+
):
112+
continue
113+
114+
# Ensure latest_block_slot is not None or provide a default value
115+
latest_block_slot = (
116+
price_account.slot if price_account.slot is not None else -1
137117
)
138-
)
139118

140-
for component in price_account.price_components:
141-
pub = self.publishers.get(component.publisher_key.key, None)
142-
publisher_name = (
143-
(pub.name if pub else "")
144-
+ f" ({component.publisher_key.key})"
145-
).strip()
119+
if not price_account.aggregate_price_status:
120+
raise RuntimeError("Price account status is missing")
121+
122+
if not price_account.aggregate_price_info:
123+
raise RuntimeError("Aggregate price info is missing")
124+
146125
states.append(
147-
PublisherState(
148-
publisher_name=publisher_name,
126+
PriceFeedState(
149127
symbol=product.attrs["symbol"],
150128
asset_type=product.attrs["asset_type"],
151129
schedule=MarketSchedule(product.attrs["schedule"]),
152-
public_key=component.publisher_key,
153-
confidence_interval=component.latest_price_info.confidence_interval,
154-
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
155-
price=component.latest_price_info.price,
156-
price_aggregate=price_account.aggregate_price_info.price,
157-
slot=component.latest_price_info.pub_slot,
158-
aggregate_slot=price_account.last_slot,
130+
public_key=price_account.key,
131+
status=price_account.aggregate_price_status,
159132
# this is the solana block slot when price account was fetched
160133
latest_block_slot=latest_block_slot,
161-
status=component.latest_price_info.price_status,
162-
aggregate_status=price_account.aggregate_price_status,
134+
latest_trading_slot=price_account.last_slot,
135+
price_aggregate=price_account.aggregate_price_info.price,
136+
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
137+
coingecko_price=coingecko_prices.get(product.attrs["base"]),
138+
coingecko_update=coingecko_updates.get(
139+
product.attrs["base"]
140+
),
141+
crosschain_price=crosschain_price,
163142
)
164143
)
165144

166-
await self.dispatch.run(states)
145+
for component in price_account.price_components:
146+
pub = self.publishers.get(component.publisher_key.key, None)
147+
publisher_name = (
148+
(pub.name if pub else "")
149+
+ f" ({component.publisher_key.key})"
150+
).strip()
151+
states.append(
152+
PublisherState(
153+
publisher_name=publisher_name,
154+
symbol=product.attrs["symbol"],
155+
asset_type=product.attrs["asset_type"],
156+
schedule=MarketSchedule(product.attrs["schedule"]),
157+
public_key=component.publisher_key,
158+
confidence_interval=component.latest_price_info.confidence_interval,
159+
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
160+
price=component.latest_price_info.price,
161+
price_aggregate=price_account.aggregate_price_info.price,
162+
slot=component.latest_price_info.pub_slot,
163+
aggregate_slot=price_account.last_slot,
164+
# this is the solana block slot when price account was fetched
165+
latest_block_slot=latest_block_slot,
166+
status=component.latest_price_info.price_status,
167+
aggregate_status=price_account.aggregate_price_status,
168+
)
169+
)
170+
171+
await self.dispatch.run(states)
172+
except Exception as e:
173+
logger.error(f"Error in run loop: {e}")
174+
health_server.observer_ready = False
167175

168176
logger.debug("Sleeping...")
169177
await asyncio.sleep(5)

pyth_observer/cli.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from pyth_observer import Observer, Publisher
1111
from pyth_observer.models import ContactInfo
12+
from pyth_observer.health_server import start_health_server
1213

1314

1415
@click.command()
@@ -61,7 +62,15 @@ def run(config, publishers, coingecko_mapping, prometheus_port):
6162

6263
start_http_server(int(prometheus_port))
6364

64-
asyncio.run(observer.run())
65+
async def main():
66+
# Start health server in background
67+
health_task = asyncio.create_task(start_health_server())
68+
# Run observer
69+
await observer.run()
70+
# Optionally, wait for health server (should run forever)
71+
await health_task
72+
73+
asyncio.run(main())
6574

6675

6776
logger.remove()

pyth_observer/health_server.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import asyncio
2+
from aiohttp import web
3+
4+
observer_ready = False
5+
6+
async def live_handler(request):
7+
return web.Response(text="OK")
8+
9+
async def ready_handler(request):
10+
if observer_ready:
11+
return web.Response(text="OK")
12+
else:
13+
return web.Response(status=503, text="Not Ready")
14+
15+
async def start_health_server(port=8080):
16+
app = web.Application()
17+
app.router.add_get("/live", live_handler)
18+
app.router.add_get("/ready", ready_handler)
19+
runner = web.AppRunner(app)
20+
await runner.setup()
21+
site = web.TCPSite(runner, "0.0.0.0", port)
22+
await site.start()
23+
# Keep running forever
24+
while True:
25+
await asyncio.sleep(3600)
26+

0 commit comments

Comments
 (0)