Skip to content

Commit a532900

Browse files
authored
Fix self connection leak (#43)
* Fix self connection leak * Increase sleep to improve performance * Add load test for unfinished blocks * Fix flake8 lint
1 parent 55192f2 commit a532900

File tree

8 files changed

+186
-73
lines changed

8 files changed

+186
-73
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ pip-delete-this-directory.txt
4242

4343
# pyenv
4444
.python-version
45-
45+
.eggs
4646
.venv
4747

4848
# mypy

src/full_node.py

Lines changed: 47 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from src.util.ints import uint32, uint64
4444

4545
log = logging.getLogger(__name__)
46+
OutboundMessageGenerator = AsyncGenerator[OutboundMessage, None]
4647

4748

4849
class FullNode:
@@ -61,7 +62,7 @@ def _set_server(self, server: ChiaServer):
6162

6263
async def _send_tips_to_farmers(
6364
self, delivery: Delivery = Delivery.BROADCAST
64-
) -> AsyncGenerator[OutboundMessage, None]:
65+
) -> OutboundMessageGenerator:
6566
"""
6667
Sends all of the current heads to all farmer peers. Also sends the latest
6768
estimated proof of time rate, so farmer can calulate which proofs are good.
@@ -99,7 +100,7 @@ async def _send_tips_to_farmers(
99100

100101
async def _send_challenges_to_timelords(
101102
self, delivery: Delivery = Delivery.BROADCAST
102-
) -> AsyncGenerator[OutboundMessage, None]:
103+
) -> OutboundMessageGenerator:
103104
"""
104105
Sends all of the current heads (as well as Pos infos) to all timelord peers.
105106
"""
@@ -137,7 +138,7 @@ async def _send_challenges_to_timelords(
137138
delivery,
138139
)
139140

140-
async def _on_connect(self) -> AsyncGenerator[OutboundMessage, None]:
141+
async def _on_connect(self) -> OutboundMessageGenerator:
141142
"""
142143
Whenever we connect to another node, send them our current heads. Also send heads to farmers
143144
and challenges to timelords.
@@ -162,7 +163,7 @@ async def _on_connect(self) -> AsyncGenerator[OutboundMessage, None]:
162163
async for msg in self._send_tips_to_farmers(Delivery.RESPOND):
163164
yield msg
164165

165-
def _num_needed_peers(self):
166+
def _num_needed_peers(self) -> int:
166167
diff = self.config["target_peer_count"] - len(
167168
self.server.global_connections.get_full_node_connections()
168169
)
@@ -177,7 +178,7 @@ def _start_bg_tasks(self):
177178
introducer_peerinfo = PeerInfo(introducer["host"], introducer["port"])
178179

179180
async def introducer_client():
180-
async def on_connect():
181+
async def on_connect() -> OutboundMessageGenerator:
181182
msg = Message("request_peers", peer_protocol.RequestPeers())
182183
yield OutboundMessage(NodeType.INTRODUCER, msg, Delivery.RESPOND)
183184

@@ -196,7 +197,7 @@ async def on_connect():
196197
def _shutdown(self):
197198
self._shut_down = True
198199

