Skip to content

Commit 17b6fdc

Browse files
Deprecate the dramatiq_group_callback_barrier_ttl environment variable (#775)
* Deprecate the `dramatiq_group_callback_barrier_ttl` environment variable * Use the new `barrier_ttl` argument instead #775 --------- Co-authored-by: LincolnPuzey <18750802+LincolnPuzey@users.noreply.github.com>
1 parent 143aaa2 commit 17b6fdc

File tree

3 files changed

+19
-11
lines changed

3 files changed

+19
-11
lines changed

docs/source/reference.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ These are the environment variables that dramatiq reads
293293
- The maximum amount of time a message can be in the dead letter queue for the RabbitMQ Broker (in milliseconds).
294294
* - ``dramatiq_group_callback_barrier_ttl``
295295
- 86400000 (One day)
296-
-
296+
- Deprecated. Use the `barrier_ttl` parameter of |GroupCallbacks| middleware instead.
297297
* - ``dramatiq_prom_db``
298298
- tempfile.gettempdir()/dramatiq-prometheus
299299
- The path to store the prometheus database files. See :ref:`gotchas-with-prometheus`.

dramatiq/composition.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -266,15 +266,12 @@ def run(self, *, delay=None):
266266
group: This same group.
267267
"""
268268
if self.completion_callbacks:
269-
from .middleware.group_callbacks import (
270-
GROUP_CALLBACK_BARRIER_TTL,
271-
GroupCallbacks,
272-
)
269+
from .middleware.group_callbacks import GroupCallbacks
273270

274-
rate_limiter_backend = None
275271
for middleware in self.broker.middleware:
276272
if isinstance(middleware, GroupCallbacks):
277273
rate_limiter_backend = middleware.rate_limiter_backend
274+
barrier_ttl = middleware.barrier_ttl
278275
break
279276
else:
280277
raise RuntimeError(
@@ -287,7 +284,7 @@ def run(self, *, delay=None):
287284
# group is re-run, the barriers are all separate.
288285
# Re-using a barrier's name is an unsafe operation.
289286
completion_uuid = str(uuid4())
290-
completion_barrier = Barrier(rate_limiter_backend, completion_uuid, ttl=GROUP_CALLBACK_BARRIER_TTL)
287+
completion_barrier = Barrier(rate_limiter_backend, completion_uuid, ttl=barrier_ttl)
291288
completion_barrier.create(len(self.children))
292289

293290
children = []

dramatiq/middleware/group_callbacks.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,30 @@
1818
from __future__ import annotations
1919

2020
import os
21+
import warnings
2122

2223
from ..rate_limits import Barrier, RateLimiterBackend
2324
from .middleware import Middleware
2425

25-
GROUP_CALLBACK_BARRIER_TTL = int(os.getenv("dramatiq_group_callback_barrier_ttl", "86400000"))
26-
2726

2827
class GroupCallbacks(Middleware):
2928
"""Middleware that enables adding completion callbacks to |Groups|."""
3029

31-
def __init__(self, rate_limiter_backend: RateLimiterBackend) -> None:
30+
def __init__(self, rate_limiter_backend: RateLimiterBackend, *, barrier_ttl: int = 86400 * 1000) -> None:
3231
self.rate_limiter_backend = rate_limiter_backend
3332

33+
_barrier_ttl_env = os.getenv("dramatiq_group_callback_barrier_ttl", None)
34+
if _barrier_ttl_env is not None:
35+
warnings.warn(
36+
"Configuring the barrier TTL via the 'dramatiq_group_callback_barrier_ttl' environment variable is deprecated; "
37+
"use the `barrier_ttl` argument of the `GroupCallbacks` middleware instead.",
38+
FutureWarning,
39+
stacklevel=2,
40+
)
41+
self.barrier_ttl = int(_barrier_ttl_env)
42+
else:
43+
self.barrier_ttl = barrier_ttl
44+
3445
def after_process_message(self, broker, message, *, result=None, exception=None):
3546
from ..message import Message
3647

@@ -41,7 +52,7 @@ def after_process_message(self, broker, message, *, result=None, exception=None)
4152
barrier = Barrier(
4253
self.rate_limiter_backend,
4354
group_completion_uuid,
44-
ttl=GROUP_CALLBACK_BARRIER_TTL,
55+
ttl=self.barrier_ttl,
4556
)
4657
if barrier.wait(block=False):
4758
for message in group_completion_callbacks:

0 commit comments

Comments
 (0)