Skip to content

Commit 0eff378

Browse files
committed
return dict from get_stats, new get_stats test
also, a bugfix for getting stats before any requests are made
1 parent e45c722 commit 0eff378

File tree

9 files changed

+173
-23
lines changed

9 files changed

+173
-23
lines changed
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import asyncio
2+
3+
from eth_utils import to_tuple
4+
from eth.rlp.headers import BlockHeader
5+
from p2p.peer import (
6+
PeerSubscriber,
7+
)
8+
import pytest
9+
10+
from trinity.protocol.les.commands import GetBlockHeaders
11+
from trinity.protocol.eth.peer import ETHPeer
12+
from trinity.protocol.les.peer import LESPeer
13+
14+
from tests.trinity.core.peer_helpers import (
15+
get_directly_linked_peers,
16+
)
17+
18+
19+
class RequestIDMonitor(PeerSubscriber):
20+
subscription_msg_types = {GetBlockHeaders}
21+
msg_queue_maxsize = 100
22+
23+
async def next_request_id(self):
24+
msg = await self.msg_queue.get()
25+
return msg.payload['request_id']
26+
27+
28+
@to_tuple
29+
def mk_header_chain(length):
30+
assert length >= 1
31+
genesis = BlockHeader(difficulty=100, block_number=0, gas_limit=3000000)
32+
yield genesis
33+
parent = genesis
34+
if length == 1:
35+
return
36+
37+
for i in range(length - 1):
38+
header = BlockHeader(
39+
difficulty=100,
40+
block_number=parent.block_number + 1,
41+
parent_hash=parent.hash,
42+
gas_limit=3000000,
43+
)
44+
yield header
45+
parent = header
46+
47+
48+
@pytest.fixture
49+
async def eth_peer_and_remote(request, event_loop):
50+
peer, remote = await get_directly_linked_peers(
51+
request,
52+
event_loop,
53+
peer1_class=ETHPeer,
54+
peer2_class=ETHPeer,
55+
)
56+
return peer, remote
57+
58+
59+
@pytest.fixture
60+
async def les_peer_and_remote(request, event_loop):
61+
peer, remote = await get_directly_linked_peers(
62+
request,
63+
event_loop,
64+
peer1_class=LESPeer,
65+
peer2_class=LESPeer,
66+
)
67+
return peer, remote
68+
69+
70+
@pytest.mark.asyncio
71+
async def test_eth_get_headers_empty_stats(eth_peer_and_remote):
72+
peer, remote = eth_peer_and_remote
73+
stats = peer.requests.get_stats()
74+
assert all(status == 'Uninitialized' for status in stats.values())
75+
assert 'BlockHeaders' in stats.keys()
76+
77+
78+
@pytest.mark.asyncio
79+
async def test_eth_get_headers_stats(eth_peer_and_remote):
80+
peer, remote = eth_peer_and_remote
81+
82+
async def send_headers():
83+
remote.sub_proto.send_block_headers(mk_header_chain(1))
84+
85+
for idx in range(1, 5):
86+
get_headers_task = asyncio.ensure_future(peer.requests.get_block_headers(0, 1, 0, False))
87+
asyncio.ensure_future(send_headers())
88+
89+
await get_headers_task
90+
91+
stats = peer.requests.get_stats()
92+
93+
assert stats['BlockHeaders'].startswith('count={0}, items={0}, avg_rtt='.format(idx))
94+
assert stats['BlockHeaders'].endswith(', timeouts=0')
95+
96+
97+
@pytest.mark.asyncio
98+
async def test_les_get_headers_stats(les_peer_and_remote):
99+
peer, remote = les_peer_and_remote
100+
101+
request_id_monitor = RequestIDMonitor()
102+
103+
for idx in range(1, 5):
104+
with request_id_monitor.subscribe_peer(remote):
105+
get_headers_task = asyncio.ensure_future(
106+
peer.requests.get_block_headers(0, 1, 0, False)
107+
)
108+
request_id = await request_id_monitor.next_request_id()
109+
110+
remote.sub_proto.send_block_headers(mk_header_chain(1), 0, request_id)
111+
112+
await get_headers_task
113+
114+
stats = peer.requests.get_stats()
115+
116+
assert stats['BlockHeaders'].startswith('count={0}, items={0}, avg_rtt='.format(idx))
117+
assert stats['BlockHeaders'].endswith(', timeouts=0')

trinity/protocol/common/exchanges.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@
44
Any,
55
Callable,
66
Generic,
7+
Type,
78
)
89

910
from p2p.protocol import (
1011
BaseRequest,
12+
Command,
1113
TRequestPayload,
1214
)
1315

