Skip to content

Commit 87819a1

Browse files
authored
Merge pull request #1100 from pipermerriam/piper/use-cancel-token-library
convert to use CancelToken from libary
2 parents 4abc075 + ed1cff1 commit 87819a1

25 files changed

+116
-401
lines changed

p2p/DEVELOPMENT.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ package, but for now it's just a collection of random notes/recommendations.
77
## Task cancellation
88

99
In order to make sure we stop all pending asyncio tasks upon exit (or when a service terminates),
10-
we use `CancelToken`s, which are heavily inspired by https://vorpus.org/blog/timeouts-and-cancellation-for-humans/
10+
we use `CancelToken`s from the
11+
[asyncio-cancel-token](https://asyncio-cancel-token.readthedocs.io/en/latest/index.html)
12+
library.
1113

1214
- A `CancelToken` must be available to all our async APIs. Either as an instance attribute or as an explicit argument.
13-
- When one of our async APIs `await` for stdlib/third-party coroutines, it must use `wait_with_token()` to ensure the scheduled task is cancelled when the token is triggered.
15+
- When one of our async APIs `await` for stdlib/third-party coroutines, it must use `CancelToken.cancellable_wait()` to ensure the scheduled task is cancelled when the token is triggered.
1416

1517

1618
## BaseService

p2p/auth.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717

1818
from eth_hash.auto import keccak
1919

20+
from cancel_token import CancelToken
21+
2022
from p2p import ecies
2123
from p2p import kademlia
22-
from p2p.cancel_token import CancelToken, wait_with_token
2324
from p2p.constants import REPLY_TIMEOUT
2425
from p2p.exceptions import (
2526
BadAckMessage,
@@ -74,9 +75,8 @@ async def _handshake(initiator: 'HandshakeInitiator', reader: asyncio.StreamRead
7475
auth_init = initiator.encrypt_auth_message(auth_msg)
7576
writer.write(auth_init)
7677

77-
auth_ack = await wait_with_token(
78+
auth_ack = await token.cancellable_wait(
7879
reader.read(ENCRYPTED_AUTH_ACK_LEN),
79-
token=token,
8080
timeout=REPLY_TIMEOUT)
8181

8282
if reader.at_eof():
@@ -118,9 +118,8 @@ def pubkey(self) -> datatypes.PublicKey:
118118
return self.privkey.public_key
119119

120120
async def connect(self) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
121-
return await wait_with_token(
121+
return await self.cancel_token.cancellable_wait(
122122
asyncio.open_connection(host=self.remote.address.ip, port=self.remote.address.tcp_port),
123-
token=self.cancel_token,
124123
timeout=REPLY_TIMEOUT)
125124

126125
def derive_secrets(self,

p2p/cancel_token.py

Lines changed: 0 additions & 161 deletions
This file was deleted.

p2p/cancellable.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from typing import (
2+
Awaitable,
3+
TypeVar,
4+
)
5+
6+
from cancel_token import CancelToken
7+
8+
9+
class CancellableMixin:
10+
cancel_token: CancelToken = None
11+
12+
_TReturn = TypeVar('_TReturn')
13+
14+
async def wait(self,
15+
awaitable: Awaitable[_TReturn],
16+
token: CancelToken = None,
17+
timeout: float = None) -> _TReturn:
18+
"""See wait_first()"""
19+
return await self.wait_first(awaitable, token=token, timeout=timeout)
20+
21+
async def wait_first(self,
22+
*awaitables: Awaitable[_TReturn],
23+
token: CancelToken = None,
24+
timeout: float = None) -> _TReturn:
25+
"""
26+
Wait for the first awaitable to complete, unless we timeout or the token chain is triggered.
27+
28+
The given token is chained with this service's token, so triggering either will cancel
29+
this.
30+
31+
Returns the result of the first one to complete.
32+
33+
Raises TimeoutError if we timeout or OperationCancelled if the token chain is triggered.
34+
35+
All pending futures are cancelled before returning.
36+
"""
37+
if token is None:
38+
token_chain = self.cancel_token
39+
else:
40+
token_chain = token.chain(self.cancel_token)
41+
return await token_chain.cancellable_wait(*awaitables, timeout=timeout)

p2p/chain.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
from eth_typing import BlockNumber, Hash32
2525

26+
from cancel_token import CancelToken
27+
2628
from eth.constants import (
2729
BLANK_ROOT_HASH, EMPTY_UNCLE_HASH, GENESIS_BLOCK_NUMBER, GENESIS_PARENT_HASH)
2830
from eth.chains import AsyncChain
@@ -36,7 +38,7 @@
3638
from p2p import protocol
3739
from p2p import eth
3840
from p2p import les
39-
from p2p.cancel_token import CancellableMixin, CancelToken
41+
from p2p.cancellable import CancellableMixin
4042
from p2p.constants import MAX_REORG_DEPTH, SEAL_CHECK_RANDOM_SAMPLE_RATE
4143
from p2p.exceptions import NoEligiblePeers, OperationCancelled
4244
from p2p.p2p_proto import DisconnectReason

p2p/discovery.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@
4242

4343
from eth_hash.auto import keccak
4444

45-
from p2p.cancel_token import CancelToken
45+
from cancel_token import CancelToken
46+
4647
from p2p.exceptions import NoEligibleNodes, OperationCancelled
4748
from p2p import kademlia
4849
from p2p.peer import PeerPool

p2p/kademlia.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141
from eth_hash.auto import keccak
4242

43-
from p2p.cancel_token import CancelToken, wait_with_token
43+
from cancel_token import CancelToken
4444

4545
# Workaround for import cycles caused by type annotations:
4646
# http://mypy.readthedocs.io/en/latest/common_issues.html#import-cycles
@@ -504,8 +504,8 @@ async def wait_ping(self, remote: Node, cancel_token: CancelToken) -> bool:
504504
with self.ping_callbacks.acquire(remote, event.set):
505505
got_ping = False
506506
try:
507-
got_ping = await wait_with_token(
508-
event.wait(), token=cancel_token, timeout=k_request_timeout)
507+
got_ping = await cancel_token.cancellable_wait(
508+
event.wait(), timeout=k_request_timeout)
509509
self.logger.debug('got expected ping from %s', remote)
510510
except TimeoutError:
511511
self.logger.debug('timed out waiting for ping from %s', remote)
@@ -525,8 +525,8 @@ async def wait_pong(self, remote: Node, token: bytes, cancel_token: CancelToken)
525525
with self.pong_callbacks.acquire(pingid, event.set):
526526
got_pong = False
527527
try:
528-
got_pong = await wait_with_token(
529-
event.wait(), token=cancel_token, timeout=k_request_timeout)
528+
got_pong = await cancel_token.cancellable_wait(
529+
event.wait(), timeout=k_request_timeout)
530530
self.logger.debug('got expected pong with token %s', encode_hex(token))
531531
except TimeoutError:
532532
self.logger.debug(
@@ -555,8 +555,8 @@ def process(response: List[Node]) -> None:
555555

556556
with self.neighbours_callbacks.acquire(remote, process):
557557
try:
558-
await wait_with_token(
559-
event.wait(), token=cancel_token, timeout=k_request_timeout)
558+
await cancel_token.cancellable_wait(
559+
event.wait(), timeout=k_request_timeout)
560560
self.logger.debug('got expected neighbours response from %s', remote)
561561
except TimeoutError:
562562
self.logger.debug('timed out waiting for neighbours response from %s', remote)

p2p/lightchain.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
from trie import HexaryTrie
3030
from trie.exceptions import BadTrieProof
3131

32+
from cancel_token import CancelToken
33+
3234
from eth.exceptions import (
3335
BlockNotFound,
3436
HeaderNotFound,
@@ -42,7 +44,6 @@
4244
NoConnectedPeers,
4345
NoEligiblePeers,
4446
)
45-
from p2p.cancel_token import CancelToken
4647
from p2p import protocol
4748
from p2p.constants import (
4849
COMPLETION_TIMEOUT,

p2p/nat.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
)
1010
from urllib.parse import urlparse
1111

12-
from p2p.cancel_token import (
12+
from cancel_token import (
1313
CancelToken,
1414
)
1515
from p2p.exceptions import (

p2p/peer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
keys,
4949
)
5050

51+
from cancel_token import CancelToken
52+
5153
from eth.chains.mainnet import MAINNET_NETWORK_ID
5254
from eth.chains.ropsten import ROPSTEN_NETWORK_ID
5355
from eth.constants import GENESIS_BLOCK_NUMBER
@@ -75,7 +77,6 @@
7577
UnknownProtocolCommand,
7678
UnreachablePeer,
7779
)
78-
from p2p.cancel_token import CancelToken
7980
from p2p.service import BaseService
8081
from p2p.utils import (
8182
gen_request_id,

0 commit comments

Comments
 (0)