Skip to content

Commit c7afdf6

Browse files
committed
initial commit asyncwebsockets transport
1 parent 3bc542b commit c7afdf6

File tree

2 files changed

+58
-1
lines changed

2 files changed

+58
-1
lines changed

requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ pydantic==1.10.13
2323
Werkzeug==3.0.0
2424
graphql-core==3.2.3
2525
gql==3.4.1
26-
websockets==11.0.3
26+
websockets==11.0.3
27+
asyncwebsockets==0.9.4
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import asyncio
2+
from contextlib import asynccontextmanager
3+
4+
from rsocket.exceptions import RSocketTransportError
5+
from rsocket.frame import Frame
6+
from rsocket.helpers import wrap_transport_exception, single_transport_provider
7+
from rsocket.logger import logger
8+
from rsocket.rsocket_client import RSocketClient
9+
from rsocket.transports.abstract_messaging import AbstractMessagingTransport
10+
11+
12+
@asynccontextmanager
13+
async def websocket_client(url: str,
14+
**kwargs) -> RSocketClient:
15+
"""
16+
Helper method to instantiate an RSocket client using a websocket url over asyncwebsockets client.
17+
"""
18+
from asyncwebsockets import open_websocket
19+
async with open_websocket(url) as websocket:
20+
async with RSocketClient(single_transport_provider(TransportAsyncWebsocketsClient(websocket)),
21+
**kwargs) as client:
22+
yield client
23+
24+
25+
class TransportAsyncWebsocketsClient(AbstractMessagingTransport):
26+
"""
27+
RSocket transport over client side asyncwebsockets.
28+
"""
29+
30+
def __init__(self, websocket):
31+
super().__init__()
32+
self._ws = websocket
33+
self._message_handler = None
34+
35+
async def connect(self):
36+
self._message_handler = asyncio.create_task(self.handle_incoming_ws_messages())
37+
38+
async def handle_incoming_ws_messages(self):
39+
from wsproto.events import BytesMessage
40+
try:
41+
async for message in self._ws:
42+
if isinstance(message, BytesMessage):
43+
async for frame in self._frame_parser.receive_data(message.data, 0):
44+
self._incoming_frame_queue.put_nowait(frame)
45+
except asyncio.CancelledError:
46+
logger().debug('Asyncio task canceled: incoming_data_listener')
47+
except Exception:
48+
self._incoming_frame_queue.put_nowait(RSocketTransportError())
49+
50+
async def send_frame(self, frame: Frame):
51+
with wrap_transport_exception():
52+
await self._ws.send(frame.serialize())
53+
54+
async def close(self):
55+
self._message_handler.cancel()
56+
await self._message_handler

0 commit comments

Comments
 (0)