Skip to content

Commit 7d085bd

Browse files
authored
Use 99th percentile and standard deviation to dynamically tune peer timeout (#1553)
* implement percentile tracking for peer round trip request/response times * Use dynamic timeout values for peer requests based on historical performance * linting * Convert Percentile and StandardDeviation to operate on a historical window * PR feedback
1 parent d053f8b commit 7d085bd

File tree

7 files changed

+176
-19
lines changed

7 files changed

+176
-19
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import pytest
2+
3+
from trinity.utils.percentile import Percentile
4+
5+
6+
@pytest.mark.parametrize(
7+
'data,percentile,window_size,expected',
8+
(
9+
(range(6), 0.2, 6, 1),
10+
(range(11), 0.4, 11, 4),
11+
(range(11), 0.2, 6, 6),
12+
),
13+
)
14+
def test_percentile_class(data, percentile, window_size, expected):
15+
percentile = Percentile(percentile=percentile, window_size=window_size)
16+
for value in data:
17+
percentile.update(value)
18+
19+
assert percentile.value == expected
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import pytest
2+
3+
from trinity.utils.stddev import StandardDeviation
4+
5+
6+
@pytest.mark.parametrize(
7+
"data,expected",
8+
(
9+
((4, 2, 5, 8, 6), 2.23606),
10+
((1.5, 1.8, 7, 1.2, 1.35), 2.4863),
11+
((2, 2, 2, 2, 2), 0),
12+
((1, 3, 5, 7, 9), 3.1622),
13+
((100, 200, 300, 400, 500, 1, 3, 5, 7, 9), 3.1622),
14+
),
15+
)
16+
def test_standard_deviation(data, expected):
17+
stddev = StandardDeviation(window_size=5)
18+
19+
for value in data:
20+
stddev.update(value)
21+
22+
assert abs(stddev.value - expected) < 0.01

trinity/protocol/common/managers.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,29 +79,45 @@ async def payload_candidates(
7979
To mark a response as valid, use `complete_request`. After that call, payload
8080
candidates will stop arriving.
8181
"""
82-
if timeout is None:
83-
timeout = self.response_timeout
82+
outer_timeout = self.response_timeout if timeout is None else timeout
8483

8584
start_at = time.perf_counter()
8685

8786
# The _lock ensures that we never have two concurrent requests to a
8887
# single peer for a single command pair in flight.
8988
try:
90-
await self.wait(self._lock.acquire(), timeout=timeout)
89+
await self.wait(self._lock.acquire(), timeout=outer_timeout)
9190
except TimeoutError:
9291
raise AlreadyWaiting(
9392
f"Timed out waiting for {self.response_msg_name} request lock "
9493
f"or peer: {self._peer}"
9594
)
9695

96+
if timeout is not None or tracker.total_msgs < 20:
97+
inner_timeout = outer_timeout
98+
else:
99+
# We compute a timeout based on the historical performance
100+
# of the peer defined as three standard deviations above
101+
# the response time for the 99th percentile of requests.
102+
try:
103+
rtt_99th = tracker.round_trip_99th.value
104+
rtt_stddev = tracker.round_trip_stddev.value
105+
except ValueError:
106+
inner_timeout = outer_timeout
107+
else:
108+
inner_timeout = rtt_99th + 3 * rtt_stddev
109+
97110
try:
98111
self._request(request)
99112
while self._is_pending():
100-
timeout_remaining = max(0, timeout - (time.perf_counter() - start_at))
113+
timeout_remaining = max(0, outer_timeout - (time.perf_counter() - start_at))
114+
115+
payload_timeout = min(inner_timeout, timeout_remaining)
116+
101117
try:
102-
yield await self._get_payload(timeout_remaining)
118+
yield await self._get_payload(payload_timeout)
103119
except TimeoutError:
104-
tracker.record_timeout(timeout)
120+
tracker.record_timeout()
105121
raise
106122
finally:
107123
self._lock.release()

trinity/protocol/common/trackers.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
from trinity.utils.ema import EMA
1515
from trinity.utils.logging import HasTraceLogger
16+
from trinity.utils.percentile import Percentile
17+
from trinity.utils.stddev import StandardDeviation
1618
from .constants import ROUND_TRIP_TIMEOUT
1719
from .types import (
1820
TResult,
@@ -35,8 +37,10 @@ def __init__(self) -> None:
3537
# empty responses.
3638
self.response_quality_ema = EMA(initial_value=0, smoothing_factor=0.05)
3739

38-
# an EMA of the round trip request/response time
40+
# Metrics for the round trip request/response time
3941
self.round_trip_ema = EMA(initial_value=ROUND_TRIP_TIMEOUT, smoothing_factor=0.05)
42+
self.round_trip_99th = Percentile(percentile=0.99, window_size=200)
43+
self.round_trip_stddev = StandardDeviation(window_size=200)
4044

4145
# an EMA of the items per second
4246
self.items_per_second_ema = EMA(initial_value=0, smoothing_factor=0.05)
@@ -76,39 +80,43 @@ def get_stats(self) -> str:
7680
"""
7781
if not self.total_msgs:
7882
return 'None'
79-
avg_rtt = self.total_response_time / self.total_msgs
80-
if not self.total_response_time:
81-
items_per_second = 0.0
82-
else:
83-
items_per_second = self.total_items / self.total_response_time
83+
84+
try:
85+
rt99 = self.round_trip_99th.value
86+
except ValueError:
87+
rt99 = 0
88+
89+
try:
90+
rt_stddev = self.round_trip_stddev.value
91+
except ValueError:
92+
rt_stddev = 0
8493

8594
# msgs: total number of messages
8695
# items: total number of items
87-
# rtt: round-trip-time (avg/ema)
88-
# ips: items-per-second (avg/ema)
96+
# rtt: round-trip-time (ema/99th/stddev)
97+
# ips: items-per-second (ema)
8998
# timeouts: total number of timeouts
9099
# missing: total number of missing response items
91100
# quality: 0-100 for how complete responses are
92101
return (
93-
'msgs=%d items=%d rtt=%.2f/%.2f ips=%.5f/%.5f '
102+
'msgs=%d items=%d rtt=%.2f/%.2f/%.2f ips=%.5f '
94103
'timeouts=%d quality=%d'
95104
) % (
96105
self.total_msgs,
97106
self.total_items,
98-
avg_rtt,
99107
self.round_trip_ema.value,
100-
items_per_second,
108+
rt99,
109+
rt_stddev,
101110
self.items_per_second_ema.value,
102111
self.total_timeouts,
103112
int(self.response_quality_ema.value),
104113
)
105114

106-
def record_timeout(self, timeout: float) -> None:
115+
def record_timeout(self) -> None:
107116
self.total_msgs += 1
108117
self.total_timeouts += 1
109118
self.response_quality_ema.update(0)
110119
self.items_per_second_ema.update(0)
111-
self.round_trip_ema.update(timeout)
112120

113121
def record_response(self,
114122
elapsed: float,
@@ -148,7 +156,10 @@ def record_response(self,
148156

149157
self.total_items += num_items
150158
self.total_response_time += elapsed
159+
151160
self.round_trip_ema.update(elapsed)
161+
self.round_trip_99th.update(elapsed)
162+
self.round_trip_stddev.update(elapsed)
152163

153164
if elapsed > 0:
154165
throughput = num_items / elapsed

trinity/sync/full/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
# How old (in seconds) must our local head be to cause us to start with a
22
# fast-sync before we switch to regular-sync.
33
FAST_SYNC_CUTOFF = 60 * 60 * 24
4+
FAST_SYNC_CUTOFF = 60

trinity/utils/percentile.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import bisect
2+
import collections
3+
import math
4+
from typing import List, Union, Deque
5+
6+
7+
class Percentile:
8+
"""
9+
Track a specific percentile across a window of recent data.
10+
11+
https://en.wikipedia.org/wiki/Percentile
12+
"""
13+
def __init__(self, percentile: float, window_size: int) -> None:
14+
if percentile < 0 or percentile > 1:
15+
raise ValueError("Invalid: percentile must be in the range [0, 1]")
16+
self.window: List[Union[int, float]] = []
17+
self.history: Deque[Union[int, float]] = collections.deque()
18+
self.percentile = percentile
19+
self.window_size = window_size
20+
21+
@property
22+
def value(self) -> float:
23+
"""
24+
The current approximation for the tracked percentile.
25+
"""
26+
if not self.window:
27+
raise ValueError("No data for percentile calculation")
28+
29+
idx = (len(self.window) - 1) * self.percentile
30+
if int(idx) == idx:
31+
return self.window[int(idx)]
32+
33+
left = int(math.floor(idx))
34+
right = int(math.ceil(idx))
35+
36+
left_part = self.window[int(left)] * (right - idx)
37+
right_part = self.window[int(right)] * (idx - left)
38+
39+
return left_part + right_part
40+
41+
def update(self, value: Union[int, float]) -> None:
42+
bisect.insort(self.window, value)
43+
self.history.append(value)
44+
45+
while len(self.history) > self.window_size:
46+
to_discard = self.history.popleft()
47+
window_idx = bisect.bisect_left(self.window, to_discard)
48+
discarded = self.window.pop(window_idx)
49+
if discarded != to_discard:
50+
raise ValueError(
51+
"The value popped from the `window` does not match the "
52+
"expected value"
53+
)

trinity/utils/stddev.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import collections
2+
import math
3+
from typing import Union, Deque
4+
5+
6+
class StandardDeviation:
7+
"""
8+
https://stackoverflow.com/questions/5543651/computing-standard-deviation-in-a-stream
9+
10+
Tracks standard deviation on a stream of data.
11+
"""
12+
def __init__(self, window_size: int) -> None:
13+
self.window: Deque[Union[int, float]] = collections.deque()
14+
self.window_size = window_size
15+
16+
def update(self, value: Union[int, float]) -> None:
17+
self.window.append(value)
18+
19+
while len(self.window) > self.window_size:
20+
self.window.popleft()
21+
22+
@property
23+
def value(self) -> float:
24+
num_values = len(self.window)
25+
26+
if num_values < 2:
27+
raise ValueError("No data")
28+
29+
sum_of_values = sum(self.window)
30+
sum_of_squared_values = sum(item * item for item in self.window)
31+
32+
return math.sqrt(
33+
(num_values * sum_of_squared_values - sum_of_values ** 2) /
34+
(num_values * (num_values - 1))
35+
)

0 commit comments

Comments
 (0)