Skip to content

Commit 1a0582b

Browse files
Nkovaturientacul71
andauthored
Fix QUIC stream direction, CID routing/promotion races (libp2p#1096)
* fix(quic): promotion/id routing, stream direction, and skip flaky concurrent streams test under xdist * test: make tests/core/transport/quic/test_integration.py::test_yamux_stress_ping 100% * Address PR review issues: fix error formatting, extract magic numbers, improve lock cleanup, add regression tests - Convert error messages from % formatting to f-strings in connection.py - Extract hard-coded 0.01 threshold to SLOW_NOTIFICATION_THRESHOLD_SECONDS constant - Add centralized _cleanup_promotion_lock() helper method in listener.py - Improve lock cleanup in _remove_connection, _remove_connection_by_object, and close() - Add regression tests for duplicate promotion prevention, handler invocation once-per-connection, and multiple CID routing under concurrent load - Fix test_connection_id_tracking_with_real_connection to compare by address instead of object identity - Add newsfragment for Issue libp2p#1081 --------- Co-authored-by: acul71 <[email protected]> Co-authored-by: acul71 <[email protected]>
1 parent 568e241 commit 1a0582b

File tree

8 files changed

+731
-232
lines changed

8 files changed

+731
-232
lines changed

libp2p/transport/quic/connection.py

Lines changed: 105 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from .stream import QUICStream, StreamDirection
3737

3838
if TYPE_CHECKING:
39+
from .listener import QUICListener
3940
from .security import QUICTLSConfigManager
4041
from .transport import QUICTransport
4142

@@ -73,6 +74,8 @@ def __init__(
7374
security_manager: Optional["QUICTLSConfigManager"] = None,
7475
resource_scope: Any | None = None,
7576
listener_socket: trio.socket.SocketType | None = None,
77+
listener: "QUICListener | None" = None,
78+
listener_connection_id: bytes | None = None,
7679
):
7780
"""
7881
Initialize QUIC connection with security integration.
@@ -88,18 +91,32 @@ def __init__(
8891
security_manager: Security manager for TLS/certificate handling
8992
resource_scope: Resource manager scope for tracking
9093
listener_socket: Socket of listener to transmit data
94+
listener: Owning listener for server-side connections (enables O(1)
95+
Connection ID registration).
96+
listener_connection_id: Listener's primary Connection ID for this
97+
connection (used as the base CID when registering new CIDs).
9198
9299
"""
93100
self._quic = quic_connection
94101
self._remote_addr = remote_addr
95102
self._remote_peer_id = remote_peer_id
96103
self._local_peer_id = local_peer_id
97104
self.peer_id = remote_peer_id or local_peer_id
98-
self._is_initiator = is_initiator
105+
# Derive initiator/client role from aioquic configuration when available.
106+
# Only trust `is_client` when it's a real bool
107+
# (mocks may return truthy objects).
108+
is_client = getattr(
109+
getattr(quic_connection, "configuration", None), "is_client", None
110+
)
111+
self._is_initiator = is_client if isinstance(is_client, bool) else is_initiator
99112
self._maddr = maddr
100113
self._transport = transport
101114
self._security_manager = security_manager
102115
self._resource_scope = resource_scope
116+
# Owning listener context (server-side). Used to register newly-issued
117+
# Connection IDs in O(1) for correct packet routing.
118+
self._listener: "QUICListener | None" = listener
119+
self._listener_connection_id: bytes | None = listener_connection_id
103120

104121
# Trio networking - socket may be provided by listener
105122
self._socket = listener_socket if listener_socket else None
@@ -187,6 +204,9 @@ def __init__(
187204
self.MAX_CONCURRENT_STREAMS = self._transport._config.MAX_CONCURRENT_STREAMS
188205
self.STREAM_OPEN_TIMEOUT = self._transport._config.STREAM_OPEN_TIMEOUT
189206

207+
# Performance monitoring thresholds
208+
self.SLOW_NOTIFICATION_THRESHOLD_SECONDS = 0.01 # 10ms
209+
190210
# Performance and monitoring
191211
self._connection_start_time = time.time()
192212
self._stats = {
@@ -209,6 +229,17 @@ def __init__(
209229
f"security: {self._security_manager is not None})"
210230
)
211231

232+
def set_listener_context(
233+
self, listener: "QUICListener | None", listener_connection_id: bytes | None
234+
) -> None:
235+
"""
236+
Set owning listener context for O(1) Connection ID registration.
237+
238+
For server-side connections, the listener is responsible for packet routing.
239+
"""
240+
self._listener = listener
241+
self._listener_connection_id = listener_connection_id
242+
212243
# Resource manager integration
213244
def set_resource_scope(self, scope: Any) -> None:
214245
"""
@@ -846,11 +877,19 @@ async def _accept_stream_impl(self) -> QUICStream:
846877
logger.debug(f"Accepted inbound stream {stream.stream_id}")
847878
return stream
848879

880+
# If the event is already set but the queue is empty, reset it to avoid
881+
# a busy loop that can starve the QUIC event loop under load.
882+
if self._stream_accept_event.is_set():
883+
self._stream_accept_event = trio.Event()
884+
continue
885+
886+
accept_event = self._stream_accept_event
887+
849888
if self._closed:
850889
raise MuxedConnUnavailable("Connection closed while accepting stream")
851890

852891
# Wait for new streams indefinitely
853-
await self._stream_accept_event.wait()
892+
await accept_event.wait()
854893

855894
raise QUICConnectionError("Error occurred while waiting to accept stream")
856895

@@ -984,8 +1023,37 @@ async def _handle_stream_data_batch(
9841023
await self._transmit()
9851024
continue
9861025
else:
1026+
# Common benign case: we closed and removed a locally-initiated
1027+
# stream wrapper, then received a late FIN-only event.
1028+
fin_only = True
1029+
for e in stream_events:
1030+
data = getattr(e, "data", b"")
1031+
end_stream = getattr(e, "end_stream", False)
1032+
if data or not end_stream:
1033+
fin_only = False
1034+
break
1035+
if fin_only:
1036+
logger.debug(
1037+
"Ignoring late FIN on closed outbound stream %s "
1038+
"(is_initiator=%s, quic.is_client=%s)",
1039+
stream_id,
1040+
self._is_initiator,
1041+
getattr(
1042+
getattr(self._quic, "configuration", None),
1043+
"is_client",
1044+
None,
1045+
),
1046+
)
1047+
continue
1048+
is_client = getattr(
1049+
getattr(self._quic, "configuration", None), "is_client", None
1050+
)
1051+
parity = "even" if stream_id % 2 == 0 else "odd"
9871052
logger.error(
988-
f"Unexpected outbound stream {stream_id} in data event"
1053+
f"Unexpected outbound stream {stream_id} in data event "
1054+
f"(parity={parity}, is_initiator={self._is_initiator}, "
1055+
f"quic.is_client={is_client}, streams={len(self._streams)}, "
1056+
f"cached={stream_id in self._stream_cache})"
9891057
)
9901058
continue
9911059

@@ -1147,6 +1215,19 @@ async def _notify_listener_of_new_connection_id(
11471215
"""
11481216
notification_start = time.time()
11491217
try:
1218+
if self._listener and self._listener_connection_id:
1219+
await self._listener._registry.add_connection_id(
1220+
new_connection_id, self._listener_connection_id, sequence
1221+
)
1222+
notification_duration = time.time() - notification_start
1223+
if notification_duration > self.SLOW_NOTIFICATION_THRESHOLD_SECONDS:
1224+
logger.debug(
1225+
f"Slow Connection ID notification: "
1226+
f"{notification_duration * 1000:.2f}ms "
1227+
f"for Connection ID {new_connection_id.hex()[:8]}"
1228+
)
1229+
return
1230+
11501231
if not self._transport:
11511232
return
11521233

@@ -1298,8 +1379,28 @@ async def _handle_stream_data(self, event: events.StreamDataReceived) -> None:
12981379
logger.debug(f"Creating new incoming stream {stream_id}")
12991380
stream = await self._create_inbound_stream(stream_id)
13001381
else:
1382+
if not event.data and event.end_stream:
1383+
logger.debug(
1384+
"Ignoring late FIN on closed outbound stream %s "
1385+
"(is_initiator=%s, quic.is_client=%s)",
1386+
stream_id,
1387+
self._is_initiator,
1388+
getattr(
1389+
getattr(self._quic, "configuration", None),
1390+
"is_client",
1391+
None,
1392+
),
1393+
)
1394+
return
1395+
is_client = getattr(
1396+
getattr(self._quic, "configuration", None), "is_client", None
1397+
)
1398+
parity = "even" if stream_id % 2 == 0 else "odd"
13011399
logger.error(
1302-
f"Unexpected outbound stream {stream_id} in data event"
1400+
f"Unexpected outbound stream {stream_id} in data event "
1401+
f"(parity={parity}, is_initiator={self._is_initiator}, "
1402+
f"quic.is_client={is_client}, streams={len(self._streams)}, "
1403+
f"cached={stream_id in self._stream_cache})"
13031404
)
13041405
return
13051406

0 commit comments

Comments
 (0)