Skip to content

Commit 432cfd2

Browse files
authored
Merge pull request ethereum#620 from carver/bundled-cleanups
Bundled cleanups
2 parents ccd6647 + 0c8250d commit 432cfd2

File tree

5 files changed

+93
-29
lines changed

5 files changed

+93
-29
lines changed

p2p/service.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,13 +265,26 @@ async def cleanup(self) -> None:
265265
for child_service in self._child_services])
266266
self.logger.debug("All child services finished")
267267
if self._tasks:
268-
self.logger.debug("Waiting for tasks: %s", list(self._tasks))
268+
self._log_tasks("Waiting for tasks")
269269
await asyncio.gather(*self._tasks)
270270
self.logger.debug("All tasks finished")
271271

272272
await self._cleanup()
273273
self.events.cleaned_up.set()
274274

275+
def _log_tasks(self, message: str) -> None:
276+
MAX_DISPLAY_TASKS = 50
277+
task_list = list(self._tasks)
278+
if len(self._tasks) > MAX_DISPLAY_TASKS:
279+
task_display = ''.join(map(str, [
280+
task_list[:MAX_DISPLAY_TASKS // 2],
281+
'...',
282+
task_list[-1 * MAX_DISPLAY_TASKS // 2:],
283+
]))
284+
else:
285+
task_display = str(task_list)
286+
self.logger.debug("%s (%d): %s", message, len(self._tasks), task_display)
287+
275288
def cancel_nowait(self) -> None:
276289
if self.is_cancelled:
277290
self.logger.warning("Tried to cancel %s, but it was already cancelled", self)
@@ -295,7 +308,7 @@ async def cancel(self) -> None:
295308
"Timed out waiting for %s to finish its cleanup, forcibly cancelling pending "
296309
"tasks and exiting anyway", self)
297310
if self._tasks:
298-
self.logger.debug("Pending tasks: %s", list(self._tasks))
311+
self._log_tasks("Pending tasks")
299312
if self._child_services:
300313
self.logger.debug("Pending child services: %s", list(self._child_services))
301314
self._forcibly_cancel_all_tasks()

trinity/protocol/eth/commands.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,23 @@
1818
from trinity.rlp.sedes import HashOrNumber
1919

2020

21+
hash_sedes = sedes.Binary(min_length=32, max_length=32)
22+
23+
2124
class Status(Command):
2225
_cmd_id = 0
2326
structure = (
2427
('protocol_version', sedes.big_endian_int),
2528
('network_id', sedes.big_endian_int),
2629
('td', sedes.big_endian_int),
27-
('best_hash', sedes.binary),
28-
('genesis_hash', sedes.binary),
30+
('best_hash', hash_sedes),
31+
('genesis_hash', hash_sedes),
2932
)
3033

3134

3235
class NewBlockHashes(Command):
3336
_cmd_id = 1
34-
structure = sedes.CountableList(sedes.List([sedes.binary, sedes.big_endian_int]))
37+
structure = sedes.CountableList(sedes.List([hash_sedes, sedes.big_endian_int]))
3538

3639

3740
class Transactions(Command):
@@ -79,7 +82,7 @@ class NewBlock(Command):
7982

8083
class GetNodeData(Command):
8184
_cmd_id = 13
82-
structure = sedes.CountableList(sedes.binary)
85+
structure = sedes.CountableList(hash_sedes)
8386

8487

8588
class NodeData(Command):
@@ -89,7 +92,7 @@ class NodeData(Command):
8992

9093
class GetReceipts(Command):
9194
_cmd_id = 15
92-
structure = sedes.CountableList(sedes.binary)
95+
structure = sedes.CountableList(hash_sedes)
9396

9497

9598
class Receipts(Command):

trinity/protocol/eth/servers.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
BlockIdentifier,
1818
Hash32,
1919
)
20-
2120
from eth_utils import (
2221
to_hex,
2322
)
23+
from trie.exceptions import MissingTrieNode
2424

