Skip to content

Commit 01d327f

Browse files
authored
Merge pull request #1157 from pipermerriam/piper/service-context-api
Use shared ProcessPoolExecutor across all services
2 parents ae207a6 + dd10c24 commit 01d327f

File tree

10 files changed

+113
-54
lines changed

10 files changed

+113
-54
lines changed

p2p/executor.py

Lines changed: 0 additions & 20 deletions
This file was deleted.

p2p/peer.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,18 @@ async def handshake(remote: Node,
135135
except (ConnectionRefusedError, OSError) as e:
136136
raise UnreachablePeer() from e
137137
peer = peer_class(
138-
remote=remote, privkey=privkey, reader=reader, writer=writer,
139-
aes_secret=aes_secret, mac_secret=mac_secret, egress_mac=egress_mac,
140-
ingress_mac=ingress_mac, headerdb=headerdb, network_id=network_id)
138+
remote=remote,
139+
privkey=privkey,
140+
reader=reader,
141+
writer=writer,
142+
aes_secret=aes_secret,
143+
mac_secret=mac_secret,
144+
egress_mac=egress_mac,
145+
ingress_mac=ingress_mac,
146+
headerdb=headerdb,
147+
network_id=network_id,
148+
token=token,
149+
)
141150
await peer.do_p2p_handshake()
142151
await peer.do_sub_proto_handshake()
143152
return peer
@@ -167,8 +176,9 @@ def __init__(self,
167176
headerdb: 'BaseAsyncHeaderDB',
168177
network_id: int,
169178
inbound: bool = False,
179+
token: CancelToken = None,
170180
) -> None:
171-
super().__init__()
181+
super().__init__(token)
172182
self.remote = remote
173183
self.privkey = privkey
174184
self.reader = reader
@@ -1027,8 +1037,15 @@ def _test() -> None:
10271037
network_id = RopstenChain.network_id
10281038
loop = asyncio.get_event_loop()
10291039
nodes = [Node.from_uri(args.enode)]
1040+
10301041
peer_pool = PeerPool(
1031-
peer_class, headerdb, network_id, ecies.generate_privkey(), ROPSTEN_VM_CONFIGURATION)
1042+
peer_class,
1043+
headerdb,
1044+
network_id,
1045+
ecies.generate_privkey(),
1046+
ROPSTEN_VM_CONFIGURATION,
1047+
)
1048+
10321049
asyncio.ensure_future(connect_to_peers_loop(peer_pool, nodes))
10331050

10341051
async def request_stuff() -> None:

p2p/service.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from cancel_token import CancelToken, OperationCancelled
1616

1717
from p2p.cancellable import CancellableMixin
18+
from p2p.utils import get_asyncio_executor
1819

1920

2021
class ServiceEvents:
@@ -34,26 +35,44 @@ class BaseService(ABC, CancellableMixin):
3435
_wait_until_finished_timeout = 5
3536

3637
# the custom event loop to run in, or None if the default loop should be used
37-
loop: asyncio.AbstractEventLoop = None
38+
_loop: asyncio.AbstractEventLoop = None
3839

39-
def __init__(self, token: CancelToken=None, loop: asyncio.AbstractEventLoop = None) -> None:
40-
if self.logger is None:
41-
self.logger = cast(
42-
TraceLogger, logging.getLogger(self.__module__ + '.' + self.__class__.__name__))
40+
_logger: TraceLogger = None
4341

44-
self._run_lock = asyncio.Lock()
42+
def __init__(self,
43+
token: CancelToken=None,
44+
loop: asyncio.AbstractEventLoop = None) -> None:
4545
self.events = ServiceEvents()
46+
self._run_lock = asyncio.Lock()
4647
self._child_services = []
4748
self._finished_callbacks = []
4849

49-
self.loop = loop
50+
self._loop = loop
51+
5052
base_token = CancelToken(type(self).__name__, loop=loop)
5153

5254
if token is None:
5355
self.cancel_token = base_token
5456
else:
5557
self.cancel_token = base_token.chain(token)
5658

59+
self._executor = get_asyncio_executor()
60+
61+
@property
62+
def logger(self) -> TraceLogger:
63+
if self._logger is None:
64+
self._logger = cast(
65+
TraceLogger,
66+
logging.getLogger(self.__module__ + '.' + self.__class__.__name__)
67+
)
68+
return self._logger
69+
70+
def get_event_loop(self) -> asyncio.AbstractEventLoop:
71+
if self._loop is None:
72+
return asyncio.get_event_loop()
73+
else:
74+
return self._loop
75+
5776
async def run(
5877
self,
5978
finished_callback: Optional[Callable[['BaseService'], None]] = None) -> None:
@@ -102,6 +121,10 @@ def run_child_service(self, child_service: 'BaseService') -> 'asyncio.Future[Any
102121
self._child_services.append(child_service)
103122
return asyncio.ensure_future(child_service.run())
104123

