Skip to content

Commit 060a96b

Browse files
authored
Merge branch 'master' into save_safety
2 parents 2db33e8 + d4764ba commit 060a96b

File tree

22 files changed

+323
-298
lines changed

22 files changed

+323
-298
lines changed

docs/changelog.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Changelog
22

3+
## v4.0.8 🌈
4+
5+
### 🐛 Bug Fixes
6+
7+
- Fix: Scheduled jobs fail to execute: {'scheduled_time': ['Scheduled time must be in the future']} #297
8+
- Fix Error with deserialize JobModel due to multi-processing #291
9+
310
## v4.0.7 🌈
411

512
### 🧰 Maintenance

docs/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
mkdocs==1.6.1
2-
mkdocs-material==9.6.19
2+
mkdocs-material==9.6.20

pyproject.toml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "django-tasks-scheduler"
7-
version = "4.0.7"
7+
version = "4.0.8"
88
description = "An async job scheduler for django using redis/valkey brokers"
99
authors = [{ name = "Daniel Moran", email = "[email protected]" }]
1010
requires-python = ">=3.10"
@@ -56,12 +56,12 @@ Funding = "https://github.com/sponsors/cunla"
5656

5757
[dependency-groups]
5858
dev = [
59-
"time-machine>=2.16.0",
60-
"ruff>=0.11",
61-
"coverage>=7.6",
59+
"time-machine>=2.19",
60+
"ruff>=0.13",
61+
"coverage[toml]>=7.10",
6262
"fakeredis>=2.28",
63-
"pyyaml>=6,<7",
64-
"mypy>=1.16.0",
63+
"pyyaml>=6",
64+
"mypy>=1.18",
6565
"types-croniter>=6.0.0.20250411",
6666
"beautifulsoup4>=4.13.4"
6767
]
@@ -92,9 +92,9 @@ line-ending = "auto"
9292
[tool.mypy]
9393
packages = ['scheduler', ]
9494
exclude = ["scheduler/tests/.*\\.py",
95-
"scheduler/migrations/.*\\.py",
96-
"testproject/.*\\.py",
97-
"testproject/tests/.*\\.py"]
95+
"scheduler/migrations/.*\\.py",
96+
"testproject/.*\\.py",
97+
"testproject/tests/.*\\.py"]
9898
strict = true
9999
follow_imports = "silent"
100100
ignore_missing_imports = true

scheduler/helpers/queues/getters.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ def _get_connection(config: QueueConfiguration, use_strict_broker: bool = False)
4646
)
4747

4848

49+
def refresh_queue_connection(queue: Queue) -> None:
50+
"""Refreshes the connection of a given Queue"""
51+
queue_settings = get_queue_configuration(queue.name)
52+
connection = _get_connection(queue_settings)
53+
queue.refresh_connection(connection)
54+
55+
4956
def get_queue(name: str = "default") -> Queue:
5057
"""Returns an DjangoQueue using parameters defined in `SCHEDULER_QUEUES`"""
5158
queue_settings = get_queue_configuration(name)

scheduler/helpers/queues/queue_logic.py

Lines changed: 37 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,16 @@ def perform_job(job_model: JobModel, connection: ConnectionType) -> Any: # noqa
4444

4545
try:
4646
result = job_model.func(*job_model.args, **job_model.kwargs)
47+
job_model.save(connection=connection, save_all=True)
4748
if asyncio.iscoroutine(result):
4849
loop = asyncio.new_event_loop()
4950
coro_result = loop.run_until_complete(result)
5051
result = coro_result
51-
if job_model.success_callback:
52-
job_model.success_callback(job_model, connection, result)
52+
job_model.call_success_callback(job_model, connection, result)
5353
return result
5454
except Exception as e:
5555
logger.error(f"Job {job_model.name} failed with exception: {e}", exc_info=True)
56-
if job_model.failure_callback:
57-
job_model.failure_callback(job_model, connection, *sys.exc_info())
56+
job_model.call_failure_callback(job_model, connection, *sys.exc_info())
5857
raise
5958
finally:
6059
assert job_model is _job_stack.pop()
@@ -90,6 +89,15 @@ def __init__(self, connection: ConnectionType, name: str, is_async: bool = True)
9089
self.scheduled_job_registry = ScheduledJobRegistry(connection=self.connection, name=self.name)
9190
self.canceled_job_registry = CanceledJobRegistry(connection=self.connection, name=self.name)
9291

