Skip to content

Commit 202fc1c

Browse files
committed
reset(p2p) - Update 1613 to 1614, without check entrypoints
1 parent 735f4e0 commit 202fc1c

File tree

11 files changed

+738
-30
lines changed

11 files changed

+738
-30
lines changed

hathor/p2p/connection_slot.py

Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
# Copyright 2021 Hathor Labs
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from collections import deque
16+
from dataclasses import dataclass
17+
18+
from typing_extensions import assert_never
19+
20+
from hathor.conf.settings import HathorSettings
21+
from hathor.p2p.protocol import HathorProtocol, PeerEndpoint
22+
23+
24+
@dataclass
25+
class ConnectionAllowed:
26+
confirmation: str
27+
28+
29+
@dataclass
30+
class ConnectionRejected:
31+
reason: str
32+
33+
34+
@dataclass
35+
class ConnectionRemoved:
36+
reason: str
37+
entrypoint: PeerEndpoint | None
38+
39+
40+
@dataclass
41+
class ConnectionNotRemoved:
42+
reason: str
43+
44+
45+
@dataclass
46+
class LockSlot:
47+
"""Struct for reserving a spot in the check_ep slot for a specific entrypoint.
48+
This is done so to avoid that a connection made with an entrypoint popped from the queue
49+
loses its spot in the slot to another arriving connection which cuts the line.
50+
51+
We reserve on place on the slot spefically for this entrypoint.
52+
53+
If, however, many attempts are made and the correct entrypoint connection has not
54+
arrived still, the reserve is unlocked by the increase in counter. """
55+
56+
is_spot_reserved: bool
57+
key_entrypoint: PeerEndpoint | None
58+
attempts: int
59+
attemp_limit: int = 3
60+
61+
62+
AddToSlotResult = ConnectionAllowed | ConnectionRejected
63+
RemoveFromSlotResult = ConnectionRemoved | ConnectionNotRemoved
64+
65+
66+
class ConnectionSlots:
67+
""" Class of a connection pool slot - outgoing, incoming, discovered connections. """
68+
connection_slot: set[HathorProtocol]
69+
type: HathorProtocol.ConnectionType
70+
max_slot_connections: int
71+
72+
def __init__(self, type: HathorProtocol.ConnectionType, max_connections: int):
73+
74+
if max_connections <= 0:
75+
raise ValueError("Slot max number must allow at least one connection")
76+
77+
self.type = type
78+
self.connection_slot = set()
79+
self.max_slot_connections = max_connections
80+
81+
def add_connection(self, protocol: HathorProtocol) -> AddToSlotResult:
82+
""" Adds connection protocol to the slot. Checks whether the slot is full or not. If full,
83+
disconnects the protocol. If the type is 'check_entrypoints', the returns peers of it
84+
may go to a queue."""
85+
86+
assert self.type == protocol.connection_type
87+
88+
if protocol in self.connection_slot:
89+
return ConnectionRejected("Protocol already in Slot.")
90+
91+
if self.is_full():
92+
return ConnectionRejected(f"Slot {self.type} is full")
93+
94+
self.connection_slot.add(protocol)
95+
96+
return ConnectionAllowed(f"Added to slot {self.type}.")
97+
98+
def remove_connection(self, protocol: HathorProtocol) -> RemoveFromSlotResult:
99+
""" Removes from given instance the protocol passed. Returns protocol from queue
100+
when disconnection leads to free space in slot."""
101+
102+
# Discard does nothing if protocol not in connection_slot.
103+
self.connection_slot.discard(protocol)
104+
105+
def is_full(self) -> bool:
106+
return len(self.connection_slot) >= self.max_slot_connections
107+
108+
def is_in_slot(self, protocol: HathorSettings) -> bool:
109+
return protocol in self.connection_slot
110+
111+
112+
@dataclass
113+
class SlotsManagerSettings:
114+
max_outgoing: int
115+
max_incoming: int
116+
max_bootstrap: int
117+
max_check_ep: int
118+
119+
120+
class SlotsManager:
121+
"""Manager of slot connections - selects the slot to which must we send the
122+
arriving protocol. Three protocol slots: OUTGOING, INCOMING, DISCOVERED.
123+
"""
124+
outgoing_slot: ConnectionSlots
125+
incoming_slot: ConnectionSlots
126+
bootstrap_slot: ConnectionSlots
127+
check_ep_slot: ConnectionSlots
128+
entrypoints_queue: deque[PeerEndpoint | None]
129+
seen_entrypoints: set[PeerEndpoint | None]
130+
untrustworthy_entrypoints: set[PeerEndpoint | None]
131+
spot_locked: LockSlot
132+
133+
types_allowed: dict[str, HathorProtocol.ConnectionType] = {
134+
'outgoing': HathorProtocol.ConnectionType.OUTGOING,
135+
'incoming': HathorProtocol.ConnectionType.INCOMING,
136+
'bootstrap': HathorProtocol.ConnectionType.BOOTSTRAP,
137+
'check_ep': HathorProtocol.ConnectionType.CHECK_ENTRYPOINTS,
138+
}
139+
140+
states_allowed: dict[str, HathorProtocol.ConnectionState] = {
141+
'created': HathorProtocol.ConnectionState.CREATED,
142+
'connecting': HathorProtocol.ConnectionState.CONNECTING,
143+
'ready': HathorProtocol.ConnectionState.READY,
144+
}
145+
146+
def __init__(self, settings: SlotsManagerSettings) -> None:
147+
types = self.types_allowed
148+
self.outgoing_slot = ConnectionSlots(types['outgoing'], settings.max_outgoing)
149+
self.incoming_slot = ConnectionSlots(types['incoming'], settings.max_incoming)
150+
self.bootstrap_slot = ConnectionSlots(types['bootstrap'], settings.max_bootstrap)
151+
self.check_ep_slot = ConnectionSlots(types['check_ep'], settings.max_check_ep)
152+
self.entrypoints_queue = deque()
153+
self.seen_entrypoints = set()
154+
self.untrustworthy_entrypoints = set()
155+
self.spot_locked = LockSlot(is_spot_reserved=False, key_entrypoint=None, attempts=0)
156+
157+
def add_to_slot(self, protocol: HathorProtocol) -> AddToSlotResult:
158+
"""Add received protocol to one of the slots.
159+
If slot is full, protocol is not added. """
160+
161+
conn_type = protocol.connection_type
162+
types = self.types_allowed
163+
164+
assert conn_type in types
165+
166+
slot: ConnectionSlots | None = None
167+
match conn_type:
168+
case HathorProtocol.ConnectionType.OUTGOING:
169+
slot = self.outgoing_slot
170+
if slot.is_full():
171+
protocol.connection_type = types['check_ep']
172+
return self.add_to_slot(protocol)
173+
case HathorProtocol.ConnectionType.INCOMING:
174+
slot = self.incoming_slot
175+
case HathorProtocol.ConnectionType.BOOTSTRAP:
176+
slot = self.bootstrap_slot
177+
case HathorProtocol.ConnectionType.CHECK_ENTRYPOINTS:
178+
slot = self.check_ep_slot
179+
locked = self.spot_locked.is_spot_reserved
180+
if locked:
181+
self.spot_locked.attempts += 1
182+
unlocked = self.unlock_the_spot(slot, protocol.entrypoint)
183+
if not unlocked:
184+
return ConnectionRejected('Check Entrypoints Slot is locked.')
185+
186+
case _:
187+
assert_never()
188+
189+
if self.should_queue_entrypoint(slot):
190+
self.put_on_queue(protocol)
191+
192+
status = slot.add_connection(protocol)
193+
194+
return status
195+
196+
def remove_from_slot(self, protocol: HathorProtocol) -> RemoveFromSlotResult:
197+
""" Removes protocol from slot of same type.
198+
If OUTGOING, INCOMING or BOOTSTRAP, simply remove from slot and disconnect.
199+
Should be called by manager when disconnecting a protocol."""
200+
201+
conn_type = protocol.connection_type
202+
assert conn_type in self.types_allowed
203+
204+
slot: ConnectionSlots | None = None
205+
match conn_type:
206+
case HathorProtocol.ConnectionType.OUTGOING:
207+
slot = self.outgoing_slot
208+
case HathorProtocol.ConnectionType.INCOMING:
209+
slot = self.incoming_slot
210+
case HathorProtocol.ConnectionType.BOOTSTRAP:
211+
slot = self.bootstrap_slot
212+
case HathorProtocol.ConnectionType.CHECK_ENTRYPOINTS:
213+
slot = self.check_ep_slot
214+
case _:
215+
assert_never()
216+
217+
if protocol not in slot.connection_slot:
218+
return ConnectionNotRemoved(reason='Protocol not in slot.')
219+
220+
types = self.types_allowed
221+
if slot.type != types['check_ep']:
222+
slot.remove_connection(protocol)
223+
return ConnectionRemoved(reason=f'Connection on slot {slot.type} removed.', entrypoint=None)
224+
225+
# From now on, we're dealing solely with the check_entrypoints slot.
226+
227+
entrypoint = protocol.entrypoint
228+
connection_state = protocol.connection_state
229+
states = self.states_allowed
230+
231+
# If disconnected due to a time-out, we don't trust the entrypoint.
232+
if connection_state != states['ready']:
233+
self.untrustworthy_entrypoints.add(entrypoint)
234+
235+
# Check if the protocol has its entrypoint being one we already saw before.
236+
# If so, grab the entrypoints and queue them.
237+
238+
if not self.has_been_seen(entrypoint) and connection_state == states['ready']:
239+
peer_entrypoints = protocol.peer.info.entrypoints
240+
241+
for peer_entrypoint in peer_entrypoints:
242+
if not self.has_been_seen(peer_entrypoint):
243+
self.put_on_queue(protocol)
244+
245+
self.seen_entrypoints.add(entrypoint)
246+
slot.remove_connection(protocol)
247+
248+
new_entrypoint = self.entrypoints_queue.pop()
249+
250+
if self.should_lock_the_spot(slot, new_entrypoint):
251+
self.lock_the_spot(slot, new_entrypoint)
252+
253+
return ConnectionRemoved(reason=f'Connection on slot {slot.type} removed', entrypoint=new_entrypoint)
254+
255+
def should_queue_entrypoint(self, slot: ConnectionSlots) -> bool:
256+
"""See if the protocol should have its entrypoint thrown into the queue."""
257+
types = self.types_allowed
258+
conn_type = slot.type
259+
locked = self.spot_locked.is_spot_reserved
260+
slot_closed = slot.is_full() or locked
261+
262+
return slot_closed and conn_type == types['check_ep']
263+
264+
def put_on_queue(self, protocol: HathorProtocol) -> None:
265+
"""Put on queue the entrypoint of the protocol, for later connection attempt."""
266+
entrypoint = protocol.entrypoint
267+
queue = self.entrypoints_queue
268+
269+
queue.appendleft(entrypoint)
270+
271+
def has_been_seen(self, entrypoint: PeerEndpoint) -> bool:
272+
"""If an entrypoint has been seen before, regardless of being considered trustworthy."""
273+
274+
return entrypoint in self.seen_entrypoints
275+
276+
def should_lock_the_spot(self, slot: ConnectionSlots, entrypoint: PeerEndpoint | None) -> bool:
277+
""" Reserve one spot in the slot for a pending connection.
278+
When pulling an entrypoint from the queue, we create a protocol
279+
with that entrypoint and connect to it. Eventually the protocol
280+
will attempt to connect, but if some other protocol takes its
281+
place, we'll not be able to check the entrypoint we dequeued.
282+
283+
For this reason, we lock the spot when necessary. """
284+
285+
types = self.types_allowed
286+
max_length = slot.max_slot_connections
287+
288+
if slot.type != types['check_ep']:
289+
return False
290+
291+
if len(slot.connection_slot) != max_length - 1:
292+
return False
293+
294+
if entrypoint is None:
295+
return False
296+
297+
return True
298+
299+
def lock_the_spot(self, slot: ConnectionSlots, entrypoint: PeerEndpoint) -> None:
300+
""" Guarantee a reserved spot for a protocol in the check entrypoints slot.
301+
This is done so we can connect to the entrypoint we popped from the queue, as
302+
amidst the connection attempt one may try to connect as well. """
303+
if not self.should_lock_the_spot(slot):
304+
return
305+
306+
assert entrypoint is not None
307+
308+
self.spot_locked.is_spot_reserved = True
309+
self.spot_locked.key_entrypoint = entrypoint
310+
self.spot_locked.attempts = 0
311+
312+
def unlock_the_spot(self, slot: ConnectionSlots, entrypoint: PeerEndpoint) -> bool:
313+
""" Called if the check_ep slot reserves a spot for an expected protocol, and we wish to attempt to unlock it.
314+
If the entrypoint provided matches the entrypoint we previously popped from the queue, we can unlock the slot.
315+
316+
If, however, after some attempts, the counter reaches the limit, we can unlock it regardless."""
317+
assert slot.type == self.types_allowed['check_ep']
318+
assert entrypoint is not None
319+
320+
lockspot = self.spot_locked
321+
counter = lockspot.attempts
322+
limit = lockspot.attemp_limit
323+
324+
if lockspot.key_entrypoint != entrypoint and counter < limit:
325+
return False
326+
327+
lockspot.is_spot_reserved = False
328+
lockspot.key_entrypoint = None
329+
lockspot.attempts = 0
330+
331+
return True
332+
333+
def slot_number(self, slot: ConnectionSlots) -> int:
334+
return len(slot.connection_slot)
335+
336+
def slot_size(self, slot: ConnectionSlots) -> int:
337+
return slot.max_slot_connections
338+
339+
340+
"Still needs:"
341+
"1. Connect to entrypoints in queue"
342+
"2. When connection arrives, it outgoing is not full, it will become an outgoing connection."
343+
"3. Check perfectly the dequeuing mechanism."

