Skip to content

Commit 7d501ca

Browse files
Izzetteauvipy
andauthored
fix(worker): continue to attempt to bind other queues after a native delayed delivery binding failure has occurred (celery#9959)
* Fix native delayed delivery binding failures * Continue to attempt to bind other queues after a native delayed delivery binding failure has occurred. * Add test for native delayed delivery retries * Native delayed deliveries retries should bypass exception grouping and raise retriable exception immediately. * Update celery/worker/consumer/delayed_delivery.py * Replace ExceptionGroup in DelayedDelivery * Use agronholm's exceptiongroup backport * Update t/unit/worker/test_native_delayed_delivery.py Co-authored-by: Isabelle COWAN-BERGMAN <[email protected]> * Update celery/worker/consumer/delayed_delivery.py Co-authored-by: Isabelle COWAN-BERGMAN <[email protected]> * Add integration test for native delayed delivery bindings --------- Co-authored-by: Asif Saif Uddin {"Auvi":"অভি"} <[email protected]>
1 parent 2f60642 commit 7d501ca

File tree

6 files changed

+381
-5
lines changed

6 files changed

+381
-5
lines changed

.github/workflows/integration-tests.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@ jobs:
3737
REDIS_HOST: localhost
3838
REDIS_PORT: 6379
3939
rabbitmq:
40-
image: rabbitmq
40+
image: rabbitmq:management
4141
ports:
4242
- 5672:5672
43+
- 15672:15672
4344
env:
4445
RABBITMQ_DEFAULT_USER: guest
4546
RABBITMQ_DEFAULT_PASS: guest

celery/worker/consumer/delayed_delivery.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
"""
66
from typing import Iterator, List, Optional, Set, Union, ValuesView
77

8+
# Backport of PEP 654 for Python versions < 3.11
9+
# In Python 3.11+, exceptiongroup uses the built-in ExceptionGroup
10+
from exceptiongroup import ExceptionGroup
811
from kombu import Connection, Queue
912
from kombu.transport.native_delayed_delivery import (bind_queue_to_native_delayed_delivery_exchange,
1013
declare_native_delayed_delivery_exchanges_and_queues)
@@ -23,7 +26,7 @@
2326
# Default retry settings
2427
RETRY_INTERVAL = 1.0 # seconds between retries
2528
MAX_RETRIES = 3 # maximum number of retries
26-
29+
RETRIED_EXCEPTIONS = (ConnectionRefusedError, OSError)
2730

2831
# Valid queue types for delayed delivery
2932
VALID_QUEUE_TYPES = {'classic', 'quorum'}
@@ -84,7 +87,7 @@ def start(self, c: Consumer) -> None:
8487
retry_over_time(
8588
self._setup_delayed_delivery,
8689
args=(c, broker_url),
87-
catch=(ConnectionRefusedError, OSError),
90+
catch=RETRIED_EXCEPTIONS,
8891
errback=self._on_retry,
8992
interval_start=RETRY_INTERVAL,
9093
max_retries=MAX_RETRIES,
@@ -157,6 +160,7 @@ def _bind_queues(self, app: Celery, connection: Connection) -> None:
157160
logger.warning("No queues found to bind for delayed delivery")
158161
return
159162

163+
exceptions: list[Exception] = []
160164
for queue in queues:
161165
try:
162166
logger.debug("Binding queue %r to delayed delivery exchange", queue.name)
@@ -166,7 +170,27 @@ def _bind_queues(self, app: Celery, connection: Connection) -> None:
166170
"Failed to bind queue %r: %s",
167171
queue.name, str(e)
168172
)
169-
raise
173+
174+
# We must re-raise on retried exceptions to ensure they are
175+
# caught with the outer retry_over_time mechanism.
176+
#
177+
# This could be removed if one of:
178+
# * The minimum python version for Celery and Kombu is
179+
# increased to 3.11. Kombu updated to use the `except*`
180+
# clause to catch specific exceptions from an ExceptionGroup.
181+
# * Kombu's retry_over_time utility is updated to use the
182+
# catch utility from agronholm's exceptiongroup backport.
183+
if isinstance(e, RETRIED_EXCEPTIONS):
184+
raise
185+
186+
exceptions.append(e)
187+
188+
if exceptions:
189+
raise ExceptionGroup(
190+
("One or more failures occurred while binding queues to "
191+
"delayed delivery exchanges"),
192+
exceptions,
193+
)
170194

171195
def _on_retry(self, exc: Exception, interval_range: Iterator[float], intervals_count: int) -> float:
172196
"""Callback for retry attempts.

docker/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ services:
2525
- azurite
2626

2727
rabbit:
28-
image: rabbitmq:latest
28+
image: rabbitmq:management
2929

3030
redis:
3131
image: redis:latest

requirements/default.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ click-didyoumean>=0.3.0
66
click-repl>=0.2.0
77
click-plugins>=1.1.1
88
python-dateutil>=2.8.2
9+
exceptiongroup>=1.3.0
910
tzlocal
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
"""Integration tests for native delayed delivery queue binding.
2+
3+
Tests that verify queue bindings are created correctly for native delayed
4+
delivery, especially when some queues in task_queues fail to bind.
5+
"""
6+
import os
7+
import uuid
8+
from urllib.parse import quote
9+
10+
import pytest
11+
import requests
12+
from kombu import Exchange, Queue
13+
from requests.auth import HTTPBasicAuth
14+
15+
from celery import Celery
16+
from celery.contrib.testing.worker import start_worker
17+
18+
19+
def get_rabbitmq_credentials():
20+
"""Get RabbitMQ credentials from environment."""
21+
user = os.environ.get("RABBITMQ_DEFAULT_USER", "guest")
22+
password = os.environ.get("RABBITMQ_DEFAULT_PASSWORD", "guest")
23+
return user, password
24+
25+
26+
def get_rabbitmq_url():
27+
"""Get RabbitMQ broker URL from environment."""
28+
user, password = get_rabbitmq_credentials()
29+
return os.environ.get(
30+
"TEST_BROKER", f"pyamqp://{user}:{password}@localhost:5672//")
31+
32+
33+
def get_management_api_url():
34+
"""Get RabbitMQ Management API base URL."""
35+
return "http://localhost:15672/api"
36+
37+
38+
def get_bindings_for_exchange(exchange_name, vhost='/'):
39+
"""Fetch bindings where the given exchange is the source.
40+
41+
Args:
42+
exchange_name: Name of the exchange
43+
vhost: Virtual host (default '/')
44+
45+
Returns:
46+
List of binding dictionaries
47+
"""
48+
user, password = get_rabbitmq_credentials()
49+
vhost_encoded = quote(vhost, safe='')
50+
exchange_encoded = quote(exchange_name, safe='')
51+
api_url = (
52+
f"{get_management_api_url()}/exchanges/{vhost_encoded}/"
53+
f"{exchange_encoded}/bindings/source"
54+
)
55+
response = requests.get(api_url, auth=HTTPBasicAuth(user, password))
56+
response.raise_for_status()
57+
return response.json()
58+
59+
60+
def get_bindings_for_queue(queue_name, vhost='/'):
61+
"""Fetch bindings for a specific queue.
62+
63+
Args:
64+
queue_name: Name of the queue
65+
vhost: Virtual host (default '/')
66+
67+
Returns:
68+
List of binding dictionaries
69+
"""
70+
user, password = get_rabbitmq_credentials()
71+
vhost_encoded = quote(vhost, safe='')
72+
queue_encoded = quote(queue_name, safe='')
73+
api_url = (
74+
f"{get_management_api_url()}/queues/{vhost_encoded}/{queue_encoded}/"
75+
"bindings"
76+
)
77+
response = requests.get(api_url, auth=HTTPBasicAuth(user, password))
78+
response.raise_for_status()
79+
return response.json()
80+
81+
82+
def create_test_app(unique_id):
83+
"""Create Celery app configured for native delayed delivery testing.
84+
85+
Args:
86+
unique_id: Unique identifier to ensure queue/exchange names don't
87+
conflict
88+
89+
Returns:
90+
Tuple of (app, exchange_name, queue_a_name, queue_b_name)
91+
"""
92+
broker_url = get_rabbitmq_url()
93+
94+
# Get Redis backend URL from environment
95+
redis_host = os.environ.get("REDIS_HOST", "localhost")
96+
redis_port = os.environ.get("REDIS_PORT", "6379")
97+
backend_url = os.environ.get(
98+
"TEST_BACKEND", f"redis://{redis_host}:{redis_port}/0")
99+
100+
app = Celery(
101+
"test_native_delayed_delivery_binding",
102+
broker=broker_url,
103+
backend=backend_url,
104+
)
105+
106+
# Configure topic exchange with unique name
107+
exchange_name = f'celery.topic_{unique_id}'
108+
default_exchange = Exchange(exchange_name, type='topic')
109+
110+
# Define task queues with queue-a first, queue-b second
111+
queue_a_name = f'queue-a_{unique_id}'
112+
queue_b_name = f'queue-b_{unique_id}'
113+
app.conf.task_queues = [
114+
Queue(queue_a_name, exchange=default_exchange,
115+
routing_key=queue_a_name,
116+
queue_arguments={'x-queue-type': 'quorum'}),
117+
Queue(queue_b_name, exchange=default_exchange,
118+
routing_key=queue_b_name,
119+
queue_arguments={'x-queue-type': 'quorum'}),
120+
]
121+
122+
# Recommended setting for using celery with quorum queues
123+
app.conf.broker_transport_options = {"confirm_publish": True}
124+
125+
# Enable quorum queue detection to disable global QoS
126+
app.conf.worker_detect_quorum_queues = True
127+
128+
return app, exchange_name, queue_a_name, queue_b_name
129+
130+
131+
@pytest.mark.amqp
132+
@pytest.mark.timeout(90)
133+
def test_worker_binds_consumed_queue_despite_earlier_queue_failure():
134+
"""Test that queue binding continues even when earlier queues fail to bind.
135+
136+
This test reproduces the scenario from
137+
https://github.com/celery/celery/issues/9960
138+
"""
139+
unique_id = uuid.uuid4().hex
140+
app, exchange_name, queue_a_name, queue_b_name = create_test_app(unique_id)
141+
142+
# Set default queue to queue-b so the start_worker ping task is received
143+
# by our worker
144+
app.conf.task_default_queue = queue_b_name
145+
146+
# Start worker that only consumes from queue-b
147+
# queue-a is NOT consumed, so it won't be declared by this worker
148+
with start_worker(
149+
app,
150+
queues=[queue_b_name],
151+
loglevel="INFO",
152+
perform_ping_check=True,
153+
shutdown_timeout=15,
154+
):
155+
# Check celery_delayed_delivery → exchange bindings
156+
delayed_delivery_bindings = \
157+
get_bindings_for_exchange('celery_delayed_delivery')
158+
queue_b_delayed_binding = [
159+
b for b in delayed_delivery_bindings
160+
if b.get('destination') == exchange_name
161+
and b.get('routing_key') == f'#.{queue_b_name}'
162+
]
163+
assert len(queue_b_delayed_binding) >= 1, (
164+
f"Expected delayed delivery binding for {queue_b_name!r}, but "
165+
f"got bindings: {delayed_delivery_bindings!r}"
166+
)
167+
168+
# Check celery.topic → queue-b bindings
169+
# Should have bindings from the topic exchange to queue-b for both
170+
# immediate and delayed delivery
171+
queue_b_bindings = get_bindings_for_queue(queue_b_name)
172+
topic_to_queue_bindings = [
173+
b for b in queue_b_bindings
174+
if b.get('source') == exchange_name
175+
]
176+
topic_to_queue_routing_keys = {
177+
b.get('routing_key') for b in topic_to_queue_bindings
178+
}
179+
180+
# Check the routing key for immediate delivery
181+
assert queue_b_name in topic_to_queue_routing_keys, (
182+
f"Expected routing key {queue_b_name!r} in bindings, but got: "
183+
f"{topic_to_queue_bindings!r}"
184+
)
185+
186+
# Check the routing key for delayed delivery
187+
assert f"#.{queue_b_name}" in topic_to_queue_routing_keys, (
188+
f"Expected at least one binding from {exchange_name!r} to "
189+
f"{queue_b_name!r}, but got: {topic_to_queue_bindings!r}"
190+
)

0 commit comments

Comments
 (0)