Skip to content

Commit fb401c7

Browse files
committed
Establish repeatable pattern for request response handling
1 parent bf1d3c2 commit fb401c7

32 files changed

+417
-207
lines changed

p2p/peer.py

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -155,14 +155,6 @@ class BasePeer(BaseService):
155155
head_td: int = None
156156
head_hash: Hash32 = None
157157

158-
# TODO: Instead of a fixed timeout, we should instead monitor response
159-
# times for the peer and adjust our timeout accordingly
160-
_response_timeout = 60
161-
pending_requests: Dict[
162-
Type[protocol.Command],
163-
Tuple['BaseRequest', 'asyncio.Future[protocol._DecodedMsgType]'],
164-
]
165-
166158
def __init__(self,
167159
remote: Node,
168160
privkey: datatypes.PrivateKey,
@@ -189,8 +181,6 @@ def __init__(self,
189181
self.start_time = datetime.datetime.now()
190182
self.received_msgs: Dict[protocol.Command, int] = collections.defaultdict(int)
191183

192-
self.pending_requests = {}
193-
194184
self.egress_mac = egress_mac
195185
self.ingress_mac = ingress_mac
196186
# FIXME: Yes, the encryption is insecure, see: https://github.com/ethereum/devp2p/issues/32
@@ -400,22 +390,6 @@ def handle_sub_proto_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgT
400390
else:
401391
self.logger.warn("Peer %s has no subscribers, discarding %s msg", self, cmd)
402392

403-
if cmd_type in self.pending_requests:
404-
request, future = self.pending_requests[cmd_type]
405-
try:
406-
request.validate_response(msg)
407-
except ValidationError as err:
408-
self.logger.debug(
409-
"Response validation failure for pending %s request from peer %s: %s",
410-
cmd_type.__name__,
411-
self,
412-
err,
413-
)
414-
pass
415-
else:
416-
future.set_result(msg)
417-
self.pending_requests.pop(cmd_type)
418-
419393
def process_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None:
420394
if cmd.is_base_protocol:
421395
self.handle_p2p_msg(cmd, msg)
@@ -640,6 +614,14 @@ def subscribe(self, peer_pool: 'PeerPool') -> Iterator[None]:
640614
finally:
641615
peer_pool.unsubscribe(self)
642616

617+
@contextlib.contextmanager
618+
def subscribe_peer(self, peer: BasePeer) -> Iterator[None]:
619+
peer.add_subscriber(self)
620+
try:
621+
yield
622+
finally:
623+
peer.remove_subscriber(self)
624+
643625

644626
class PeerPool(BaseService, AsyncIterable[BasePeer]):
645627
"""
@@ -794,7 +776,7 @@ async def ensure_same_side_on_dao_fork(
794776
wait for that we may receive other messages from the peer, which are returned so that they
795777
can be re-added to our subscribers' queues when the peer is finally added to the pool.
796778
"""
797-
from trinity.protocol.base_block_headers import BaseBlockHeaders
779+
from trinity.protocol.common.commands import BaseBlockHeaders
798780
msgs = []
799781
for start_block, vm_class in self.vm_configuration:
800782
if not issubclass(vm_class, HomesteadVM):
@@ -1008,7 +990,9 @@ def _test() -> None:
1008990
from eth.chains.ropsten import RopstenChain, ROPSTEN_GENESIS_HEADER, ROPSTEN_VM_CONFIGURATION
1009991
from eth.db.backends.memory import MemoryDB
1010992
from trinity.protocol.eth.peer import ETHPeer
993+
from trinity.protocol.eth.requests import HeaderRequest as ETHHeaderRequest
1011994
from trinity.protocol.les.peer import LESPeer
995+
from trinity.protocol.les.requests import HeaderRequest as LESHeaderRequest
1012996
from tests.p2p.integration_test_helpers import FakeAsyncHeaderDB, connect_to_peers_loop
1013997
logging.basicConfig(level=TRACE_LEVEL_NUM, format='%(asctime)s %(levelname)s: %(message)s')
1014998

@@ -1041,13 +1025,15 @@ async def request_stuff() -> None:
10411025
'0x59af08ab31822c992bb3dad92ddb68d820aa4c69e9560f07081fa53f1009b152')
10421026
if peer_class == ETHPeer:
10431027
peer = cast(ETHPeer, peer)
1044-
peer.sub_proto.send_get_block_headers(block_hash, 1, 0, False)
1028+
peer.sub_proto.send_get_block_headers(ETHHeaderRequest(block_hash, 1, 0, False))
10451029
peer.sub_proto.send_get_block_bodies([block_hash])
10461030
peer.sub_proto.send_get_receipts([block_hash])
10471031
else:
10481032
peer = cast(LESPeer, peer)
10491033
request_id = 1
1050-
peer.sub_proto.send_get_block_headers(block_hash, 1, 0, False, request_id)
1034+
peer.sub_proto.send_get_block_headers(
1035+
LESHeaderRequest(block_hash, 1, 0, False, request_id)
1036+
)
10511037
peer.sub_proto.send_get_block_bodies([block_hash], request_id + 1)
10521038
peer.sub_proto.send_get_receipts(block_hash, request_id + 2)
10531039

p2p/service.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ async def cleanup(self) -> None:
9494
The ``_cleanup()`` coroutine is invoked before the child services may have finished
9595
their cleanup.
9696
"""
97-
9897
await asyncio.gather(*[
9998
child_service.cleaned_up.wait()
10099
for child_service in self._child_services],

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def vm_logger(namespaces=LOGGING_NAMESPACES):
3333

3434
handler = logging.StreamHandler(sys.stdout)
3535

36-
# level = TRACE_LEVEL_NUM
36+
# level = 5 # TRACE
3737
# level = logging.DEBUG
3838
# level = logging.INFO
3939
level = logging.ERROR

tests/p2p/test_peer_subscriber.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from trinity.protocol.eth.peer import ETHPeer
1010
from trinity.protocol.eth.commands import GetBlockHeaders
11+
from trinity.protocol.eth.requests import HeaderRequest
1112

1213
from tests.trinity.core.peer_helpers import (
1314
get_directly_linked_peers,
@@ -45,9 +46,9 @@ async def test_peer_subscriber_filters_messages(request, event_loop):
4546
peer.add_subscriber(all_subscriber)
4647

4748
remote.sub_proto.send_get_node_data([b'\x00' * 32])
48-
remote.sub_proto.send_get_block_headers(0, 1, 0, False)
49+
remote.sub_proto.send_get_block_headers(HeaderRequest(0, 1, 0, False))
4950
remote.sub_proto.send_get_node_data([b'\x00' * 32])
50-
remote.sub_proto.send_get_block_headers(1, 1, 0, False)
51+
remote.sub_proto.send_get_block_headers(HeaderRequest(1, 1, 0, False))
5152
remote.sub_proto.send_get_node_data([b'\x00' * 32])
5253

5354
# yeild to let remote and peer transmit.

tests/trinity/core/p2p-proto/test_header_request_object.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from p2p.exceptions import ValidationError
44

5-
from trinity.protocol.base_request import BaseHeaderRequest
5+
from trinity.protocol.common.requests import BaseHeaderRequest
66

77

88
FORWARD_0_to_5 = (0, 6, 0, False)

tests/trinity/core/p2p-proto/test_peer_block_header_request_and_response_api.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,9 @@ async def test_eth_peer_get_headers_round_trip(eth_peer_and_remote,
7272

7373
async def send_headers():
7474
remote.sub_proto.send_block_headers(headers)
75-
await asyncio.sleep(0)
7675

7776
asyncio.ensure_future(send_headers())
78-
response = await peer.get_block_headers(*params)
77+
response = await peer.handler.get_block_headers(*params)
7978

8079
assert len(response) == len(headers)
8180
for expected, actual in zip(headers, response):
@@ -95,16 +94,14 @@ async def test_les_peer_get_headers_round_trip(les_peer_and_remote,
9594
params,
9695
headers):
9796
peer, remote = les_peer_and_remote
98-
request_id = 1234
99-
100-
peer.gen_request_id = lambda: request_id
10197

10298
async def send_headers():
99+
request_id = peer.handler.get_block_headers.pending_request[0].request_id
103100
remote.sub_proto.send_block_headers(headers, 0, request_id)
104101
await asyncio.sleep(0)
105102

106103
asyncio.ensure_future(send_headers())
107-
response = await peer.get_block_headers(*params)
104+
response = await peer.handler.get_block_headers(*params)
108105

109106
assert len(response) == len(headers)
110107
for expected, actual in zip(headers, response):
@@ -124,7 +121,7 @@ async def send_responses():
124121
await asyncio.sleep(0)
125122

126123
asyncio.ensure_future(send_responses())
127-
response = await peer.get_block_headers(0, 10, 0, False)
124+
response = await peer.handler.get_block_headers(0, 10, 0, False)
128125

129126
assert len(response) == len(headers)
130127
for expected, actual in zip(headers, response):
@@ -148,7 +145,7 @@ async def send_responses():
148145
await asyncio.sleep(0)
149146

150147
asyncio.ensure_future(send_responses())
151-
response = await peer.get_block_headers(0, 5, 0, False)
148+
response = await peer.handler.get_block_headers(0, 5, 0, False)
152149

153150
assert len(response) == len(headers)
154151
for expected, actual in zip(headers, response):

trinity/exceptions.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,11 @@ class MissingPath(BaseTrinityError):
2222
def __init__(self, msg: str, path: pathlib.Path) -> None:
2323
super().__init__(msg)
2424
self.path = path
25+
26+
27+
class AlreadyWaiting(BaseTrinityError):
28+
"""
29+
Raised when an attempt is made to wait for a certain message type from a
30+
peer when there is already an active wait for that message type.
31+
"""
32+
pass

trinity/p2p/handlers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from trinity.db.chain import AsyncChainDB
2323
from trinity.db.header import AsyncHeaderDB
2424
from trinity.protocol.eth.peer import ETHPeer
25-
from trinity.protocol.base_request import BaseHeaderRequest
25+
from trinity.protocol.common.requests import BaseHeaderRequest
2626
from trinity.rlp.block_body import BlockBody
2727

2828

trinity/protocol/common/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)