diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9f9a9958..27c04c8c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: - name: set up python uses: actions/setup-python@v5 with: - python-version: '3.11' + python-version: '3.12' - run: pip install -r requirements/linting.txt -r requirements/pyproject.txt pre-commit @@ -35,7 +35,7 @@ jobs: - name: set up python uses: actions/setup-python@v5 with: - python-version: '3.11' + python-version: '3.12' - run: pip install -r requirements/docs.txt -r requirements/pyproject.txt - run: pip install . @@ -54,16 +54,15 @@ jobs: fail-fast: false matrix: os: [ubuntu] - python: ['3.8', '3.9', '3.10', '3.11', '3.12'] - redis: ['5'] + python: ['3.9', '3.10', '3.11', '3.12', '3.13'] + redis: ['8.0'] include: - - python: '3.11' - redis: '6' + - python: '3.12' + redis: '7.2' os: 'ubuntu' - - python: '3.11' - redis: '7' + - python: '3.12' + redis: '7.4' os: 'ubuntu' - env: PYTHON: ${{ matrix.python }} OS: ${{ matrix.os }} @@ -124,7 +123,7 @@ jobs: - name: set up python uses: actions/setup-python@v5 with: - python-version: '3.11' + python-version: '3.12' - name: install run: pip install -U build diff --git a/HISTORY.rst b/HISTORY.rst index 0e2f945f..f8d03465 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,12 @@ History ------- +v0.26.4 (2025-XX-XX) +.................... + +* Extend support for redis-py 6.x - Add python 3.13 support - Remove python 3.8 by @nsteinmetz in #500 + + v0.26.3 (2025-01-06) .................... diff --git a/arq/connections.py b/arq/connections.py index c1058890..2fdf83ab 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from datetime import datetime, timedelta from operator import attrgetter -from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Callable, Optional, Union from urllib.parse import parse_qs, urlparse from uuid import uuid4 @@ -28,7 +28,7 @@ class RedisSettings: Used by :func:`arq.connections.create_pool` and :class:`arq.worker.Worker`. """ - host: Union[str, List[Tuple[str, int]]] = 'localhost' + host: Union[str, list[tuple[str, int]]] = 'localhost' port: int = 6379 unix_socket_path: Optional[str] = None database: int = 0 @@ -49,8 +49,7 @@ class RedisSettings: sentinel: bool = False sentinel_master: str = 'mymaster' - retry_on_timeout: bool = False - retry_on_error: Optional[List[Exception]] = None + retry_on_error: Optional[list[Exception]] = None retry: Optional[Retry] = None @classmethod @@ -101,7 +100,7 @@ class ArqRedis(BaseRedis): def __init__( self, - pool_or_conn: Optional[ConnectionPool] = None, + pool_or_conn: Optional[ConnectionPool] = None, # type: ignore[type-arg] job_serializer: Optional[Serializer] = None, job_deserializer: Optional[Deserializer] = None, default_queue_name: str = default_queue_name, @@ -189,7 +188,7 @@ async def _get_job_result(self, key: bytes) -> JobResult: r.job_id = job_id return r - async def all_job_results(self) -> List[JobResult]: + async def all_job_results(self) -> list[JobResult]: """ Get results for all jobs in redis. """ @@ -207,7 +206,7 @@ async def _get_job_def(self, job_id: bytes, score: int) -> JobDef: jd.job_id = job_id.decode() return jd - async def queued_jobs(self, *, queue_name: Optional[str] = None) -> List[JobDef]: + async def queued_jobs(self, *, queue_name: Optional[str] = None) -> list[JobDef]: """ Get information about queued, mostly useful when testing. """ @@ -216,6 +215,9 @@ async def queued_jobs(self, *, queue_name: Optional[str] = None) -> List[JobDef] jobs = await self.zrange(queue_name, withscores=True, start=0, end=-1) return await asyncio.gather(*[self._get_job_def(job_id, int(score)) for job_id, score in jobs]) + async def aclose(self) -> None: + await super().aclose() # type: ignore[misc] + async def create_pool( settings_: Optional[RedisSettings] = None, @@ -241,12 +243,12 @@ async def create_pool( def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis: client = Sentinel( # type: ignore[misc] *args, - sentinels=settings.host, + sentinels=settings.host, # type: ignore[arg-type] ssl=settings.ssl, **kwargs, ) redis = client.master_for(settings.sentinel_master, redis_class=ArqRedis) - return cast(ArqRedis, redis) + return redis else: pool_factory = functools.partial( @@ -263,7 +265,6 @@ def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis: ssl_ca_data=settings.ssl_ca_data, ssl_check_hostname=settings.ssl_check_hostname, retry=settings.retry, - retry_on_timeout=settings.retry_on_timeout, retry_on_error=settings.retry_on_error, max_connections=settings.max_connections, ) @@ -312,8 +313,5 @@ async def log_redis_info(redis: 'Redis[bytes]', log_func: Callable[[str], Any]) clients_connected = info_clients.get('connected_clients', '?') log_func( - f'redis_version={redis_version} ' - f'mem_usage={mem_usage} ' - f'clients_connected={clients_connected} ' - f'db_keys={key_count}' + f'redis_version={redis_version} mem_usage={mem_usage} clients_connected={clients_connected} db_keys={key_count}' ) diff --git a/arq/jobs.py b/arq/jobs.py index 15b7231e..3d3c639e 100644 --- a/arq/jobs.py +++ b/arq/jobs.py @@ -5,7 +5,7 @@ from dataclasses import dataclass from datetime import datetime from enum import Enum -from typing import Any, Callable, Dict, Optional, Tuple +from typing import Any, Callable, Optional from redis.asyncio import Redis @@ -14,8 +14,8 @@ logger = logging.getLogger('arq.jobs') -Serializer = Callable[[Dict[str, Any]], bytes] -Deserializer = Callable[[bytes], Dict[str, Any]] +Serializer = Callable[[dict[str, Any]], bytes] +Deserializer = Callable[[bytes], dict[str, Any]] class ResultNotFound(RuntimeError): @@ -42,8 +42,8 @@ class JobStatus(str, Enum): @dataclass class JobDef: function: str - args: Tuple[Any, ...] - kwargs: Dict[str, Any] + args: tuple[Any, ...] + kwargs: dict[str, Any] job_try: int enqueue_time: datetime score: Optional[int] @@ -210,8 +210,8 @@ class DeserializationError(SerializationError): def serialize_job( function_name: str, - args: Tuple[Any, ...], - kwargs: Dict[str, Any], + args: tuple[Any, ...], + kwargs: dict[str, Any], job_try: Optional[int], enqueue_time_ms: int, *, @@ -228,8 +228,8 @@ def serialize_job( def serialize_result( function: str, - args: Tuple[Any, ...], - kwargs: Dict[str, Any], + args: tuple[Any, ...], + kwargs: dict[str, Any], job_try: int, enqueue_time_ms: int, success: bool, @@ -291,7 +291,7 @@ def deserialize_job(r: bytes, *, deserializer: Optional[Deserializer] = None) -> def deserialize_job_raw( r: bytes, *, deserializer: Optional[Deserializer] = None -) -> Tuple[str, Tuple[Any, ...], Dict[str, Any], int, int]: +) -> tuple[str, tuple[Any, ...], dict[str, Any], int, int]: if deserializer is None: deserializer = pickle.loads try: diff --git a/arq/logs.py b/arq/logs.py index 2231cbed..c57f627e 100644 --- a/arq/logs.py +++ b/arq/logs.py @@ -1,7 +1,7 @@ -from typing import Any, Dict +from typing import Any -def default_log_config(verbose: bool) -> Dict[str, Any]: +def default_log_config(verbose: bool) -> dict[str, Any]: """ Setup default config. for dictConfig. diff --git a/arq/typing.py b/arq/typing.py index 454cc5b6..e6ca76e4 100644 --- a/arq/typing.py +++ b/arq/typing.py @@ -1,5 +1,6 @@ +from collections.abc import Sequence from datetime import timedelta -from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Protocol, Sequence, Set, Type, Union +from typing import TYPE_CHECKING, Any, Literal, Optional, Protocol, Union __all__ = ( 'OptionType', @@ -16,7 +17,7 @@ from .cron import CronJob from .worker import Function -OptionType = Union[None, Set[int], int] +OptionType = Union[None, set[int], int] WEEKDAYS = 'mon', 'tues', 'wed', 'thurs', 'fri', 'sat', 'sun' WeekdayOptionType = Union[OptionType, Literal['mon', 'tues', 'wed', 'thurs', 'fri', 'sat', 'sun']] SecondsTimedelta = Union[int, float, timedelta] @@ -25,14 +26,14 @@ class WorkerCoroutine(Protocol): __qualname__: str - async def __call__(self, ctx: Dict[Any, Any], *args: Any, **kwargs: Any) -> Any: # pragma: no cover + async def __call__(self, ctx: dict[Any, Any], *args: Any, **kwargs: Any) -> Any: # pragma: no cover pass class StartupShutdown(Protocol): __qualname__: str - async def __call__(self, ctx: Dict[Any, Any]) -> Any: # pragma: no cover + async def __call__(self, ctx: dict[Any, Any]) -> Any: # pragma: no cover pass @@ -44,4 +45,4 @@ class WorkerSettingsBase(Protocol): # and many more... -WorkerSettingsType = Union[Dict[str, Any], Type[WorkerSettingsBase]] +WorkerSettingsType = Union[dict[str, Any], type[WorkerSettingsBase]] diff --git a/arq/utils.py b/arq/utils.py index 2cbde056..f64df89f 100644 --- a/arq/utils.py +++ b/arq/utils.py @@ -1,10 +1,11 @@ import asyncio import logging import os +from collections.abc import AsyncGenerator, Sequence from datetime import datetime, timedelta, timezone from functools import lru_cache from time import time -from typing import TYPE_CHECKING, Any, AsyncGenerator, Dict, Optional, Sequence, overload +from typing import TYPE_CHECKING, Any, Optional, overload from .constants import timezone_env_vars @@ -121,7 +122,7 @@ def truncate(s: str, length: int = DEFAULT_CURTAIL) -> str: return s -def args_to_string(args: Sequence[Any], kwargs: Dict[str, Any]) -> str: +def args_to_string(args: Sequence[Any], kwargs: dict[str, Any]) -> str: arguments = '' if args: arguments = ', '.join(map(repr, args)) diff --git a/arq/version.py b/arq/version.py index 31ed92e3..e89b6bca 100644 --- a/arq/version.py +++ b/arq/version.py @@ -1,2 +1,2 @@ # Version here is used for the package version via the `[tool.hatch.version]` section of `pyproject.toml`. -VERSION = '0.26.3' +VERSION = '0.26.4' diff --git a/arq/worker.py b/arq/worker.py index f1e613c9..c5585f81 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -3,12 +3,13 @@ import inspect import logging import signal +from collections.abc import Sequence from dataclasses import dataclass from datetime import datetime, timedelta, timezone from functools import partial from signal import Signals from time import time -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast from redis.exceptions import ResponseError, WatchError @@ -81,7 +82,7 @@ def func( if isinstance(coroutine, str): name = name or coroutine - coroutine_: 'WorkerCoroutine' = import_string(coroutine) + coroutine_: WorkerCoroutine = import_string(coroutine) else: coroutine_ = coroutine @@ -118,7 +119,7 @@ def __eq__(self, other: Any) -> bool: class FailedJobs(RuntimeError): - def __init__(self, count: int, job_results: List[JobResult]): + def __init__(self, count: int, job_results: list[JobResult]): self.count = count self.job_results = job_results @@ -208,7 +209,7 @@ def __init__( max_tries: int = 5, health_check_interval: 'SecondsTimedelta' = 3600, health_check_key: Optional[str] = None, - ctx: Optional[Dict[Any, Any]] = None, + ctx: Optional[dict[Any, Any]] = None, retry_jobs: bool = True, allow_abort_jobs: bool = False, max_burst_jobs: int = -1, @@ -218,14 +219,14 @@ def __init__( timezone: Optional[timezone] = None, log_results: bool = True, ): - self.functions: Dict[str, Union[Function, CronJob]] = {f.name: f for f in map(func, functions)} + self.functions: dict[str, Union[Function, CronJob]] = {f.name: f for f in map(func, functions)} if queue_name is None: if redis_pool is not None: queue_name = redis_pool.default_queue_name else: raise ValueError('If queue_name is absent, redis_pool must be present.') self.queue_name = queue_name - self.cron_jobs: List[CronJob] = [] + self.cron_jobs: list[CronJob] = [] if cron_jobs is not None: if not all(isinstance(cj, CronJob) for cj in cron_jobs): raise RuntimeError('cron_jobs, must be instances of CronJob') @@ -262,9 +263,9 @@ def __init__( else: self.redis_settings = None # self.tasks holds references to run_job coroutines currently running - self.tasks: Dict[str, asyncio.Task[Any]] = {} + self.tasks: dict[str, asyncio.Task[Any]] = {} # self.job_tasks holds references the actual jobs running - self.job_tasks: Dict[str, asyncio.Task[Any]] = {} + self.job_tasks: dict[str, asyncio.Task[Any]] = {} self.main_task: Optional[asyncio.Task[None]] = None self.loop = asyncio.get_event_loop() self.ctx = ctx or {} @@ -289,7 +290,7 @@ def __init__( self.retry_jobs = retry_jobs self.allow_abort_jobs = allow_abort_jobs self.allow_pick_jobs: bool = True - self.aborting_tasks: Set[str] = set() + self.aborting_tasks: set[str] = set() self.max_burst_jobs = max_burst_jobs self.job_serializer = job_serializer self.job_deserializer = job_deserializer @@ -409,7 +410,7 @@ async def _cancel_aborted_jobs(self) -> None: pipe.zremrangebyscore(abort_jobs_ss, min=timestamp_ms() + abort_job_max_age, max=float('inf')) abort_job_ids, _ = await pipe.execute() - aborted: Set[str] = set() + aborted: set[str] = set() for job_id_bytes in abort_job_ids: job_id = job_id_bytes.decode() try: @@ -428,7 +429,7 @@ def _release_sem_dec_counter_on_complete(self) -> None: self.job_counter = self.job_counter - 1 self.sem.release() - async def start_jobs(self, job_ids: List[bytes]) -> None: + async def start_jobs(self, job_ids: list[bytes]) -> None: """ For each job id, get the job definition, check it's not running and start it in a task """ @@ -484,8 +485,8 @@ async def run_job(self, job_id: str, score: int) -> None: # noqa: C901 abort_job = False function_name, enqueue_time_ms = '', 0 - args: Tuple[Any, ...] = () - kwargs: Dict[Any, Any] = {} + args: tuple[Any, ...] = () + kwargs: dict[Any, Any] = {} async def job_failed(exc: BaseException) -> None: self.jobs_failed += 1 @@ -869,7 +870,7 @@ async def close(self) -> None: await self.pool.delete(self.health_check_key) if self.on_shutdown: await self.on_shutdown(self.ctx) - await self.pool.close(close_connection_pool=True) + await self.pool.aclose() self._pool = None def __repr__(self) -> str: @@ -879,7 +880,7 @@ def __repr__(self) -> str: ) -def get_kwargs(settings_cls: 'WorkerSettingsType') -> Dict[str, NameError]: +def get_kwargs(settings_cls: 'WorkerSettingsType') -> dict[str, NameError]: worker_args = set(inspect.signature(Worker).parameters.keys()) d = settings_cls if isinstance(settings_cls, dict) else settings_cls.__dict__ return {k: v for k, v in d.items() if k in worker_args} @@ -910,7 +911,7 @@ async def async_check_health( else: logger.info('Health check successful: %s', data) r = 0 - await redis.close(close_connection_pool=True) + await redis.aclose() return r diff --git a/pyproject.toml b/pyproject.toml index cc7b10fc..0e87ef81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,11 +24,11 @@ classifiers = [ 'Programming Language :: Python', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3 :: Only', - 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', 'Programming Language :: Python :: 3.12', + 'Programming Language :: Python :: 3.13', 'Topic :: Internet', 'Topic :: Software Development :: Libraries :: Python Modules', 'Topic :: System :: Clustering', @@ -36,12 +36,12 @@ classifiers = [ 'Topic :: System :: Monitoring', 'Topic :: System :: Systems Administration', ] -requires-python = '>=3.8' +requires-python = '>=3.9' dependencies = [ - 'redis[hiredis]>=4.2.0,<6', - 'click>=8.0', + 'redis[hiredis]>=6.0,<7', + 'click>=8.0,<8.2', ] -optional-dependencies = {watch = ['watchfiles>=0.16'] } +optional-dependencies = {watch = ['watchfiles>=1.0'] } dynamic = ['version'] [project.scripts] diff --git a/requirements/docs.in b/requirements/docs.in index 6dc7d3fa..2806c164 100644 --- a/requirements/docs.in +++ b/requirements/docs.in @@ -1 +1 @@ -Sphinx>=5,<6 +Sphinx diff --git a/requirements/docs.txt b/requirements/docs.txt index f8b0de2b..dbb5656b 100644 --- a/requirements/docs.txt +++ b/requirements/docs.txt @@ -4,45 +4,47 @@ # # pip-compile --output-file=requirements/docs.txt --strip-extras requirements/docs.in # -alabaster==0.7.16 +alabaster==1.0.0 # via sphinx -babel==2.14.0 +babel==2.17.0 # via sphinx -certifi==2024.7.4 +certifi==2025.4.26 # via requests -charset-normalizer==3.3.2 +charset-normalizer==3.4.2 # via requests -docutils==0.19 +docutils==0.21.2 # via sphinx -idna==3.7 +idna==3.10 # via requests imagesize==1.4.1 # via sphinx -jinja2==3.1.4 +jinja2==3.1.6 # via sphinx -markupsafe==2.1.5 +markupsafe==3.0.2 # via jinja2 -packaging==24.0 +packaging==25.0 # via sphinx -pygments==2.17.2 +pygments==2.19.1 # via sphinx requests==2.32.3 # via sphinx -snowballstemmer==2.2.0 +roman-numerals-py==3.1.0 # via sphinx -sphinx==5.3.0 - # via -r docs.in -sphinxcontrib-applehelp==1.0.8 +snowballstemmer==3.0.1 # via sphinx -sphinxcontrib-devhelp==1.0.6 +sphinx==8.2.3 + # via -r requirements/docs.in +sphinxcontrib-applehelp==2.0.0 # via sphinx -sphinxcontrib-htmlhelp==2.0.5 +sphinxcontrib-devhelp==2.0.0 + # via sphinx +sphinxcontrib-htmlhelp==2.1.0 # via sphinx sphinxcontrib-jsmath==1.0.1 # via sphinx -sphinxcontrib-qthelp==1.0.7 +sphinxcontrib-qthelp==2.0.0 # via sphinx -sphinxcontrib-serializinghtml==1.1.10 +sphinxcontrib-serializinghtml==2.0.0 # via sphinx -urllib3==2.2.2 +urllib3==2.4.0 # via requests diff --git a/requirements/linting.txt b/requirements/linting.txt index 873ee0c1..4aeaf8a4 100644 --- a/requirements/linting.txt +++ b/requirements/linting.txt @@ -4,25 +4,32 @@ # # pip-compile --output-file=requirements/linting.txt --strip-extras requirements/linting.in # -cffi==1.16.0 +cffi==1.17.1 # via cryptography -cryptography==42.0.5 +cryptography==44.0.3 # via # types-pyopenssl # types-redis -mypy==1.9.0 +mypy==1.15.0 # via -r requirements/linting.in -mypy-extensions==1.0.0 +mypy-extensions==1.1.0 # via mypy pycparser==2.22 # via cffi -ruff==0.3.4 +ruff==0.11.9 # via -r requirements/linting.in -types-pyopenssl==24.0.0.20240311 +types-cffi==1.17.0.20250326 + # via types-pyopenssl +types-pyopenssl==24.1.0.20240722 # via types-redis -types-pytz==2024.1.0.20240203 +types-pytz==2025.2.0.20250326 # via -r requirements/linting.in -types-redis==4.6.0.20240311 +types-redis==4.6.0.20241004 # via -r requirements/linting.in -typing-extensions==4.10.0 +types-setuptools==80.4.0.20250511 + # via types-cffi +typing-extensions==4.13.2 # via mypy + +# The following packages are considered to be unsafe in a requirements file: +# setuptools diff --git a/requirements/pyproject.txt b/requirements/pyproject.txt index 041adfac..be14ab91 100644 --- a/requirements/pyproject.txt +++ b/requirements/pyproject.txt @@ -4,17 +4,19 @@ # # pip-compile --all-extras --output-file=requirements/pyproject.txt --strip-extras pyproject.toml # -anyio==4.3.0 +anyio==4.9.0 # via watchfiles -click==8.1.7 +click==8.1.8 # via arq (pyproject.toml) -hiredis==2.3.2 +hiredis==3.1.1 # via redis -idna==3.7 +idna==3.10 # via anyio -redis==4.6.0 +redis==6.1.0 # via arq (pyproject.toml) sniffio==1.3.1 # via anyio -watchfiles==0.21.0 +typing-extensions==4.13.2 + # via anyio +watchfiles==1.0.5 # via arq (pyproject.toml) diff --git a/requirements/testing.in b/requirements/testing.in index 7f12cc70..bc9f2a2a 100644 --- a/requirements/testing.in +++ b/requirements/testing.in @@ -8,4 +8,4 @@ pytest-mock pytest-pretty pytest-timeout pytz -testcontainers<4 # until we remove 3.8 support +testcontainers diff --git a/requirements/testing.txt b/requirements/testing.txt index 87461fd6..ecdcff3e 100644 --- a/requirements/testing.txt +++ b/requirements/testing.txt @@ -1,83 +1,78 @@ # -# This file is autogenerated by pip-compile with Python 3.9 +# This file is autogenerated by pip-compile with Python 3.12 # by the following command: # # pip-compile --output-file=requirements/testing.txt --strip-extras requirements/testing.in # -annotated-types==0.6.0 +annotated-types==0.7.0 # via pydantic -certifi==2024.7.4 +certifi==2025.4.26 # via requests -charset-normalizer==3.3.2 +charset-normalizer==3.4.2 # via requests -coverage==7.4.4 +coverage==7.8.0 # via -r requirements/testing.in -deprecation==2.1.0 - # via testcontainers -dirty-equals==0.7.1.post0 +dirty-equals==0.9.0 # via -r requirements/testing.in docker==7.1.0 # via testcontainers -exceptiongroup==1.2.2 - # via pytest -idna==3.7 +idna==3.10 # via requests -iniconfig==2.0.0 +iniconfig==2.1.0 # via pytest markdown-it-py==3.0.0 # via rich mdurl==0.1.2 # via markdown-it-py -msgpack==1.0.8 +msgpack==1.1.0 # via -r requirements/testing.in -packaging==24.0 - # via - # deprecation - # pytest -pluggy==1.4.0 +packaging==25.0 # via pytest -pydantic==2.6.4 +pluggy==1.6.0 + # via pytest +pydantic==2.11.4 # via -r requirements/testing.in -pydantic-core==2.16.3 +pydantic-core==2.33.2 # via pydantic -pygments==2.17.2 +pygments==2.19.1 # via rich -pytest==8.1.1 +pytest==8.3.5 # via # -r requirements/testing.in # pytest-asyncio # pytest-mock # pytest-pretty # pytest-timeout -pytest-asyncio==0.23.6 +pytest-asyncio==0.26.0 # via -r requirements/testing.in pytest-mock==3.14.0 # via -r requirements/testing.in pytest-pretty==1.2.0 # via -r requirements/testing.in -pytest-timeout==2.3.1 +pytest-timeout==2.4.0 + # via -r requirements/testing.in +python-dotenv==1.1.0 + # via testcontainers +pytz==2025.2 # via -r requirements/testing.in -pytz==2024.1 - # via - # -r requirements/testing.in - # dirty-equals requests==2.32.3 # via docker -rich==13.7.1 +rich==14.0.0 # via pytest-pretty -testcontainers==3.7.1 +testcontainers==4.10.0 # via -r requirements/testing.in -tomli==2.0.1 - # via - # coverage - # pytest -typing-extensions==4.10.0 +typing-extensions==4.13.2 # via # pydantic # pydantic-core -urllib3==2.2.2 + # testcontainers + # typing-inspection +typing-inspection==0.4.0 + # via pydantic +urllib3==2.4.0 # via # docker # requests -wrapt==1.16.0 + # testcontainers +wrapt==1.17.2 # via testcontainers diff --git a/tests/conftest.py b/tests/conftest.py index 9b6b7f5b..c3f66759 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,7 +2,7 @@ import functools import os import sys -from typing import Generator +from collections.abc import Generator import msgpack import pytest @@ -38,7 +38,7 @@ def test_redis_host(redis_container: RedisContainer) -> str: @pytest.fixture(scope='session') def test_redis_port(redis_container: RedisContainer) -> int: - return redis_container.get_exposed_port(redis_container.port_to_expose) + return redis_container.get_exposed_port(6379) @pytest.fixture(scope='session') @@ -58,7 +58,7 @@ async def arq_redis(test_redis_host: str, test_redis_port: int): yield redis_ - await redis_.close(close_connection_pool=True) + await redis_.aclose() @pytest.fixture @@ -72,7 +72,7 @@ async def arq_redis_msgpack(test_redis_host: str, test_redis_port: int): ) await redis_.flushall() yield redis_ - await redis_.close(close_connection_pool=True) + await redis_.aclose() @pytest.fixture @@ -82,12 +82,11 @@ async def arq_redis_retry(test_redis_host: str, test_redis_port: int): port=test_redis_port, encoding='utf-8', retry=Retry(backoff=NoBackoff(), retries=3), - retry_on_timeout=True, retry_on_error=[redis.exceptions.ConnectionError], ) await redis_.flushall() yield redis_ - await redis_.close(close_connection_pool=True) + await redis_.aclose() @pytest.fixture @@ -140,7 +139,7 @@ async def create_pool_(settings, *args, **kwargs): yield create_pool_ - await asyncio.gather(*[p.close(close_connection_pool=True) for p in pools]) + await asyncio.gather(*[p.aclose() for p in pools]) @pytest.fixture(name='cancel_remaining_task') diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 7266a9b3..762cdf8e 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -1,5 +1,6 @@ import asyncio import pickle +from datetime import timezone import pytest from dirty_equals import IsNow, IsStr @@ -82,11 +83,11 @@ async def foobar(ctx, *args, **kwargs): function='foobar', args=(1, 2), kwargs={'c': 3}, - enqueue_time=IsNow(tz='utc'), + enqueue_time=IsNow(tz=timezone.utc), success=True, result=42, - start_time=IsNow(tz='utc'), - finish_time=IsNow(tz='utc'), + start_time=IsNow(tz=timezone.utc), + finish_time=IsNow(tz=timezone.utc), score=None, queue_name=expected_queue_name, job_id=IsStr(), @@ -98,11 +99,11 @@ async def foobar(ctx, *args, **kwargs): args=(1, 2), kwargs={'c': 3}, job_try=1, - enqueue_time=IsNow(tz='utc'), + enqueue_time=IsNow(tz=timezone.utc), success=True, result=42, - start_time=IsNow(tz='utc'), - finish_time=IsNow(tz='utc'), + start_time=IsNow(tz=timezone.utc), + finish_time=IsNow(tz=timezone.utc), score=None, queue_name=expected_queue_name, job_id=j.job_id, diff --git a/tests/test_main.py b/tests/test_main.py index baf03ee8..1b08d4c4 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -134,7 +134,7 @@ async def test_job_info(arq_redis: ArqRedis): t_before = time() j = await arq_redis.enqueue_job('foobar', 123, a=456) info = await j.info() - assert info.enqueue_time == IsNow(tz='utc') + assert info.enqueue_time == IsNow(tz=timezone.utc) assert info.job_try is None assert info.function == 'foobar' assert info.args == (123,) @@ -254,7 +254,7 @@ async def test_get_jobs(arq_redis: ArqRedis): 'args': (), 'kwargs': {'a': 1, 'b': 2, 'c': 3}, 'job_try': None, - 'enqueue_time': IsNow(tz='utc'), + 'enqueue_time': IsNow(tz=timezone.utc), 'score': IsInt(), 'job_id': '1', }, @@ -263,7 +263,7 @@ async def test_get_jobs(arq_redis: ArqRedis): 'args': (4,), 'kwargs': {'b': 5, 'c': 6}, 'job_try': None, - 'enqueue_time': IsNow(tz='utc'), + 'enqueue_time': IsNow(tz=timezone.utc), 'score': IsInt(), 'job_id': '2', }, @@ -272,7 +272,7 @@ async def test_get_jobs(arq_redis: ArqRedis): 'args': (7,), 'kwargs': {'b': 8}, 'job_try': None, - 'enqueue_time': IsNow(tz='utc'), + 'enqueue_time': IsNow(tz=timezone.utc), 'score': IsInt(), 'job_id': '3', }, diff --git a/tests/test_utils.py b/tests/test_utils.py index 96c9a25c..887b7402 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -23,7 +23,7 @@ def test_settings_changed(): "ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs='required', ssl_ca_certs=None, " 'ssl_ca_data=None, ssl_check_hostname=False, conn_timeout=1, conn_retries=5, conn_retry_delay=1, ' "max_connections=None, sentinel=False, sentinel_master='mymaster', " - 'retry_on_timeout=False, retry_on_error=None, retry=None)' + 'retry_on_error=None, retry=None)' ) == str(settings) @@ -59,11 +59,11 @@ async def test_redis_success_log(test_redis_settings: RedisSettings, caplog, cre caplog.set_level(logging.INFO) pool = await create_pool(test_redis_settings) assert 'redis connection successful' not in [r.message for r in caplog.records] - await pool.close(close_connection_pool=True) + await pool.aclose() pool = await create_pool(test_redis_settings, retry=1) assert 'redis connection successful' in [r.message for r in caplog.records] - await pool.close(close_connection_pool=True) + await pool.aclose() async def test_redis_log(test_redis_settings: RedisSettings, create_pool): @@ -221,7 +221,6 @@ def test_settings_plain(): 'conn_retry_delay': 1, 'sentinel': False, 'sentinel_master': 'mymaster', - 'retry_on_timeout': False, 'retry_on_error': None, 'retry': None, 'max_connections': None, @@ -250,7 +249,6 @@ def test_settings_from_socket_dsn(): 'conn_retry_delay': 1, 'sentinel': False, 'sentinel_master': 'mymaster', - 'retry_on_timeout': False, 'retry_on_error': None, 'retry': None, 'max_connections': None, diff --git a/tests/test_worker.py b/tests/test_worker.py index 93fbc7f0..a8d1bf4f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -5,11 +5,10 @@ import signal import sys from datetime import datetime, timedelta, timezone -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock import msgpack import pytest -import redis.exceptions from arq.connections import ArqRedis, RedisSettings from arq.constants import abort_jobs_ss, default_queue_name, expires_extra_ms, health_check_key_suffix, job_key_prefix @@ -421,7 +420,7 @@ async def test_job_old(arq_redis: ArqRedis, worker, caplog): assert worker.jobs_retried == 0 log = re.sub(r'(\d+).\d\ds', r'\1.XXs', '\n'.join(r.message for r in caplog.records)) - assert log.endswith(' 0.XXs → testing:foobar() delayed=2.XXs\n' ' 0.XXs ← testing:foobar ● 42') + assert log.endswith(' 0.XXs → testing:foobar() delayed=2.XXs\n 0.XXs ← testing:foobar ● 42') async def test_retry_repr(): @@ -1060,79 +1059,3 @@ async def test_worker_timezone_defaults_to_system_timezone(worker): worker = worker(functions=[func(foobar)]) assert worker.timezone is not None assert worker.timezone == datetime.now().astimezone().tzinfo - - -@pytest.mark.parametrize( - 'exception_thrown', - [ - redis.exceptions.ConnectionError('Error while reading from host'), - redis.exceptions.TimeoutError('Timeout reading from host'), - ], -) -async def test_worker_retry(mocker, worker_retry, exception_thrown): - # Testing redis exceptions, with retry settings specified - worker = worker_retry(functions=[func(foobar)]) - - # patch db read_response to mimic connection exceptions - p = patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) - - # baseline - await worker.main() - await worker._poll_iteration() - - # spy method handling call_with_retry failure - spy = mocker.spy(worker.pool, '_disconnect_raise') - - try: - # start patch - p.start() - - # assert exception thrown - with pytest.raises(type(exception_thrown)): - await worker._poll_iteration() - - # assert retry counts and no exception thrown during '_disconnect_raise' - assert spy.call_count == 4 # retries setting + 1 - assert spy.spy_exception is None - - finally: - # stop patch to allow worker cleanup - p.stop() - - -@pytest.mark.parametrize( - 'exception_thrown', - [ - redis.exceptions.ConnectionError('Error while reading from host'), - redis.exceptions.TimeoutError('Timeout reading from host'), - ], -) -async def test_worker_crash(mocker, worker, exception_thrown): - # Testing redis exceptions, no retry settings specified - worker = worker(functions=[func(foobar)]) - - # patch db read_response to mimic connection exceptions - p = patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) - - # baseline - await worker.main() - await worker._poll_iteration() - - # spy method handling call_with_retry failure - spy = mocker.spy(worker.pool, '_disconnect_raise') - - try: - # start patch - p.start() - - # assert exception thrown - with pytest.raises(type(exception_thrown)): - await worker._poll_iteration() - - # assert no retry counts and exception thrown during '_disconnect_raise' - assert spy.call_count == 1 - assert spy.spy_exception == exception_thrown - - finally: - # stop patch to allow worker cleanup - p.stop()