-
Notifications
You must be signed in to change notification settings - Fork 201
Description
Using:
channels_redis=4.1.0
redis=5.2.1
We use this to run async tasks in our project (with channels workers).
We seem to be dropping already awaited tasks when an exception occurs in a running task.
We use SyncConsumers for task definitions.
To reproduce:
Have a simple task that log the argument it was passed
def test(self, event):
import time
time.sleep(0.1)
data = event["data"]
if data == 50:
raise ValueError()
logger.info(data)
setup a channel layer with a high capacity (I did test with 2k)
Without having any worker running, fill the channel at capacity with tasks with:
for i in range(max):
async_to_sync(channel_layer.send)(
"mychannel",
{
"type": "test",
"data": i,
},
)
I can see the ZCOUNT on redis going to 2k
watch -n 1 redis-cli ZCOUNT asgimychannel -inf +inf
Now when I start my only worker, I can see the count dropping way faster than the tasks aree run (based on the logs posted).
As I understand it (not clearly enough though), the queue is emptied and an internal queue is filled with the awaited tasks.
By the time we reach the 50th tsak, the ZCOUNT is 0.
Once the task 50 is reached, we got the exception in the worker logs, but no more logs are posted.
My understanding is that the tasks 51 to 1999 are lost.
Is my understanding of what is happening correct (would love to understand a bit more of the de-queuing, especially if more than 1 worker is available)?
Also, is that normal?
I fixed this issue by using the following consumer:
class SafeSyncConsumer(SyncConsumer):
@database_sync_to_async
def dispatch(self, message):
"""
Dispatches incoming messages to type-based handlers asynchronously.
Catches any exception raised in the handler and logs it to avoid having the worker dump already awaited tasks.
"""
# Get and execute the handler
handler = getattr(self, get_handler_name(message), None)
if handler:
try:
handler(message)
except Exception:
logger.exception("Exception in consumer method (%s.%s)", self.__class__.__name__, handler.__name__)
else:
logger.exception("No handler for message type %s" % message["type"])