Skip to content

Commit 868346d

Browse files
authored
Track and report peer response size/time (#1166)
Closes: #1128
1 parent 856da17 commit 868346d

File tree

9 files changed

+97
-31
lines changed

9 files changed

+97
-31
lines changed

p2p/peer.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ def __init__(self,
201201
mac_cipher = Cipher(algorithms.AES(mac_secret), modes.ECB(), default_backend())
202202
self.mac_enc = mac_cipher.encryptor().update
203203

204+
def get_extra_stats(self) -> List[str]:
205+
return []
206+
204207
@abstractmethod
205208
async def send_sub_proto_handshake(self) -> None:
206209
raise NotImplementedError("Must be implemented by subclasses")
@@ -916,12 +919,18 @@ async def _periodically_report_stats(self) -> None:
916919

917920
self.logger.debug("== Peer details == ")
918921
for peer in self.connected_nodes.values():
922+
if not peer.is_running:
923+
self.logger.warning(
924+
"%s is no longer alive but has not been removed from pool", peer)
925+
continue
919926
most_received_type, count = max(
920927
peer.received_msgs.items(), key=operator.itemgetter(1))
921928
self.logger.debug(
922-
"%s: running=%s, uptime=%s, received_msgs=%d, most_received=%s(%d)",
923-
peer, peer.is_running, peer.uptime, peer.received_msgs_count,
929+
"%s: uptime=%s, received_msgs=%d, most_received=%s(%d)",
930+
peer, peer.uptime, peer.received_msgs_count,
924931
most_received_type, count)
932+
for line in peer.get_extra_stats():
933+
self.logger.debug(" %s", line)
925934
self.logger.debug("== End peer details == ")
926935
try:
927936
await self.wait(asyncio.sleep(self._report_interval))
@@ -1018,9 +1027,7 @@ def _test() -> None:
10181027
from eth.db.backends.memory import MemoryDB
10191028
from eth.tools.logging import TRACE_LEVEL_NUM
10201029
from trinity.protocol.eth.peer import ETHPeer
1021-
from trinity.protocol.eth.requests import HeaderRequest as ETHHeaderRequest
10221030
from trinity.protocol.les.peer import LESPeer
1023-
from trinity.protocol.les.requests import HeaderRequest as LESHeaderRequest
10241031
from tests.trinity.core.integration_test_helpers import FakeAsyncHeaderDB, connect_to_peers_loop
10251032
logging.basicConfig(level=TRACE_LEVEL_NUM, format='%(asctime)s %(levelname)s: %(message)s')
10261033

@@ -1056,21 +1063,17 @@ async def request_stuff() -> None:
10561063
peer_pool.logger.info("Waiting for peer connection...")
10571064
await asyncio.sleep(0.2)
10581065
peer = peer_pool.highest_td_peer
1059-
block_hash = decode_hex(
1060-
'0x59af08ab31822c992bb3dad92ddb68d820aa4c69e9560f07081fa53f1009b152')
1066+
headers = await peer.requests.get_block_headers(2440319, max_headers=100) # type: ignore
1067+
hashes = [header.hash for header in headers]
10611068
if peer_class == ETHPeer:
10621069
peer = cast(ETHPeer, peer)
1063-
peer.sub_proto.send_get_block_headers(ETHHeaderRequest(block_hash, 1, 0, False))
1064-
peer.sub_proto.send_get_block_bodies([block_hash])
1065-
peer.sub_proto.send_get_receipts([block_hash])
1070+
peer.sub_proto.send_get_block_bodies(hashes)
1071+
peer.sub_proto.send_get_receipts(hashes)
10661072
else:
10671073
peer = cast(LESPeer, peer)
10681074
request_id = 1
1069-
peer.sub_proto.send_get_block_headers(
1070-
LESHeaderRequest(block_hash, 1, 0, False, request_id)
1071-
)
1072-
peer.sub_proto.send_get_block_bodies([block_hash], request_id + 1)
1073-
peer.sub_proto.send_get_receipts(block_hash, request_id + 2)
1075+
peer.sub_proto.send_get_block_bodies(hashes, request_id + 1)
1076+
peer.sub_proto.send_get_receipts(hashes[0], request_id + 2)
10741077

10751078
sigint_received = asyncio.Event()
10761079
for sig in [signal.SIGINT, signal.SIGTERM]:

p2p/protocol.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def decode_payload(self, rlp_data: bytes) -> _DecodedMsgType:
8383
decoder = sedes.List(
8484
[type_ for _, type_ in self.structure], strict=self.decode_strict)
8585
try:
86-
data = rlp.decode(rlp_data, sedes=decoder)
86+
data = rlp.decode(rlp_data, sedes=decoder, recursive_cache=True)
8787
except rlp.DecodingError as err:
8888
raise MalformedMessage(
8989
"Malformed %s message: %r".format(type(self).__name__, err)

trinity/protocol/common/handlers.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import (
33
Any,
44
Dict,
5+
List,
56
Type,
67
)
78

@@ -39,3 +40,7 @@ async def _run(self) -> None:
3940

4041
async def _cleanup(self) -> None:
4142
pass
43+
44+
def get_stats(self) -> List[str]:
45+
manager_attrs = self._managers.keys()
46+
return [getattr(self, attr).get_stats() for attr in manager_attrs]

trinity/protocol/common/managers.py

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from abc import abstractmethod
22
import asyncio
3+
import time
34
from typing import (
45
cast,
56
Generic,
@@ -33,6 +34,31 @@
3334
TReturn = TypeVar('TReturn')
3435

3536

37+
class ResponseTimeTracker:
38+
39+
def __init__(self) -> None:
40+
self.total_msgs = 0
41+
self.total_items = 0
42+
self.total_timeouts = 0
43+
self.total_response_time = 0.0
44+
45+
def get_stats(self) -> str:
46+
if not self.total_msgs:
47+
return 'None'
48+
avg_rtt = self.total_response_time / self.total_msgs
49+
if not self.total_items:
50+
per_item_rtt = 0.0
51+
else:
52+
per_item_rtt = self.total_response_time / self.total_items
53+
return 'count=%d, items=%d, avg_rtt=%.2f, avg_time_per_item=%.5f, timeouts=%d' % (
54+
self.total_msgs, self.total_items, avg_rtt, per_item_rtt, self.total_timeouts)
55+
56+
def add(self, time: float, size: int) -> None:
57+
self.total_msgs += 1
58+
self.total_items += size
59+
self.total_response_time += time
60+
61+
3662
class BaseRequestManager(PeerSubscriber, BaseService, Generic[TPeer, TRequest, TResponse, TReturn]): # noqa: E501
3763
#
3864
# PeerSubscriber
@@ -51,6 +77,7 @@ def subscription_msg_types(self) -> Set[Type[Command]]:
5177

5278
def __init__(self, peer: TPeer, token: CancelToken) -> None:
5379
self._peer = peer
80+
self.response_times = ResponseTimeTracker()
5481
super().__init__(token)
5582

5683
#
@@ -61,8 +88,7 @@ async def _run(self) -> None:
6188

6289
with self.subscribe_peer(self._peer):
6390
while self.is_running:
64-
peer, cmd, msg = await self.wait(
65-
self.msg_queue.get(), token=self.cancel_token)
91+
peer, cmd, msg = await self.wait(self.msg_queue.get())
6692
if peer != self._peer:
6793
self.logger.error("Unexpected peer: %s expected: %s", peer, self._peer)
6894
continue
@@ -81,12 +107,15 @@ async def _handle_msg(self, msg: TResponse) -> None:
81107
)
82108
return
83109

110+
self.response_times.add(
111+
time.time() - self._pending_request_start, self._get_item_count(msg))
112+
84113
request, future = self.pending_request
85114

86115
try:
87116
response = await self._normalize_response(msg)
88117
except MalformedMessage as err:
89-
self.logger.warn(
118+
self.logger.warning(
90119
"Malformed response for pending %s request from peer %s, disconnecting: %s",
91120
self.response_msg_name,
92121
self._peer,
@@ -112,6 +141,10 @@ async def _handle_msg(self, msg: TResponse) -> None:
112141
async def _normalize_response(self, msg: TResponse) -> TReturn:
113142
pass
114143

144+
@abstractmethod
145+
def _get_item_count(self, msg: TResponse) -> int:
146+
pass
147+
115148
@abstractmethod
116149
def __call__(self) -> TReturn:
117150
"""
@@ -141,7 +174,23 @@ def _send_sub_proto_request(self, request: TRequest) -> None:
141174

142175
async def _wait_for_response(self,
143176
request: TRequest,
144-
timeout: int = None) -> TReturn:
177+
timeout: int) -> TReturn:
178+
future: 'asyncio.Future[TReturn]' = asyncio.Future()
179+
self._pending_request_start = time.time()
180+
self.pending_request = (request, future)
181+
182+
try:
183+
response = await self.wait(future, timeout=timeout)
184+
except TimeoutError:
185+
self.response_times.total_timeouts += 1
186+
raise
187+
finally:
188+
# Always ensure that we reset the `pending_request` to `None` on exit.
189+
self.pending_request = None
190+
191+
return response
192+
193+
async def _request_and_wait(self, request: TRequest, timeout: int=None) -> TReturn:
145194
if self.pending_request is not None:
146195
self.logger.error(
147196
"Already waiting for response to %s for peer: %s",
@@ -155,19 +204,10 @@ async def _wait_for_response(self,
155204
)
156205
)
157206

158-
future: 'asyncio.Future[TReturn]' = asyncio.Future()
159-
self.pending_request = (request, future)
160-
161-
try:
162-
response = await self.wait(future, timeout=timeout)
163-
finally:
164-
# Always ensure that we reset the `pending_request` to `None` on exit.
165-
self.pending_request = None
166-
167-
return response
168-
169-
async def _request_and_wait(self, request: TRequest, timeout: int=None) -> TReturn:
170207
if timeout is None:
171208
timeout = self.response_timout
172209
self._send_sub_proto_request(request)
173210
return await self._wait_for_response(request, timeout=timeout)
211+
212+
def get_stats(self) -> str:
213+
return '%s: %s' % (self.response_msg_name, self.response_times.get_stats())

trinity/protocol/eth/handlers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@ class ETHRequestResponseHandler(BaseRequestResponseHandler):
1414
'get_node_data': GetNodeDataRequestManager,
1515
}
1616

17+
# These are needed only to please mypy.
1718
get_block_headers: GetBlockHeadersRequestManager
1819
get_node_data: GetNodeDataRequestManager

trinity/protocol/eth/managers.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ async def _normalize_response(self,
7373
) -> Tuple[BlockHeader, ...]:
7474
return msg
7575

76+
def _get_item_count(self, msg: Tuple[BlockHeader, ...]) -> int:
77+
return len(msg)
78+
7679

7780
BaseGetNodeDataRequestManager = BaseRequestManager[
7881
'ETHPeer',
@@ -106,3 +109,6 @@ async def _normalize_response(self,
106109

107110
node_keys = await self._run_in_executor(tuple, map(keccak, msg))
108111
return tuple(zip(node_keys, msg))
112+
113+
def _get_item_count(self, msg: Tuple[bytes, ...]) -> int:
114+
return len(msg)

trinity/protocol/eth/peer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Any,
33
cast,
44
Dict,
5+
List,
56
)
67

78
from eth_utils import encode_hex
@@ -29,6 +30,9 @@ class ETHPeer(BasePeer):
2930

3031
_requests: ETHRequestResponseHandler = None
3132

33+
def get_extra_stats(self) -> List[str]:
34+
return self.requests.get_stats()
35+
3236
@property
3337
def requests(self) -> ETHRequestResponseHandler:
3438
if self._requests is None:

trinity/protocol/les/managers.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,6 @@ async def _normalize_response(self,
8181
)
8282

8383
return msg['headers']
84+
85+
def _get_item_count(self, msg: Dict[str, Any]) -> int:
86+
return len(msg['headers'])

trinity/protocol/les/peer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Any,
33
cast,
44
Dict,
5+
List,
56
)
67

78
from eth_utils import encode_hex
@@ -40,6 +41,9 @@ class LESPeer(BasePeer):
4041

4142
_requests: LESRequestResponseHandler = None
4243

44+
def get_extra_stats(self) -> List[str]:
45+
return self.requests.get_stats()
46+
4347
@property
4448
def requests(self) -> LESRequestResponseHandler:
4549
if self._requests is None:

0 commit comments

Comments
 (0)