hathor/p2p/factory.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525

2626
class _HathorLineReceiverFactory(ABC, protocol.Factory):
27-
inbound: bool
27+
connection_type: HathorLineReceiver.ConnectionType
2828

2929
def __init__(
3030
self,
@@ -45,7 +45,7 @@ def buildProtocol(self, addr: IAddress) -> HathorLineReceiver:
4545
my_peer=self.my_peer,
4646
p2p_manager=self.p2p_manager,
4747
use_ssl=self.use_ssl,
48-
inbound=self.inbound,
48+
connection_type=self.connection_type,
4949
settings=self._settings
5050
)
5151
p.factory = self
@@ -55,10 +55,18 @@ def buildProtocol(self, addr: IAddress) -> HathorLineReceiver:
5555
class HathorServerFactory(_HathorLineReceiverFactory, protocol.ServerFactory):
5656
""" HathorServerFactory is used to generate HathorProtocol objects when a new connection arrives.
5757
"""
58-
inbound = True
58+
connection_type = HathorLineReceiver.ConnectionType.INCOMING
5959

6060

6161
class HathorClientFactory(_HathorLineReceiverFactory, protocol.ClientFactory):
6262
""" HathorClientFactory is used to generate HathorProtocol objects when we connected to another peer.
6363
"""
64-
inbound = False
64+
connection_type = HathorLineReceiver.ConnectionType.OUTGOING
65+
66+
67+
class HathorDiscoveredFactory(_HathorLineReceiverFactory, protocol.ClientFactory):
68+
"""
69+
HathorDiscoveredFactory is the same as a HathorClientFactory, but the type of connection is set to
70+
discovered, for connection pool slotting.
71+
"""
72+
connection_type = HathorLineReceiver.ConnectionType.BOOTSTRAP

0 commit comments

Comments
 (0)