Skip to content

Commit c324a9f

Browse files
committed
Deprecate the dramatiq_group_callback_barrier_ttl environment variable
1 parent 73969e8 commit c324a9f

File tree

4 files changed

+27
-11
lines changed

4 files changed

+27
-11
lines changed

docs/source/changelog.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,15 @@ Changed
160160
.. _#738: https://github.com/Bogdanp/dramatiq/issues/738
161161
.. _#764: https://github.com/Bogdanp/dramatiq/pull/764
162162

163+
Deprecated
164+
^^^^^^^^^^
165+
166+
* The ``dramatiq_group_callback_barrier_ttl`` environment variable has been deprecated.
167+
Instead, use the ``barrier_ttl`` parameter of the |GroupCallbacks| middleware.
168+
(`#775`_, `@mikeroll`_)
169+
170+
.. _#775: https://github.com/Bogdanp/dramatiq/pull/775
171+
163172
Removed
164173
^^^^^^^
165174

docs/source/reference.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ These are the environment variables that dramatiq reads
293293
- The maximum amount of time a message can be in the dead letter queue for the RabbitMQ Broker (in milliseconds).
294294
* - ``dramatiq_group_callback_barrier_ttl``
295295
- 86400000 (One day)
296-
-
296+
- Deprecated. Use `barrier_ttl` parameter of `GroupCallbacks` middleware.
297297
* - ``dramatiq_prom_db``
298298
- tempfile.gettempdir()/dramatiq-prometheus
299299
- The path to store the prometheus database files. See :ref:`gotchas-with-prometheus`.

dramatiq/composition.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -264,15 +264,12 @@ def run(self, *, delay=None):
264264
group: This same group.
265265
"""
266266
if self.completion_callbacks:
267-
from .middleware.group_callbacks import (
268-
GROUP_CALLBACK_BARRIER_TTL,
269-
GroupCallbacks,
270-
)
267+
from .middleware.group_callbacks import GroupCallbacks
271268

272-
rate_limiter_backend = None
273269
for middleware in self.broker.middleware:
274270
if isinstance(middleware, GroupCallbacks):
275271
rate_limiter_backend = middleware.rate_limiter_backend
272+
barrier_ttl = middleware.barrier_ttl
276273
break
277274
else:
278275
raise RuntimeError(
@@ -285,7 +282,7 @@ def run(self, *, delay=None):
285282
# group is re-run, the barriers are all separate.
286283
# Re-using a barrier's name is an unsafe operation.
287284
completion_uuid = str(uuid4())
288-
completion_barrier = Barrier(rate_limiter_backend, completion_uuid, ttl=GROUP_CALLBACK_BARRIER_TTL)
285+
completion_barrier = Barrier(rate_limiter_backend, completion_uuid, ttl=barrier_ttl)
289286
completion_barrier.create(len(self.children))
290287

291288
children = []

dramatiq/middleware/group_callbacks.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,29 @@
1818
from __future__ import annotations
1919

2020
import os
21+
import warnings
2122

2223
from ..rate_limits import Barrier, RateLimiterBackend
2324
from .middleware import Middleware
2425

25-
GROUP_CALLBACK_BARRIER_TTL = int(os.getenv("dramatiq_group_callback_barrier_ttl", "86400000"))
26-
2726

2827
class GroupCallbacks(Middleware):
2928
"""Middleware that enables adding completion callbacks to |Groups|."""
3029

31-
def __init__(self, rate_limiter_backend: RateLimiterBackend) -> None:
30+
def __init__(self, rate_limiter_backend: RateLimiterBackend, *, barrier_ttl: int = 86400 * 1000) -> None:
3231
self.rate_limiter_backend = rate_limiter_backend
3332

33+
_barrier_ttl_env = os.getenv("dramatiq_group_callback_barrier_ttl", None)
34+
if _barrier_ttl_env is not None:
35+
warnings.warn(
36+
"Configuring the barrier TTL via the environment variable is deprecated; "
37+
"use the `barrier_ttl` argument instead.",
38+
FutureWarning,
39+
)
40+
self.barrier_ttl = int(_barrier_ttl_env)
41+
else:
42+
self.barrier_ttl = barrier_ttl
43+
3444
def after_process_message(self, broker, message, *, result=None, exception=None):
3545
from ..message import Message
3646

@@ -41,7 +51,7 @@ def after_process_message(self, broker, message, *, result=None, exception=None)
4151
barrier = Barrier(
4252
self.rate_limiter_backend,
4353
group_completion_uuid,
44-
ttl=GROUP_CALLBACK_BARRIER_TTL,
54+
ttl=self.barrier_ttl,
4555
)
4656
if barrier.wait(block=False):
4757
for message in group_completion_callbacks:

0 commit comments

Comments
 (0)