Skip to content

Commit 9ce2fd6

Browse files
Add timezone support for cron jobs (#354)
* Test that next_cron preserves tzinfo * Add Worker.timezone attribute * Pass timezone aware now to run_cron * Make tests work in 3.7 Co-authored-by: Samuel Colvin <[email protected]>
1 parent 0474311 commit 9ce2fd6

File tree

3 files changed

+31
-2
lines changed

3 files changed

+31
-2
lines changed

arq/worker.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44
import signal
55
from dataclasses import dataclass
6-
from datetime import datetime, timedelta
6+
from datetime import datetime, timedelta, timezone
77
from functools import partial
88
from signal import Signals
99
from time import time
@@ -170,6 +170,8 @@ class Worker:
170170
:param job_deserializer: a function that deserializes bytes into Python objects, defaults to pickle.loads
171171
:param expires_extra_ms: the default length of time from when a job is expected to start
172172
after which the job expires, defaults to 1 day in ms.
173+
:param timezone: timezone used for evaluation of cron schedules,
174+
defaults to system timezone
173175
:param log_results: when set to true (default) results for successful jobs
174176
will be logged
175177
"""
@@ -204,6 +206,7 @@ def __init__(
204206
job_serializer: Optional[Serializer] = None,
205207
job_deserializer: Optional[Deserializer] = None,
206208
expires_extra_ms: int = expires_extra_ms,
209+
timezone: Optional[timezone] = None,
207210
log_results: bool = True,
208211
):
209212
self.functions: Dict[str, Union[Function, CronJob]] = {f.name: f for f in map(func, functions)}
@@ -271,6 +274,9 @@ def __init__(
271274
self.expires_extra_ms = expires_extra_ms
272275
self.log_results = log_results
273276

277+
# default to system timezone
278+
self.timezone = datetime.now().astimezone().tzinfo if timezone is None else timezone
279+
274280
def run(self) -> None:
275281
"""
276282
Sync function to run the worker, finally closes worker connections.
@@ -677,7 +683,7 @@ async def finish_failed_job(self, job_id: str, result_data: Optional[bytes]) ->
677683
await tr.execute()
678684

679685
async def heart_beat(self) -> None:
680-
now = datetime.now()
686+
now = datetime.now(tz=self.timezone)
681687
await self.record_health()
682688

683689
cron_window_size = max(self.poll_delay_s, 0.5) # Clamp the cron delay to 0.5

tests/test_cron.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from random import random
66

77
import pytest
8+
from pytest_mock import MockerFixture
89

910
import arq
1011
from arq import Worker
@@ -51,6 +52,12 @@ def test_next_cron(previous, expected, kwargs):
5152
print(f'{diff.total_seconds() * 1000:0.3f}ms')
5253

5354

55+
def test_next_cron_preserves_tzinfo():
56+
previous = datetime.fromisoformat('2016-06-01T12:10:10+02:00')
57+
assert previous.tzinfo is not None
58+
assert next_cron(previous, second=20).tzinfo is previous.tzinfo
59+
60+
5461
def test_next_cron_invalid():
5562
with pytest.raises(ValueError):
5663
next_cron(datetime(2001, 1, 1, 0, 0, 0), weekday='monday')
@@ -110,6 +117,16 @@ async def test_job_successful(worker, caplog, arq_redis, poll_delay):
110117
assert await arq_redis.pttl(keys[0]) > 0.0
111118

112119

120+
async def test_calculate_next_is_called_with_aware_datetime(worker, mocker: MockerFixture):
121+
worker: Worker = worker(cron_jobs=[cron(foobar, hour=1)])
122+
spy_run_cron = mocker.spy(worker, worker.run_cron.__name__)
123+
124+
await worker.main()
125+
126+
assert spy_run_cron.call_args[0][0].tzinfo is not None
127+
assert worker.cron_jobs[0].next_run.tzinfo is not None
128+
129+
113130
async def test_job_successful_on_specific_queue(worker, caplog):
114131
caplog.set_level(logging.INFO)
115132
worker: Worker = worker(

tests/test_worker.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -940,3 +940,9 @@ async def test(ctx):
940940
assert worker.jobs_failed == 0
941941
assert worker.jobs_retried == 0
942942
assert result['called'] == 2
943+
944+
945+
async def test_worker_timezone_defaults_to_system_timezone(worker):
946+
worker = worker(functions=[func(foobar)])
947+
assert worker.timezone is not None
948+
assert worker.timezone == datetime.now().astimezone().tzinfo

0 commit comments

Comments
 (0)