2525
from p2p import protocol
2626
from p2p.peer import BasePeer
@@ -83,8 +83,18 @@ async def handle_get_block_bodies(self, peer: ETHPeer, block_hashes: Sequence[Ha
8383
"%s asked for a block we don't have: %s", peer, to_hex(block_hash)
8484
)
8585
continue
86-
transactions = await self.wait(
87-
self.db.coro_get_block_transactions(header, BaseTransactionFields))
86+
try:
87+
transactions = await self.wait(
88+
self.db.coro_get_block_transactions(header, BaseTransactionFields))
89+
except MissingTrieNode as exc:
90+
self.logger.debug(
91+
"%s asked for block transactions we don't have: %s, "
92+
"due to %r",
93+
peer,
94+
to_hex(block_hash),
95+
exc,
96+
)
97+
continue
8898
uncles = await self.wait(self.db.coro_get_block_uncles(header.uncles_hash))
8999
bodies.append(BlockBody(transactions, uncles))
90100
self.logger.debug2("Replying to %s with %d block bodies", peer, len(bodies))
@@ -104,7 +114,17 @@ async def handle_get_receipts(self, peer: ETHPeer, block_hashes: Sequence[Hash32
104114
"%s asked receipts for a block we don't have: %s", peer, to_hex(block_hash)
105115
)
106116
continue
107-
block_receipts = await self.wait(self.db.coro_get_receipts(header, Receipt))
117+
try:
118+
block_receipts = await self.wait(self.db.coro_get_receipts(header, Receipt))
119+
except MissingTrieNode as exc:
120+
self.logger.debug(
121+
"%s asked for block receipts we don't have: %s, "
122+
"due to %r",
123+
peer,
124+
to_hex(block_hash),
125+
exc,
126+
)
127+
continue
108128
receipts.append(block_receipts)
109129
self.logger.debug2("Replying to %s with receipts for %d blocks", peer, len(receipts))
110130
peer.sub_proto.send_receipts(receipts)

trinity/sync/full/chain.py

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
Awaitable,
1212
Callable,
1313
Dict,
14+
Iterable,
1415
List,
1516
NamedTuple,
1617
FrozenSet,
@@ -24,6 +25,7 @@
2425
from eth_utils import (
2526
humanize_hash,
2627
humanize_seconds,
28+
to_tuple,
2729
ValidationError,
2830
)
2931
from eth_utils.toolz import (
@@ -39,6 +41,7 @@
3941
EMPTY_UNCLE_HASH,
4042
)
4143
from eth.exceptions import HeaderNotFound
44+
from eth.rlp.blocks import BaseBlock
4245
from eth.rlp.headers import BlockHeader
4346
from eth.rlp.receipts import Receipt
4447
from eth.rlp.transactions import BaseTransaction
@@ -963,12 +966,16 @@ def __init__(self,
963966
)
964967
self._block_importer = block_importer
965968

969+
# Track if any headers have been received yet
970+
self._got_first_header = asyncio.Event()
971+
966972
async def _run(self) -> None:
967973
head = await self.wait(self.db.coro_get_canonical_head())
968974
self._block_import_tracker.set_finished_dependency(head)
969975
self.run_daemon_task(self._launch_prerequisite_tasks())
970976
self.run_daemon_task(self._assign_body_download_to_peers())
971977
self.run_daemon_task(self._import_ready_blocks())
978+
self.run_daemon_task(self._display_stats())
972979
await super()._run()
973980

974981
def register_peer(self, peer: BasePeer) -> None:
@@ -1045,21 +1052,9 @@ async def _import_blocks(self, headers: Tuple[BlockHeader, ...]) -> None:
10451052
10461053
:param headers: headers that have the block bodies downloaded
10471054
"""
1048-
for header in headers:
1049-
vm_class = self.chain.get_vm_class(header)
1050-
block_class = vm_class.get_block_class()
1055+
unimported_blocks = self._headers_to_blocks(headers)
10511056

1052-
if _is_body_empty(header):
1053-
transactions: List[BaseTransaction] = []
1054-
uncles: List[BlockHeader] = []
1055-
else:
1056-
body = self._pending_bodies.pop(header)
1057-
tx_class = block_class.get_transaction_class()
1058-
transactions = [tx_class.from_base_transaction(tx)
1059-
for tx in body.transactions]
1060-
uncles = body.uncles
1061-
1062-
block = block_class(header, transactions, uncles)
1057+
for block in unimported_blocks:
10631058
timer = Timer()
10641059
_, new_canonical_blocks, old_canonical_blocks = await self.wait(
10651060
self._block_importer.import_block(block)
@@ -1068,24 +1063,57 @@ async def _import_blocks(self, headers: Tuple[BlockHeader, ...]) -> None:
10681063
if new_canonical_blocks == (block,):
10691064
# simple import of a single new block.
10701065
self.logger.info("Imported block %d (%d txs) in %.2f seconds",
1071-
block.number, len(transactions), timer.elapsed)
1066+
block.number, len(block.transactions), timer.elapsed)
10721067
elif not new_canonical_blocks:
10731068
# imported block from a fork.
10741069
self.logger.info("Imported non-canonical block %d (%d txs) in %.2f seconds",
1075-
block.number, len(transactions), timer.elapsed)
1070+
block.number, len(block.transactions), timer.elapsed)
10761071
elif old_canonical_blocks:
10771072
self.logger.info(
10781073
"Chain Reorganization: Imported block %d (%d txs) in %.2f "
10791074
"seconds, %d blocks discarded and %d new canonical blocks added",
10801075
block.number,
1081-
len(transactions),
1076+
len(block.transactions),
10821077
timer.elapsed,
10831078
len(old_canonical_blocks),
10841079
len(new_canonical_blocks),
10851080
)
10861081
else:
10871082
raise Exception("Invariant: unreachable code path")
10881083

1084+
@to_tuple
1085+
def _headers_to_blocks(self, headers: Iterable[BlockHeader]) -> Iterable[BaseBlock]:
1086+
for header in headers:
1087+
vm_class = self.chain.get_vm_class(header)
1088+
block_class = vm_class.get_block_class()
1089+
1090+
if _is_body_empty(header):
1091+
transactions: List[BaseTransaction] = []
1092+
uncles: List[BlockHeader] = []
1093+
else:
1094+
body = self._pending_bodies.pop(header)
1095+
tx_class = block_class.get_transaction_class()
1096+
transactions = [tx_class.from_base_transaction(tx)
1097+
for tx in body.transactions]
1098+
uncles = body.uncles
1099+
1100+
yield block_class(header, transactions, uncles)
1101+
1102+
async def _display_stats(self) -> None:
1103+
self.logger.debug("Regular sync waiting for first header to arrive")
1104+
await self.wait(self._got_first_header.wait())
1105+
self.logger.debug("Regular sync first header arrived")
1106+
1107+
while self.is_operational:
1108+
await self.sleep(5)
1109+
self.logger.debug(
1110+
"(in progress, queued, max size) of bodies, receipts: %r. Write capacity? %s",
1111+
[(q.num_in_progress(), len(q), q._maxsize) for q in (
1112+
self._block_body_tasks,
1113+
)],
1114+
"yes" if self._db_buffer_capacity.is_set() else "no",
1115+
)
1116+
10891117

10901118
def _is_body_empty(header: BlockHeader) -> bool:
10911119
return header.transaction_root == BLANK_ROOT_HASH and header.uncles_hash == EMPTY_UNCLE_HASH

trinity/sync/full/state.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def __init__(self,
8686
self.root_hash = root_hash
8787
# We use a LevelDB instance for the nodes cache because a full state download, if run
8888
# uninterrupted will visit more than 180M nodes, making an in-memory cache unfeasible.
89-
self._nodes_cache_dir = tempfile.TemporaryDirectory(prefix="pyevm-state-sync-cache")
89+
self._nodes_cache_dir = tempfile.TemporaryDirectory(prefix="trinity-state-sync-cache")
9090

9191
# Allow the LevelDB instance to consume half of the entire file descriptor limit that
9292
# the OS permits. Let the other half be reserved for other db access, networking etc.

0 commit comments

Comments
 (0)