124+
async def _run_in_executor(self, callback: Callable[..., Any], *args: Any) -> Any:
125+
loop = self.get_event_loop()
126+
return await self.wait(loop.run_in_executor(self._executor, callback, *args))
127+
105128
async def cleanup(self) -> None:
106129
"""
107130
Run the ``_cleanup()`` coroutine and set the ``cleaned_up`` event after the service as
@@ -146,7 +169,7 @@ async def threadsafe_cancel(self) -> None:
146169
147170
:param poll_period: how many seconds to wait in between each check for service cleanup
148171
"""
149-
asyncio.run_coroutine_threadsafe(self.cancel(), loop=self.loop)
172+
asyncio.run_coroutine_threadsafe(self.cancel(), loop=self.get_event_loop())
150173
await asyncio.wait_for(
151174
self.events.cleaned_up.wait(),
152175
timeout=self._wait_until_finished_timeout,

p2p/utils.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
import datetime
2+
from concurrent.futures import Executor, ProcessPoolExecutor
3+
import logging
4+
import os
25
from typing import Tuple
36

47
import rlp
@@ -32,3 +35,44 @@ def time_since(start_time: datetime.datetime) -> Tuple[int, int, int, int]:
3235
hours, remainder = divmod(delta.seconds, 3600)
3336
minutes, seconds = divmod(remainder, 60)
3437
return delta.days, hours, minutes, seconds
38+
39+
40+
CPU_EMPTY_VALUES = {None, 0}
41+
42+
43+
_executor: Executor = None
44+
45+
46+
def get_asyncio_executor(cpu_count: int=None) -> Executor:
47+
"""
48+
Returns a global `ProcessPoolExecutor` instance.
49+
50+
NOTE: We use the ProcessPoolExecutor to offload CPU intensive tasks to
51+
separate processes to ensure we don't block the main networking process.
52+
This pattern will only work correctly if used within a single process. If
53+
multiple processes use this executor API we'll end up with more workers
54+
than there are CPU cores at which point the networking process will be
55+
competing with all the worker processes for CPU resources. At the point
56+
where we need this in more than one process we will need to come up with a
57+
different solution
58+
"""
59+
global _executor
60+
61+
if _executor is None:
62+
# Use CPU_COUNT - 1 processes to make sure we always leave one CPU idle
63+
# so that it can run asyncio's event loop.
64+
if cpu_count is None:
65+
os_cpu_count = os.cpu_count()
66+
if os_cpu_count in CPU_EMPTY_VALUES:
67+
# Need this because os.cpu_count() returns None when the # of
68+
# CPUs is indeterminable.
69+
logger = logging.getLogger('p2p')
70+
logger.warning(
71+
"Could not determine number of CPUs, defaulting to 1 instead of %s",
72+
os_cpu_count,
73+
)
74+
cpu_count = 1
75+
else:
76+
cpu_count = max(1, os_cpu_count - 1)
77+
_executor = ProcessPoolExecutor(cpu_count)
78+
return _executor

tests/trinity/core/p2p-proto/test_state_sync.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@ def make_random_trie(random):
4747

4848

4949
@given(random=strategies.randoms())
50-
@settings(max_examples=10)
50+
@settings(
51+
max_examples=10,
52+
deadline=4000,
53+
)
5154
@example(random=RandomWithSeed(EXAMPLE_37968))
5255
@example(random=RandomWithSeed(EXAMPLE_809368))
5356
def test_trie_sync(random, event_loop):

trinity/main.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ def exit_because_ambigious_filesystem(logger: logging.Logger) -> None:
275275

276276

277277
async def exit_on_signal(service_to_exit: BaseService) -> None:
278-
loop = asyncio.get_event_loop()
278+
loop = service_to_exit.get_event_loop()
279279
sigint_received = asyncio.Event()
280280
for sig in [signal.SIGINT, signal.SIGTERM]:
281281
# TODO also support Windows
@@ -318,9 +318,9 @@ def display_launch_logs(chain_config: ChainConfig) -> None:
318318

319319

320320
def run_service_until_quit(service: BaseService) -> None:
321-
loop = asyncio.get_event_loop()
322-
asyncio.ensure_future(exit_on_signal(service))
323-
asyncio.ensure_future(service.run())
321+
loop = service.get_event_loop()
322+
asyncio.ensure_future(exit_on_signal(service), loop=loop)
323+
asyncio.ensure_future(service.run(), loop=loop)
324324
loop.run_forever()
325325
loop.close()
326326

trinity/plugins/builtin/tx_pool/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
from trinity.protocol.eth.peer import ETHPeer
3232
from trinity.protocol.eth.commands import (
33-
Transactions
33+
Transactions,
3434
)
3535

3636

trinity/rpc/ipc.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
)
2121

