diff --git a/docs/examples/dynamics/dyn_scheduler.py b/docs/examples/dynamics/dyn_scheduler.py index e760868c..6e62f971 100644 --- a/docs/examples/dynamics/dyn_scheduler.py +++ b/docs/examples/dynamics/dyn_scheduler.py @@ -23,7 +23,8 @@ async def main() -> None: schedule=[ { # Here we also can specify cron instead of time. - "time": datetime.datetime.utcnow() + datetime.timedelta(seconds=2), + "time": datetime.datetime.now(datetime.timezone.utc) + + datetime.timedelta(seconds=2), "args": [22], }, ], diff --git a/docs/guide/scheduling-tasks.md b/docs/guide/scheduling-tasks.md index 43793204..1d9b305a 100644 --- a/docs/guide/scheduling-tasks.md +++ b/docs/guide/scheduling-tasks.md @@ -111,7 +111,7 @@ Now we can use this source to add new schedules in runtime. Here's an example: await my_task.schedule_by_time( redis_source, # It's better to use UTC time, or add tzinfo to datetime. - datetime.datetime.utcnow() + datetime.timedelta(minutes=1, seconds=5), + datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=1, seconds=5), # You can pass args and kwargs here as usual 11, arg2="arg2", @@ -137,7 +137,7 @@ If you want to pass additional labels, you can call these methods on the `Kicker .with_labels(label1="value") .schedule_by_time( redis_source, - datetime.datetime.utcnow() + datetime.timedelta(seconds=10), + datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=10), 11, arg2="arg2", ) @@ -155,7 +155,7 @@ Each of these methods return you an instance of the `CreatedSchedule` class. Thi ```python schedule = await my_task.schedule_by_time( redis_source, - datetime.datetime.utcnow() + datetime.timedelta(minutes=1, seconds=5), + datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=1, seconds=5), 11, arg2="arg2", ) diff --git a/taskiq/__init__.py b/taskiq/__init__.py index b2695e4b..2414754f 100644 --- a/taskiq/__init__.py +++ b/taskiq/__init__.py @@ -1,4 +1,5 @@ """Distributed task manager.""" + from importlib.metadata import version from taskiq_dependencies import Depends as TaskiqDepends diff --git a/taskiq/abc/__init__.py b/taskiq/abc/__init__.py index c15fe965..76b5f07b 100644 --- a/taskiq/abc/__init__.py +++ b/taskiq/abc/__init__.py @@ -1,4 +1,5 @@ """Abstract classes for taskiq.""" + from taskiq.abc.broker import AsyncBroker from taskiq.abc.result_backend import AsyncResultBackend diff --git a/taskiq/cli/scheduler/run.py b/taskiq/cli/scheduler/run.py index 09467dc0..7a7d9f53 100644 --- a/taskiq/cli/scheduler/run.py +++ b/taskiq/cli/scheduler/run.py @@ -1,7 +1,7 @@ import asyncio import inspect import sys -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from logging import basicConfig, getLevelName, getLogger from typing import Any, Dict, List, Optional, Set, Tuple @@ -273,10 +273,13 @@ async def run_scheduler(args: SchedulerArgs) -> None: await scheduler.startup() logger.info("Startup completed.") if args.skip_first_run: - next_minute = datetime.utcnow().replace(second=0, microsecond=0) + timedelta( + next_minute = datetime.now(timezone.utc).replace( + second=0, + microsecond=0, + ) + timedelta( minutes=1, ) - delay = next_minute - datetime.utcnow() + delay = next_minute - datetime.now(timezone.utc) delay_secs = int(delay.total_seconds()) logger.info(f"Skipping first run. Waiting {delay_secs} seconds.") await asyncio.sleep(delay.total_seconds()) diff --git a/taskiq/middlewares/__init__.py b/taskiq/middlewares/__init__.py index f236a3eb..18a9c50f 100644 --- a/taskiq/middlewares/__init__.py +++ b/taskiq/middlewares/__init__.py @@ -1,6 +1,5 @@ """Taskiq middlewares.""" - from .prometheus_middleware import PrometheusMiddleware from .simple_retry_middleware import SimpleRetryMiddleware from .smart_retry_middleware import SmartRetryMiddleware diff --git a/taskiq/middlewares/smart_retry_middleware.py b/taskiq/middlewares/smart_retry_middleware.py index d58c7611..ef0bcb63 100644 --- a/taskiq/middlewares/smart_retry_middleware.py +++ b/taskiq/middlewares/smart_retry_middleware.py @@ -109,7 +109,9 @@ async def on_send( if self.schedule_source is None: await kicker.with_labels(delay=delay).kiq(*message.args, **message.kwargs) else: - target_time = datetime.datetime.now(datetime.UTC) + datetime.timedelta( + target_time = datetime.datetime.now( + datetime.timezone.utc, + ) + datetime.timedelta( seconds=delay, ) await kicker.schedule_by_time( diff --git a/taskiq/result/__init__.py b/taskiq/result/__init__.py index b17ed941..6c72ef22 100644 --- a/taskiq/result/__init__.py +++ b/taskiq/result/__init__.py @@ -1,10 +1,4 @@ -from taskiq.compat import IS_PYDANTIC2 - -if IS_PYDANTIC2: - from .v2 import TaskiqResult -else: - from .v1 import TaskiqResult # type: ignore - +from .result import TaskiqResult __all__ = [ "TaskiqResult", diff --git a/taskiq/result/result.py b/taskiq/result/result.py new file mode 100644 index 00000000..42120cfa --- /dev/null +++ b/taskiq/result/result.py @@ -0,0 +1,133 @@ +import json +import pickle +from functools import partial +from typing import Any, Callable, Dict, Generic, Optional, TypeVar + +from pydantic import Field +from typing_extensions import Self + +from taskiq.compat import IS_PYDANTIC2 +from taskiq.serialization import exception_to_python, prepare_exception + +_ReturnType = TypeVar("_ReturnType") + + +def _json_encoder(value: Any, default: Callable[[Any], Any]) -> Any: + if isinstance(value, BaseException): + return prepare_exception(value, json) + + return default(value) + + +def _json_dumps(value: Any, *, default: Callable[[Any], Any], **kwargs: Any) -> str: + return json.dumps(value, default=partial(_json_encoder, default=default), **kwargs) + + +if IS_PYDANTIC2: + from pydantic import BaseModel, ConfigDict, field_serializer, field_validator + + class TaskiqResult(BaseModel, Generic[_ReturnType]): + """Result of a remote task invocation.""" + + is_err: bool + # Log is a deprecated field. It would be removed in future + # releases of not, if we find a way to capture logs in async + # environment. + log: Optional[str] = None + return_value: _ReturnType + execution_time: float + labels: Dict[str, Any] = Field(default_factory=dict) + + error: Optional[BaseException] = None + + model_config = ConfigDict(arbitrary_types_allowed=True) + + @field_serializer("error") + def serialize_error(self, value: BaseException) -> Any: + """ + Serialize error field. + + :returns: Any + :param value: exception to serialize. + """ + if value: + return prepare_exception(value, json) + + return None + + def raise_for_error(self) -> "Self": + """Raise exception if `error`. + + :raises error: task execution exception + :returns: TaskiqResult + """ + if self.error is not None: + raise self.error + return self + + def __getstate__(self) -> Dict[Any, Any]: + dict = super().__getstate__() + vals: Dict[str, Any] = dict["__dict__"] + + if "error" in vals and vals["error"] is not None: + vals["error"] = prepare_exception( + vals["error"], + pickle, + ) + + return dict + + @field_validator("error", mode="before") + @classmethod + def _validate_error(cls, value: Any) -> Optional[BaseException]: + return exception_to_python(value) + +else: + from pydantic import validator + from pydantic.generics import GenericModel + + class TaskiqResult(GenericModel, Generic[_ReturnType]): # type: ignore[no-redef] + """Result of a remote task invocation.""" + + is_err: bool + # Log is a deprecated field. It would be removed in future + # releases of not, if we find a way to capture logs in async + # environment. + log: Optional[str] = None + return_value: _ReturnType + execution_time: float + labels: Dict[str, Any] = Field(default_factory=dict) + + error: Optional[BaseException] = None + + class Config: + arbitrary_types_allowed = True + json_dumps = _json_dumps # type: ignore + json_loads = json.loads + + def raise_for_error(self) -> "Self": + """Raise exception if `error`. + + :raises error: task execution exception + :returns: TaskiqResult + """ + if self.error is not None: + raise self.error + return self + + def __getstate__(self) -> Dict[Any, Any]: + dict = super().__getstate__() + vals: Dict[str, Any] = dict["__dict__"] + + if "error" in vals and vals["error"] is not None: + vals["error"] = prepare_exception( + vals["error"], + pickle, + ) + + return dict + + @validator("error", pre=True) + @classmethod + def _validate_error(cls, value: Any) -> Optional[BaseException]: + return exception_to_python(value) diff --git a/taskiq/result/v1.py b/taskiq/result/v1.py deleted file mode 100644 index 95297053..00000000 --- a/taskiq/result/v1.py +++ /dev/null @@ -1,70 +0,0 @@ -import json -import pickle -from functools import partial -from typing import Any, Callable, Dict, Generic, Optional, TypeVar - -from pydantic import Field, validator -from pydantic.generics import GenericModel -from typing_extensions import Self - -from taskiq.serialization import exception_to_python, prepare_exception - -_ReturnType = TypeVar("_ReturnType") - - -def _json_encoder(value: Any, default: Callable[[Any], Any]) -> Any: - if isinstance(value, BaseException): - return prepare_exception(value, json) - - return default(value) - - -def _json_dumps(value: Any, *, default: Callable[[Any], Any], **kwargs: Any) -> str: - return json.dumps(value, default=partial(_json_encoder, default=default), **kwargs) - - -class TaskiqResult(GenericModel, Generic[_ReturnType]): - """Result of a remote task invocation.""" - - is_err: bool - # Log is a deprecated field. It would be removed in future - # releases of not, if we find a way to capture logs in async - # environment. - log: Optional[str] = None - return_value: _ReturnType - execution_time: float - labels: Dict[str, Any] = Field(default_factory=dict) - - error: Optional[BaseException] = None - - class Config: - arbitrary_types_allowed = True - json_dumps = _json_dumps # type: ignore - json_loads = json.loads - - def raise_for_error(self) -> "Self": - """Raise exception if `error`. - - :raises error: task execution exception - :returns: TaskiqResult - """ - if self.error is not None: - raise self.error - return self - - def __getstate__(self) -> Dict[Any, Any]: - dict = super().__getstate__() - vals: Dict[str, Any] = dict["__dict__"] - - if "error" in vals and vals["error"] is not None: - vals["error"] = prepare_exception( - vals["error"], - pickle, - ) - - return dict - - @validator("error", pre=True) - @classmethod - def _validate_error(cls, value: Any) -> Optional[BaseException]: - return exception_to_python(value) diff --git a/taskiq/result/v2.py b/taskiq/result/v2.py deleted file mode 100644 index 6294a2e3..00000000 --- a/taskiq/result/v2.py +++ /dev/null @@ -1,67 +0,0 @@ -import json -import pickle -from typing import Any, Dict, Generic, Optional, TypeVar - -from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator -from typing_extensions import Self - -from taskiq.serialization import exception_to_python, prepare_exception - -_ReturnType = TypeVar("_ReturnType") - - -class TaskiqResult(BaseModel, Generic[_ReturnType]): - """Result of a remote task invocation.""" - - is_err: bool - # Log is a deprecated field. It would be removed in future - # releases of not, if we find a way to capture logs in async - # environment. - log: Optional[str] = None - return_value: _ReturnType - execution_time: float - labels: Dict[str, Any] = Field(default_factory=dict) - - error: Optional[BaseException] = None - - model_config = ConfigDict(arbitrary_types_allowed=True) - - @field_serializer("error") - def serialize_error(self, value: BaseException) -> Any: - """ - Serialize error field. - - :returns: Any - :param value: exception to serialize. - """ - if value: - return prepare_exception(value, json) - - return None - - def raise_for_error(self) -> "Self": - """Raise exception if `error`. - - :raises error: task execution exception - :returns: TaskiqResult - """ - if self.error is not None: - raise self.error - return self - - def __getstate__(self) -> Dict[Any, Any]: - dict = super().__getstate__() - vals: Dict[str, Any] = dict["__dict__"] - - if "error" in vals and vals["error"] is not None: - vals["error"] = prepare_exception( - vals["error"], - pickle, - ) - - return dict - - @field_validator("error", mode="before") - @classmethod - def _validate_error(cls, value: Any) -> Optional[BaseException]: - return exception_to_python(value) diff --git a/taskiq/schedule_sources/__init__.py b/taskiq/schedule_sources/__init__.py index 1ad5a6fd..7e2dfb16 100644 --- a/taskiq/schedule_sources/__init__.py +++ b/taskiq/schedule_sources/__init__.py @@ -1,4 +1,5 @@ """Package for schedule sources.""" + from taskiq.schedule_sources.label_based import LabelScheduleSource __all__ = [ diff --git a/taskiq/scheduler/scheduler.py b/taskiq/scheduler/scheduler.py index 04f0887d..b2484243 100644 --- a/taskiq/scheduler/scheduler.py +++ b/taskiq/scheduler/scheduler.py @@ -46,11 +46,15 @@ async def on_ready(self, source: "ScheduleSource", task: ScheduledTask) -> None: except ScheduledTaskCancelledError: logger.info("Scheduled task %s has been cancelled.", task.task_name) else: - await AsyncKicker(task.task_name, self.broker, task.labels).with_labels( - schedule_id=task.schedule_id, - ).kiq( - *task.args, - **task.kwargs, + await ( + AsyncKicker(task.task_name, self.broker, task.labels) + .with_labels( + schedule_id=task.schedule_id, + ) + .kiq( + *task.args, + **task.kwargs, + ) ) await maybe_awaitable(source.post_send(task)) diff --git a/taskiq/serializers/__init__.py b/taskiq/serializers/__init__.py index 8f29165f..2874ffe6 100644 --- a/taskiq/serializers/__init__.py +++ b/taskiq/serializers/__init__.py @@ -1,4 +1,5 @@ """Taskiq serializers.""" + from .cbor_serializer import CBORSerializer from .json_serializer import JSONSerializer from .msgpack_serializer import MSGPackSerializer diff --git a/tests/api/test_scheduler.py b/tests/api/test_scheduler.py index ce09bab6..96546d38 100644 --- a/tests/api/test_scheduler.py +++ b/tests/api/test_scheduler.py @@ -1,6 +1,6 @@ import asyncio import contextlib -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import pytest @@ -16,7 +16,7 @@ async def test_successful() -> None: scheduler = TaskiqScheduler(broker, sources=[LabelScheduleSource(broker)]) scheduler_task = asyncio.create_task(run_scheduler_task(scheduler)) - @broker.task(schedule=[{"time": datetime.utcnow() - timedelta(seconds=1)}]) + @broker.task(schedule=[{"time": datetime.now(timezone.utc) - timedelta(seconds=1)}]) def _() -> None: ... @@ -31,7 +31,7 @@ async def test_cancelation() -> None: broker = AsyncQueueBroker() scheduler = TaskiqScheduler(broker, sources=[LabelScheduleSource(broker)]) - @broker.task(schedule=[{"time": datetime.utcnow()}]) + @broker.task(schedule=[{"time": datetime.now(timezone.utc)}]) def _() -> None: ... diff --git a/tests/cli/scheduler/test_task_delays.py b/tests/cli/scheduler/test_task_delays.py index 2e00fe37..1087fe52 100644 --- a/tests/cli/scheduler/test_task_delays.py +++ b/tests/cli/scheduler/test_task_delays.py @@ -40,7 +40,7 @@ def test_should_run_cron_str_offset() -> None: def test_should_run_cron_td_offset() -> None: offset = 2 - hour = (datetime.datetime.utcnow().hour + offset) % 24 + hour = (datetime.datetime.now(datetime.timezone.utc).hour + offset) % 24 delay = get_task_delay( ScheduledTask( task_name="", @@ -55,7 +55,7 @@ def test_should_run_cron_td_offset() -> None: def test_time_utc_without_zone() -> None: - time = datetime.datetime.utcnow() + time = datetime.datetime.now() delay = get_task_delay( ScheduledTask( task_name="", diff --git a/tests/test_task.py b/tests/test_task.py index b0b76a91..da57dbb2 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -8,7 +8,7 @@ from taskiq.abc import AsyncResultBackend from taskiq.abc.serializer import TaskiqSerializer from taskiq.compat import model_dump, model_validate -from taskiq.result.v1 import TaskiqResult +from taskiq.result import TaskiqResult from taskiq.task import AsyncTaskiqTask _ReturnType = TypeVar("_ReturnType")