92+
def refresh_connection(self, connection: ConnectionType) -> None:
93+
self.connection = connection
94+
self.queued_job_registry.connection = connection
95+
self.active_job_registry.connection = connection
96+
self.failed_job_registry.connection = connection
97+
self.finished_job_registry.connection = connection
98+
self.scheduled_job_registry.connection = connection
99+
self.canceled_job_registry.connection = connection
100+
93101
def __len__(self) -> int:
94102
return self.count
95103

@@ -111,43 +119,27 @@ def clean_registries(self, timestamp: Optional[float] = None) -> None:
111119
self.connection, before_score
112120
)
113121

114-
with self.connection.pipeline() as pipeline:
115-
for job_name, job_score in started_jobs:
116-
job = JobModel.get(job_name, connection=self.connection)
117-
if job is None or job.failure_callback is None or job_score + job.timeout > before_score:
118-
continue
122+
for job_name, job_score in started_jobs:
123+
job = JobModel.get(job_name, connection=self.connection)
124+
if job is None or not job.has_failure_callback or job_score + job.timeout > before_score:
125+
continue
119126

120-
logger.debug(f"Running failure callbacks for {job.name}")
121-
try:
122-
job.failure_callback(job, self.connection, traceback.extract_stack())
123-
except Exception: # noqa
124-
logger.exception(f"Job {self.name}: error while executing failure callback")
125-
raise
127+
logger.debug(f"Running failure callbacks for {job.name}")
128+
try:
129+
job.call_failure_callback(job, self.connection, traceback.extract_stack())
130+
except Exception: # noqa
131+
logger.exception(f"Job {self.name}: error while executing failure callback")
132+
raise
126133

127-
else:
128-
logger.warning(
129-
f"Queue cleanup: Moving job to {self.failed_job_registry.key} (due to AbandonedJobError)"
130-
)
131-
exc_string = (
132-
f"Moved to {self.failed_job_registry.key}, due to AbandonedJobError, at {datetime.now()}"
133-
)
134-
job.status = JobStatus.FAILED
135-
score = current_timestamp() + SCHEDULER_CONFIG.DEFAULT_FAILURE_TTL
136-
Result.create(
137-
connection=pipeline,
138-
job_name=job.name,
139-
worker_name=job.worker_name,
140-
_type=ResultType.FAILED,
141-
ttl=SCHEDULER_CONFIG.DEFAULT_FAILURE_TTL,
142-
exc_string=exc_string,
143-
)
144-
self.failed_job_registry.add(pipeline, job.name, score)
145-
job.expire(connection=pipeline, ttl=SCHEDULER_CONFIG.DEFAULT_FAILURE_TTL)
146-
job.save(connection=pipeline)
134+
else:
135+
logger.warning(
136+
f"Queue cleanup: Moving job to {self.failed_job_registry.key} (due to AbandonedJobError)"
137+
)
138+
exc_string = f"Moved to {self.failed_job_registry.key}, due to AbandonedJobError, at {datetime.now()}"
139+
self.job_handle_failure(JobStatus.FAILED, job, exc_string)
147140

148141
for registry in self.REGISTRIES.values():
149142
getattr(self, registry).cleanup(connection=self.connection, timestamp=before_score)
150-
pipeline.execute()
151143

