From ebe697560c963a752b71c95794a95e077eeb0df2 Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Wed, 2 Sep 2020 13:34:41 +0300 Subject: [PATCH 01/18] feature: PeriodicTask by signature --- django_celery_beat/admin.py | 16 ++++--- .../0015_periodictask_task_signature.py | 18 ++++++++ django_celery_beat/models.py | 12 +++++ django_celery_beat/schedulers.py | 44 +++++++++++++++++-- requirements/default.txt | 1 + requirements/runtime.txt | 1 + 6 files changed, 83 insertions(+), 9 deletions(-) create mode 100644 django_celery_beat/migrations/0015_periodictask_task_signature.py diff --git a/django_celery_beat/admin.py b/django_celery_beat/admin.py index 31ac06b2..fac3aaee 100644 --- a/django_celery_beat/admin.py +++ b/django_celery_beat/admin.py @@ -203,11 +203,14 @@ def toggle_tasks(self, request, queryset): def run_tasks(self, request, queryset): self.celery_app.loader.import_default_modules() - tasks = [(self.celery_app.tasks.get(task.task), - loads(task.args), - loads(task.kwargs), - task.queue) - for task in queryset] + tasks = [ + ( + task.get_task_signature() if task.task_signature is not None else self.celery_app.tasks.get(task.task), + loads(task.args), + loads(task.kwargs), + task.queue + ) for task in queryset + ] if any(t[0] is None for t in tasks): for i, t in enumerate(tasks): @@ -215,7 +218,8 @@ def run_tasks(self, request, queryset): break # variable "i" will be set because list "tasks" is not empty - not_found_task_name = queryset[i].task + not_found_task_name = queryset[i].task_signature.name if queryset[i].task_signature is not None \ + else queryset[i].task self.message_user( request, diff --git a/django_celery_beat/migrations/0015_periodictask_task_signature.py b/django_celery_beat/migrations/0015_periodictask_task_signature.py new file mode 100644 index 00000000..32d8a1e5 --- /dev/null +++ b/django_celery_beat/migrations/0015_periodictask_task_signature.py @@ -0,0 +1,18 @@ +# Generated by Django 2.2.16 on 2020-09-01 10:17 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_celery_beat', '0014_remove_clockedschedule_enabled'), + ] + + operations = [ + migrations.AddField( + model_name='periodictask', + name='task_signature', + field=models.BinaryField(help_text='Serialized signature objects of task (or chain, group, etc.) got by https://pypi.org/project/dill/', null=True), + ), + ] diff --git a/django_celery_beat/models.py b/django_celery_beat/models.py index d82b6aa4..6c0fc183 100644 --- a/django_celery_beat/models.py +++ b/django_celery_beat/models.py @@ -1,6 +1,7 @@ """Database models.""" from datetime import timedelta +import dill import timezone_field from celery import schedules, current_app from django.conf import settings @@ -388,6 +389,11 @@ class PeriodicTask(models.Model): help_text=_('The Name of the Celery Task that Should be Run. ' '(Example: "proj.tasks.import_contacts")'), ) + task_signature = models.BinaryField( + null=True, + help_text='Serialized signature objects of task (or chain, group, ' + 'etc.) got by https://pypi.org/project/dill/' + ) # todo: add sign of a serialized task # You can only set ONE of the following schedule FK's # TODO: Redo this as a GenericForeignKey @@ -578,6 +584,12 @@ def _clean_expires(self): _('Only one can be set, in expires and expire_seconds') ) + def get_task_signature(self): + if self.task_signature is None: + return None + # todo: add sign check + return dill.loads(bytes(self.task_signature)) + @property def expires_(self): return self.expires or self.expire_seconds diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index b9f332dc..4265a06e 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -1,22 +1,32 @@ """Beat Scheduler Implementation.""" +from __future__ import absolute_import, unicode_literals + import datetime import logging import math - -from multiprocessing.util import Finalize +import sys from celery import current_app from celery import schedules -from celery.beat import Scheduler, ScheduleEntry +# noinspection PyProtectedMember +from celery.beat import Scheduler, ScheduleEntry, SchedulingError, BeatLazyFunc +# noinspection PyUnresolvedReferences +from celery.five import ( + items, monotonic, python_2_unicode_compatible, + reraise, values +) from celery.utils.encoding import safe_str, safe_repr from celery.utils.log import get_logger from celery.utils.time import maybe_make_aware from kombu.utils.json import dumps, loads from django.conf import settings +# noinspection PyProtectedMember from django.db import transaction, close_old_connections from django.db.utils import DatabaseError, InterfaceError from django.core.exceptions import ObjectDoesNotExist +# noinspection PyUnresolvedReferences +from multiprocessing.util import Finalize from .models import ( PeriodicTask, PeriodicTasks, @@ -60,6 +70,7 @@ def __init__(self, model, app=None): self.app = app or current_app._get_current_object() self.name = model.name self.task = model.task + self.task_signature = model.get_task_signature() try: self.schedule = model.schedule except model.DoesNotExist: @@ -372,3 +383,30 @@ def schedule(self): repr(entry) for entry in self._schedule.values()), ) return self._schedule + + def apply_async(self, entry, producer=None, advance=True, **kwargs): + # Update time-stamps and run counts before we actually execute, + # so we have that done if an exception is raised (doesn't schedule + # forever.) + entry = self.reserve(entry) if advance else entry + task = entry.task_signature if entry.task_signature is not None else self.app.tasks.get(entry.task) + + try: + entry_args = [v() if isinstance(v, BeatLazyFunc) else v for v in (entry.args or [])] + entry_kwargs = {k: v() if isinstance(v, BeatLazyFunc) else v for k, v in entry.kwargs.items()} + if task: + return task.apply_async(entry_args, entry_kwargs, + producer=producer, + **entry.options) + else: + return self.send_task(entry.task, entry_args, entry_kwargs, + producer=producer, + **entry.options) + except Exception as exc: # pylint: disable=broad-except + reraise(SchedulingError, SchedulingError( + "Couldn't apply scheduled task {0.name}: {exc}".format( + entry, exc=exc)), sys.exc_info()[2]) + finally: + self._tasks_since_sync += 1 + if self.should_sync(): + self._do_sync() diff --git a/requirements/default.txt b/requirements/default.txt index 8e00ce47..c15e9e13 100644 --- a/requirements/default.txt +++ b/requirements/default.txt @@ -1,2 +1,3 @@ django-timezone-field>=4.0,<5.0 python-crontab>=2.3.4 +dill diff --git a/requirements/runtime.txt b/requirements/runtime.txt index 0764f4cd..a54d0225 100644 --- a/requirements/runtime.txt +++ b/requirements/runtime.txt @@ -1,2 +1,3 @@ celery +# celery>=4.4.7 Django>=1.11.17 From 0b91eca42038a9fa8c7fae601f26b68668ddd553 Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Thu, 3 Sep 2020 11:35:16 +0300 Subject: [PATCH 02/18] feature: Sign field --- django_celery_beat/admin.py | 12 ++++-- .../0015_periodictask_task_signature.py | 7 +++- django_celery_beat/models.py | 38 ++++++++++++++++--- django_celery_beat/schedulers.py | 3 +- django_celery_beat/utils.py | 38 +++++++++++++++++++ requirements/default.txt | 2 + 6 files changed, 89 insertions(+), 11 deletions(-) diff --git a/django_celery_beat/admin.py b/django_celery_beat/admin.py index fac3aaee..231e34c3 100644 --- a/django_celery_beat/admin.py +++ b/django_celery_beat/admin.py @@ -74,6 +74,7 @@ class PeriodicTaskForm(forms.ModelForm): required=False, max_length=200, ) + # todo: add field for task_signature class Meta: """Form metadata.""" @@ -205,7 +206,9 @@ def run_tasks(self, request, queryset): self.celery_app.loader.import_default_modules() tasks = [ ( - task.get_task_signature() if task.task_signature is not None else self.celery_app.tasks.get(task.task), + task.get_verified_task_signature(raise_exceptions=False) + if task.task_signature is not None + else self.celery_app.tasks.get(task.task), loads(task.args), loads(task.kwargs), task.queue @@ -218,8 +221,9 @@ def run_tasks(self, request, queryset): break # variable "i" will be set because list "tasks" is not empty - not_found_task_name = queryset[i].task_signature.name if queryset[i].task_signature is not None \ - else queryset[i].task + not_found_task_name = queryset[i].get_verified_task_signature(raise_exceptions=False).name \ + if queryset[i].task_signature is not None and queryset[i].get_verified_task_signature( + raise_exceptions=False) is not None else queryset[i].task self.message_user( request, @@ -231,7 +235,7 @@ def run_tasks(self, request, queryset): task_ids = [task.apply_async(args=args, kwargs=kwargs, queue=queue) if queue and len(queue) else task.apply_async(args=args, kwargs=kwargs) - for task, args, kwargs, queue in tasks] + for task, args, kwargs, queue in tasks if task is not None] tasks_run = len(task_ids) self.message_user( request, diff --git a/django_celery_beat/migrations/0015_periodictask_task_signature.py b/django_celery_beat/migrations/0015_periodictask_task_signature.py index 32d8a1e5..301d24c0 100644 --- a/django_celery_beat/migrations/0015_periodictask_task_signature.py +++ b/django_celery_beat/migrations/0015_periodictask_task_signature.py @@ -13,6 +13,11 @@ class Migration(migrations.Migration): migrations.AddField( model_name='periodictask', name='task_signature', - field=models.BinaryField(help_text='Serialized signature objects of task (or chain, group, etc.) got by https://pypi.org/project/dill/', null=True), + field=models.BinaryField(help_text="Serialized `celery.canvas.Signature` type's object of task (or chain, group, etc.) got by https://pypi.org/project/dill/", null=True), + ), + migrations.AddField( + model_name='periodictask', + name='task_signature_sign', + field=models.CharField(help_text="Signature (in hex) of serialized `celery.canvas.Signature` type's object (see task_signature field)", max_length=1028, null=True), ), ] diff --git a/django_celery_beat/models.py b/django_celery_beat/models.py index 6c0fc183..c3e2acc3 100644 --- a/django_celery_beat/models.py +++ b/django_celery_beat/models.py @@ -13,9 +13,11 @@ from . import managers, validators from .tzcrontab import TzAwareCrontab -from .utils import make_aware, now +from .utils import make_aware, now, verify from .clockedschedule import clocked +from celery.utils.log import get_logger +logger = get_logger(__name__) DAYS = 'days' HOURS = 'hours' @@ -391,9 +393,14 @@ class PeriodicTask(models.Model): ) task_signature = models.BinaryField( null=True, - help_text='Serialized signature objects of task (or chain, group, ' + help_text='Serialized `celery.canvas.Signature` type\'s object of task (or chain, group, ' 'etc.) got by https://pypi.org/project/dill/' - ) # todo: add sign of a serialized task + ) + task_signature_sign = models.CharField( + null=True, + max_length=1028, + help_text='Signature (in hex) of serialized `celery.canvas.Signature` type\'s object (see task_signature field)' + ) # You can only set ONE of the following schedule FK's # TODO: Redo this as a GenericForeignKey @@ -584,10 +591,31 @@ def _clean_expires(self): _('Only one can be set, in expires and expire_seconds') ) - def get_task_signature(self): + def get_verified_task_signature(self, raise_exceptions=True): if self.task_signature is None: return None - # todo: add sign check + + if self.task_signature_sign is None: + err = 'Not found `task_signature_sign` for `{}` (use django_celery_be' \ + 'at.utils.sign to sign). Task disabled.'.format(self) + self.enabled = False + self.save(update_fields=['enabled']) + logger.error(err) + if raise_exceptions: + raise ValueError(err) + return None + + task_signature_sign = int(self.task_signature_sign, 16) + + if not verify(bytes(self.task_signature), (task_signature_sign,)): + err = 'Wrong sign for `{}`. Task disabled.'.format(self) + self.enabled = False + self.save(update_fields=['enabled']) + logger.error(err) + if raise_exceptions: + raise ValueError(err) + return None + return dill.loads(bytes(self.task_signature)) @property diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index 4265a06e..fafec7d1 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -70,7 +70,8 @@ def __init__(self, model, app=None): self.app = app or current_app._get_current_object() self.name = model.name self.task = model.task - self.task_signature = model.get_task_signature() + self.task_signature = model.get_verified_task_signature() + try: self.schedule = model.schedule except model.DoesNotExist: diff --git a/django_celery_beat/utils.py b/django_celery_beat/utils.py index c19f4edb..ca67ac3c 100644 --- a/django_celery_beat/utils.py +++ b/django_celery_beat/utils.py @@ -1,4 +1,7 @@ """Utilities.""" +import os + +import Crypto.PublicKey.RSA as RSA # -- XXX This module must not use translation as that causes # -- a recursive loader import! from django.conf import settings @@ -12,6 +15,28 @@ now_localtime = getattr(timezone, 'template_localtime', timezone.localtime) +def _load_keys(): + private_key_path = os.environ.get('DJANGO_CELERY_BEAT_PRIVATE_KEY_PATH', './id_rsa') + public_key_path = os.environ.get('DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH', './id_rsa.pub') + + if os.path.exists(private_key_path): + with open(private_key_path, 'r') as id_rsa: + private_key = RSA.importKey(id_rsa.read()) + public_key = private_key.publickey() + else: + private_key = RSA.generate(4096, os.urandom) + public_key = private_key.publickey() + + os.chmod(private_key_path, 0o600) + with open(private_key_path, 'wb+') as id_rsa: + id_rsa.write(private_key.exportKey()) + + with open(public_key_path, 'wb') as id_rsa_pub: + id_rsa_pub.write(public_key.exportKey()) + + return private_key, public_key + + def make_aware(value): """Force datatime to have timezone information.""" if getattr(settings, 'USE_TZ', False): @@ -46,3 +71,16 @@ def is_database_scheduler(scheduler): scheduler == 'django' or issubclass(symbol_by_name(scheduler), DatabaseScheduler) ) + + +def sign(data): + """Sign the data to protect against database changes and return signature in hex""" + return hex(_private_key.sign(data, '')[0]) + + +def verify(data, signature): + """Check the signature and return True if it is correct for the specified data""" + return _public_key.verify(data, signature) + + +_private_key, _public_key = _load_keys() diff --git a/requirements/default.txt b/requirements/default.txt index c15e9e13..93acedc6 100644 --- a/requirements/default.txt +++ b/requirements/default.txt @@ -1,3 +1,5 @@ django-timezone-field>=4.0,<5.0 python-crontab>=2.3.4 dill +pycrypto +django-appconf From 967f200a0533ca048a4993fa81c7025937430a74 Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Thu, 3 Sep 2020 14:52:47 +0300 Subject: [PATCH 03/18] Bugfix: id_rsa.pubpermissions & key files creating --- django_celery_beat/models.py | 4 +--- django_celery_beat/utils.py | 18 ++++++++++++++---- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/django_celery_beat/models.py b/django_celery_beat/models.py index c3e2acc3..fd920735 100644 --- a/django_celery_beat/models.py +++ b/django_celery_beat/models.py @@ -605,9 +605,7 @@ def get_verified_task_signature(self, raise_exceptions=True): raise ValueError(err) return None - task_signature_sign = int(self.task_signature_sign, 16) - - if not verify(bytes(self.task_signature), (task_signature_sign,)): + if not verify(bytes(self.task_signature), self.task_signature_sign): err = 'Wrong sign for `{}`. Task disabled.'.format(self) self.enabled = False self.save(update_fields=['enabled']) diff --git a/django_celery_beat/utils.py b/django_celery_beat/utils.py index ca67ac3c..5a2cfcbc 100644 --- a/django_celery_beat/utils.py +++ b/django_celery_beat/utils.py @@ -20,17 +20,26 @@ def _load_keys(): public_key_path = os.environ.get('DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH', './id_rsa.pub') if os.path.exists(private_key_path): - with open(private_key_path, 'r') as id_rsa: + with open(private_key_path, 'rb') as id_rsa: private_key = RSA.importKey(id_rsa.read()) public_key = private_key.publickey() + + if not os.path.exists(public_key_path): + open(private_key_path, 'wb').close() + os.chmod(public_key_path, 0o644) + with open(public_key_path, 'wb') as id_rsa_pub: + id_rsa_pub.write(public_key.exportKey()) else: private_key = RSA.generate(4096, os.urandom) public_key = private_key.publickey() + open(private_key_path, 'wb').close() os.chmod(private_key_path, 0o600) - with open(private_key_path, 'wb+') as id_rsa: + with open(private_key_path, 'wb') as id_rsa: id_rsa.write(private_key.exportKey()) + open(public_key_path, 'wb').close() + os.chmod(public_key_path, 0o644) with open(public_key_path, 'wb') as id_rsa_pub: id_rsa_pub.write(public_key.exportKey()) @@ -74,13 +83,14 @@ def is_database_scheduler(scheduler): def sign(data): - """Sign the data to protect against database changes and return signature in hex""" + """Sign the bytes data to protect against database changes and return signature in hex""" + assert isinstance(data, bytes), ValueError('Data must be bytes') return hex(_private_key.sign(data, '')[0]) def verify(data, signature): """Check the signature and return True if it is correct for the specified data""" - return _public_key.verify(data, signature) + return _public_key.verify(data, (int(signature, 16),)) _private_key, _public_key = _load_keys() From 93431fb9f88aaf354ca685c9c52499b88328b89e Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Thu, 3 Sep 2020 17:21:19 +0300 Subject: [PATCH 04/18] Feature: callback in periodic task (support for option) --- .../migrations/0016_auto_20200903_1356.py | 23 ++++++++++ django_celery_beat/models.py | 43 ++++++++++++++++--- django_celery_beat/schedulers.py | 5 ++- 3 files changed, 64 insertions(+), 7 deletions(-) create mode 100644 django_celery_beat/migrations/0016_auto_20200903_1356.py diff --git a/django_celery_beat/migrations/0016_auto_20200903_1356.py b/django_celery_beat/migrations/0016_auto_20200903_1356.py new file mode 100644 index 00000000..82d49a17 --- /dev/null +++ b/django_celery_beat/migrations/0016_auto_20200903_1356.py @@ -0,0 +1,23 @@ +# Generated by Django 2.2.16 on 2020-09-03 13:56 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_celery_beat', '0015_periodictask_task_signature'), + ] + + operations = [ + migrations.AddField( + model_name='periodictask', + name='callback_signature', + field=models.BinaryField(help_text="Serialized `celery.canvas.Signature` type's callback task got by https://pypi.org/project/dill/ (use as link arg in `.apply_async` method)", null=True), + ), + migrations.AddField( + model_name='periodictask', + name='callback_signature_sign', + field=models.CharField(help_text="Signature (in hex) of serialized `celery.canvas.Signature` type's callback task (see callback_signature field)", max_length=1028, null=True), + ), + ] diff --git a/django_celery_beat/models.py b/django_celery_beat/models.py index fd920735..b0eaf60f 100644 --- a/django_celery_beat/models.py +++ b/django_celery_beat/models.py @@ -396,11 +396,22 @@ class PeriodicTask(models.Model): help_text='Serialized `celery.canvas.Signature` type\'s object of task (or chain, group, ' 'etc.) got by https://pypi.org/project/dill/' ) + callback_signature = models.BinaryField( + null=True, + help_text='Serialized `celery.canvas.Signature` type\'s callback task got ' + 'by https://pypi.org/project/dill/ (use as link arg in `.apply_async` method)' + ) # todo: add support for error_callback (link_error option) task_signature_sign = models.CharField( null=True, max_length=1028, help_text='Signature (in hex) of serialized `celery.canvas.Signature` type\'s object (see task_signature field)' ) + callback_signature_sign = models.CharField( + null=True, + max_length=1028, + help_text='Signature (in hex) of serialized `celery.canvas.Signature` type\'s callback ' + 'task (see callback_signature field)' + ) # You can only set ONE of the following schedule FK's # TODO: Redo this as a GenericForeignKey @@ -592,12 +603,32 @@ def _clean_expires(self): ) def get_verified_task_signature(self, raise_exceptions=True): - if self.task_signature is None: + try: + self.get_verified_callback_signature() + except ValueError as e: + err = 'Wrong callback: {} [{}]'.format(e, self) + logger.error(err) + if raise_exceptions: + raise ValueError(err) + return None + + return self._get_verified_obj_signature('task', raise_exceptions) + + def get_verified_callback_signature(self, raise_exceptions=True): + return self._get_verified_obj_signature('callback', raise_exceptions) + + def _get_verified_obj_signature(self, object_name, raise_exceptions): + assert object_name in ('task', 'callback'), ValueError('Unknown object_name') + + obj_signarute = getattr(self, '{}_signature'.format(object_name), None) + obj_signarute_sign = getattr(self, '{}_signature_sign'.format(object_name), None) + + if obj_signarute is None: return None - if self.task_signature_sign is None: - err = 'Not found `task_signature_sign` for `{}` (use django_celery_be' \ - 'at.utils.sign to sign). Task disabled.'.format(self) + if obj_signarute_sign is None: + err = 'Not found `{}_signature_sign` for `{}` (use django_celery_be' \ + 'at.utils.sign to sign). Task disabled.'.format(object_name, self) self.enabled = False self.save(update_fields=['enabled']) logger.error(err) @@ -605,7 +636,7 @@ def get_verified_task_signature(self, raise_exceptions=True): raise ValueError(err) return None - if not verify(bytes(self.task_signature), self.task_signature_sign): + if not verify(bytes(obj_signarute), obj_signarute_sign): err = 'Wrong sign for `{}`. Task disabled.'.format(self) self.enabled = False self.save(update_fields=['enabled']) @@ -614,7 +645,7 @@ def get_verified_task_signature(self, raise_exceptions=True): raise ValueError(err) return None - return dill.loads(bytes(self.task_signature)) + return dill.loads(bytes(obj_signarute)) @property def expires_(self): diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index fafec7d1..e8753230 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -90,7 +90,10 @@ def __init__(self, model, app=None): ) self._disable(model) - self.options = {} + self.options = { + 'link': model.get_verified_callback_signature() + } + for option in ['queue', 'exchange', 'routing_key', 'priority']: value = getattr(model, option) if value is None: From f8e3f1719f44d5d77ada5d8fba9147d94ab3d545 Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Mon, 7 Sep 2020 19:01:07 +0300 Subject: [PATCH 05/18] Fix: sign hash of serialized task signature --- django_celery_beat/models.py | 16 +++++++++------- django_celery_beat/utils.py | 11 ++++++----- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/django_celery_beat/models.py b/django_celery_beat/models.py index b0eaf60f..809084b5 100644 --- a/django_celery_beat/models.py +++ b/django_celery_beat/models.py @@ -4,6 +4,7 @@ import dill import timezone_field from celery import schedules, current_app +from celery.utils.log import get_logger from django.conf import settings from django.core.exceptions import MultipleObjectsReturned, ValidationError from django.core.validators import MaxValueValidator, MinValueValidator @@ -12,10 +13,9 @@ from django.utils.translation import gettext_lazy as _ from . import managers, validators -from .tzcrontab import TzAwareCrontab -from .utils import make_aware, now, verify from .clockedschedule import clocked -from celery.utils.log import get_logger +from .tzcrontab import TzAwareCrontab +from .utils import make_aware, now, verify_task_signature logger = get_logger(__name__) @@ -572,8 +572,8 @@ def validate_unique(self, *args, **kwargs): 'must be set.' ) - err_msg = 'Only one of clocked, interval, crontab, '\ - 'or solar must be set' + err_msg = 'Only one of clocked, interval, crontab, ' \ + 'or solar must be set' if len(selected_schedule_types) > 1: error_info = {} for selected_schedule_type in selected_schedule_types: @@ -636,7 +636,9 @@ def _get_verified_obj_signature(self, object_name, raise_exceptions): raise ValueError(err) return None - if not verify(bytes(obj_signarute), obj_signarute_sign): + obj_signarute = bytes(obj_signarute) + + if not verify_task_signature(obj_signarute, obj_signarute_sign): err = 'Wrong sign for `{}`. Task disabled.'.format(self) self.enabled = False self.save(update_fields=['enabled']) @@ -645,7 +647,7 @@ def _get_verified_obj_signature(self, object_name, raise_exceptions): raise ValueError(err) return None - return dill.loads(bytes(obj_signarute)) + return dill.loads(obj_signarute) @property def expires_(self): diff --git a/django_celery_beat/utils.py b/django_celery_beat/utils.py index 5a2cfcbc..becd3590 100644 --- a/django_celery_beat/utils.py +++ b/django_celery_beat/utils.py @@ -1,5 +1,6 @@ """Utilities.""" import os +from hashlib import sha256 import Crypto.PublicKey.RSA as RSA # -- XXX This module must not use translation as that causes @@ -82,15 +83,15 @@ def is_database_scheduler(scheduler): ) -def sign(data): +def sign_task_signature(serialized_task_signature): """Sign the bytes data to protect against database changes and return signature in hex""" - assert isinstance(data, bytes), ValueError('Data must be bytes') - return hex(_private_key.sign(data, '')[0]) + assert isinstance(serialized_task_signature, bytes), ValueError('Data must be bytes') + return hex(_private_key.sign(sha256(serialized_task_signature).hexdigest().encode(), '')[0]) -def verify(data, signature): +def verify_task_signature(serialized_task_signature, sign_in_hex): """Check the signature and return True if it is correct for the specified data""" - return _public_key.verify(data, (int(signature, 16),)) + return _public_key.verify(sha256(serialized_task_signature).hexdigest().encode(), (int(sign_in_hex, 16),)) _private_key, _public_key = _load_keys() From 6388db429fe659b272475f614e44b016f4f24482 Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Tue, 8 Sep 2020 21:03:52 +0300 Subject: [PATCH 06/18] added tests of periodic tasks with task signatures --- django_celery_beat/utils.py | 49 ++++++++++++++++------------ t/unit/test_models.py | 65 ++++++++++++++++++++++++++++++++++--- t/unit/test_utils.py | 43 ++++++++++++++++++++++++ 3 files changed, 132 insertions(+), 25 deletions(-) create mode 100644 t/unit/test_utils.py diff --git a/django_celery_beat/utils.py b/django_celery_beat/utils.py index becd3590..0415df43 100644 --- a/django_celery_beat/utils.py +++ b/django_celery_beat/utils.py @@ -5,6 +5,7 @@ import Crypto.PublicKey.RSA as RSA # -- XXX This module must not use translation as that causes # -- a recursive loader import! +from celery.utils.log import get_logger from django.conf import settings from django.utils import timezone @@ -14,37 +15,44 @@ # see Issue #222 now_localtime = getattr(timezone, 'template_localtime', timezone.localtime) +_private_key = _public_key = None + +logger = get_logger(__name__) def _load_keys(): + global _private_key, _public_key + + if _private_key is not None and _public_key is not None: + return + private_key_path = os.environ.get('DJANGO_CELERY_BEAT_PRIVATE_KEY_PATH', './id_rsa') public_key_path = os.environ.get('DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH', './id_rsa.pub') if os.path.exists(private_key_path): with open(private_key_path, 'rb') as id_rsa: - private_key = RSA.importKey(id_rsa.read()) - public_key = private_key.publickey() - - if not os.path.exists(public_key_path): - open(private_key_path, 'wb').close() - os.chmod(public_key_path, 0o644) - with open(public_key_path, 'wb') as id_rsa_pub: - id_rsa_pub.write(public_key.exportKey()) + _private_key = RSA.importKey(id_rsa.read()) + _public_key = _private_key.publickey() else: - private_key = RSA.generate(4096, os.urandom) - public_key = private_key.publickey() + logger.info( + 'Keys not found. Generating new RSA keys... [{},{}]'.format( + public_key_path, + private_key_path + ) + ) + + _private_key = RSA.generate(4096, os.urandom) + _public_key = _private_key.publickey() open(private_key_path, 'wb').close() os.chmod(private_key_path, 0o600) with open(private_key_path, 'wb') as id_rsa: - id_rsa.write(private_key.exportKey()) - - open(public_key_path, 'wb').close() - os.chmod(public_key_path, 0o644) - with open(public_key_path, 'wb') as id_rsa_pub: - id_rsa_pub.write(public_key.exportKey()) + id_rsa.write(_private_key.exportKey()) - return private_key, public_key + open(public_key_path, 'wb').close() + os.chmod(public_key_path, 0o644) + with open(public_key_path, 'wb') as id_rsa_pub: + id_rsa_pub.write(_public_key.exportKey()) def make_aware(value): @@ -85,13 +93,14 @@ def is_database_scheduler(scheduler): def sign_task_signature(serialized_task_signature): """Sign the bytes data to protect against database changes and return signature in hex""" + _load_keys() + assert isinstance(serialized_task_signature, bytes), ValueError('Data must be bytes') return hex(_private_key.sign(sha256(serialized_task_signature).hexdigest().encode(), '')[0]) def verify_task_signature(serialized_task_signature, sign_in_hex): """Check the signature and return True if it is correct for the specified data""" - return _public_key.verify(sha256(serialized_task_signature).hexdigest().encode(), (int(sign_in_hex, 16),)) + _load_keys() - -_private_key, _public_key = _load_keys() + return _public_key.verify(sha256(serialized_task_signature).hexdigest().encode(), (int(sign_in_hex, 16),)) diff --git a/t/unit/test_models.py b/t/unit/test_models.py index 5951a8ad..83c20cde 100644 --- a/t/unit/test_models.py +++ b/t/unit/test_models.py @@ -1,16 +1,20 @@ import os +import random +import string -from django.test import TestCase, override_settings +import dill +import timezone_field +from celery.canvas import Signature from django.apps import apps -from django.db.migrations.state import ProjectState from django.db.migrations.autodetector import MigrationAutodetector from django.db.migrations.loader import MigrationLoader from django.db.migrations.questioner import NonInteractiveMigrationQuestioner - -import timezone_field +from django.db.migrations.state import ProjectState +from django.test import TestCase, override_settings from django_celery_beat import migrations as beat_migrations -from django_celery_beat.models import crontab_schedule_celery_timezone +from django_celery_beat.models import crontab_schedule_celery_timezone, PeriodicTask, IntervalSchedule +from django_celery_beat.utils import _load_keys, sign_task_signature class MigrationTests(TestCase): @@ -68,3 +72,54 @@ def test_default_timezone_without_settings_config(self): @override_settings(CELERY_TIMEZONE=FIRST_VALID_TIMEZONE) def test_default_timezone_with_settings_config(self): assert crontab_schedule_celery_timezone() == self.FIRST_VALID_TIMEZONE + + +class PeriodicTaskSignatureTestCase(TestCase): + test_private_key_path = './test_id_rsa' + test_public_key_path = './test_id_rsa.pub' + + @classmethod + def setUpClass(cls): + super(PeriodicTaskSignatureTestCase, cls).setUpClass() + + os.environ.update({ + 'DJANGO_CELERY_BEAT_PRIVATE_KEY_PATH': cls.test_private_key_path, + 'DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH': cls.test_public_key_path, + }) + + _load_keys() + + def test_periodic_task_with_signatures(self): + empty_task_signature = Signature(task='empty_task') + + serialized_empty_task = dill.dumps(empty_task_signature) + s = sign_task_signature(serialized_empty_task) + + interval, _ = IntervalSchedule.objects.get_or_create( + every=2, + period=IntervalSchedule.MINUTES + ) + periodic_task = PeriodicTask.objects.create( + name='test-' + ''.join(random.choices(string.ascii_letters, k=20)), + task_signature=serialized_empty_task, + task_signature_sign=s, + callback_signature=serialized_empty_task, + callback_signature_sign=s, + interval=interval, + ) + + task_signature = periodic_task.get_verified_callback_signature(raise_exceptions=False) + callback_signature = periodic_task.get_verified_callback_signature(raise_exceptions=False) + + self.assertEqual(empty_task_signature, task_signature) + self.assertEqual(empty_task_signature, callback_signature) + + @classmethod + def tearDownClass(cls) -> None: + super(PeriodicTaskSignatureTestCase, cls).tearDownClass() + + if os.path.exists(cls.test_private_key_path): + os.remove(cls.test_private_key_path) + + if os.path.exists(cls.test_public_key_path): + os.remove(cls.test_public_key_path) diff --git a/t/unit/test_utils.py b/t/unit/test_utils.py new file mode 100644 index 00000000..aad1b6c0 --- /dev/null +++ b/t/unit/test_utils.py @@ -0,0 +1,43 @@ +import os +from unittest import TestCase + +import dill +from celery.canvas import Signature + +from django_celery_beat.utils import sign_task_signature, _load_keys, verify_task_signature + + +class UtilsTests(TestCase): + test_private_key_path = './test_id_rsa' + test_public_key_path = './test_id_rsa.pub' + + @classmethod + def setUpClass(cls) -> None: + super(UtilsTests, cls).setUpClass() + + os.environ.update({ + 'DJANGO_CELERY_BEAT_PRIVATE_KEY_PATH': cls.test_private_key_path, + 'DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH': cls.test_public_key_path, + }) + + _load_keys() + + def test_sign_verify_task_signature(self): + empty_task_signature = Signature() + + serialized_empty_task = dill.dumps(empty_task_signature) + s = sign_task_signature(serialized_empty_task) + + is_valid = verify_task_signature(serialized_empty_task, s) + + self.assertTrue(is_valid) + + @classmethod + def tearDownClass(cls) -> None: + super(UtilsTests, cls).tearDownClass() + + if os.path.exists(cls.test_private_key_path): + os.remove(cls.test_private_key_path) + + if os.path.exists(cls.test_public_key_path): + os.remove(cls.test_public_key_path) From 7b02ec0b7ab8155a9a2a9762a55c5514d8d3abba Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Thu, 10 Sep 2020 12:42:32 +0300 Subject: [PATCH 07/18] refactor of keys loading --- django_celery_beat/utils.py | 72 +++++++++++++++++++++++-------------- 1 file changed, 45 insertions(+), 27 deletions(-) diff --git a/django_celery_beat/utils.py b/django_celery_beat/utils.py index 0415df43..90069b28 100644 --- a/django_celery_beat/utils.py +++ b/django_celery_beat/utils.py @@ -8,6 +8,7 @@ from celery.utils.log import get_logger from django.conf import settings from django.utils import timezone +from functools import lru_cache is_aware = timezone.is_aware # celery schedstate return None will make it not work @@ -15,44 +16,61 @@ # see Issue #222 now_localtime = getattr(timezone, 'template_localtime', timezone.localtime) -_private_key = _public_key = None logger = get_logger(__name__) -def _load_keys(): - global _private_key, _public_key - - if _private_key is not None and _public_key is not None: - return - +@lru_cache(maxsize=None) +def _load_private_key(): private_key_path = os.environ.get('DJANGO_CELERY_BEAT_PRIVATE_KEY_PATH', './id_rsa') - public_key_path = os.environ.get('DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH', './id_rsa.pub') if os.path.exists(private_key_path): with open(private_key_path, 'rb') as id_rsa: - _private_key = RSA.importKey(id_rsa.read()) - _public_key = _private_key.publickey() - else: - logger.info( - 'Keys not found. Generating new RSA keys... [{},{}]'.format( - public_key_path, - private_key_path - ) + private_key = RSA.importKey(id_rsa.read()) + return private_key + + public_key_path = os.environ.get('DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH', './id_rsa.pub') + logger.warning( + 'Keys not found. Generating new RSA keys... [{},{}]'.format( + public_key_path, + private_key_path ) + ) - _private_key = RSA.generate(4096, os.urandom) - _public_key = _private_key.publickey() + private_key = RSA.generate(4096, os.urandom) + public_key = private_key.publickey() - open(private_key_path, 'wb').close() - os.chmod(private_key_path, 0o600) - with open(private_key_path, 'wb') as id_rsa: - id_rsa.write(_private_key.exportKey()) + open(private_key_path, 'wb').close() + os.chmod(private_key_path, 0o600) + with open(private_key_path, 'wb') as id_rsa: + id_rsa.write(private_key.exportKey()) open(public_key_path, 'wb').close() os.chmod(public_key_path, 0o644) with open(public_key_path, 'wb') as id_rsa_pub: - id_rsa_pub.write(_public_key.exportKey()) + id_rsa_pub.write(public_key.exportKey()) + + return private_key + +@lru_cache(maxsize=None) +def _load_public_key(): + public_key_path = os.environ.get('DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH', './id_rsa.pub') + + if os.path.exists(public_key_path): + with open(public_key_path, 'rb') as id_rsa_pub: + _private_key = RSA.importKey(id_rsa_pub.read()) + _public_key = _private_key.publickey() + return _public_key + + raise FileNotFoundError( + 'Failed, public key not found. [{}]'.format( + public_key_path + ) + ) + + +def _load_keys(): + return _load_private_key(), _load_public_key() def make_aware(value): @@ -93,14 +111,14 @@ def is_database_scheduler(scheduler): def sign_task_signature(serialized_task_signature): """Sign the bytes data to protect against database changes and return signature in hex""" - _load_keys() + private_key = _load_private_key() assert isinstance(serialized_task_signature, bytes), ValueError('Data must be bytes') - return hex(_private_key.sign(sha256(serialized_task_signature).hexdigest().encode(), '')[0]) + return hex(private_key.sign(sha256(serialized_task_signature).hexdigest().encode(), '')[0]) def verify_task_signature(serialized_task_signature, sign_in_hex): """Check the signature and return True if it is correct for the specified data""" - _load_keys() + public_key = _load_public_key() - return _public_key.verify(sha256(serialized_task_signature).hexdigest().encode(), (int(sign_in_hex, 16),)) + return public_key.verify(sha256(serialized_task_signature).hexdigest().encode(), (int(sign_in_hex, 16),)) From 1d96917d0977903c0dfeab327e1537bfa741c91a Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Thu, 10 Sep 2020 13:25:42 +0300 Subject: [PATCH 08/18] Fix deprication warnings in tests and refactor key generating & loading --- django_celery_beat/utils.py | 51 +++++++++++++++++++++---------------- t/unit/test_admin.py | 6 ++--- t/unit/test_models.py | 7 +++-- t/unit/test_utils.py | 7 +++-- 4 files changed, 42 insertions(+), 29 deletions(-) diff --git a/django_celery_beat/utils.py b/django_celery_beat/utils.py index 90069b28..a9985e0b 100644 --- a/django_celery_beat/utils.py +++ b/django_celery_beat/utils.py @@ -20,25 +20,20 @@ logger = get_logger(__name__) -@lru_cache(maxsize=None) -def _load_private_key(): - private_key_path = os.environ.get('DJANGO_CELERY_BEAT_PRIVATE_KEY_PATH', './id_rsa') +def generate_keys( + private_key_path=os.environ.get('DJANGO_CELERY_BEAT_PRIVATE_KEY_PATH', './id_rsa'), + public_key_path=os.environ.get('DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH', './id_rsa.pub') +): + private_key = RSA.generate(4096, os.urandom) + public_key = private_key.publickey() if os.path.exists(private_key_path): - with open(private_key_path, 'rb') as id_rsa: - private_key = RSA.importKey(id_rsa.read()) - return private_key - - public_key_path = os.environ.get('DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH', './id_rsa.pub') - logger.warning( - 'Keys not found. Generating new RSA keys... [{},{}]'.format( - public_key_path, - private_key_path - ) - ) + if input('Do you realy want to rewrite `{}` key file? [y/n]: '.format(private_key_path)) != 'y': + return - private_key = RSA.generate(4096, os.urandom) - public_key = private_key.publickey() + if os.path.exists(public_key_path): + if input('Do you realy want to rewrite `{}` key file? [y/n]: '.format(public_key_path)) != 'y': + return open(private_key_path, 'wb').close() os.chmod(private_key_path, 0o600) @@ -50,7 +45,21 @@ def _load_private_key(): with open(public_key_path, 'wb') as id_rsa_pub: id_rsa_pub.write(public_key.exportKey()) - return private_key + +@lru_cache(maxsize=None) +def _load_private_key(): + private_key_path = os.environ.get('DJANGO_CELERY_BEAT_PRIVATE_KEY_PATH', './id_rsa') + + if os.path.exists(private_key_path): + with open(private_key_path, 'rb') as id_rsa: + private_key = RSA.importKey(id_rsa.read()) + return private_key + + raise FileNotFoundError( + 'Private key not found. Use `django_celery_beat.utils.generate_keys` ' + 'to generate new RSA keys... [{}]'.format(private_key_path) + ) + @lru_cache(maxsize=None) def _load_public_key(): @@ -58,14 +67,12 @@ def _load_public_key(): if os.path.exists(public_key_path): with open(public_key_path, 'rb') as id_rsa_pub: - _private_key = RSA.importKey(id_rsa_pub.read()) - _public_key = _private_key.publickey() + _public_key = RSA.importKey(id_rsa_pub.read()) return _public_key raise FileNotFoundError( - 'Failed, public key not found. [{}]'.format( - public_key_path - ) + 'Private key not found. Use `django_celery_beat.utils.generate_keys` ' + 'to generate new RSA keys... [{}]'.format(public_key_path) ) diff --git a/t/unit/test_admin.py b/t/unit/test_admin.py index e3dabc19..d92d18bc 100644 --- a/t/unit/test_admin.py +++ b/t/unit/test_admin.py @@ -81,7 +81,7 @@ class ValidateUniqueTests(TestCase): def test_validate_unique_raises_if_schedule_not_set(self): with self.assertRaises(ValidationError) as cm: PeriodicTask(name='task0').validate_unique() - self.assertEquals( + self.assertEqual( cm.exception.args[0], 'One of clocked, interval, crontab, or solar must be set.', ) @@ -103,9 +103,9 @@ def test_validate_unique_raises_for_multiple_schedules(self): with self.assertRaises(ValidationError) as cm: PeriodicTask(name=name, **options_dict).validate_unique() errors = cm.exception.args[0] - self.assertEquals(errors.keys(), options_dict.keys()) + self.assertEqual(errors.keys(), options_dict.keys()) for error_msg in errors.values(): - self.assertEquals(error_msg, [expected_error_msg]) + self.assertEqual(error_msg, [expected_error_msg]) def test_validate_unique_not_raises(self): PeriodicTask(crontab=CrontabSchedule()).validate_unique() diff --git a/t/unit/test_models.py b/t/unit/test_models.py index 83c20cde..8fd7bbb7 100644 --- a/t/unit/test_models.py +++ b/t/unit/test_models.py @@ -14,7 +14,7 @@ from django_celery_beat import migrations as beat_migrations from django_celery_beat.models import crontab_schedule_celery_timezone, PeriodicTask, IntervalSchedule -from django_celery_beat.utils import _load_keys, sign_task_signature +from django_celery_beat.utils import sign_task_signature, generate_keys class MigrationTests(TestCase): @@ -87,7 +87,10 @@ def setUpClass(cls): 'DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH': cls.test_public_key_path, }) - _load_keys() + generate_keys( + private_key_path=cls.test_private_key_path, + public_key_path=cls.test_public_key_path + ) def test_periodic_task_with_signatures(self): empty_task_signature = Signature(task='empty_task') diff --git a/t/unit/test_utils.py b/t/unit/test_utils.py index aad1b6c0..7b8dcc12 100644 --- a/t/unit/test_utils.py +++ b/t/unit/test_utils.py @@ -4,7 +4,7 @@ import dill from celery.canvas import Signature -from django_celery_beat.utils import sign_task_signature, _load_keys, verify_task_signature +from django_celery_beat.utils import sign_task_signature, verify_task_signature, generate_keys class UtilsTests(TestCase): @@ -20,7 +20,10 @@ def setUpClass(cls) -> None: 'DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH': cls.test_public_key_path, }) - _load_keys() + generate_keys( + private_key_path=cls.test_private_key_path, + public_key_path=cls.test_public_key_path + ) def test_sign_verify_task_signature(self): empty_task_signature = Signature() From 834a4329baaef11539daed84f67b393dd35b6cd7 Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Thu, 10 Sep 2020 14:41:10 +0300 Subject: [PATCH 09/18] Update authors file --- AUTHORS | 1 + 1 file changed, 1 insertion(+) diff --git a/AUTHORS b/AUTHORS index ef975283..a4b7ebbc 100644 --- a/AUTHORS +++ b/AUTHORS @@ -92,3 +92,4 @@ Wes Winham Williams Mendez WoLpH dongweiming +SunnyCapt From 89cb908afc3981f297394dab90e60ade4c9e9787 Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Mon, 26 Oct 2020 17:49:21 +0300 Subject: [PATCH 10/18] fix imports --- django_celery_beat/schedulers.py | 20 +++++++------------- requirements/runtime.txt | 4 ++-- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index 87222d87..27abafda 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -10,31 +10,24 @@ from celery import schedules # noinspection PyProtectedMember from celery.beat import Scheduler, ScheduleEntry, SchedulingError, BeatLazyFunc -# noinspection PyUnresolvedReferences -from celery.five import ( - items, monotonic, python_2_unicode_compatible, - reraise, values -) -from celery.utils.encoding import safe_str, safe_repr +from celery.exceptions import reraise +# from celery.utils.encoding import safe_str, safe_repr from celery.utils.log import get_logger from celery.utils.time import maybe_make_aware -from kombu.utils.encoding import safe_str, safe_repr -from kombu.utils.json import dumps, loads - from django.conf import settings +from django.core.exceptions import ObjectDoesNotExist # noinspection PyProtectedMember from django.db import transaction, close_old_connections from django.db.utils import DatabaseError, InterfaceError -from django.core.exceptions import ObjectDoesNotExist -# noinspection PyUnresolvedReferences -from multiprocessing.util import Finalize +from kombu.utils.encoding import safe_str, safe_repr +from kombu.utils.json import dumps, loads +from .clockedschedule import clocked from .models import ( PeriodicTask, PeriodicTasks, CrontabSchedule, IntervalSchedule, SolarSchedule, ClockedSchedule ) -from .clockedschedule import clocked from .utils import NEVER_CHECK_TIMEOUT # This scheduler must wake up more frequently than the @@ -244,6 +237,7 @@ def __init__(self, *args, **kwargs): """Initialize the database scheduler.""" self._dirty = set() Scheduler.__init__(self, *args, **kwargs) + # noinspection PyUnresolvedReferences self._finalize = Finalize(self, self.sync, exitpriority=5) self.max_interval = ( kwargs.get('max_interval') diff --git a/requirements/runtime.txt b/requirements/runtime.txt index 64a9f498..b595453d 100644 --- a/requirements/runtime.txt +++ b/requirements/runtime.txt @@ -1,2 +1,2 @@ -celery>=4.4.7,<6.0 -Django>=2.2 \ No newline at end of file +celery>=4.4,<6.0 +Django>=2.2 From e7fdac4ed152851759d2a3aad92d8bf9cffa78a9 Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Mon, 26 Oct 2020 19:43:35 +0300 Subject: [PATCH 11/18] useless commit --- django_celery_beat/schedulers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index 27abafda..6c753884 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -8,7 +8,7 @@ from celery import current_app from celery import schedules -# noinspection PyProtectedMember +# noinspection PyProtectedMember from celery.beat import Scheduler, ScheduleEntry, SchedulingError, BeatLazyFunc from celery.exceptions import reraise # from celery.utils.encoding import safe_str, safe_repr From 7c9479c4416b07b30acda4f91be0384bb69652ba Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Tue, 20 Apr 2021 20:43:23 +0300 Subject: [PATCH 12/18] call some functions before calling real apply_async --- django_celery_beat/schedulers.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index 6c753884..93099b3e 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -2,6 +2,7 @@ from __future__ import absolute_import, unicode_literals import datetime +import importlib import logging import math import sys @@ -385,6 +386,14 @@ def apply_async(self, entry, producer=None, advance=True, **kwargs): entry = self.reserve(entry) if advance else entry task = entry.task_signature if entry.task_signature is not None else self.app.tasks.get(entry.task) + if hasattr(self.app.conf, 'call_before_run_periodic_task'): + for func_ref in self.app.conf.call_before_run_periodic_task: + func_ref = func_ref.split('.') + callback = importlib.import_module( + '.'.join(func_ref[:-1]) + ).__getattribute__(func_ref[-1]) + callback(task=task, entry=entry, producer=producer, advance=advance, **kwargs) + try: entry_args = [v() if isinstance(v, BeatLazyFunc) else v for v in (entry.args or [])] entry_kwargs = {k: v() if isinstance(v, BeatLazyFunc) else v for k, v in entry.kwargs.items()} From 28d63911156219d00d061fadefdf4f814b7f2231 Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Wed, 21 Apr 2021 13:00:11 +0300 Subject: [PATCH 13/18] added comments about app.conf.call_before_run_periodic_task --- django_celery_beat/schedulers.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index 93099b3e..2d8e2867 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -9,7 +9,7 @@ from celery import current_app from celery import schedules -# noinspection PyProtectedMember +# noinspection PyProtectedMember from celery.beat import Scheduler, ScheduleEntry, SchedulingError, BeatLazyFunc from celery.exceptions import reraise # from celery.utils.encoding import safe_str, safe_repr @@ -387,6 +387,8 @@ def apply_async(self, entry, producer=None, advance=True, **kwargs): task = entry.task_signature if entry.task_signature is not None else self.app.tasks.get(entry.task) if hasattr(self.app.conf, 'call_before_run_periodic_task'): + # if app.conf has a field call_before_run_periodic_task + # then we try to import and run all the specified functions for func_ref in self.app.conf.call_before_run_periodic_task: func_ref = func_ref.split('.') callback = importlib.import_module( From c85b77de955e9a70c700202939b7c3f57c091484 Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Wed, 21 Apr 2021 14:31:29 +0300 Subject: [PATCH 14/18] django_celery_beat.schedulers.DatabaseScheduler.apply_async refactoring --- django_celery_beat/schedulers.py | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index 2d8e2867..6501ea4f 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -11,7 +11,6 @@ from celery import schedules # noinspection PyProtectedMember from celery.beat import Scheduler, ScheduleEntry, SchedulingError, BeatLazyFunc -from celery.exceptions import reraise # from celery.utils.encoding import safe_str, safe_repr from celery.utils.log import get_logger from celery.utils.time import maybe_make_aware @@ -380,11 +379,11 @@ def schedule(self): return self._schedule def apply_async(self, entry, producer=None, advance=True, **kwargs): - # Update time-stamps and run counts before we actually execute, - # so we have that done if an exception is raised (doesn't schedule - # forever.) + if entry.task_signature is None: + super(DatabaseScheduler, self).apply_async(entry, producer=producer, advance=advance, **kwargs) + entry = self.reserve(entry) if advance else entry - task = entry.task_signature if entry.task_signature is not None else self.app.tasks.get(entry.task) + task = entry.task_signature if hasattr(self.app.conf, 'call_before_run_periodic_task'): # if app.conf has a field call_before_run_periodic_task @@ -397,20 +396,13 @@ def apply_async(self, entry, producer=None, advance=True, **kwargs): callback(task=task, entry=entry, producer=producer, advance=advance, **kwargs) try: - entry_args = [v() if isinstance(v, BeatLazyFunc) else v for v in (entry.args or [])] - entry_kwargs = {k: v() if isinstance(v, BeatLazyFunc) else v for k, v in entry.kwargs.items()} - if task: - return task.apply_async(entry_args, entry_kwargs, - producer=producer, - **entry.options) - else: - return self.send_task(entry.task, entry_args, entry_kwargs, - producer=producer, - **entry.options) + return task.apply_async(producer=producer, **entry.options) except Exception as exc: # pylint: disable=broad-except - reraise(SchedulingError, SchedulingError( - "Couldn't apply scheduled task {0.name}: {exc}".format( - entry, exc=exc)), sys.exc_info()[2]) + e = SchedulingError( + "Couldn't apply scheduled task {0.name}: {exc}".format(entry, exc=exc) + ) + raise e.with_traceback(sys.exc_info()[2]) + finally: self._tasks_since_sync += 1 if self.should_sync(): From 7bfca12160360ad6ad255d135ce7bbaee678afca Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Wed, 21 Apr 2021 14:58:30 +0300 Subject: [PATCH 15/18] fix schedulers.py imports --- django_celery_beat/schedulers.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index 16c8feea..89a7151b 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -10,8 +10,7 @@ from celery import current_app from celery import schedules # noinspection PyProtectedMember -from celery.beat import Scheduler, ScheduleEntry, SchedulingError, BeatLazyFunc -# from celery.utils.encoding import safe_str, safe_repr +from celery.beat import Scheduler, ScheduleEntry, SchedulingError from celery.utils.log import get_logger from celery.utils.time import maybe_make_aware from django.conf import settings @@ -21,6 +20,8 @@ from django.db.utils import DatabaseError, InterfaceError from kombu.utils.encoding import safe_str, safe_repr from kombu.utils.json import dumps, loads +# noinspection PyUnresolvedReferences +from multiprocessing.util import Finalize from .clockedschedule import clocked from .models import ( From 02778de8a9fff45032099b400bdd2f3befce6dee Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Wed, 21 Apr 2021 15:18:36 +0300 Subject: [PATCH 16/18] fix performing an action before starting a periodic task --- django_celery_beat/schedulers.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/django_celery_beat/schedulers.py b/django_celery_beat/schedulers.py index 89a7151b..5fee37fc 100644 --- a/django_celery_beat/schedulers.py +++ b/django_celery_beat/schedulers.py @@ -380,9 +380,6 @@ def schedule(self): return self._schedule def apply_async(self, entry, producer=None, advance=True, **kwargs): - if entry.task_signature is None: - super(DatabaseScheduler, self).apply_async(entry, producer=producer, advance=advance, **kwargs) - entry = self.reserve(entry) if advance else entry task = entry.task_signature @@ -396,6 +393,9 @@ def apply_async(self, entry, producer=None, advance=True, **kwargs): ).__getattribute__(func_ref[-1]) callback(task=task, entry=entry, producer=producer, advance=advance, **kwargs) + if entry.task_signature is None: + return super(DatabaseScheduler, self).apply_async(entry, producer=producer, advance=advance, **kwargs) + try: return task.apply_async(producer=producer, **entry.options) except Exception as exc: # pylint: disable=broad-except From 2604ab97c4fd11f691a1d0a47f8e04d0f61ce3ab Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Wed, 21 Apr 2021 16:49:22 +0300 Subject: [PATCH 17/18] fix tests --- .gitignore | 2 ++ .../migrations/0017_merge_20210421_1344.py | 14 ++++++++++ django_celery_beat/utils.py | 6 ++--- t/proj/settings.py | 18 +++++++++++++ t/unit/test_models.py | 26 +----------------- t/unit/test_utils.py | 27 +------------------ 6 files changed, 38 insertions(+), 55 deletions(-) create mode 100644 django_celery_beat/migrations/0017_merge_20210421_1344.py diff --git a/.gitignore b/.gitignore index d6be2b0a..9de12852 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,5 @@ coverage.xml .eggs/ .python-version venv +test_id_rsa +test_id_rsa.pub diff --git a/django_celery_beat/migrations/0017_merge_20210421_1344.py b/django_celery_beat/migrations/0017_merge_20210421_1344.py new file mode 100644 index 00000000..54b58d2a --- /dev/null +++ b/django_celery_beat/migrations/0017_merge_20210421_1344.py @@ -0,0 +1,14 @@ +# Generated by Django 3.2 on 2021-04-21 13:44 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_celery_beat', '0015_edit_solarschedule_events_choices'), + ('django_celery_beat', '0016_auto_20200903_1356'), + ] + + operations = [ + ] diff --git a/django_celery_beat/utils.py b/django_celery_beat/utils.py index a9985e0b..ae308eb5 100644 --- a/django_celery_beat/utils.py +++ b/django_celery_beat/utils.py @@ -28,12 +28,10 @@ def generate_keys( public_key = private_key.publickey() if os.path.exists(private_key_path): - if input('Do you realy want to rewrite `{}` key file? [y/n]: '.format(private_key_path)) != 'y': - return + raise FileExistsError(private_key_path) if os.path.exists(public_key_path): - if input('Do you realy want to rewrite `{}` key file? [y/n]: '.format(public_key_path)) != 'y': - return + raise FileExistsError(public_key_path) open(private_key_path, 'wb').close() os.chmod(private_key_path, 0o600) diff --git a/t/proj/settings.py b/t/proj/settings.py index a09c4e33..e8d9fa5f 100644 --- a/t/proj/settings.py +++ b/t/proj/settings.py @@ -12,6 +12,8 @@ import os import sys +from django_celery_beat.utils import generate_keys + CELERY_DEFAULT_EXCHANGE = 'testcelery' CELERY_DEFAULT_ROUTING_KEY = 'testcelery' CELERY_DEFAULT_QUEUE = 'testcelery' @@ -122,3 +124,19 @@ STATIC_URL = '/static/' DJANGO_CELERY_BEAT_TZ_AWARE = True + +PRIVATE_KEY_PATH = './test_id_rsa' +PUBLIC_KEY_PATH = './test_id_rsa.pub' + +os.environ.update({ + 'DJANGO_CELERY_BEAT_PRIVATE_KEY_PATH': PRIVATE_KEY_PATH, + 'DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH': PUBLIC_KEY_PATH, +}) + +try: + generate_keys( + private_key_path=PRIVATE_KEY_PATH, + public_key_path=PUBLIC_KEY_PATH + ) +except FileExistsError: + pass diff --git a/t/unit/test_models.py b/t/unit/test_models.py index f9f631d7..bbc1370c 100644 --- a/t/unit/test_models.py +++ b/t/unit/test_models.py @@ -23,7 +23,7 @@ PeriodicTask, IntervalSchedule, ) -from django_celery_beat.utils import sign_task_signature, generate_keys +from django_celery_beat.utils import sign_task_signature class MigrationTests(TestCase): @@ -157,20 +157,6 @@ class PeriodicTaskSignatureTestCase(TestCase): test_private_key_path = './test_id_rsa' test_public_key_path = './test_id_rsa.pub' - @classmethod - def setUpClass(cls): - super(PeriodicTaskSignatureTestCase, cls).setUpClass() - - os.environ.update({ - 'DJANGO_CELERY_BEAT_PRIVATE_KEY_PATH': cls.test_private_key_path, - 'DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH': cls.test_public_key_path, - }) - - generate_keys( - private_key_path=cls.test_private_key_path, - public_key_path=cls.test_public_key_path - ) - def test_periodic_task_with_signatures(self): empty_task_signature = Signature(task='empty_task') @@ -195,13 +181,3 @@ def test_periodic_task_with_signatures(self): self.assertEqual(empty_task_signature, task_signature) self.assertEqual(empty_task_signature, callback_signature) - - @classmethod - def tearDownClass(cls) -> None: - super(PeriodicTaskSignatureTestCase, cls).tearDownClass() - - if os.path.exists(cls.test_private_key_path): - os.remove(cls.test_private_key_path) - - if os.path.exists(cls.test_public_key_path): - os.remove(cls.test_public_key_path) diff --git a/t/unit/test_utils.py b/t/unit/test_utils.py index 7b8dcc12..61f4af16 100644 --- a/t/unit/test_utils.py +++ b/t/unit/test_utils.py @@ -1,30 +1,15 @@ -import os from unittest import TestCase import dill from celery.canvas import Signature -from django_celery_beat.utils import sign_task_signature, verify_task_signature, generate_keys +from django_celery_beat.utils import sign_task_signature, verify_task_signature class UtilsTests(TestCase): test_private_key_path = './test_id_rsa' test_public_key_path = './test_id_rsa.pub' - @classmethod - def setUpClass(cls) -> None: - super(UtilsTests, cls).setUpClass() - - os.environ.update({ - 'DJANGO_CELERY_BEAT_PRIVATE_KEY_PATH': cls.test_private_key_path, - 'DJANGO_CELERY_BEAT_PUBLIC_KEY_PATH': cls.test_public_key_path, - }) - - generate_keys( - private_key_path=cls.test_private_key_path, - public_key_path=cls.test_public_key_path - ) - def test_sign_verify_task_signature(self): empty_task_signature = Signature() @@ -34,13 +19,3 @@ def test_sign_verify_task_signature(self): is_valid = verify_task_signature(serialized_empty_task, s) self.assertTrue(is_valid) - - @classmethod - def tearDownClass(cls) -> None: - super(UtilsTests, cls).tearDownClass() - - if os.path.exists(cls.test_private_key_path): - os.remove(cls.test_private_key_path) - - if os.path.exists(cls.test_public_key_path): - os.remove(cls.test_public_key_path) From f2ec316bbad5318e97aca78407b79c299bc1363e Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Thu, 22 Apr 2021 15:59:21 +0300 Subject: [PATCH 18/18] added readable info about serialized task --- django_celery_beat/admin.py | 3 +-- django_celery_beat/models.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/django_celery_beat/admin.py b/django_celery_beat/admin.py index ac4de14b..d1d2243d 100644 --- a/django_celery_beat/admin.py +++ b/django_celery_beat/admin.py @@ -69,7 +69,6 @@ class PeriodicTaskForm(forms.ModelForm): required=False, max_length=200, ) - # todo: add field for task_signature class Meta: """Form metadata.""" @@ -82,7 +81,7 @@ def clean(self): regtask = data.get('regtask') if regtask: data['task'] = regtask - if not data['task']: + if not data['task'] and not (self.instance and self.instance.task_signature): exc = forms.ValidationError(_('Need name of task')) self._errors['task'] = self.error_class(exc.messages) raise exc diff --git a/django_celery_beat/models.py b/django_celery_beat/models.py index fa97273c..08e81623 100644 --- a/django_celery_beat/models.py +++ b/django_celery_beat/models.py @@ -602,6 +602,17 @@ def save(self, *args, **kwargs): self.last_run_at = None self._clean_expires() self.validate_unique() + + if self.task_signature: + task = self.get_verified_task_signature().__repr__() + pattern = '' + max_length = PeriodicTask.task.field.max_length - len(pattern) + 2 - 3 + + if len(task) > max_length: + task = pattern.format(task[:max_length] + '...') + + self.task = task + super().save(*args, **kwargs) def _clean_expires(self):