Skip to content

[pull] master from celery:master #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
eee9975
Fix typo
sondrelg Nov 8, 2022
6f1691b
fix typos in optional tests (#7876)
hsk17 Nov 9, 2022
eabd701
Enhanced doc for canvas.Signature class (#7891)
Nusnus Nov 9, 2022
dc7cdc2
Fix revoke by headers tests stability (#7892)
Nusnus Nov 10, 2022
41e79a9
feat: add global keyprefix for backend result keys (#7620)
kaustavb12 Nov 10, 2022
1bdd5e4
Canvas.py doc enhancement (#7897)
Nusnus Nov 10, 2022
d96bf9b
update sqlalchemy 1.0.14 to 1.2.18
chncaption Nov 11, 2022
4e2280b
Canvas.py doc enhancement (#7902)
Nusnus Nov 14, 2022
a6b16c5
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] Nov 14, 2022
b5bc40f
Fix test warnings (#7906)
ShaheedHaque Nov 15, 2022
706ebb6
Support for out-of-tree worker pool implementations (#7880)
ShaheedHaque Nov 15, 2022
577eee6
Canvas.py doc enhancement (#7907)
Nusnus Nov 16, 2022
145aae8
Use bound task in base task example. Closes #7909
WilliamDEdwards Nov 16, 2022
1392936
Allow the stamping visitor itself to set the stamp value type instead…
Nusnus Nov 20, 2022
570c4a6
Stamping a task left the task properties dirty (#7916)
Nusnus Nov 22, 2022
bfd8587
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] Nov 21, 2022
87613c7
Fixed bug when chaining a chord with a group (#7919)
Nusnus Nov 27, 2022
c918a6d
Fixed bug in the stamping visitor mechanism where the request was lac…
Nusnus Nov 28, 2022
2960b89
Fixed bug in task_accepted() where the request was not added to the `…
Nusnus Nov 29, 2022
cd3486d
Fix bug in TraceInfo._log_error() where the real exception obj was hi…
Nusnus Nov 29, 2022
788dfe4
Added integration test: test_all_tasks_of_canvas_are_stamped() for va…
Nusnus Nov 29, 2022
5c70357
Added new example for the stamping mechanism: examples/stamping (#7933)
Nusnus Nov 30, 2022
b2f456b
Fixed a bug where replacing a stamped task and stamping it again
Nusnus Dec 1, 2022
5eaa6ac
The bugfix in PR #7934 created a new bug with nested group stamping o…
Nusnus Dec 1, 2022
aad5ff1
Added test_stamping_example_canvas to validate the new stamping examp…
Nusnus Dec 3, 2022
49334bd
Fixed a bug in losing chain links (not error links though) when uncha…
Nusnus Dec 4, 2022
8b7e9f5
Removing as not mandatory
auvipy Nov 8, 2022
3983484
Housekeeping for Canvas.py (#7942)
Nusnus Dec 11, 2022
ae73d5d
[pre-commit.ci] pre-commit autoupdate (#7927)
pre-commit-ci[bot] Dec 13, 2022
736c8a8
Scheduled weekly dependency update for week 50 (#7954)
pyup-bot Dec 13, 2022
8bba7f9
try pypy 3.9 in CI (#7956)
auvipy Dec 14, 2022
c2315e5
sqlalchemy==1.4.45 (#7943)
auvipy Dec 14, 2022
a8c2a1e
billiard>=4.1.0,<5.0 (#7957)
auvipy Dec 14, 2022
dd811b3
feat(typecheck): allow changing type check behavior on the app level;
Dec 10, 2022
0d5abd7
Add broker_channel_error_retry option (#7951)
nkns165 Dec 15, 2022
8a92e0f
Add beat_cron_starting_deadline_seconds to prevent unwanted cron runs…
asnoeyink Dec 15, 2022
795a8e2
[pre-commit.ci] pre-commit autoupdate
pre-commit-ci[bot] Dec 19, 2022
f3a2cf4
Scheduled weekly dependency update for week 51 (#7965)
pyup-bot Dec 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions .github/workflows/post_release_to_hacker_news.yml

This file was deleted.

6 changes: 3 additions & 3 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ['3.7', '3.8', '3.9', '3.10', 'pypy-3.7', 'pypy-3.8']
python-version: ['3.7', '3.8', '3.9', '3.10', 'pypy-3.9', 'pypy-3.8']
os: ["ubuntu-latest", "windows-latest"]
exclude:
- python-version: 'pypy-3.7'
- python-version: 'pypy-3.9'
os: "windows-latest"
- python-version: 'pypy-3.8'
os: "windows-latest"
Expand Down Expand Up @@ -120,7 +120,7 @@ jobs:
run: |
echo "::set-output name=dir::$(pip cache dir)"
- name: Install tox
run: python -m pip install tox
run: python -m pip install --upgrade pip tox tox-gh-actions
- name: >
Run tox for
"${{ matrix.python-version }}-integration-${{ matrix.toxenv }}"
Expand Down
10 changes: 5 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
repos:
- repo: https://github.com/asottile/pyupgrade
rev: v3.2.0
rev: v3.3.1
hooks:
- id: pyupgrade
args: ["--py37-plus"]

- repo: https://github.com/PyCQA/flake8
rev: 5.0.4
rev: 6.0.0
hooks:
- id: flake8

Expand All @@ -16,20 +16,20 @@ repos:
- id: yesqa

- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.3.0
rev: v4.4.0
hooks:
- id: check-merge-conflict
- id: check-toml
- id: check-yaml
- id: mixed-line-ending

- repo: https://github.com/pycqa/isort
rev: 5.10.1
rev: v5.11.3
hooks:
- id: isort

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.982
rev: v0.991
hooks:
- id: mypy
pass_filenames: false
2 changes: 2 additions & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,5 @@ Gabor Boros, 2021/11/09
Tizian Seehaus, 2022/02/09
Oleh Romanovskyi, 2022/06/09
JoonHwan Kim, 2022/08/01
Kaustav Banerjee, 2022/11/10
Austin Snoeyink 2022/12/06
3 changes: 1 addition & 2 deletions celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ def debug_import(name, locals=None, globals=None,
from celery.app.base import Celery
from celery.app.task import Task
from celery.app.utils import bugreport
from celery.canvas import (chain, chord, chunks, group, maybe_signature, signature, subtask, xmap, # noqa
xstarmap)
from celery.canvas import chain, chord, chunks, group, maybe_signature, signature, subtask, xmap, xstarmap
from celery.utils import uuid

# Eventlet/gevent patching must happen before importing
Expand Down
2 changes: 1 addition & 1 deletion celery/app/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ def as_task_v2(self, task_id, name, args=None, kwargs=None,
if not root_id: # empty root_id defaults to task_id
root_id = task_id

stamps = {header: maybe_list(options[header]) for header in stamped_headers or []}
stamps = {header: options[header] for header in stamped_headers or []}
headers = {
'lang': 'py',
'task': name,
Expand Down
12 changes: 9 additions & 3 deletions celery/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
from celery.utils.time import maybe_make_aware, timezone, to_utc

# Load all builtin tasks
from . import builtins # noqa
from . import backends
from . import backends, builtins
from .annotations import prepare as prepare_annotations
from .autoretry import add_autoretry_behaviour
from .defaults import DEFAULT_SECURITY_DIGEST, find_deprecated_settings
Expand Down Expand Up @@ -458,6 +457,9 @@ def cons(app):
sum([len(args), len(opts)])))
return inner_create_task_cls(**opts)

def type_checker(self, fun, bound=False):
return staticmethod(head_from_fun(fun, bound=bound))

def _task_from_fun(self, fun, name=None, base=None, bind=False, **options):
if not self.finalized and not self.autofinalize:
raise RuntimeError('Contract breach: app not finalized')
Expand All @@ -474,7 +476,7 @@ def _task_from_fun(self, fun, name=None, base=None, bind=False, **options):
'__doc__': fun.__doc__,
'__module__': fun.__module__,
'__annotations__': fun.__annotations__,
'__header__': staticmethod(head_from_fun(fun, bound=bind)),
'__header__': self.type_checker(fun, bound=bind),
'__wrapped__': run}, **options))()
# for some reason __qualname__ cannot be set in type()
# so we have to set it here.
Expand Down Expand Up @@ -778,6 +780,10 @@ def send_task(self, name, args=None, kwargs=None, countdown=None,
**options
)

stamped_headers = options.pop('stamped_headers', [])
for stamp in stamped_headers:
options.pop(stamp)

if connection:
producer = amqp.Producer(connection, auto_declare=False)

Expand Down
2 changes: 2 additions & 0 deletions celery/app/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __repr__(self):
scheduler=Option('celery.beat:PersistentScheduler'),
schedule_filename=Option('celerybeat-schedule'),
sync_every=Option(0, type='int'),
cron_starting_deadline=Option(None, type=int)
),
broker=Namespace(
url=Option(None, type='string'),
Expand All @@ -89,6 +90,7 @@ def __repr__(self):
connection_retry=Option(True, type='bool'),
connection_retry_on_startup=Option(None, type='bool'),
connection_max_retries=Option(100, type='int'),
channel_error_retry=Option(False, type='bool'),
failover_strategy=Option(None, type='string'),
heartbeat=Option(120, type='int'),
heartbeat_checkrate=Option(3.0, type='int'),
Expand Down
12 changes: 10 additions & 2 deletions celery/app/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -953,10 +953,18 @@ def replace(self, sig):
for t in reversed(self.request.chain or []):
sig |= signature(t, app=self.app)
# Stamping sig with parents groups
stamped_headers = self.request.stamped_headers
if self.request.stamps:
groups = self.request.stamps.get("groups")
sig.stamp(visitor=GroupStampingVisitor(groups=groups, stamped_headers=stamped_headers))
sig.stamp(visitor=GroupStampingVisitor(groups=groups, stamped_headers=self.request.stamped_headers))
stamped_headers = self.request.stamped_headers.copy()
stamps = self.request.stamps.copy()
stamped_headers.extend(sig.options.get('stamped_headers', []))
stamps.update({
stamp: value
for stamp, value in sig.options.items() if stamp in sig.options.get('stamped_headers', [])
})
sig.options['stamped_headers'] = stamped_headers
sig.options.update(stamps)

return self.on_replace(sig)

Expand Down
4 changes: 3 additions & 1 deletion celery/app/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from collections import namedtuple
from warnings import warn

from billiard.einfo import ExceptionInfo
from billiard.einfo import ExceptionInfo, ExceptionWithTraceback
from kombu.exceptions import EncodeError
from kombu.serialization import loads as loads_message
from kombu.serialization import prepare_accept_content
Expand Down Expand Up @@ -238,6 +238,8 @@ def handle_failure(self, task, req, store_errors=True, call_errbacks=True):

def _log_error(self, task, req, einfo):
eobj = einfo.exception = get_pickled_exception(einfo.exception)
if isinstance(eobj, ExceptionWithTraceback):
eobj = einfo.exception = eobj.exc
exception, traceback, exc_info, sargs, skwargs = (
safe_repr(eobj),
safe_str(einfo.traceback),
Expand Down
18 changes: 16 additions & 2 deletions celery/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def exception_to_python(self, exc):
exc = cls(*exc_msg)
else:
exc = cls(exc_msg)
except Exception as err: # noqa
except Exception as err:
exc = Exception(f'{cls}({exc_msg})')

return exc
Expand Down Expand Up @@ -817,11 +817,25 @@ class BaseKeyValueStoreBackend(Backend):
def __init__(self, *args, **kwargs):
if hasattr(self.key_t, '__func__'): # pragma: no cover
self.key_t = self.key_t.__func__ # remove binding
self._encode_prefixes()
super().__init__(*args, **kwargs)
self._add_global_keyprefix()
self._encode_prefixes()
if self.implements_incr:
self.apply_chord = self._apply_chord_incr

def _add_global_keyprefix(self):
"""
This method prepends the global keyprefix to the existing keyprefixes.

This method checks if a global keyprefix is configured in `result_backend_transport_options` using the
`global_keyprefix` key. If so, then it is prepended to the task, group and chord key prefixes.
"""
global_keyprefix = self.app.conf.get('result_backend_transport_options', {}).get("global_keyprefix", None)
if global_keyprefix:
self.task_keyprefix = f"{global_keyprefix}_{self.task_keyprefix}"
self.group_keyprefix = f"{global_keyprefix}_{self.group_keyprefix}"
self.chord_keyprefix = f"{global_keyprefix}_{self.chord_keyprefix}"

def _encode_prefixes(self):
self.task_keyprefix = self.key_t(self.task_keyprefix)
self.group_keyprefix = self.key_t(self.group_keyprefix)
Expand Down
2 changes: 1 addition & 1 deletion celery/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SchedulingError(Exception):


class BeatLazyFunc:
"""An lazy function declared in 'beat_schedule' and called before sending to worker.
"""A lazy function declared in 'beat_schedule' and called before sending to worker.

Example:

Expand Down
4 changes: 2 additions & 2 deletions celery/bin/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ def _no_ipython(self): # pragma: no cover

def _invoke_default_shell(locals):
try:
import IPython # noqa
import IPython
except ImportError:
try:
import bpython # noqa
import bpython
except ImportError:
_invoke_fallback_shell(locals)
else:
Expand Down
Loading