16+
from trinity.utils.decorators import classproperty
1417
from .managers import ExchangeManager
1518
from .normalizers import BaseNormalizer
1619
from .types import (
@@ -56,7 +59,7 @@ async def get_result(
5659
- the payload validator is primed with the request payload
5760
"""
5861
if not self._manager.is_running:
59-
await self._manager.launch_service(request.response_type)
62+
await self._manager.launch_service()
6063

6164
# bind the outbound request payload to the payload validator
6265
message_validator = partial(payload_validator, request.command_payload)
@@ -69,9 +72,19 @@ async def get_result(
6972
timeout=timeout
7073
)
7174

75+
@property
76+
@abstractmethod
77+
def request_class(cls) -> Type[BaseRequest[TRequestPayload]]:
78+
raise NotImplementedError('request_class must be defined on every Exchange')
79+
80+
@classproperty
81+
def response_cmd_type(cls) -> Type[Command]:
82+
# mypy is confused about the "abstract class property"
83+
return cls.request_class.response_type # type: ignore
84+
7285
@abstractmethod
7386
async def __call__(self, *args: Any, **kwargs: Any) -> None:
7487
"""
7588
Issue the request to the peer for the desired data
7689
"""
77-
raise NotImplementedError()
90+
raise NotImplementedError('__call__ must be defined on every Exchange')

trinity/protocol/common/handlers.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from typing import (
33
Any,
44
Dict,
5-
List,
65
Set,
76
Type,
87
)
@@ -35,10 +34,11 @@ def __init__(self, peer: BasePeer) -> None:
3534
"Unable to set manager on attribute `{0}` which is already "
3635
"present on the class: {1}".format(attr, getattr(self, attr))
3736
)
38-
manager: ExchangeManager[Any, Any, Any] = ExchangeManager(self._peer, peer.cancel_token)
37+
manager: ExchangeManager[Any, Any, Any]
38+
manager = ExchangeManager(self._peer, exchange_cls.response_cmd_type, peer.cancel_token)
3939
self._exchange_managers.add(manager)
4040
exchange = exchange_cls(manager)
4141
setattr(self, attr, exchange)
4242

43-
def get_stats(self) -> List[str]:
44-
return [exchange_manager.get_stats() for exchange_manager in self._exchange_managers]
43+
def get_stats(self) -> Dict[str, str]:
44+
return dict(exchange_manager.get_stats() for exchange_manager in self._exchange_managers)

trinity/protocol/common/managers.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class ResponseCandidateStream(
6969
#
7070
@property
7171
def subscription_msg_types(self) -> Set[Type[Command]]:
72-
return {self._response_msg_type}
72+
return {self.response_msg_type}
7373

7474
msg_queue_maxsize = 100
7575

@@ -87,7 +87,7 @@ def __init__(
8787
super().__init__(token)
8888
self._peer = peer
8989
self.response_times = ResponseTimeTracker()
90-
self._response_msg_type = response_msg_type
90+
self.response_msg_type = response_msg_type
9191

9292
async def payload_candidates(
9393
self,
@@ -105,7 +105,7 @@ async def payload_candidates(
105105

106106
@property
107107
def response_msg_name(self) -> str:
108-
return self._response_msg_type.__name__
108+
return self.response_msg_type.__name__
109109

110110
def complete_request(self, item_count: int) -> None:
111111
self.pending_request = None
@@ -123,7 +123,7 @@ async def _run(self) -> None:
123123
if peer != self._peer:
124124
self.logger.error("Unexpected peer: %s expected: %s", peer, self._peer)
125125
continue
126-
elif isinstance(cmd, self._response_msg_type):
126+
elif isinstance(cmd, self.response_msg_type):
127127
await self._handle_msg(cast(TResponsePayload, msg))
128128
else:
129129
self.logger.warning("Unexpected payload type: %s", cmd.__class__.__name__)
@@ -176,8 +176,8 @@ def _request(self, request: BaseRequest[TRequestPayload]) -> None:
176176
def _is_pending(self) -> bool:
177177
return self.pending_request is not None
178178

179-
def get_stats(self) -> str:
180-
return '%s: %s' % (self.response_msg_name, self.response_times.get_stats())
179+
def get_stats(self) -> Tuple[str, str]:
180+
return (self.response_msg_name, self.response_times.get_stats())
181181

182182

183183
class ExchangeManager(Generic[TRequestPayload, TResponsePayload, TResult]):
@@ -186,14 +186,16 @@ class ExchangeManager(Generic[TRequestPayload, TResponsePayload, TResult]):
186186
def __init__(
187187
self,
188188
peer: BasePeer,
189+
listening_for: Type[Command],
189190
cancel_token: CancelToken) -> None:
190191
self._peer = peer
191192
self._cancel_token = cancel_token
193+
self._response_command_type = listening_for
192194

193-
async def launch_service(self, listening_for: Type[Command]) -> None:
195+
async def launch_service(self) -> None:
194196
self._response_stream = ResponseCandidateStream(
195197
self._peer,
196-
listening_for,
198+
self._response_command_type,
197199
self._cancel_token,
198200
)
199201
self._peer.run_daemon(self._response_stream)
@@ -248,5 +250,8 @@ def service(self) -> BaseService:
248250
"""
249251
return self._response_stream
250252

251-
def get_stats(self) -> str:
252-
return self._response_stream.get_stats()
253+
def get_stats(self) -> Tuple[str, str]:
254+
if self._response_stream is None:
255+
return (self._response_command_type.__name__, 'Uninitialized')
256+
else:
257+
return self._response_stream.get_stats()

trinity/protocol/eth/exchanges.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454

5555
class GetBlockHeadersExchange(BaseGetBlockHeadersExchange):
5656
_normalizer = NoopNormalizer[Tuple[BlockHeader, ...]]()
57+
request_class = GetBlockHeadersRequest
5758

5859
async def __call__( # type: ignore
5960
self,
@@ -65,7 +66,7 @@ async def __call__( # type: ignore
6566

6667
original_request_args = (block_number_or_hash, max_headers, skip, reverse)
6768
validator = GetBlockHeadersValidator(*original_request_args)
68-
request = GetBlockHeadersRequest(*original_request_args)
69+
request = self.request_class(*original_request_args)
6970

7071
return await self.get_result(
7172
request,
@@ -81,21 +82,23 @@ async def __call__( # type: ignore
8182

8283
class GetNodeDataExchange(BaseNodeDataExchange):
8384
_normalizer = GetNodeDataNormalizer()
85+
request_class = GetNodeDataRequest
8486

8587
async def __call__(self, node_hashes: Tuple[Hash32, ...]) -> NodeDataBundles: # type: ignore
8688
validator = GetNodeDataValidator(node_hashes)
87-
request = GetNodeDataRequest(node_hashes)
89+
request = self.request_class(node_hashes)
8890
return await self.get_result(request, self._normalizer, validator, noop_payload_validator)
8991

9092

9193
class GetReceiptsExchange(BaseExchange[Tuple[Hash32, ...], ReceiptsByBlock, ReceiptsBundles]):
9294
_normalizer = ReceiptsNormalizer()
95+
request_class = GetReceiptsRequest
9396

9497
async def __call__(self, headers: Tuple[BlockHeader, ...]) -> ReceiptsBundles: # type: ignore
9598
validator = ReceiptsValidator(headers)
9699

97100
block_hashes = tuple(header.hash for header in headers)
98-
request = GetReceiptsRequest(block_hashes)
101+
request = self.request_class(block_hashes)
99102

100103
return await self.get_result(request, self._normalizer, validator, noop_payload_validator)
101104

@@ -109,11 +112,12 @@ async def __call__(self, headers: Tuple[BlockHeader, ...]) -> ReceiptsBundles:
109112

110113
class GetBlockBodiesExchange(BaseGetBlockBodiesExchange):
111114
_normalizer = GetBlockBodiesNormalizer()
115+
request_class = GetBlockBodiesRequest
112116

113117
async def __call__(self, headers: Tuple[BlockHeader, ...]) -> BlockBodyBundles: # type: ignore
114118
validator = GetBlockBodiesValidator(headers)
115119

116120
block_hashes = tuple(header.hash for header in headers)
117-
request = GetBlockBodiesRequest(block_hashes)
121+
request = self.request_class(block_hashes)
118122

119123
return await self.get_result(request, self._normalizer, validator, noop_payload_validator)

trinity/protocol/eth/peer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ class ETHPeer(BasePeer):
3131
_requests: ETHExchangeHandler = None
3232

3333
def get_extra_stats(self) -> List[str]:
34-
return self.requests.get_stats()
34+
stats_pairs = self.requests.get_stats().items()
35+
return ['%s: %s' % (cmd_name, stats) for cmd_name, stats in stats_pairs]
3536

3637
@property
3738
def requests(self) -> ETHExchangeHandler:

trinity/protocol/les/exchanges.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
class GetBlockHeadersExchange(LESExchange[Tuple[BlockHeader, ...]]):
3636
_normalizer = BlockHeadersNormalizer()
37+
request_class = GetBlockHeadersRequest
3738

3839
async def __call__( # type: ignore
3940
self,
@@ -47,7 +48,7 @@ async def __call__( # type: ignore
4748
validator = GetBlockHeadersValidator(*original_request_args)
4849

4950
command_args = original_request_args + (gen_request_id(),)
50-
request = GetBlockHeadersRequest(*command_args)
51+
request = self.request_class(*command_args)
5152

5253
return await self.get_result(
5354
request,

trinity/protocol/les/peer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ class LESPeer(BasePeer):
4242
_requests: LESExchangeHandler = None
4343

4444
def get_extra_stats(self) -> List[str]:
45-
return self.requests.get_stats()
45+
stats_pairs = self.requests.get_stats().items()
46+
return ['%s: %s' % (cmd_name, stats) for cmd_name, stats in stats_pairs]
4647

4748
@property
4849
def requests(self) -> LESExchangeHandler:

trinity/utils/decorators.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from typing import (
2+
Any,
3+
)
4+
5+
6+
class classproperty(property):
7+
def __get__(self, obj: Any, objtype: Any = None) -> Any:
8+
return super().__get__(objtype)

0 commit comments

Comments
 (0)