Skip to content

Commit 4531855

Browse files
committed
Major refactor of Request/Response cycle
- Get a small, simple request loop definition - Component methods have access to a lot less object state - Unfortunately it split things even a bit more, into: - Exchanges - Normalizers - Validators - Get proper type checking on the call
1 parent 7508476 commit 4531855

36 files changed

+1073
-800
lines changed

p2p/peer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ def decrypt_header(self, data: bytes) -> bytes:
478478
self.ingress_mac.update(sxor(aes, header_ciphertext))
479479
expected_header_mac = self.ingress_mac.digest()[:HEADER_LEN]
480480
if not bytes_eq(expected_header_mac, header_mac):
481-
raise DecryptionError('Invalid header mac: expected %s, got %s'.format(
481+
raise DecryptionError('Invalid header mac: expected {}, got {}'.format(
482482
expected_header_mac, header_mac))
483483
return self.aes_dec.update(header_ciphertext)
484484

@@ -1066,8 +1066,8 @@ async def request_stuff() -> None:
10661066
hashes = tuple(header.hash for header in headers)
10671067
if peer_class == ETHPeer:
10681068
peer = cast(ETHPeer, peer)
1069-
peer.sub_proto._send_get_block_bodies(hashes)
1070-
peer.sub_proto._send_get_receipts(hashes)
1069+
peer.sub_proto.send_get_block_bodies(hashes)
1070+
peer.sub_proto.send_get_receipts(hashes)
10711071
else:
10721072
peer = cast(LESPeer, peer)
10731073
request_id = 1

p2p/protocol.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1+
from abc import ABC, abstractmethod
12
import logging
23
import struct
34
from typing import (
45
Any,
56
Dict,
7+
Generic,
68
List,
79
Tuple,
810
Type,
11+
TypeVar,
912
TYPE_CHECKING,
1013
Union,
1114
)
@@ -123,6 +126,27 @@ def encode(self, data: _DecodedMsgType) -> Tuple[bytes, bytes]:
123126
return header, body
124127

125128

129+
TCommandPayload = TypeVar('TCommandPayload', bound=_DecodedMsgType)
130+
131+
132+
class BaseRequest(ABC, Generic[TCommandPayload]):
133+
"""
134+
Must define command_payload during init. This is the data that will
135+
be sent to the peer with the request command.
136+
"""
137+
command_payload: TCommandPayload
138+
139+
@property
140+
@abstractmethod
141+
def cmd_type(self) -> Type[Command]:
142+
raise NotImplementedError
143+
144+
@property
145+
@abstractmethod
146+
def response_type(self) -> Type[Command]:
147+
raise NotImplementedError
148+
149+
126150
class Protocol:
127151
logger = logging.getLogger("p2p.protocol.Protocol")
128152
name: str = None
@@ -135,11 +159,20 @@ def __init__(self, peer: 'BasePeer', cmd_id_offset: int) -> None:
135159
self.peer = peer
136160
self.cmd_id_offset = cmd_id_offset
137161
self.commands = [cmd_class(cmd_id_offset) for cmd_class in self._commands]
162+
self.cmd_by_type = {cmd_class: cmd_class(cmd_id_offset) for cmd_class in self._commands}
138163
self.cmd_by_id = dict((cmd.cmd_id, cmd) for cmd in self.commands)
139164

140165
def send(self, header: bytes, body: bytes) -> None:
141166
self.peer.send(header, body)
142167

168+
def send_request(self, request: BaseRequest[_DecodedMsgType]) -> None:
169+
command = self.cmd_by_type[request.cmd_type]
170+
header, body = command.encode(request.command_payload)
171+
self.send(header, body)
172+
173+
def supports_command(self, cmd_type: Type[Command]) -> bool:
174+
return cmd_type in self.cmd_by_type
175+
143176
def __repr__(self) -> str:
144177
return "(%s, %d)" % (self.name, self.version)
145178

tests/p2p/test_peer_collect_sub_proto_msgs.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,6 @@
55

66
from trinity.protocol.eth.peer import ETHPeer
77
from trinity.protocol.eth.commands import GetBlockHeaders, GetNodeData
8-
from trinity.protocol.eth.requests import (
9-
HeaderRequest,
10-
NodeDataRequest,
11-
)
128

139
from tests.trinity.core.peer_helpers import (
1410
get_directly_linked_peers,
@@ -30,11 +26,11 @@ async def test_peer_subscriber_filters_messages(request, event_loop):
3026

3127
with peer.collect_sub_proto_messages() as collector:
3228
assert collector in peer._subscribers
33-
remote.sub_proto.send_get_node_data(NodeDataRequest([b'\x00' * 32]))
34-
remote.sub_proto.send_get_block_headers(HeaderRequest(0, 1, 0, False))
35-
remote.sub_proto.send_get_node_data(NodeDataRequest([b'\x00' * 32]))
36-
remote.sub_proto.send_get_block_headers(HeaderRequest(1, 1, 0, False))
37-
remote.sub_proto.send_get_node_data(NodeDataRequest([b'\x00' * 32]))
29+
remote.sub_proto.send_get_node_data([b'\x00' * 32])
30+
remote.sub_proto.send_get_block_headers(0, 1, 0, False)
31+
remote.sub_proto.send_get_node_data([b'\x00' * 32])
32+
remote.sub_proto.send_get_block_headers(1, 1, 0, False)
33+
remote.sub_proto.send_get_node_data([b'\x00' * 32])
3834
await asyncio.sleep(0.01)
3935

4036
assert collector not in peer._subscribers
@@ -51,7 +47,7 @@ async def test_peer_subscriber_filters_messages(request, event_loop):
5147
assert isinstance(all_messages[4][1], GetNodeData)
5248

5349
# make sure it isn't still collecting
54-
remote.sub_proto.send_get_block_headers(HeaderRequest(1, 1, 0, False))
50+
remote.sub_proto.send_get_block_headers(1, 1, 0, False)
5551

5652
await asyncio.sleep(0.01)
5753

tests/p2p/test_peer_subscriber.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@
88

99
from trinity.protocol.eth.peer import ETHPeer
1010
from trinity.protocol.eth.commands import GetBlockHeaders
11-
from trinity.protocol.eth.requests import (
12-
HeaderRequest,
13-
NodeDataRequest,
14-
)
1511

1612
from tests.trinity.core.peer_helpers import (
1713
get_directly_linked_peers,
@@ -48,11 +44,11 @@ async def test_peer_subscriber_filters_messages(request, event_loop):
4844
peer.add_subscriber(header_subscriber)
4945
peer.add_subscriber(all_subscriber)
5046

51-
remote.sub_proto.send_get_node_data(NodeDataRequest([b'\x00' * 32]))
52-
remote.sub_proto.send_get_block_headers(HeaderRequest(0, 1, 0, False))
53-
remote.sub_proto.send_get_node_data(NodeDataRequest([b'\x00' * 32]))
54-
remote.sub_proto.send_get_block_headers(HeaderRequest(1, 1, 0, False))
55-
remote.sub_proto.send_get_node_data(NodeDataRequest([b'\x00' * 32]))
47+
remote.sub_proto.send_get_node_data(tuple([b'\x00' * 32]))
48+
remote.sub_proto.send_get_block_headers(0, 1, 0, False)
49+
remote.sub_proto.send_get_node_data(tuple([b'\x00' * 32]))
50+
remote.sub_proto.send_get_block_headers(1, 1, 0, False)
51+
remote.sub_proto.send_get_node_data(tuple([b'\x00' * 32]))
5652

5753
# yeild to let remote and peer transmit.
5854
await asyncio.sleep(0.01)

tests/trinity/core/p2p-proto/test_block_bodies_request_object.py

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from eth.rlp.transactions import BaseTransactionFields
2020

2121
from trinity.rlp.block_body import BlockBody
22-
from trinity.protocol.eth.requests import BlockBodiesRequest
22+
from trinity.protocol.eth.validators import GetBlockBodiesValidator
2323

2424

2525
def mk_uncle(block_number):
@@ -76,32 +76,29 @@ def mk_headers(*counts):
7676
def test_block_bodies_request_empty_response_is_valid():
7777
headers_bundle = mk_headers((2, 3), (8, 4), (0, 1), (0, 0))
7878
headers, _, _, _, _ = zip(*headers_bundle)
79-
request = BlockBodiesRequest(headers)
80-
request.validate_response(tuple(), tuple())
79+
request = GetBlockBodiesValidator(headers)
80+
request.validate_result(tuple())
8181

8282

8383
def test_block_bodies_request_valid_with_full_response():
8484
headers_bundle = mk_headers((2, 3), (8, 4), (0, 1), (0, 0))
8585
headers, bodies, transactions_roots, trie_data_dicts, uncles_hashes = zip(*headers_bundle)
8686
transactions_bundles = tuple(zip(transactions_roots, trie_data_dicts))
8787
bodies_bundle = tuple(zip(bodies, transactions_bundles, uncles_hashes))
88-
request = BlockBodiesRequest(headers)
89-
request.validate_response(bodies, bodies_bundle)
88+
request = GetBlockBodiesValidator(headers)
89+
request.validate_result(bodies_bundle)
9090

9191

9292
def test_block_bodies_request_valid_with_partial_response():
9393
headers_bundle = mk_headers((2, 3), (8, 4), (0, 1), (0, 0))
9494
headers, bodies, transactions_roots, trie_data_dicts, uncles_hashes = zip(*headers_bundle)
9595
transactions_bundles = tuple(zip(transactions_roots, trie_data_dicts))
9696
bodies_bundle = tuple(zip(bodies, transactions_bundles, uncles_hashes))
97-
request = BlockBodiesRequest(headers)
97+
request = GetBlockBodiesValidator(headers)
9898

99-
request.validate_response(bodies[:2], bodies_bundle[:2])
100-
request.validate_response(bodies[2:], bodies_bundle[2:])
101-
request.validate_response(
102-
(bodies[0], bodies[2], bodies[3]),
103-
(bodies_bundle[0], bodies_bundle[2], bodies_bundle[3]),
104-
)
99+
request.validate_result(bodies_bundle[:2])
100+
request.validate_result(bodies_bundle[2:])
101+
request.validate_result((bodies_bundle[0], bodies_bundle[2], bodies_bundle[3]))
105102

106103

107104
def test_block_bodies_request_with_fully_invalid_response():
@@ -115,17 +112,17 @@ def test_block_bodies_request_with_fully_invalid_response():
115112
w_transactions_bundles = tuple(zip(w_transactions_roots, w_trie_data_dicts))
116113
w_bodies_bundle = tuple(zip(w_bodies, w_transactions_bundles, w_uncles_hashes))
117114

118-
request = BlockBodiesRequest(headers)
115+
request = GetBlockBodiesValidator(headers)
119116
with pytest.raises(ValidationError):
120-
request.validate_response(w_bodies, w_bodies_bundle)
117+
request.validate_result(w_bodies_bundle)
121118

122119

123120
def test_block_bodies_request_with_extra_unrequested_bodies():
124121
headers_bundle = mk_headers((2, 3), (8, 4), (0, 1), (0, 0))
125122
headers, bodies, transactions_roots, trie_data_dicts, uncles_hashes = zip(*headers_bundle)
126123
transactions_bundles = tuple(zip(transactions_roots, trie_data_dicts))
127124
bodies_bundle = tuple(zip(bodies, transactions_bundles, uncles_hashes))
128-
request = BlockBodiesRequest(headers)
125+
request = GetBlockBodiesValidator(headers)
129126

130127
wrong_headers_bundle = mk_headers((3, 2), (4, 8), (1, 0), (0, 0))
131128
w_headers, w_bodies, w_transactions_roots, w_trie_data_dicts, w_uncles_hashes = zip(
@@ -134,9 +131,6 @@ def test_block_bodies_request_with_extra_unrequested_bodies():
134131
w_transactions_bundles = tuple(zip(w_transactions_roots, w_trie_data_dicts))
135132
w_bodies_bundle = tuple(zip(w_bodies, w_transactions_bundles, w_uncles_hashes))
136133

137-
request = BlockBodiesRequest(headers)
134+
request = GetBlockBodiesValidator(headers)
138135
with pytest.raises(ValidationError):
139-
request.validate_response(
140-
bodies + w_bodies,
141-
bodies_bundle + w_bodies_bundle,
142-
)
136+
request.validate_result(bodies_bundle + w_bodies_bundle)

tests/trinity/core/p2p-proto/test_header_request_object.py renamed to tests/trinity/core/p2p-proto/test_headers_request_validator.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
)
66

77
from trinity.protocol.common.requests import BaseHeaderRequest
8+
from trinity.protocol.common.validators import BaseBlockHeadersValidator
89

910

1011
FORWARD_0_to_5 = (0, 6, 0, False)
@@ -30,8 +31,9 @@ def __init__(self,
3031
self.skip = skip
3132
self.reverse = reverse
3233

33-
def validate_response(self, response):
34-
pass
34+
35+
class BlockHeadersValidator(BaseBlockHeadersValidator):
36+
protocol_max_request_size = 192
3537

3638

3739
@pytest.mark.parametrize(
@@ -127,10 +129,10 @@ def test_header_request_sequence_matching(
127129
params,
128130
sequence,
129131
is_match):
130-
request = HeaderRequest(*params)
132+
request = BlockHeadersValidator(*params)
131133

132134
if is_match:
133-
request.validate_sequence(sequence)
135+
request._validate_sequence(sequence)
134136
else:
135137
with pytest.raises(ValidationError):
136-
request.validate_sequence(sequence)
138+
request._validate_sequence(sequence)

tests/trinity/core/p2p-proto/test_node_data_request_object.py

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
ValidationError,
99
)
1010

11-
from trinity.protocol.eth.requests import NodeDataRequest
11+
from trinity.protocol.eth.validators import GetNodeDataValidator
1212

1313

1414
def mk_node():
@@ -27,55 +27,51 @@ def mk_node_data(n):
2727

2828
def test_node_data_request_empty_response_is_valid():
2929
node_keys, _ = mk_node_data(10)
30-
request = NodeDataRequest(node_keys)
30+
request = GetNodeDataValidator(node_keys)
3131

32-
request.validate_response(tuple(), tuple())
32+
request.validate_result(tuple())
3333

3434

3535
def test_node_data_request_with_full_response():
3636
node_keys, nodes = mk_node_data(10)
37-
request = NodeDataRequest(node_keys)
37+
request = GetNodeDataValidator(node_keys)
3838
node_data = tuple(zip(node_keys, nodes))
3939

40-
request.validate_response(nodes, node_data)
40+
request.validate_result(node_data)
4141

4242

4343
def test_node_data_request_with_partial_response():
4444
node_keys, nodes = mk_node_data(10)
45-
request = NodeDataRequest(node_keys)
45+
request = GetNodeDataValidator(node_keys)
4646
node_data = tuple(zip(node_keys, nodes))
4747

48-
request.validate_response(nodes[3:], node_data[3:])
49-
request.validate_response(nodes[:3], node_data[:3])
50-
request.validate_response(
51-
(nodes[1], nodes[8], nodes[4]),
52-
(node_data[1], node_data[8], node_data[4]),
53-
)
48+
request.validate_result(node_data[3:])
49+
50+
request.validate_result(node_data[:3])
51+
52+
request.validate_result((node_data[1], node_data[8], node_data[4]))
5453

5554

5655
def test_node_data_request_with_fully_invalid_response():
5756
node_keys, nodes = mk_node_data(10)
58-
request = NodeDataRequest(node_keys)
57+
request = GetNodeDataValidator(node_keys)
5958

6059
# construct a unique set of other nodes
6160
other_nodes = tuple(set(mk_node() for _ in range(10)).difference(nodes))
6261
other_node_data = tuple((keccak(node), node) for node in other_nodes)
6362

6463
with pytest.raises(ValidationError):
65-
request.validate_response(other_nodes, other_node_data)
64+
request.validate_result(other_node_data)
6665

6766

6867
def test_node_data_request_with_extra_unrequested_nodes():
6968
node_keys, nodes = mk_node_data(10)
70-
request = NodeDataRequest(node_keys)
69+
request = GetNodeDataValidator(node_keys)
7170
node_data = tuple(zip(node_keys, nodes))
7271

7372
# construct a unique set of other nodes
7473
other_nodes = tuple(set(mk_node() for _ in range(10)).difference(nodes))
7574
other_node_data = tuple((keccak(node), node) for node in other_nodes)
7675

7776
with pytest.raises(ValidationError):
78-
request.validate_response(
79-
nodes + other_nodes,
80-
node_data + other_node_data,
81-
)
77+
request.validate_result(node_data + other_node_data)

tests/trinity/core/p2p-proto/test_peer_block_header_request_and_response_api.py renamed to tests/trinity/core/p2p-proto/test_peer_block_header_validator_api.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from eth.rlp.headers import BlockHeader
88

9+
import trinity
910
from trinity.protocol.eth.peer import ETHPeer
1011
from trinity.protocol.les.peer import LESPeer
1112

@@ -94,11 +95,13 @@ async def send_headers():
9495
@pytest.mark.asyncio
9596
async def test_les_peer_get_headers_round_trip(les_peer_and_remote,
9697
params,
98+
monkeypatch,
9799
headers):
98100
peer, remote = les_peer_and_remote
101+
request_id = 9
102+
monkeypatch.setattr(trinity.protocol.les.exchanges, 'gen_request_id', lambda: request_id)
99103

100104
async def send_headers():
101-
request_id = peer.requests.get_block_headers.pending_request[0].request_id
102105
remote.sub_proto.send_block_headers(headers, 0, request_id)
103106
await asyncio.sleep(0)
104107

tests/trinity/core/p2p-proto/test_peer_node_data_request_and_response_api.py renamed to tests/trinity/core/p2p-proto/test_peer_node_data_validator_api.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,9 @@ async def test_eth_peer_get_node_data_round_trip(eth_peer_and_remote, node_keys,
6262
async def send_node_data():
6363
remote.sub_proto.send_node_data(nodes)
6464

65+
request = asyncio.ensure_future(peer.requests.get_node_data(node_keys))
6566
asyncio.ensure_future(send_node_data())
66-
response = await peer.requests.get_node_data(node_keys)
67+
response = await request
6768

6869
assert len(response) == len(node_keys)
6970
assert response == node_data

0 commit comments

Comments
 (0)