Skip to content

Commit 14aec04

Browse files
authored
Merge pull request ethereum#1205 from carver/exchange-patch
ExchangeHandler stats fix
2 parents 2ed5b14 + 1839fb1 commit 14aec04

File tree

10 files changed

+181
-34
lines changed

10 files changed

+181
-34
lines changed

p2p/protocol.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from abc import ABC, abstractmethod
1+
from abc import ABC
22
import logging
33
import struct
44
from typing import (
@@ -135,17 +135,14 @@ class BaseRequest(ABC, Generic[TRequestPayload]):
135135
Must define command_payload during init. This is the data that will
136136
be sent to the peer with the request command.
137137
"""
138+
# Defined at init time, with specific parameters:
138139
command_payload: TRequestPayload
139140

140-
@property
141-
@abstractmethod
142-
def cmd_type(self) -> Type[Command]:
143-
raise NotImplementedError
144-
145-
@property
146-
@abstractmethod
147-
def response_type(self) -> Type[Command]:
148-
raise NotImplementedError
141+
# Defined as class attributes in subclasses
142+
# outbound command type
143+
cmd_type: Type[Command]
144+
# response command type
145+
response_type: Type[Command]
149146

150147

151148
class Protocol:
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import asyncio
2+
3+
from eth_utils import to_tuple
4+
from eth.rlp.headers import BlockHeader
5+
from p2p.peer import (
6+
PeerSubscriber,
7+
)
8+
import pytest
9+
10+
from trinity.protocol.les.commands import GetBlockHeaders
11+
from trinity.protocol.eth.peer import ETHPeer
12+
from trinity.protocol.les.peer import LESPeer
13+
14+
from tests.trinity.core.peer_helpers import (
15+
get_directly_linked_peers,
16+
)
17+
18+
19+
class RequestIDMonitor(PeerSubscriber):
20+
subscription_msg_types = {GetBlockHeaders}
21+
msg_queue_maxsize = 100
22+
23+
async def next_request_id(self):
24+
msg = await self.msg_queue.get()
25+
return msg.payload['request_id']
26+
27+
28+
@to_tuple
29+
def mk_header_chain(length):
30+
assert length >= 1
31+
genesis = BlockHeader(difficulty=100, block_number=0, gas_limit=3000000)
32+
yield genesis
33+
parent = genesis
34+
if length == 1:
35+
return
36+
37+
for i in range(length - 1):
38+
header = BlockHeader(
39+
difficulty=100,
40+
block_number=parent.block_number + 1,
41+
parent_hash=parent.hash,
42+
gas_limit=3000000,
43+
)
44+
yield header
45+
parent = header
46+
47+
48+
@pytest.fixture
49+
async def eth_peer_and_remote(request, event_loop):
50+
peer, remote = await get_directly_linked_peers(
51+
request,
52+
event_loop,
53+
peer1_class=ETHPeer,
54+
peer2_class=ETHPeer,
55+
)
56+
return peer, remote
57+
58+
59+
@pytest.fixture
60+
async def les_peer_and_remote(request, event_loop):
61+
peer, remote = await get_directly_linked_peers(
62+
request,
63+
event_loop,
64+
peer1_class=LESPeer,
65+
peer2_class=LESPeer,
66+
)
67+
return peer, remote
68+
69+
70+
@pytest.mark.asyncio
71+
async def test_eth_get_headers_empty_stats(eth_peer_and_remote):
72+
peer, remote = eth_peer_and_remote
73+
stats = peer.requests.get_stats()
74+
assert all(status == 'Uninitialized' for status in stats.values())
75+
assert 'BlockHeaders' in stats.keys()
76+
77+
78+
@pytest.mark.asyncio
79+
async def test_eth_get_headers_stats(eth_peer_and_remote):
80+
peer, remote = eth_peer_and_remote
81+
82+
async def send_headers():
83+
remote.sub_proto.send_block_headers(mk_header_chain(1))
84+
85+
for idx in range(1, 5):
86+
get_headers_task = asyncio.ensure_future(peer.requests.get_block_headers(0, 1, 0, False))
87+
asyncio.ensure_future(send_headers())
88+
89+
await get_headers_task
90+
91+
stats = peer.requests.get_stats()
92+
93+
assert stats['BlockHeaders'].startswith('count={0}, items={0}, avg_rtt='.format(idx))
94+
assert stats['BlockHeaders'].endswith(', timeouts=0')
95+
96+
97+
@pytest.mark.asyncio
98+
async def test_les_get_headers_stats(les_peer_and_remote):
99+
peer, remote = les_peer_and_remote
100+
101+
request_id_monitor = RequestIDMonitor()
102+
103+
for idx in range(1, 5):
104+
with request_id_monitor.subscribe_peer(remote):
105+
get_headers_task = asyncio.ensure_future(
106+
peer.requests.get_block_headers(0, 1, 0, False)
107+
)
108+
request_id = await request_id_monitor.next_request_id()
109+
110+
remote.sub_proto.send_block_headers(mk_header_chain(1), 0, request_id)
111+
112+
await get_headers_task
113+
114+
stats = peer.requests.get_stats()
115+
116+
assert stats['BlockHeaders'].startswith('count={0}, items={0}, avg_rtt='.format(idx))
117+
assert stats['BlockHeaders'].endswith(', timeouts=0')

trinity/protocol/common/exchanges.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@
44
Any,
55
Callable,
66
Generic,
7+
Type,
78
)
89

910
from p2p.protocol import (
1011
BaseRequest,
12+
Command,
1113
TRequestPayload,
1214
)
1315

16+
from trinity.utils.decorators import classproperty
1417
from .managers import ExchangeManager
1518
from .normalizers import BaseNormalizer
1619
from .types import (
@@ -38,6 +41,8 @@ class BaseExchange(ABC, Generic[TRequestPayload, TResponsePayload, TResult]):
3841
TResult is the response data after normalization
3942
"""
4043

44+
request_class: Type[BaseRequest[TRequestPayload]]
45+
4146
def __init__(self, mgr: ExchangeManager[TRequestPayload, TResponsePayload, TResult]) -> None:
4247
self._manager = mgr
4348

@@ -56,7 +61,7 @@ async def get_result(
5661
- the payload validator is primed with the request payload
5762
"""
5863
if not self._manager.is_running:
59-
await self._manager.launch_service(request.response_type)
64+
await self._manager.launch_service()
6065

6166
# bind the outbound request payload to the payload validator
6267
message_validator = partial(payload_validator, request.command_payload)
@@ -69,9 +74,13 @@ async def get_result(
6974
timeout=timeout
7075
)
7176

77+
@classproperty
78+
def response_cmd_type(cls) -> Type[Command]:
79+
return cls.request_class.response_type
80+
7281
@abstractmethod
7382
async def __call__(self, *args: Any, **kwargs: Any) -> None:
7483
"""
7584
Issue the request to the peer for the desired data
7685
"""
77-
raise NotImplementedError()
86+
raise NotImplementedError('__call__ must be defined on every Exchange')

trinity/protocol/common/handlers.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from typing import (
33
Any,
44
Dict,
5-
List,
5+
Set,
66
Type,
77
)
88

@@ -17,24 +17,28 @@
1717

1818

1919
class BaseExchangeHandler:
20+
_exchange_managers: Set[ExchangeManager[Any, Any, Any]]
21+
2022
@property
2123
@abstractmethod
2224
def _exchanges(self) -> Dict[str, Type[BaseExchange[Any, Any, Any]]]:
2325
pass
2426

2527
def __init__(self, peer: BasePeer) -> None:
2628
self._peer = peer
29+
self._exchange_managers = set()
2730

2831
for attr, exchange_cls in self._exchanges.items():
2932
if hasattr(self, attr):
3033
raise AttributeError(
3134
"Unable to set manager on attribute `{0}` which is already "
3235
"present on the class: {1}".format(attr, getattr(self, attr))
3336
)
34-
manager: ExchangeManager[Any, Any, Any] = ExchangeManager(self._peer, peer.cancel_token)
37+
manager: ExchangeManager[Any, Any, Any]
38+
manager = ExchangeManager(self._peer, exchange_cls.response_cmd_type, peer.cancel_token)
39+
self._exchange_managers.add(manager)
3540
exchange = exchange_cls(manager)
3641
setattr(self, attr, exchange)
3742

38-
def get_stats(self) -> List[str]:
39-
manager_attrs = self._exchanges.keys()
40-
return [getattr(self, attr).get_stats() for attr in manager_attrs]
43+
def get_stats(self) -> Dict[str, str]:
44+
return dict(exchange_manager.get_stats() for exchange_manager in self._exchange_managers)

trinity/protocol/common/managers.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class ResponseCandidateStream(
6969
#
7070
@property
7171
def subscription_msg_types(self) -> Set[Type[Command]]:
72-
return {self._response_msg_type}
72+
return {self.response_msg_type}
7373

7474
msg_queue_maxsize = 100
7575

@@ -87,7 +87,7 @@ def __init__(
8787
super().__init__(token)
8888
self._peer = peer
8989
self.response_times = ResponseTimeTracker()
90-
self._response_msg_type = response_msg_type
90+
self.response_msg_type = response_msg_type
9191

9292
async def payload_candidates(
9393
self,
@@ -105,7 +105,7 @@ async def payload_candidates(
105105

106106
@property
107107
def response_msg_name(self) -> str:
108-
return self._response_msg_type.__name__
108+
return self.response_msg_type.__name__
109109

110110
def complete_request(self, item_count: int) -> None:
111111
self.pending_request = None
@@ -123,7 +123,7 @@ async def _run(self) -> None:
123123
if peer != self._peer:
124124
self.logger.error("Unexpected peer: %s expected: %s", peer, self._peer)
125125
continue
126-
elif isinstance(cmd, self._response_msg_type):
126+
elif isinstance(cmd, self.response_msg_type):
127127
await self._handle_msg(cast(TResponsePayload, msg))
128128
else:
129129
self.logger.warning("Unexpected payload type: %s", cmd.__class__.__name__)
@@ -176,8 +176,8 @@ def _request(self, request: BaseRequest[TRequestPayload]) -> None:
176176
def _is_pending(self) -> bool:
177177
return self.pending_request is not None
178178

179-
def get_stats(self) -> str:
180-
return '%s: %s' % (self.response_msg_name, self.response_times.get_stats())
179+
def get_stats(self) -> Tuple[str, str]:
180+
return (self.response_msg_name, self.response_times.get_stats())
181181

182182

183183
class ExchangeManager(Generic[TRequestPayload, TResponsePayload, TResult]):
@@ -186,14 +186,16 @@ class ExchangeManager(Generic[TRequestPayload, TResponsePayload, TResult]):
186186
def __init__(
187187
self,
188188
peer: BasePeer,
189+
listening_for: Type[Command],
189190
cancel_token: CancelToken) -> None:
190191
self._peer = peer
191192
self._cancel_token = cancel_token
193+
self._response_command_type = listening_for
192194

193-
async def launch_service(self, listening_for: Type[Command]) -> None:
195+
async def launch_service(self) -> None:
194196
self._response_stream = ResponseCandidateStream(
195197
self._peer,
196-
listening_for,
198+
self._response_command_type,
197199
self._cancel_token,
198200
)
199201
self._peer.run_daemon(self._response_stream)
@@ -248,5 +250,8 @@ def service(self) -> BaseService:
248250
"""
249251
return self._response_stream
250252

251-
def get_stats(self) -> str:
252-
return self._response_stream.get_stats()
253+
def get_stats(self) -> Tuple[str, str]:
254+
if self._response_stream is None:
255+
return (self._response_command_type.__name__, 'Uninitialized')
256+
else:
257+
return self._response_stream.get_stats()

trinity/protocol/eth/exchanges.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454

5555
class GetBlockHeadersExchange(BaseGetBlockHeadersExchange):
5656
_normalizer = NoopNormalizer[Tuple[BlockHeader, ...]]()
57+
request_class = GetBlockHeadersRequest
5758

5859
async def __call__( # type: ignore
5960
self,
@@ -65,7 +66,7 @@ async def __call__( # type: ignore
6566

6667
original_request_args = (block_number_or_hash, max_headers, skip, reverse)
6768
validator = GetBlockHeadersValidator(*original_request_args)
68-
request = GetBlockHeadersRequest(*original_request_args)
69+
request = self.request_class(*original_request_args)
6970

7071
return await self.get_result(
7172
request,
@@ -81,21 +82,23 @@ async def __call__( # type: ignore
8182

8283
class GetNodeDataExchange(BaseNodeDataExchange):
8384
_normalizer = GetNodeDataNormalizer()
85+
request_class = GetNodeDataRequest
8486

8587
async def __call__(self, node_hashes: Tuple[Hash32, ...]) -> NodeDataBundles: # type: ignore
8688
validator = GetNodeDataValidator(node_hashes)
87-
request = GetNodeDataRequest(node_hashes)
89+
request = self.request_class(node_hashes)
8890
return await self.get_result(request, self._normalizer, validator, noop_payload_validator)
8991

9092

9193
class GetReceiptsExchange(BaseExchange[Tuple[Hash32, ...], ReceiptsByBlock, ReceiptsBundles]):
9294
_normalizer = ReceiptsNormalizer()
95+
request_class = GetReceiptsRequest
9396

9497
async def __call__(self, headers: Tuple[BlockHeader, ...]) -> ReceiptsBundles: # type: ignore
9598
validator = ReceiptsValidator(headers)
9699

97100
block_hashes = tuple(header.hash for header in headers)
98-
request = GetReceiptsRequest(block_hashes)
101+
request = self.request_class(block_hashes)
99102

100103
return await self.get_result(request, self._normalizer, validator, noop_payload_validator)
101104

@@ -109,11 +112,12 @@ async def __call__(self, headers: Tuple[BlockHeader, ...]) -> ReceiptsBundles:
109112

110113
class GetBlockBodiesExchange(BaseGetBlockBodiesExchange):
111114
_normalizer = GetBlockBodiesNormalizer()
115+
request_class = GetBlockBodiesRequest
112116

113117
async def __call__(self, headers: Tuple[BlockHeader, ...]) -> BlockBodyBundles: # type: ignore
114118
validator = GetBlockBodiesValidator(headers)
115119

116120
block_hashes = tuple(header.hash for header in headers)
117-
request = GetBlockBodiesRequest(block_hashes)
121+
request = self.request_class(block_hashes)
118122

119123
return await self.get_result(request, self._normalizer, validator, noop_payload_validator)

trinity/protocol/eth/peer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ class ETHPeer(BasePeer):
3131
_requests: ETHExchangeHandler = None
3232

3333
def get_extra_stats(self) -> List[str]:
34-
return self.requests.get_stats()
34+
stats_pairs = self.requests.get_stats().items()
35+
return ['%s: %s' % (cmd_name, stats) for cmd_name, stats in stats_pairs]
3536

3637
@property
3738
def requests(self) -> ETHExchangeHandler:

0 commit comments

Comments
 (0)