Skip to content

Commit 6dbecb1

Browse files
fixed quic reconnect flow and added test.
added readme.md link to pypi
1 parent 99c3106 commit 6dbecb1

File tree

5 files changed

+91
-17
lines changed

5 files changed

+91
-17
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ Python implementation of [RSocket](http://rsocket.io)
44

55
# Installation
66

7-
The pypi stable package (version 0.2) is very old (and barely implements anything).
7+
The [pypi](https://pypi.org/project/rsocket/) stable package (version 0.2) is very old (and barely implements anything).
88

99
Currently, the pre-release package can be installed using:
1010

rsocket/transports/abstract_messaging.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ def __init__(self):
1212
async def next_frame_generator(self):
1313
frame = await self._incoming_frame_queue.get()
1414

15+
self._incoming_frame_queue.task_done()
16+
1517
if isinstance(frame, Exception):
1618
raise frame
1719

rsocket/transports/aiohttp_websocket.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async def handle_incoming_ws_messages(self):
6262
async for frame in self._frame_parser.receive_data(msg.data, 0):
6363
self._incoming_frame_queue.put_nowait(frame)
6464
except asyncio.CancelledError:
65-
pass
65+
logger().debug('Asyncio task canceled: incoming_data_listener')
6666
except Exception:
6767
self._incoming_frame_queue.put_nowait(RSocketTransportError())
6868

rsocket/transports/aioquic_transport.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33

44
from aioquic.asyncio import QuicConnectionProtocol, connect, serve
55
from aioquic.quic.configuration import QuicConfiguration
6-
from aioquic.quic.events import QuicEvent, StreamDataReceived
6+
from aioquic.quic.events import QuicEvent, StreamDataReceived, ConnectionTerminated
77

8+
from rsocket.exceptions import RSocketTransportError
89
from rsocket.frame import Frame
910
from rsocket.helpers import wrap_transport_exception
1011
from rsocket.logger import logger
@@ -67,7 +68,12 @@ async def query(self, frame: Frame) -> None:
6768
self.transmit()
6869

6970
def quic_event_received(self, event: QuicEvent) -> None:
70-
if isinstance(event, StreamDataReceived):
71+
logger().debug('Quic event received: %s', event)
72+
73+
if isinstance(event, ConnectionTerminated):
74+
self.frame_queue.put_nowait(RSocketTransportError())
75+
76+
elif isinstance(event, StreamDataReceived):
7177
self.frame_queue.put_nowait(event.data)
7278

7379

@@ -87,10 +93,17 @@ async def incoming_data_listener(self):
8793
while True:
8894
data = await self._incoming_bytes_queue.get()
8995

90-
async for frame in self._frame_parser.receive_data(data, 0):
91-
self._incoming_frame_queue.put_nowait(frame)
96+
if isinstance(data, Exception):
97+
self._incoming_frame_queue.put_nowait(data)
98+
else:
99+
async for frame in self._frame_parser.receive_data(data, 0):
100+
self._incoming_frame_queue.put_nowait(frame)
101+
102+
self._incoming_bytes_queue.task_done()
92103
except asyncio.CancelledError:
93104
logger().debug('Asyncio task canceled: incoming_data_listener')
105+
except Exception:
106+
self._incoming_frame_queue.put_nowait(RSocketTransportError())
94107

95108
async def close(self):
96109
self._listener.cancel()

tests/rsocket/test_connection_lost.py

Lines changed: 70 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
import pytest
88
from aiohttp.test_utils import RawTestServer
9+
from aioquic.quic.configuration import QuicConfiguration
910
from asyncstdlib import sync
11+
from cryptography.hazmat.primitives import serialization
1012

1113
from reactivestreams.publisher import Publisher
1214
from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket
@@ -20,6 +22,7 @@
2022
from rsocket.rsocket_server import RSocketServer
2123
from rsocket.streams.stream_from_async_generator import StreamFromAsyncGenerator
2224
from rsocket.transports.aiohttp_websocket import websocket_handler_factory, TransportAioHttpClient
25+
from rsocket.transports.aioquic_transport import rsocket_connect, rsocket_serve
2326
from rsocket.transports.tcp import TransportTCP
2427
from rsocket.transports.transport import Transport
2528
from tests.rsocket.helpers import future_from_payload, IdentifiedHandlerFactory, \
@@ -107,7 +110,7 @@ async def transport_provider():
107110
service.close()
108111

109112

110-
class FailingTransportTCP(Transport):
113+
class FailingTransport(Transport):
111114

112115
async def connect(self):
113116
raise Exception
@@ -152,7 +155,7 @@ async def transport_provider():
152155
client_connection = await asyncio.open_connection(host, port)
153156
yield TransportTCP(*client_connection)
154157

155-
yield FailingTransportTCP()
158+
yield FailingTransport()
156159

157160
client_connection = await asyncio.open_connection(host, port)
158161
yield TransportTCP(*client_connection)
@@ -198,7 +201,7 @@ async def on_connection_lost(self, rsocket, exception: Exception):
198201
await rsocket.reconnect()
199202

200203

201-
async def start_tcp_service(waiter: asyncio.Event, container, port: int):
204+
async def start_tcp_service(waiter: asyncio.Event, container, port: int, generate_test_certificates):
202205
index_iterator = iter(range(1, 3))
203206

204207
def session(*connection):
@@ -213,13 +216,13 @@ def session(*connection):
213216
return sync(service.close)
214217

215218

216-
async def start_tcp_client(port: int) -> RSocketClient:
219+
async def start_tcp_client(port: int, generate_test_certificates) -> RSocketClient:
217220
async def transport_provider():
218221
try:
219222
client_connection = await asyncio.open_connection('localhost', port)
220223
yield TransportTCP(*client_connection)
221224

222-
yield FailingTransportTCP()
225+
yield FailingTransport()
223226

224227
client_connection = await asyncio.open_connection('localhost', port)
225228
yield TransportTCP(*client_connection)
@@ -230,7 +233,7 @@ async def transport_provider():
230233
return RSocketClient(transport_provider(), handler_factory=ClientHandler)
231234

232235

233-
async def start_websocket_service(waiter: asyncio.Event, container, port: int):
236+
async def start_websocket_service(waiter: asyncio.Event, container, port: int, generate_test_certificates):
234237
index_iterator = iter(range(1, 3))
235238

236239
def handler_factory(*args, **kwargs):
@@ -250,14 +253,14 @@ def on_server_create(server):
250253
return server.close
251254

252255

253-
async def start_websocket_client(port: int) -> RSocketClient:
256+
async def start_websocket_client(port: int, generate_test_certificates) -> RSocketClient:
254257
url = 'http://localhost:{}'.format(port)
255258

256259
async def transport_provider():
257260
try:
258261
yield TransportAioHttpClient(url)
259262

260-
yield FailingTransportTCP()
263+
yield FailingTransport()
261264

262265
yield TransportAioHttpClient(url)
263266
except Exception:
@@ -267,20 +270,76 @@ async def transport_provider():
267270
return RSocketClient(transport_provider(), handler_factory=ClientHandler)
268271

269272

273+
async def start_quic_service(waiter: asyncio.Event, container, port: int, generate_test_certificates):
274+
index_iterator = iter(range(1, 3))
275+
certificate, private_key = generate_test_certificates
276+
server_configuration = QuicConfiguration(
277+
certificate=certificate,
278+
private_key=private_key,
279+
is_client=False
280+
)
281+
282+
def handler_factory(*args, **kwargs):
283+
return IdentifiedHandlerFactory(
284+
next(index_iterator),
285+
ServerHandler,
286+
delay=timedelta(seconds=1)).factory(*args, **kwargs)
287+
288+
def on_server_create(server):
289+
container.server = server
290+
container.transport = server._transport
291+
waiter.set()
292+
293+
quic_server = await rsocket_serve(host='localhost',
294+
port=port,
295+
configuration=server_configuration,
296+
on_server_create=on_server_create,
297+
handler_factory=handler_factory)
298+
return sync(quic_server.close)
299+
300+
301+
async def start_quic_client(port: int, generate_test_certificates) -> RSocketClient:
302+
certificate, private_key = generate_test_certificates
303+
client_configuration = QuicConfiguration(
304+
is_client=True
305+
)
306+
ca_data = certificate.public_bytes(serialization.Encoding.PEM)
307+
client_configuration.load_verify_locations(cadata=ca_data, cafile=None)
308+
309+
async def transport_provider():
310+
try:
311+
async with rsocket_connect('localhost', port,
312+
configuration=client_configuration) as transport:
313+
yield transport
314+
315+
yield FailingTransport()
316+
317+
async with rsocket_connect('localhost', port,
318+
configuration=client_configuration) as transport:
319+
yield transport
320+
except Exception:
321+
logger().error('Client connection error', exc_info=True)
322+
raise
323+
324+
return RSocketClient(transport_provider(), handler_factory=ClientHandler)
325+
326+
270327
@pytest.mark.allow_error_log(regex_filter='Connection error')
271328
@pytest.mark.parametrize(
272329
'start_service, start_client',
273330
(
274331
(start_tcp_service, start_tcp_client),
275332
(start_websocket_service, start_websocket_client),
333+
(start_quic_service, start_quic_client),
276334
)
277335
)
278-
async def test_connection_failure_during_stream(unused_tcp_port, start_service, start_client):
336+
async def test_connection_failure_during_stream(unused_tcp_port, generate_test_certificates,
337+
start_service, start_client):
279338
server_container = ServerContainer()
280339
wait_for_server = Event()
281340

282-
service_closer = await start_service(wait_for_server, server_container, unused_tcp_port)
283-
client = await start_client(unused_tcp_port)
341+
service_closer = await start_service(wait_for_server, server_container, unused_tcp_port, generate_test_certificates)
342+
client = await start_client(unused_tcp_port, generate_test_certificates)
284343

285344
try:
286345
async with AwaitableRSocket(client) as a_client:

0 commit comments

Comments
 (0)