Skip to content

Commit 33a149e

Browse files
committed
fix(observer): fix exact stall, add test
1 parent fdb4397 commit 33a149e

File tree

3 files changed

+36
-11
lines changed

3 files changed

+36
-11
lines changed

pyth_observer/check/publisher.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -293,13 +293,17 @@ def run(self) -> bool:
293293
current_time = int(time.time())
294294

295295
publisher_key = (self.__state.publisher_name, self.__state.symbol)
296-
PUBLISHER_CACHE[publisher_key].append(
297-
PriceUpdate(current_time, self.__state.price)
298-
)
299296
updates = PUBLISHER_CACHE[publisher_key]
300297

298+
# Only cache new prices, let repeated prices grow stale.
299+
# These will be caught as an exact stall in the detector.
300+
is_repeated_price = updates and updates[-1].price == self.__state.price
301+
cur_update = PriceUpdate(current_time, self.__state.price)
302+
if not is_repeated_price:
303+
PUBLISHER_CACHE[publisher_key].append(cur_update)
304+
301305
# Analyze for stalls
302-
result = self.__detector.analyze_updates(list(updates))
306+
result = self.__detector.analyze_updates(list(updates), cur_update)
303307
logger.debug(f"Stall detection result: {result}")
304308

305309
self.__last_analysis = result # For error logging

pyth_observer/check/stall_detection.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,33 +78,36 @@ def __init__(
7878
self.noise_threshold = noise_threshold
7979
self.min_noise_samples = min_noise_samples
8080

81-
def analyze_updates(self, updates: List[PriceUpdate]) -> StallDetectionResult:
81+
def analyze_updates(
82+
self, updates: List[PriceUpdate], cur_update: PriceUpdate
83+
) -> StallDetectionResult:
8284
"""
8385
Assumes that the cache has been recently updated since it takes the latest
8486
cached timestamp as the current time.
8587
8688
Args:
8789
updates: List of price updates to analyze
90+
cur_update: The update currently being processed. If it's a repeated price,
91+
the update won't be in `updates`, so we need it as a separate parameter.
8892
8993
Returns:
9094
StallDetectionResult with detection details
9195
"""
92-
# Need at least 2 samples
93-
if not updates or len(updates) < 2:
96+
# Need at least 1 sample
97+
if not updates:
9498
return StallDetectionResult.no_stall()
9599

96100
## Check for exact stall
97101

98102
# The latest 2 updates are sufficient to detect an exact stall
99-
latest_updates = updates[-2:]
100-
duration = latest_updates[1].timestamp - latest_updates[0].timestamp
103+
duration = cur_update.timestamp - updates[-1].timestamp
101104
if duration <= self.stall_time_limit:
102105
return StallDetectionResult.no_stall()
103-
elif latest_updates[1].price == latest_updates[0].price:
106+
elif cur_update.price == updates[-1].price:
104107
return StallDetectionResult(
105108
is_stalled=True,
106109
stall_type="exact",
107-
base_price=latest_updates[1].price,
110+
base_price=cur_update.price,
108111
noise_magnitude=0.0,
109112
duration=duration,
110113
confidence=1.0,

tests/test_checks_publisher.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,24 @@ def test_exact_stall_fails_check(self):
114114
check_b = self.setup_check(state_b, stall_time_limit=5)
115115
self.run_check(check_b, 6, False) # Should fail as it exceeds the limit
116116

117+
PUBLISHER_CACHE.clear()
118+
state_c = make_publisher_state(1, 100.0, 2.0, 1, 100.0, 1.0)
119+
check_c = self.setup_check(state_c, stall_time_limit=5)
120+
self.run_check(check_c, 2, True) # Initial check should pass
121+
state_c.price = 105.0 # Change the price
122+
self.run_check(check_c, 3, True) # Should pass as price changes
123+
state_c.price = 100.0 # Change back to original price
124+
# Simulate a stall -- send the same price repeatedly.
125+
self.run_check(check_c, 2, True)
126+
state_c.price = 100.0
127+
self.run_check(check_c, 2, True)
128+
state_c.price = 100.0
129+
self.run_check(check_c, 2, True)
130+
state_c.price = 100.0
131+
self.run_check(
132+
check_c, 2, False
133+
) # Should fail since we breached the stall time limit
134+
117135
PUBLISHER_CACHE.clear()
118136
state_c = make_publisher_state(1, 100.0, 2.0, 1, 100.0, 1.0)
119137
check_c = self.setup_check(state_c, stall_time_limit=5)

0 commit comments

Comments
 (0)