2222
from trinity.rpc.main import (
23-
RPCServer
23+
RPCServer,
2424
)
2525

2626
MAXIMUM_REQUEST_BYTES = 10000
@@ -144,7 +144,7 @@ async def _run(self) -> None:
144144
self.server = await asyncio.start_unix_server(
145145
connection_handler(self.rpc.execute, self.cancel_token),
146146
str(self.ipc_path),
147-
loop=self.loop,
147+
loop=self.get_event_loop(),
148148
limit=MAXIMUM_REQUEST_BYTES,
149149
)
150150
self.logger.info('IPC started at: %s', self.ipc_path.resolve())

trinity/sync/common/chain.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from typing import (
44
Any,
55
AsyncGenerator,
6-
Callable,
76
Tuple,
87
Union,
98
cast,
@@ -25,7 +24,6 @@
2524
from p2p.p2p_proto import DisconnectReason
2625
from p2p.peer import BasePeer, PeerPool, PeerSubscriber
2726
from p2p.service import BaseService
28-
from p2p.executor import get_asyncio_executor
2927

3028
from trinity.db.header import AsyncHeaderDB
3129
from trinity.p2p.handlers import PeerRequestHandler
@@ -65,7 +63,6 @@ def __init__(self,
6563
self._syncing = False
6664
self._sync_complete = asyncio.Event()
6765
self._sync_requests: asyncio.Queue[HeaderRequestingPeer] = asyncio.Queue()
68-
self._executor = get_asyncio_executor()
6966

7067
@property
7168
def msg_queue_maxsize(self) -> int:
@@ -105,8 +102,10 @@ async def _run(self) -> None:
105102
asyncio.ensure_future(self._handle_msg_loop())
106103
with self.subscribe(self.peer_pool):
107104
while True:
108-
peer_or_finished = await self.wait_first(
109-
self._sync_requests.get(), self._sync_complete.wait()) # type: Any
105+
peer_or_finished: Any = await self.wait_first(
106+
self._sync_requests.get(),
107+
self._sync_complete.wait()
108+
)
110109

111110
# In the case of a fast sync, we return once the sync is completed, and our caller
112111
# must then run the StateDownloader.
@@ -122,10 +121,6 @@ async def _cleanup(self) -> None:
122121
# run in the background notice the cancel token has been triggered and return.
123122
await asyncio.sleep(0)
124123

125-
async def _run_in_executor(self, callback: Callable[..., Any], *args: Any) -> Any:
126-
loop = asyncio.get_event_loop()
127-
return await self.wait(loop.run_in_executor(self._executor, callback, *args))
128-
129124
async def sync(self, peer: HeaderRequestingPeer) -> None:
130125
if self._syncing:
131126
self.logger.debug(

trinity/sync/full/state.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646

4747
from p2p.exceptions import NoEligiblePeers, NoIdlePeers
4848
from p2p.peer import BasePeer, PeerPool, PeerSubscriber
49-
from p2p.executor import get_asyncio_executor
5049

5150
from trinity.db.base import AsyncBaseDB
5251
from trinity.db.chain import AsyncChainDB
@@ -87,7 +86,6 @@ def __init__(self,
8786
self._handler = PeerRequestHandler(self.chaindb, self.logger, self.cancel_token)
8887
self.request_tracker = TrieNodeRequestTracker(self._reply_timeout, self.logger)
8988
self._peer_missing_nodes: Dict[ETHPeer, Set[Hash32]] = collections.defaultdict(set)
90-
self._executor = get_asyncio_executor()
9189

9290
# Throughout the whole state sync our chain head is fixed, so it makes sense to ignore
9391
# messages related to new blocks/transactions, but we must handle requests for data from
@@ -182,8 +180,7 @@ async def _handle_msg(
182180
self.logger.debug("Got %d NodeData entries from %s", len(msg), peer)
183181
_, requested_node_keys = self.request_tracker.active_requests.pop(peer)
184182

185-
loop = asyncio.get_event_loop()
186-
node_keys = await loop.run_in_executor(self._executor, list, map(keccak, msg))
183+
node_keys = await self._run_in_executor(list, map(keccak, msg))
187184

188185
missing = set(requested_node_keys).difference(node_keys)
189186
self._peer_missing_nodes[peer].update(missing)

0 commit comments

Comments
 (0)