Skip to content

Commit bf26d84

Browse files
committed
feat(observer): improve stall detection check
1 parent 1c4d802 commit bf26d84

14 files changed

+427
-96
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ __pycache__/
66
.envrc
77
.coverage
88

9-
.env
9+
.env
10+
.vscode/

.python-version

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.10

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@ Observe Pyth on-chain price feeds and run sanity checks on the data.
66

77
Container images are available at https://github.com/pyth-network/pyth-observer/pkgs/container/pyth-observer
88

9-
To run Observer locally, make sure you have a recent version of [Poetry](https://python-poetry.org) installed and run:
9+
To run Observer locally, you will need:
10+
- Python 3.10 ([pyenv](https://github.com/pyenv/pyenv) is a nice way to manage Python installs, and once installed will automatically set the version to 3.10 for this project dir via the `.python-version` file).
11+
- [Poetry](https://python-poetry.org), which handles package and virtualenv management.
1012

13+
Install dependencies and run the service:
1114
```sh
15+
$ poetry env use $(which python) # point Poetry to the pyenv python shim
1216
$ poetry install
1317
$ poetry run pyth-observer
1418
```

poetry.lock

Lines changed: 67 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ throttler = "1.2.1"
2727
types-pyyaml = "^6.0.12"
2828
types-pytz = "^2022.4.0.0"
2929
python-dotenv = "^1.0.1"
30+
numpy = "^2.1.3"
3031

3132

3233
[tool.poetry.group.dev.dependencies]

pyth_observer/__init__.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ async def run(self):
7777
coingecko_prices, coingecko_updates = await self.get_coingecko_prices()
7878
crosschain_prices = await self.get_crosschain_prices()
7979

80+
failed_checks = []
8081
for product in products:
8182
# Skip tombstone accounts with blank metadata
8283
if "base" not in product.attrs:
@@ -159,10 +160,16 @@ async def run(self):
159160
)
160161
)
161162

162-
await self.dispatch.run(states)
163+
cur_failed_checks = await self.dispatch.run(states)
164+
if cur_failed_checks:
165+
failed_checks.extend(cur_failed_checks)
163166

167+
if failed_checks:
168+
logger.error(f"Failed checks: {len(failed_checks)}")
169+
else:
170+
logger.info("All checks passed")
164171
logger.debug("Sleeping...")
165-
await asyncio.sleep(5)
172+
await asyncio.sleep(30)
166173

167174
async def get_pyth_products(self) -> List[PythProductAccount]:
168175
logger.debug("Fetching Pyth product accounts...")

pyth_observer/check/price_feed.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,13 @@ class PriceFeedState:
3232

3333
@runtime_checkable
3434
class PriceFeedCheck(Protocol):
35-
def __init__(self, state: PriceFeedState, config: PriceFeedCheckConfig):
36-
...
35+
def __init__(self, state: PriceFeedState, config: PriceFeedCheckConfig): ...
3736

38-
def state(self) -> PriceFeedState:
39-
...
37+
def state(self) -> PriceFeedState: ...
4038

41-
def run(self) -> bool:
42-
...
39+
def run(self) -> bool: ...
4340

44-
def error_message(self) -> dict:
45-
...
41+
def error_message(self) -> dict: ...
4642

4743

4844
class PriceFeedOfflineCheck(PriceFeedCheck):
@@ -281,8 +277,8 @@ def error_message(self) -> dict:
281277

282278

283279
PRICE_FEED_CHECKS = [
284-
PriceFeedCoinGeckoCheck,
285-
PriceFeedCrossChainDeviationCheck,
286-
PriceFeedCrossChainOnlineCheck,
287-
PriceFeedOfflineCheck,
280+
# PriceFeedCoinGeckoCheck,
281+
# PriceFeedCrossChainDeviationCheck,
282+
# PriceFeedCrossChainOnlineCheck,
283+
# PriceFeedOfflineCheck,
288284
]

pyth_observer/check/publisher.py

Lines changed: 66 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,35 @@
1+
from collections import defaultdict, deque
12
import time
2-
from dataclasses import dataclass
3+
from dataclasses import asdict, dataclass
34
from datetime import datetime
4-
from typing import Dict, Protocol, runtime_checkable
5+
from typing import Dict, List, Optional, Protocol, runtime_checkable
56
from zoneinfo import ZoneInfo
6-
7+
from loguru import logger
78
from pythclient.calendar import is_market_open
89
from pythclient.pythaccounts import PythPriceStatus
910
from pythclient.solana import SolanaPublicKey
1011

12+
13+
@dataclass
14+
class PriceUpdate:
15+
"""Represents a single price with its timestamp (epoch seconds)."""
16+
17+
timestamp: int
18+
price: float
19+
20+
1121
PUBLISHER_EXCLUSION_DISTANCE = 25
22+
PUBLISHER_CACHE_MAX_LEN = 30
23+
"""Roughly 30 mins of updates, since the check runs about once a minute"""
1224

13-
PUBLISHER_CACHE = {}
25+
PUBLISHER_CACHE: Dict[tuple[str, str], List[PriceUpdate]] = defaultdict(
26+
lambda: deque(maxlen=PUBLISHER_CACHE_MAX_LEN)
27+
)
28+
"""
29+
Cache that holds tuples of (price, timestamp) for publisher/feed combos as they stream in.
30+
Entries longer than `PUBLISHER_CACHE_MAX_LEN` are automatically pruned.
31+
Used by the PublisherStalledCheck to detect stalls in prices.
32+
"""
1433

1534

1635
@dataclass
@@ -35,17 +54,13 @@ class PublisherState:
3554

3655
@runtime_checkable
3756
class PublisherCheck(Protocol):
38-
def __init__(self, state: PublisherState, config: PublisherCheckConfig):
39-
...
57+
def __init__(self, state: PublisherState, config: PublisherCheckConfig): ...
4058

41-
def state(self) -> PublisherState:
42-
...
59+
def state(self) -> PublisherState: ...
4360

44-
def run(self) -> bool:
45-
...
61+
def run(self) -> bool: ...
4662

47-
def error_message(self) -> dict:
48-
...
63+
def error_message(self) -> dict: ...
4964

5065

5166
class PublisherWithinAggregateConfidenceCheck(PublisherCheck):
@@ -240,6 +255,20 @@ def __init__(self, state: PublisherState, config: PublisherCheckConfig):
240255
self.__abandoned_time_limit: int = int(config["abandoned_time_limit"])
241256
self.__max_slot_distance: int = int(config["max_slot_distance"])
242257

258+
from pyth_observer.check.stall_detection import (
259+
StallDetectionResult,
260+
StallDetector,
261+
) # noqa: deferred import to avoid circular import
262+
263+
self.__detector = StallDetector(
264+
stall_time_limit=self.__stall_time_limit,
265+
noise_threshold=float(config.get("noise_threshold")),
266+
min_noise_samples=int(config.get("min_noise_samples")),
267+
)
268+
269+
# Keep track of last analysis for error reporting
270+
self.__last_analysis: Optional[StallDetectionResult] = None
271+
243272
def state(self) -> PublisherState:
244273
return self.__state
245274

@@ -254,36 +283,46 @@ def run(self) -> bool:
254283

255284
distance = self.__state.latest_block_slot - self.__state.slot
256285

286+
# Pass for redemption rates because they are expected to be static for long periods
287+
if self.__state.asset_type == "Crypto Redemption Rate":
288+
logger.info(f"Redemption rate: Skipping {self.__state.symbol}")
289+
return True
290+
257291
# Pass when publisher is offline because PublisherOfflineCheck will be triggered
258292
if distance >= self.__max_slot_distance:
259293
return True
260294

261-
publisher_key = (self.__state.publisher_name, self.__state.symbol)
262295
current_time = int(time.time())
263-
previous_price, last_change_time = PUBLISHER_CACHE.get(
264-
publisher_key, (None, None)
265-
)
266296

267-
if previous_price is None or self.__state.price != previous_price:
268-
PUBLISHER_CACHE[publisher_key] = (self.__state.price, current_time)
269-
return True
297+
publisher_key = (self.__state.publisher_name, self.__state.symbol)
298+
PUBLISHER_CACHE[publisher_key].append(
299+
PriceUpdate(current_time, self.__state.price)
300+
),
301+
updates = PUBLISHER_CACHE[publisher_key]
270302

271-
time_since_last_change = current_time - last_change_time
272-
if time_since_last_change > self.__stall_time_limit:
273-
if time_since_last_change > self.__abandoned_time_limit:
274-
return True # Abandon this check after the abandoned time limit
275-
return False
303+
# Analyze for stalls
304+
result = self.__detector.analyze_updates(list(updates))
305+
logger.debug(f"Stall detection result: {result}")
306+
307+
self.__last_analysis = result # For error logging
308+
309+
# If we've been stalled for too long, abandon this check
310+
if result.is_stalled and result.duration > self.__abandoned_time_limit:
311+
return True
276312

277-
return True
313+
return not result.is_stalled
278314

279315
def error_message(self) -> dict:
316+
stall_duration = f"{self.__last_analysis.duration:.1f} seconds"
280317
return {
281-
"msg": f"{self.__state.publisher_name} has been publishing the same price for too long.",
318+
"msg": f"{self.__state.publisher_name} has been publishing the same price of {self.__state.symbol} for {stall_duration}",
282319
"type": "PublisherStalledCheck",
283320
"publisher": self.__state.publisher_name,
284321
"symbol": self.__state.symbol,
285322
"price": self.__state.price,
286-
"stall_duration": f"{int(time.time()) - PUBLISHER_CACHE[(self.__state.publisher_name, self.__state.symbol)][1]} seconds",
323+
"stall_type": self.__last_analysis.stall_type,
324+
"stall_duration": stall_duration,
325+
"analysis": asdict(self.__last_analysis),
287326
}
288327

289328

0 commit comments

Comments
 (0)