199-
async def _sync(self):
200+
async def _sync(self) -> OutboundMessageGenerator:
200201
"""
201202
Performs a full sync of the blockchain.
202203
- Check which are the heaviest tips
@@ -222,13 +223,20 @@ async def _sync(self):
222223
Tuple[bytes32, FullBlock]
223224
] = await self.store.get_potential_tips_tuples()
224225
log.info(f"Have collected {len(potential_tips)} potential tips")
225-
for header_hash, block in potential_tips:
226-
if block.header_block.challenge is None:
227-
raise ValueError(f"Invalid tip block {block.header_hash} received")
228-
if block.header_block.challenge.total_weight > highest_weight:
229-
highest_weight = block.header_block.challenge.total_weight
230-
tip_block = block
231-
tip_height = block.header_block.challenge.height
226+
for header_hash, potential_tip_block in potential_tips:
227+
if potential_tip_block.header_block.challenge is None:
228+
raise ValueError(
229+
f"Invalid tip block {potential_tip_block.header_hash} received"
230+
)
231+
if (
232+
potential_tip_block.header_block.challenge.total_weight
233+
> highest_weight
234+
):
235+
highest_weight = (
236+
potential_tip_block.header_block.challenge.total_weight
237+
)
238+
tip_block = potential_tip_block
239+
tip_height = potential_tip_block.header_block.challenge.height
232240
if highest_weight <= max(
233241
[t.weight for t in self.blockchain.get_current_tips()]
234242
):
@@ -474,7 +482,9 @@ async def _sync(self):
474482
for height in range(height_checkpoint, end_height):
475483
if self._shut_down:
476484
return
477-
block = await self.store.get_potential_block(uint32(height))
485+
block: Optional[FullBlock] = await self.store.get_potential_block(
486+
uint32(height)
487+
)
478488
assert block is not None
479489
start = time.time()
480490
async with self.store.lock:
@@ -498,7 +508,7 @@ async def _sync(self):
498508
assert max([h.height for h in self.blockchain.get_current_tips()]) == tip_height
499509
log.info(f"Finished sync up to height {tip_height}")
500510

501-
async def _finish_sync(self):
511+
async def _finish_sync(self) -> OutboundMessageGenerator:
502512
"""
503513
Finalize sync by setting sync mode to False, clearing all sync information, and adding any final
504514
blocks that we have finalized recently.
@@ -525,7 +535,7 @@ async def _finish_sync(self):
525535
@api_request
526536
async def request_all_header_hashes(
527537
self, request: peer_protocol.RequestAllHeaderHashes
528-
) -> AsyncGenerator[OutboundMessage, None]:
538+
) -> OutboundMessageGenerator:
529539
try:
530540
header_hashes = self.blockchain.get_header_hashes(request.tip_header_hash)
531541
message = Message(
@@ -538,7 +548,7 @@ async def request_all_header_hashes(
538548
@api_request
539549
async def all_header_hashes(
540550
self, all_header_hashes: peer_protocol.AllHeaderHashes
541-
) -> AsyncGenerator[OutboundMessage, None]:
551+
) -> OutboundMessageGenerator:
542552
assert len(all_header_hashes.header_hashes) > 0
543553
async with self.store.lock:
544554
self.store.set_potential_hashes(all_header_hashes.header_hashes)
@@ -549,7 +559,7 @@ async def all_header_hashes(
549559
@api_request
550560
async def request_header_blocks(
551561
self, request: peer_protocol.RequestHeaderBlocks
552-
) -> AsyncGenerator[OutboundMessage, None]:
562+
) -> OutboundMessageGenerator:
553563
"""
554564
A peer requests a list of header blocks, by height. Used for syncing or light clients.
555565
"""
@@ -579,7 +589,7 @@ async def request_header_blocks(
579589
@api_request
580590
async def header_blocks(
581591
self, request: peer_protocol.HeaderBlocks
582-
) -> AsyncGenerator[OutboundMessage, None]:
592+
) -> OutboundMessageGenerator:
583593
"""
584594
Receive header blocks from a peer.
585595
"""
@@ -597,7 +607,7 @@ async def header_blocks(
597607
@api_request
598608
async def request_sync_blocks(
599609
self, request: peer_protocol.RequestSyncBlocks
600-
) -> AsyncGenerator[OutboundMessage, None]:
610+
) -> OutboundMessageGenerator:
601611
"""
602612
Responsd to a peers request for syncing blocks.
603613
"""
@@ -642,7 +652,7 @@ async def request_sync_blocks(
642652
@api_request
643653
async def sync_blocks(
644654
self, request: peer_protocol.SyncBlocks
645-
) -> AsyncGenerator[OutboundMessage, None]:
655+
) -> OutboundMessageGenerator:
646656
"""
647657
We have received the blocks that we needed for syncing. Add them to processing queue.
648658
"""
@@ -662,7 +672,7 @@ async def sync_blocks(
662672
@api_request
663673
async def request_header_hash(
664674
self, request: farmer_protocol.RequestHeaderHash
665-
) -> AsyncGenerator[OutboundMessage, None]:
675+
) -> OutboundMessageGenerator:
666676
"""
667677
Creates a block body and header, with the proof of space, coinbase, and fee targets provided
668678
by the farmer, and sends the hash of the header data back to the farmer.
@@ -747,7 +757,7 @@ async def request_header_hash(
747757
@api_request
748758
async def header_signature(
749759
self, header_signature: farmer_protocol.HeaderSignature
750-
) -> AsyncGenerator[OutboundMessage, None]:
760+
) -> OutboundMessageGenerator:
751761
"""
752762
Signature of header hash, by the harvester. This is enough to create an unfinished
753763
block, which only needs a Proof of Time to be finished. If the signature is valid,
@@ -783,7 +793,7 @@ async def header_signature(
783793
@api_request
784794
async def proof_of_time_finished(
785795
self, request: timelord_protocol.ProofOfTimeFinished
786-
) -> AsyncGenerator[OutboundMessage, None]:
796+
) -> OutboundMessageGenerator:
787797
"""
788798
A proof of time, received by a peer timelord. We can use this to complete a block,
789799
and call the block routine (which handles propagation and verification of blocks).
@@ -846,7 +856,7 @@ async def proof_of_time_finished(
846856
@api_request
847857
async def new_proof_of_time(
848858
self, new_proof_of_time: peer_protocol.NewProofOfTime
849-
) -> AsyncGenerator[OutboundMessage, None]:
859+
) -> OutboundMessageGenerator:
850860
"""
851861
A proof of time, received by a peer full node. If we have the rest of the block,
852862
we can complete it. Otherwise, we just verify and propagate the proof.
@@ -878,7 +888,7 @@ async def new_proof_of_time(
878888
@api_request
879889
async def unfinished_block(
880890
self, unfinished_block: peer_protocol.UnfinishedBlock
881-
) -> AsyncGenerator[OutboundMessage, None]:
891+
) -> OutboundMessageGenerator:
882892
"""
883893
We have received an unfinished block, either created by us, or from another peer.
884894
We can validate it and if it's a good block, propagate it to other peers and
@@ -930,7 +940,7 @@ async def unfinished_block(
930940
leader: Tuple[uint32, uint64] = self.store.get_unfinished_block_leader()
931941
if leader is None or unfinished_block.block.height > leader[0]:
932942
log.info(
933-
f"This is the first block at height {unfinished_block.block.height}, so propagate."
943+
f"This is the first unfinished block at height {unfinished_block.block.height}, so propagate."
934944
)
935945
# If this is the first block we see at this height, propagate
936946
self.store.set_unfinished_block_leader(
@@ -974,9 +984,7 @@ async def unfinished_block(
974984
)
975985

976986
@api_request
977-
async def block(
978-
self, block: peer_protocol.Block
979-
) -> AsyncGenerator[OutboundMessage, None]:
987+
async def block(self, block: peer_protocol.Block) -> OutboundMessageGenerator:
980988
"""
981989
Receive a full block from a peer full node (or ourselves).
982990
"""
@@ -1020,15 +1028,15 @@ async def block(
10201028
)
10211029
try:
10221030
# Performs sync, and catch exceptions so we don't close the connection
1023-
async for msg in self._sync():
1024-
yield msg
1031+
async for ret_msg in self._sync():
1032+
yield ret_msg
10251033
except asyncio.CancelledError:
10261034
log.warning("Syncing failed, CancelledError")
10271035
except BaseException as e:
10281036
log.warning(f"Error {type(e)}{e} with syncing")
10291037
finally:
1030-
async for msg in self._finish_sync():
1031-
yield msg
1038+
async for ret_msg in self._finish_sync():
1039+
yield ret_msg
10321040

10331041
elif block.block.height >= tip_height - 3:
10341042
log.info(
@@ -1062,7 +1070,7 @@ async def block(
10621070
ips_changed = True
10631071
if ips_changed:
10641072
rate_update = farmer_protocol.ProofOfTimeRate(next_vdf_ips)
1065-
log.error(f"Sending proof of time rate {next_vdf_ips}")
1073+
log.info(f"Sending proof of time rate {next_vdf_ips}")
10661074
yield OutboundMessage(
10671075
NodeType.FARMER,
10681076
Message("proof_of_time_rate", rate_update),
@@ -1123,8 +1131,8 @@ async def block(
11231131
FullBlock
11241132
] = await self.store.get_disconnected_block_by_prev(block.block.header_hash)
11251133
if next_block is not None:
1126-
async for msg in self.block(peer_protocol.Block(next_block)):
1127-
yield msg
1134+
async for ret_msg in self.block(peer_protocol.Block(next_block)):
1135+
yield ret_msg
11281136

11291137
async with self.store.lock:
11301138
# Removes all temporary data for old blocks
@@ -1137,7 +1145,7 @@ async def block(
11371145
@api_request
11381146
async def request_block(
11391147
self, request_block: peer_protocol.RequestBlock
1140-
) -> AsyncGenerator[OutboundMessage, None]:
1148+
) -> OutboundMessageGenerator:
11411149
block: Optional[FullBlock] = await self.store.get_block(
11421150
request_block.header_hash
11431151
)
@@ -1149,9 +1157,7 @@ async def request_block(
11491157
)
11501158

11511159
@api_request
1152-
async def peers(
1153-
self, request: peer_protocol.Peers
1154-
) -> AsyncGenerator[OutboundMessage, None]:
1160+
async def peers(self, request: peer_protocol.Peers) -> OutboundMessageGenerator:
11551161
conns = self.server.global_connections
11561162
for peer in request.peer_list:
11571163
conns.peers.add(peer)

src/introducer.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import asyncio
1414

1515
import logging
16+
1617
log = logging.getLogger(__name__)
1718

1819

@@ -31,27 +32,23 @@ async def request_peers(
3132
) -> AsyncGenerator[OutboundMessage, None]:
3233
max_peers = self.config["max_peers_to_send"]
3334
rawpeers = self.server.global_connections.peers.get_peers(
34-
max_peers*2, True, self.config["recent_peer_threshold"]
35+
max_peers * 2, True, self.config["recent_peer_threshold"]
3536
)
3637

3738
peers = []
3839

3940
for peer in rawpeers:
4041
if peer.get_hash() not in self.vetted:
4142
try:
42-
r, w = await asyncio.open_connection(
43-
peer.host, int(peer.port)
44-
)
43+
r, w = await asyncio.open_connection(peer.host, int(peer.port))
4544
w.close()
4645
except (
4746
ConnectionRefusedError,
4847
TimeoutError,
4948
OSError,
5049
asyncio.TimeoutError,
5150
) as e:
52-
log.warning(
53-
f"Could not vet {peer}. {type(e)}{str(e)}"
54-
)
51+
log.warning(f"Could not vet {peer}. {type(e)}{str(e)}")
5552
self.vetted[peer.get_hash()] = False
5653
continue
5754

src/server/connection.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ def __init__(
4747
self.on_connect = on_connect
4848

4949
# Connection metrics
50-
self.handshake_finished = False
5150
self.creation_type = time.time()
5251
self.bytes_read = 0
5352
self.bytes_written = 0

0 commit comments

Comments
 (0)