Skip to content

Commit c5ca3aa

Browse files
authored
Merge pull request #91 from rsocket/quic_transport
enable frame header for quic and http3 transports
2 parents beb3fee + 92fa3aa commit c5ca3aa

File tree

2 files changed

+13
-6
lines changed

2 files changed

+13
-6
lines changed

rsocket/transports/aioquic_transport.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from aioquic.quic.events import QuicEvent, StreamDataReceived, ConnectionTerminated
77

88
from rsocket.exceptions import RSocketTransportError
9-
from rsocket.frame import Frame
9+
from rsocket.frame import Frame, serialize_with_frame_size_header
1010
from rsocket.helpers import wrap_transport_exception, cancel_if_task_exists
1111
from rsocket.logger import logger
1212
from rsocket.rsocket_server import RSocketServer
@@ -63,7 +63,7 @@ def __init__(self, *args, **kwargs):
6363
self._stream_id = self._quic.get_next_available_stream_id()
6464

6565
async def query(self, frame: Frame) -> None:
66-
data = frame.serialize()
66+
data = serialize_with_frame_size_header(frame)
6767
self._quic.send_stream_data(self._stream_id, data, end_stream=False)
6868
self.transmit()
6969

@@ -103,7 +103,7 @@ async def incoming_data_listener(self):
103103
self._incoming_frame_queue.put_nowait(data)
104104
return
105105
else:
106-
async for frame in self._frame_parser.receive_data(data, 0):
106+
async for frame in self._frame_parser.receive_data(data):
107107
self._incoming_frame_queue.put_nowait(frame)
108108

109109
except asyncio.CancelledError:
@@ -116,3 +116,6 @@ async def close(self):
116116
self._quic_protocol.close()
117117

118118
await self._quic_protocol.wait_closed()
119+
120+
def requires_length_header(self) -> bool:
121+
return True

rsocket/transports/http3_transport.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from starlette.websockets import WebSocket, WebSocketDisconnect
1818

1919
from rsocket.exceptions import RSocketTransportError
20-
from rsocket.frame import Frame
20+
from rsocket.frame import Frame, serialize_with_frame_size_header
2121
from rsocket.helpers import wrap_transport_exception, cancel_if_task_exists
2222
from rsocket.logger import logger
2323
from rsocket.transports.abstract_messaging import AbstractMessagingTransport
@@ -159,7 +159,8 @@ def __init__(self, websocket: Union[WebSocket, ClientWebSocket]):
159159
async def send_frame(self, frame: Frame):
160160
with wrap_transport_exception():
161161
try:
162-
await self._websocket.send_bytes(frame.serialize())
162+
data = serialize_with_frame_size_header(frame)
163+
await self._websocket.send_bytes(data)
163164
except WebSocketDisconnect:
164165
self._disconnect_event.set()
165166

@@ -177,7 +178,7 @@ async def incoming_data_listener(self):
177178
self._disconnect_event.set()
178179
break
179180

180-
async for frame in self._frame_parser.receive_data(data, 0):
181+
async for frame in self._frame_parser.receive_data(data):
181182
self._incoming_frame_queue.put_nowait(frame)
182183

183184
except asyncio.CancelledError:
@@ -189,3 +190,6 @@ async def incoming_data_listener(self):
189190

190191
async def wait_for_disconnect(self):
191192
await self._disconnect_event.wait()
193+
194+
def requires_length_header(self) -> bool:
195+
return True

0 commit comments

Comments
 (0)