152144
def first_queued_job_name(self) -> Optional[str]:
153145
return self.queued_job_registry.get_first()
@@ -248,37 +240,35 @@ def create_and_enqueue_job(
248240
raise TypeError(f"Invalid type for when=`{when}`")
249241
return job_model
250242

251-
def job_handle_success(
252-
self, job: JobModel, result: Any, job_info_ttl: int, result_ttl: int, connection: ConnectionType
253-
) -> None:
243+
def job_handle_success(self, job: JobModel, result: Any, job_info_ttl: int, result_ttl: int) -> None:
254244
"""Saves and cleanup job after successful execution"""
255245
job.after_execution(
256246
job_info_ttl,
257247
JobStatus.FINISHED,
258248
prev_registry=self.active_job_registry,
259249
new_registry=self.finished_job_registry,
260-
connection=connection,
250+
connection=self.connection,
261251
)
262252
Result.create(
263-
connection,
253+
self.connection,
264254
job_name=job.name,
265255
worker_name=job.worker_name,
266256
_type=ResultType.SUCCESSFUL,
267257
return_value=result,
268258
ttl=result_ttl,
269259
)
270260

271-
def job_handle_failure(self, status: JobStatus, job: JobModel, exc_string: str, connection: ConnectionType) -> None:
261+
def job_handle_failure(self, status: JobStatus, job: JobModel, exc_string: str) -> None:
272262
# Does not set job status since the job might be stopped
273263
job.after_execution(
274264
SCHEDULER_CONFIG.DEFAULT_FAILURE_TTL,
275265
status,
276266
prev_registry=self.active_job_registry,
277267
new_registry=self.failed_job_registry,
278-
connection=connection,
268+
connection=self.connection,
279269
)
280270
Result.create(
281-
connection,
271+
self.connection,
282272
job.name,
283273
job.worker_name,
284274
ResultType.FAILED,
@@ -291,19 +281,11 @@ def run_sync(self, job: JobModel) -> JobModel:
291281
job.prepare_for_execution("sync", self.active_job_registry, self.connection)
292282
try:
293283
result = perform_job(job, self.connection)
294-
295-
with self.connection.pipeline() as pipeline:
296-
self.job_handle_success(
297-
job, result=result, job_info_ttl=job.job_info_ttl, result_ttl=job.success_ttl, connection=pipeline
298-
)
299-
300-
pipeline.execute()
284+
self.job_handle_success(job, result=result, job_info_ttl=job.job_info_ttl, result_ttl=job.success_ttl)
301285
except Exception as e: # noqa
302286
logger.warning(f"Job {job.name} failed with exception: {e}")
303-
with self.connection.pipeline() as pipeline:
304-
exc_string = "".join(traceback.format_exception(*sys.exc_info()))
305-
self.job_handle_failure(JobStatus.FAILED, job, exc_string, pipeline)
306-
pipeline.execute()
287+
exc_string = "".join(traceback.format_exception(*sys.exc_info()))
288+
self.job_handle_failure(JobStatus.FAILED, job, exc_string)
307289
return job
308290

309291
@classmethod

scheduler/models/task.py

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -289,20 +289,9 @@ def to_dict(self) -> Dict[str, Any]:
289289
model=str(self.task_type),
290290
name=self.name,
291291
callable=self.callable,
292-
callable_args=[
293-
dict(
294-
arg_type=arg.arg_type,
295-
val=arg.val,
296-
)
297-
for arg in self.callable_args.all()
298-
],
292+
callable_args=[dict(arg_type=arg.arg_type, val=arg.val) for arg in self.callable_args.all()],
299293
callable_kwargs=[
300-
dict(
301-
arg_type=arg.arg_type,
302-
key=arg.key,
303-
val=arg.val,
304-
)
305-
for arg in self.callable_kwargs.all()
294+
dict(arg_type=arg.arg_type, key=arg.key, val=arg.val) for arg in self.callable_kwargs.all()
306295
],
307296
enabled=self.enabled,
308297
queue=self.queue,
@@ -344,23 +333,18 @@ def _schedule(self) -> bool:
344333
logger.debug(f"Task {str(self)} scheduled time is in the past, not scheduling")
345334
return False
346335
kwargs = self._enqueue_args()
347-
job = self.rqueue.create_and_enqueue_job(
348-
run_task,
349-
args=(self.task_type, self.id),
350-
when=schedule_time,
351-
**kwargs,
352-
)
336+
job = self.rqueue.create_and_enqueue_job(run_task, args=(self.task_type, self.id), when=schedule_time, **kwargs)
353337
self.job_name = job.name
354338
return True
355339

356340
def save(self, **kwargs: Any) -> None:
357341
should_clean = kwargs.pop("clean", True)
342+
schedule_job = kwargs.pop("schedule_job", True)
358343
if should_clean:
359344
self.clean()
360345
update_fields = kwargs.get("update_fields", None)
361346
if update_fields is not None:
362347
kwargs["update_fields"] = set(update_fields).union({"updated_at"})
363-
schedule_job = kwargs.pop("schedule_job", True)
364348
super(Task, self).save(**kwargs)
365349
if schedule_job:
366350
self._schedule()

scheduler/redis_models/base.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ def deserialize(cls, data: Dict[str, Any]) -> Self:
108108
logger.warning(f"Unknown field {k} in {cls.__name__}")
109109
continue
110110
data[k] = _deserialize(data[k], types[k])
111-
return cls(**data)
111+
res = cls(**data)
112+
return res
112113

113114

114115
@dataclasses.dataclass(slots=True, kw_only=True)
@@ -118,6 +119,22 @@ class HashModel(BaseModel):
118119
_list_key: ClassVar[str] = ":list_all:"
119120
_children_key_template: ClassVar[str] = ":children:{}:"
120121

122+
def __post_init__(self):
123+
self._dirty_fields = set()
124+
self._save_all = True
125+
126+
def __setattr__(self, key, value):
127+
if not key.startswith("_") and hasattr(self, "_dirty_fields"):
128+
self._dirty_fields.add(key)
129+
super(HashModel, self).__setattr__(key, value)
130+
131+
@classmethod
132+
def deserialize(cls, data: Dict[str, Any]) -> Self:
133+
instance = super(HashModel, cls).deserialize(data)
134+
instance._dirty_fields = set()
135+
instance._save_all = False
136+
return instance
137+
121138
@property
122139
def _parent_key(self) -> Optional[str]:
123140
if self.parent is None:
@@ -168,29 +185,33 @@ def get_many(cls, names: Sequence[str], connection: ConnectionType) -> List[Opti
168185
values = pipeline.execute()
169186
return [(cls.deserialize(decode_dict(v, set())) if v else None) for v in values]
170187

171-
def save(self, connection: ConnectionType) -> None:
188+
def save(self, connection: ConnectionType, save_all: bool = False) -> None:
189+
save_all = save_all or self._save_all
172190
with connection.pipeline() as pipeline:
173191
pipeline.sadd(self._list_key, self.name)
174192
if self._parent_key is not None:
175193
pipeline.sadd(self._parent_key, self.name)
176194
mapping = self.serialize(with_nones=True)
195+
if not save_all:
196+
mapping = {k: v for k, v in mapping.items() if k in self._dirty_fields}
177197
none_values = {k for k, v in mapping.items() if v is None}
178198
if none_values:
179199
pipeline.hdel(self._key, *none_values)
180200
mapping = {k: v for k, v in mapping.items() if v is not None}
181201
if mapping:
182202
pipeline.hset(self._key, mapping=mapping)
183-
184203
pipeline.execute()
204+
self._dirty_fields = set()
205+
self._save_all = False
185206

186207
def delete(self, connection: ConnectionType) -> None:
187208
with connection.pipeline() as pipeline:
188209
pipeline.srem(self._list_key, self._key)
189210
if self._parent_key is not None:
190211
pipeline.srem(self._parent_key, 0, self._key)
191212
pipeline.delete(self._key)
192-
193213
pipeline.execute()
214+
self._save_all = True
194215

195216
@classmethod
196217
def count(cls, connection: ConnectionType, parent: Optional[str] = None) -> int:

0 commit comments

Comments
 (0)