Skip to content

Import delay when sending event between async consumers #1612

@Pyvonix

Description

@Pyvonix

Thank you for maintaining this great package. When I have the time and a better understanding of how it works, I will come in to help. The library gets a strange behavior when I send an event from a consumer to another one.

  • Your OS and runtime environment, and browser if applicable

Debian 10, Python 3.7.3 and Firefox dev 85.0b9.

  • A pip freeze output showing your package versions
asgiref==3.2.10
async-timeout==3.0.1
channels==3.0.3
channels-redis==3.2.0
websockets==8.1
daphne==3.0.1
Django==3.1.5
  • How you're running Channels (runserver? daphne/runworker? Nginx/Apache in front?)

Running via: python manage.py runserver

  • What you expected to happen vs. what actually happened

While trying to send an event message from MyConsumer2 (appear to be "Chat" like consumer) from Async context to MyConsumer1 (appear to be "Notification" consumer), I get an important delay (between 30s and 1min) to receive messages and notifications (case 1).

I find a workaround, when I take advantage of a sync function to send an event from consumer2 to consumer1 (case 2), I don't any delay.

  • Console logs and full tracebacks of any errors
    No errors.
    I will send 10 messages (1 per second) and print them with the full date while I will receive them.

Case 1 with an important delay

App1/consumers.py:

from channels.generic.websocket import AsyncJsonWebsocketConsumer

class MyConsumer1(AsyncJsonWebsocketConsumer):
    groups = ["broadcast_layer"]

   async def broadcast_event(self, event) -> dict:
        await self.send_json({ "data", event["data"] })

    async def receive_json(self, json_data):
        await self.channel_layer.group_send(
                "broadcast_layer",
                {
                    "type": "broadcast_event",
                    "data": json_data["data"]
                }
            )

App2/consumers.py:

from channels.generic.websocket import AsyncJsonWebsocketConsumer
from django.utils.timezone import localtime

class MyConsumer2(AsyncJsonWebsocketConsumer):

   async def send_message(self, event) -> dict:
        await self.send_json({ "message", event["message"] })

    async def receive_json(self, json_data):
        print(localtime(), 'RECEIVE', json_data)
        await self.channel_layer.group_send(
                "chat-room",
                {
                    "type": "send_message",
                    "message": json_data["message"]
                }
            )
        await self.channel_layer.group_send(
                "broadcast_layer",
                {
                    "type": "broadcast_event",
                    "data": {"new" : True}
                }
            )
        #channel_layer = get_channel_layer()    # Same behavior
        #await channel_layer.group_send(
        #      "broadcast_layer", {
        #          "type": "broadcast_event",
        #          "data": {"new": True}
        #})

Console / Output :

2021-01-17 11:47:06.572844 RECEIVE {'message': 'm1'}
2021-01-17 11:47:07.691746 RECEIVE {'message': 'm2'}
2021-01-17 11:48:03.063369 RECEIVE {'message': 'm3'}
2021-01-17 11:48:58.416112 RECEIVE {'message': 'm4'}
2021-01-17 11:49:53.793862 RECEIVE {'message': 'm5'}
2021-01-17 11:50:49.202751 RECEIVE {'message': 'm6'}
2021-01-17 11:50:49.237903 RECEIVE {'message': 'm7'}
2021-01-17 11:51:44.566912 RECEIVE {'message': 'm8'}
2021-01-17 11:52:39.912585 RECEIVE {'message': 'm9'}

Case 2 without any delay

App1/consumers.py:

No change.

App2/consumers.py:

from channels.generic.websocket import AsyncJsonWebsocketConsumer
from channels.db import database_sync_to_async
from channels.layers import get_channel_layer
from django.utils.timezone import localtime
from asgiref.sync import async_to_sync

class MyConsumer2(AsyncJsonWebsocketConsumer):

   async def send_message(self, event) -> dict:
        await self.send_json({ "message", event["message"] })

    async def receive_json(self, json_data):
        print(localtime(), 'RECEIVE', json_data)
        await self.channel_layer.group_send(
                "chat-room",
                {
                    "type": "send_message",
                    "message": json_data["message"]
                }
            )
        await self.database_operation()

    @database_sync_to_async
    def database_operation(self):
        ...                                        # some process
        channel_layer = get_channel_layer()
        async_to_sync(channel_layer.group_send)(
            "broadcast_layer", {
                "type": "broadcast_event",
                "data": {"new": True}
        })

Console / Output :

2021-01-17 13:01:10.473165 RECEIVE {'message': 'a1'}
2021-01-17 13:01:11.298534 RECEIVE {'message': 'a2'}
2021-01-17 13:01:12.034638 RECEIVE {'message': 'a3'}
2021-01-17 13:01:13.051258 RECEIVE {'message': 'a4'}
2021-01-17 13:01:13.791766 RECEIVE {'message': 'a5'}
2021-01-17 13:01:15.282752 RECEIVE {'message': 'a7'}
2021-01-17 13:01:16.922638 RECEIVE {'message': 'a8'}
2021-01-17 13:01:17.722775 RECEIVE {'message': 'a9'}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions