diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f363b3d4..2a4fcc45 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,10 @@ Changelog --------- +v0.4.19 +======= +- Added Django Channels transport support for WebSocket communication + v0.4.18 ======= - Fixed Stalette/FastAPI implementation and added example using FastAPI server diff --git a/README.md b/README.md index 23bbebdc..3052bff6 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ You may also install using some **extras**: | reactivex | [ReactiveX](https://reactivex.io/) ([v4](https://pypi.org/project/reactivex/)) integration | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/reactivex) | | aiohttp | [aiohttp](https://docs.aiohttp.org/en/stable/) Websocket transport (server/client) | [Tutorial](https://rsocket.io/guides/rsocket-py/tutorial/websocket) | | fastapi | [fastapi](https://github.com/fastapi/fastapi) Websocket transport (server/client) | | +| channels | Websocket transport (server only) using channels (django) | | | quart | [Quart](https://pgjones.gitlab.io/quart/) Websocket transport (server only) | | | quic | [QUIC](https://github.com/aiortc/aioquic) transport | | | websockets | [Websockets](https://github.com/python-websockets/websockets) transport (server only) | | diff --git a/docs/conf.py b/docs/conf.py index d5ad3ebd..6c6fb826 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -24,7 +24,7 @@ author = 'jellofishi@pm.me' # The full version, including alpha/beta/rc tags -release = '0.4.18' +release = '0.4.19' # -- General configuration --------------------------------------------------- diff --git a/examples/django_channels/README.md b/examples/django_channels/README.md new file mode 100644 index 00000000..e01b6f95 --- /dev/null +++ b/examples/django_channels/README.md @@ -0,0 +1,111 @@ +# RSocket with Django Channels + +This example demonstrates how to use RSocket with Django Channels, allowing you to implement RSocket protocol support in your Django applications. + +## Overview + +Django Channels extends Django to handle WebSockets, and this integration allows you to use the RSocket protocol over those WebSocket connections. This enables: + +- Reactive streaming capabilities in Django applications +- Support for all RSocket interaction models (request-response, fire-and-forget, request-stream, request-channel) +- Bidirectional communication between client and server + +## Requirements + +- Django 3.0+ +- Channels 4.0+ +- An ASGI server like Daphne or Uvicorn + +## Installation + +1. Install Django Channels: + ```bash + pip install channels + ``` + +2. Install an ASGI server: + ```bash + pip install daphne + ``` + +3. Configure your Django project to use Channels (see Django Channels documentation) + +## Server Setup + +1. Create a request handler for RSocket: + ```python + from rsocket.payload import Payload + from rsocket.request_handler import BaseRequestHandler + + class Handler(BaseRequestHandler): + async def request_response(self, payload: Payload): + return Payload(b'Echo: ' + payload.data) + ``` + +2. Create an RSocket consumer using the factory: + ```python + from rsocket.transports.channels_transport import rsocket_consumer_factory + + RSocketConsumer = rsocket_consumer_factory(handler_factory=Handler) + ``` + +3. Add the consumer to your routing configuration: + ```python + from django.urls import path + from channels.routing import ProtocolTypeRouter, URLRouter + + application = ProtocolTypeRouter({ + 'websocket': URLRouter([ + path('rsocket', RSocketConsumer.as_asgi()), + ]), + }) + ``` + +## Client Usage + +You can connect to your Django Channels RSocket server using any RSocket client. Here's an example using the Python client: + +```python +import asyncio +import websockets +from rsocket.helpers import single_transport_provider +from rsocket.payload import Payload +from rsocket.rsocket_client import RSocketClient +from rsocket.transports.websockets_transport import WebsocketsTransport + +async def main(): + async with websockets.connect('ws://localhost:8000/rsocket') as websocket: + transport = WebsocketsTransport() + handler_task = asyncio.create_task(transport.handler(websocket)) + + try: + async with RSocketClient(single_transport_provider(transport)) as client: + response = await client.request_response(Payload(b'Hello')) + print(f"Received: {response.data.decode()}") + finally: + handler_task.cancel() + try: + await handler_task + except asyncio.CancelledError: + pass + +if __name__ == '__main__': + asyncio.run(main()) +``` + +## Advanced Usage + +The Django Channels transport supports all RSocket interaction models: + +- **Request-Response**: Simple request with a single response +- **Fire-and-Forget**: One-way message with no response +- **Request-Stream**: Request that receives a stream of responses +- **Request-Channel**: Bi-directional stream of messages + +See the server_example.py and client_example.py files for more detailed examples. + +## Security Considerations + +- Use secure WebSockets (wss://) in production +- Implement proper authentication and authorization in your Django application +- Consider using RSocket's authentication and authorization extensions for additional security \ No newline at end of file diff --git a/examples/django_channels/__init__.py b/examples/django_channels/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/django_channels/client_example.py b/examples/django_channels/client_example.py new file mode 100644 index 00000000..2b6ad6a2 --- /dev/null +++ b/examples/django_channels/client_example.py @@ -0,0 +1,78 @@ +""" +Example of a client connecting to a Django Channels RSocket server. + +This example shows how to create a client that connects to a Django Channels +RSocket server and performs a request-response interaction. +""" + +import asyncio +import logging +import ssl +from typing import Optional + +import asyncclick as click +import websockets + +from rsocket.helpers import single_transport_provider +from rsocket.payload import Payload +from rsocket.rsocket_client import RSocketClient +from rsocket.transports.websockets_transport import WebsocketsTransport + + +async def connect_to_django_channels(url: str, ssl_context: Optional[ssl.SSLContext] = None): + """ + Connect to a Django Channels RSocket server using websockets. + + :param url: WebSocket URL (e.g., 'ws://localhost:8000/rsocket') + :param ssl_context: Optional SSL context for secure connections + """ + async with websockets.connect(url, ssl=ssl_context) as websocket: + # Create a transport using the websocket connection + transport = WebsocketsTransport() + + # Start the transport handler + handler_task = asyncio.create_task(transport.handler(websocket)) + + try: + # Create an RSocket client using the transport + async with RSocketClient(single_transport_provider(transport)) as client: + # Send a request-response + payload = Payload(b'Hello from RSocket client') + response = await client.request_response(payload) + + print(f"Received response: {response.data.decode()}") + + # You can add more interactions here + + finally: + # Clean up the handler task + handler_task.cancel() + try: + await handler_task + except asyncio.CancelledError: + pass + + +@click.command() +@click.option('--url', default='ws://localhost:8000/rsocket', help='WebSocket URL') +@click.option('--secure', is_flag=True, help='Use secure WebSocket (wss://)') +async def main(url: str, secure: bool): + """ + Connect to a Django Channels RSocket server. + """ + logging.basicConfig(level=logging.INFO) + + if secure and not url.startswith('wss://'): + url = 'wss://' + url.removeprefix('ws://') + ssl_context = ssl.create_default_context() + # Disable certificate verification for testing + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + else: + ssl_context = None + + await connect_to_django_channels(url, ssl_context) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/examples/django_channels/django_rsocket/.gitignore b/examples/django_channels/django_rsocket/.gitignore new file mode 100644 index 00000000..49ef2557 --- /dev/null +++ b/examples/django_channels/django_rsocket/.gitignore @@ -0,0 +1 @@ +db.sqlite3 diff --git a/examples/django_channels/django_rsocket/__init__.py b/examples/django_channels/django_rsocket/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/django_channels/django_rsocket/django_rsocket/__init__.py b/examples/django_channels/django_rsocket/django_rsocket/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/django_channels/django_rsocket/django_rsocket/asgi.py b/examples/django_channels/django_rsocket/django_rsocket/asgi.py new file mode 100644 index 00000000..a665edc0 --- /dev/null +++ b/examples/django_channels/django_rsocket/django_rsocket/asgi.py @@ -0,0 +1,22 @@ +""" +ASGI config for django_rsocket project. + +It exposes the ASGI callable as a module-level variable named ``application``. + +For more information on this file, see +https://docs.djangoproject.com/en/5.2/howto/deployment/asgi/ +""" + +import os + +from django.core.asgi import get_asgi_application + +from channels.routing import ProtocolTypeRouter, URLRouter +from .routing import websocket_urlpatterns + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_rsocket.settings') + +application = ProtocolTypeRouter({ + "http": get_asgi_application(), + "websocket": URLRouter(websocket_urlpatterns), +}) diff --git a/examples/django_channels/django_rsocket/django_rsocket/consumers.py b/examples/django_channels/django_rsocket/django_rsocket/consumers.py new file mode 100644 index 00000000..2e6b0a4a --- /dev/null +++ b/examples/django_channels/django_rsocket/django_rsocket/consumers.py @@ -0,0 +1,28 @@ +import logging + +from channels.routing import ProtocolTypeRouter, URLRouter +from django.urls import path + +from rsocket.helpers import create_future +from rsocket.payload import Payload +from rsocket.request_handler import BaseRequestHandler +from rsocket.transports.channels_transport import rsocket_consumer_factory + + +# Define a request handler for RSocket +class Handler(BaseRequestHandler): + async def request_response(self, payload: Payload): + logging.info(payload.data) + + return create_future(Payload(b'Echo: ' + payload.data)) + + +# Create a consumer using the factory +RSocketConsumer = rsocket_consumer_factory(handler_factory=Handler) + +# Django Channels routing configuration +application = ProtocolTypeRouter({ + 'websocket': URLRouter([ + path('rsocket', RSocketConsumer.as_asgi()), + ]), +}) diff --git a/examples/django_channels/django_rsocket/django_rsocket/routing.py b/examples/django_channels/django_rsocket/django_rsocket/routing.py new file mode 100644 index 00000000..226fb4a3 --- /dev/null +++ b/examples/django_channels/django_rsocket/django_rsocket/routing.py @@ -0,0 +1,6 @@ +from django.urls import path +from .consumers import RSocketConsumer + +websocket_urlpatterns = [ + path('rsocket', RSocketConsumer.as_asgi()), +] \ No newline at end of file diff --git a/examples/django_channels/django_rsocket/django_rsocket/settings.py b/examples/django_channels/django_rsocket/django_rsocket/settings.py new file mode 100644 index 00000000..3bbd0a73 --- /dev/null +++ b/examples/django_channels/django_rsocket/django_rsocket/settings.py @@ -0,0 +1,166 @@ +""" +Django settings for django_rsocket project. + +Generated by 'django-admin startproject' using Django 5.2. + +For more information on this file, see +https://docs.djangoproject.com/en/5.2/topics/settings/ + +For the full list of settings and their values, see +https://docs.djangoproject.com/en/5.2/ref/settings/ +""" + +from pathlib import Path + +# Build paths inside the project like this: BASE_DIR / 'subdir'. +BASE_DIR = Path(__file__).resolve().parent.parent + + +# Quick-start development settings - unsuitable for production +# See https://docs.djangoproject.com/en/5.2/howto/deployment/checklist/ + +# SECURITY WARNING: keep the secret key used in production secret! +SECRET_KEY = 'django-insecure-j^+8$nsrc!xg(5nib5&838l%0^g%g8!(hwi17(lm8@fq^d_@tu' + +# SECURITY WARNING: don't run with debug turned on in production! +DEBUG = True + +ALLOWED_HOSTS = [] + + +# Application definition + +INSTALLED_APPS = [ + 'daphne', + 'django.contrib.admin', + 'django.contrib.auth', + 'django.contrib.contenttypes', + 'django.contrib.sessions', + 'django.contrib.messages', + 'django.contrib.staticfiles', + +] + +MIDDLEWARE = [ + 'django.middleware.security.SecurityMiddleware', + 'django.contrib.sessions.middleware.SessionMiddleware', + 'django.middleware.common.CommonMiddleware', + 'django.middleware.csrf.CsrfViewMiddleware', + 'django.contrib.auth.middleware.AuthenticationMiddleware', + 'django.contrib.messages.middleware.MessageMiddleware', + 'django.middleware.clickjacking.XFrameOptionsMiddleware', +] + +ROOT_URLCONF = 'django_rsocket.urls' + +TEMPLATES = [ + { + 'BACKEND': 'django.template.backends.django.DjangoTemplates', + 'DIRS': [], + 'APP_DIRS': True, + 'OPTIONS': { + 'context_processors': [ + 'django.template.context_processors.request', + 'django.contrib.auth.context_processors.auth', + 'django.contrib.messages.context_processors.messages', + ], + }, + }, +] + +WSGI_APPLICATION = 'django_rsocket.wsgi.application' + +ASGI_APPLICATION = 'django_rsocket.asgi.application' + +# Database +# https://docs.djangoproject.com/en/5.2/ref/settings/#databases + +DATABASES = { + 'default': { + 'ENGINE': 'django.db.backends.sqlite3', + 'NAME': BASE_DIR / 'db.sqlite3', + } +} + + +# Password validation +# https://docs.djangoproject.com/en/5.2/ref/settings/#auth-password-validators + +AUTH_PASSWORD_VALIDATORS = [ + { + 'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator', + }, + { + 'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator', + }, + { + 'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator', + }, + { + 'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator', + }, +] + + +# Internationalization +# https://docs.djangoproject.com/en/5.2/topics/i18n/ + +LANGUAGE_CODE = 'en-us' + +TIME_ZONE = 'UTC' + +USE_I18N = True + +USE_TZ = True + + +# Static files (CSS, JavaScript, Images) +# https://docs.djangoproject.com/en/5.2/howto/static-files/ + +STATIC_URL = 'static/' + +# Default primary key field type +# https://docs.djangoproject.com/en/5.2/ref/settings/#default-auto-field + +DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField' + + +# Logging Configuration +LOGGING = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'verbose': { + 'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}', + 'style': '{', + }, + 'simple': { + 'format': '{levelname} {message}', + 'style': '{', + }, + }, + 'handlers': { + 'console': { + 'level': 'DEBUG', + 'class': 'logging.StreamHandler', + 'formatter': 'verbose', + }, + }, + 'root': { + 'handlers': ['console'], + 'level': 'DEBUG', + }, + 'loggers': { + 'django': { + 'handlers': ['console'], + 'level': 'DEBUG', + 'propagate': False, + }, + # Add your app-specific loggers here + # 'your_app_name': { + # 'handlers': ['console'], + # 'level': 'DEBUG', + # 'propagate': False, + # }, + }, +} diff --git a/examples/django_channels/django_rsocket/django_rsocket/urls.py b/examples/django_channels/django_rsocket/django_rsocket/urls.py new file mode 100644 index 00000000..dae37f40 --- /dev/null +++ b/examples/django_channels/django_rsocket/django_rsocket/urls.py @@ -0,0 +1,22 @@ +""" +URL configuration for django_rsocket project. + +The `urlpatterns` list routes URLs to views. For more information please see: + https://docs.djangoproject.com/en/5.2/topics/http/urls/ +Examples: +Function views + 1. Add an import: from my_app import views + 2. Add a URL to urlpatterns: path('', views.home, name='home') +Class-based views + 1. Add an import: from other_app.views import Home + 2. Add a URL to urlpatterns: path('', Home.as_view(), name='home') +Including another URLconf + 1. Import the include() function: from django.urls import include, path + 2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) +""" +from django.contrib import admin +from django.urls import path + +urlpatterns = [ + path('admin/', admin.site.urls), +] diff --git a/examples/django_channels/django_rsocket/django_rsocket/wsgi.py b/examples/django_channels/django_rsocket/django_rsocket/wsgi.py new file mode 100644 index 00000000..96715fdb --- /dev/null +++ b/examples/django_channels/django_rsocket/django_rsocket/wsgi.py @@ -0,0 +1,16 @@ +""" +WSGI config for django_rsocket project. + +It exposes the WSGI callable as a module-level variable named ``application``. + +For more information on this file, see +https://docs.djangoproject.com/en/5.2/howto/deployment/wsgi/ +""" + +import os + +from django.core.wsgi import get_wsgi_application + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_rsocket.settings') + +application = get_wsgi_application() diff --git a/examples/django_channels/django_rsocket/manage.py b/examples/django_channels/django_rsocket/manage.py new file mode 100755 index 00000000..c04faec4 --- /dev/null +++ b/examples/django_channels/django_rsocket/manage.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python +"""Django's command-line utility for administrative tasks.""" +import os +import sys + + +def main(): + """Run administrative tasks.""" + os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_rsocket.settings') + try: + from django.core.management import execute_from_command_line + except ImportError as exc: + raise ImportError( + "Couldn't import Django. Are you sure it's installed and " + "available on your PYTHONPATH environment variable? Did you " + "forget to activate a virtual environment?" + ) from exc + execute_from_command_line(sys.argv) + + +if __name__ == '__main__': + main() diff --git a/examples/django_channels/server_example.py b/examples/django_channels/server_example.py new file mode 100644 index 00000000..b2e8a7f8 --- /dev/null +++ b/examples/django_channels/server_example.py @@ -0,0 +1,79 @@ +""" +Example of using RSocket with Django Channels. + +This file demonstrates how to set up a Django Channels application with RSocket support. +It shows the routing configuration and consumer setup. + +To run this example, you would need to: +1. Create a Django project +2. Install channels and daphne +3. Configure the project to use channels +4. Add this consumer to your routing configuration + +This is a template that shows the key components, not a complete runnable example. +""" + +from channels.routing import ProtocolTypeRouter, URLRouter +from django.urls import path + +from rsocket.payload import Payload +from rsocket.request_handler import BaseRequestHandler +from rsocket.rsocket_server import RSocketServer +from rsocket.transports.channels_transport import AsyncRSocketConsumer, rsocket_consumer_factory + + +# Define a request handler for RSocket +class Handler(BaseRequestHandler): + async def request_response(self, payload: Payload): + # Echo the request data back to the client + return Payload(b'Echo: ' + payload.data) + + +# Create a consumer using the factory +RSocketConsumer = rsocket_consumer_factory(handler_factory=Handler) + + +# Django Channels routing configuration +application = ProtocolTypeRouter({ + 'websocket': URLRouter([ + path('rsocket', RSocketConsumer.as_asgi()), + ]), +}) + + +""" +To use this in a Django project: +$ python3 -c 'import channels; import daphne; print(channels.__version__, daphne.__version__)' + +1. In your Django settings.py: + +INSTALLED_APPS = [ + # ... other apps + 'channels', +] + +ASGI_APPLICATION = 'your_project.asgi.application' + +2. In your project's asgi.py: + +import os +from django.core.asgi import get_asgi_application +from channels.routing import ProtocolTypeRouter, URLRouter +from your_app.routing import websocket_urlpatterns + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings') + +application = ProtocolTypeRouter({ + "http": get_asgi_application(), + "websocket": URLRouter(websocket_urlpatterns), +}) + +3. In your app's routing.py: + +from django.urls import path +from .consumers import RSocketConsumer + +websocket_urlpatterns = [ + path('rsocket', RSocketConsumer.as_asgi()), +] +""" \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 9ee86336..7d425704 100644 --- a/requirements.txt +++ b/requirements.txt @@ -56,4 +56,7 @@ websockets==13.1; python_version == "3.8" asyncwebsockets==0.9.4 -fastapi==0.115.11 \ No newline at end of file +fastapi==0.115.11 + +channels==4.0.0 +daphne==4.1.2 \ No newline at end of file diff --git a/rsocket/__init__.py b/rsocket/__init__.py index d0ed2341..0c4fce05 100644 --- a/rsocket/__init__.py +++ b/rsocket/__init__.py @@ -1 +1 @@ -__version__ = '0.4.18' +__version__ = '0.4.19' diff --git a/rsocket/transports/channels_transport.py b/rsocket/transports/channels_transport.py new file mode 100644 index 00000000..83e0e52f --- /dev/null +++ b/rsocket/transports/channels_transport.py @@ -0,0 +1,129 @@ +import asyncio +from contextlib import asynccontextmanager +from typing import Optional, Dict, Any, Callable + +from channels.generic.websocket import AsyncWebsocketConsumer + +from rsocket.exceptions import RSocketTransportError +from rsocket.frame import Frame +from rsocket.helpers import wrap_transport_exception, single_transport_provider +from rsocket.logger import logger +from rsocket.rsocket_client import RSocketClient +from rsocket.rsocket_server import RSocketServer +from rsocket.transports.abstract_messaging import AbstractMessagingTransport + + +class AsyncRSocketConsumer(AsyncWebsocketConsumer): + """ + Django Channels AsyncWebsocketConsumer for RSocket protocol. + + This consumer handles binary RSocket frames and passes them to the RSocket server. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.transport = None + self.server = None + self.server_factory_kwargs = {} + self.on_server_create = None + + async def connect(self): + """Accept the WebSocket connection.""" + await self.accept() + self.transport = ChannelsTransport(self) + self.server = RSocketServer(self.transport, **self.server_factory_kwargs) + + if self.on_server_create is not None: + self.on_server_create(self.server) + + async def disconnect(self, close_code): + """Handle WebSocket disconnect.""" + if self.transport: + await self.transport.close() + + async def receive(self, text_data=None, bytes_data=None): + """Handle incoming WebSocket messages.""" + if bytes_data: + async for frame in self.transport._frame_parser.receive_data(bytes_data, 0): + self.transport._incoming_frame_queue.put_nowait(frame) + + +def rsocket_consumer_factory(on_server_create: Optional[Callable[[RSocketServer], None]] = None, + **kwargs) -> type: + """ + Factory function to create an AsyncRSocketConsumer class with specific settings. + + :param on_server_create: Optional callback when server is created + :param kwargs: Parameters passed to RSocketServer constructor + :return: AsyncRSocketConsumer class + """ + class CustomAsyncRSocketConsumer(AsyncRSocketConsumer): + def __init__(self, *args, **consumer_kwargs): + super().__init__(*args, **consumer_kwargs) + self.server_factory_kwargs = kwargs + self.on_server_create = on_server_create + + return CustomAsyncRSocketConsumer + + +@asynccontextmanager +async def channels_client(consumer, **kwargs) -> RSocketClient: + """ + Helper method to instantiate an RSocket client using a Django Channels consumer. + + :param consumer: Django Channels consumer instance + :param kwargs: Parameters passed to the client + """ + async with RSocketClient(single_transport_provider(ChannelsTransport(consumer)), + **kwargs) as client: + yield client + + +class ChannelsTransport(AbstractMessagingTransport): + """ + RSocket transport over Django Channels WebSocket. + + :param consumer: Django Channels WebSocket consumer + """ + + def __init__(self, consumer): + super().__init__() + self._consumer = consumer + self._outgoing_frame_queue = asyncio.Queue() + self._message_handler = asyncio.create_task(self._producer_handler()) + + async def _producer_handler(self): + """Handle outgoing messages by sending them through the WebSocket.""" + try: + while True: + frame = await self._outgoing_frame_queue.get() + try: + # Use the correct method to send binary data through Django Channels + await self._consumer.send(type='websocket.send', bytes_data=frame.serialize()) + except Exception as e: + logger().error(f'Error sending frame through Django Channels: {e}') + # Try alternative send method if the first one fails + try: + await self._consumer.send(bytes_data=frame.serialize()) + except Exception as e2: + logger().error(f'Error in alternative send method: {e2}') + raise + except asyncio.CancelledError: + logger().debug('Asyncio task canceled: channels_producer_handler') + except Exception as e: + logger().error(f'Error in channels producer handler: {e}') + self._incoming_frame_queue.put_nowait(RSocketTransportError()) + + async def send_frame(self, frame: Frame): + """Send a frame through the WebSocket.""" + with wrap_transport_exception(): + await self._outgoing_frame_queue.put(frame) + + async def close(self): + """Close the transport.""" + if self._message_handler: + self._message_handler.cancel() + try: + await self._message_handler + except asyncio.CancelledError: + pass diff --git a/setup.cfg b/setup.cfg index 0d69c860..6f1f9fa9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -64,6 +64,9 @@ graphql = graphql-core>=3.2.0 gql>=3.4.0 websockets = websockets>=11.0.0 +channels = + channels>=4.0.0 + daphne>=4.1.2 asyncwebsockets = asyncwebsockets>=0.9.4 [options.entry_points]