Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/examples/dynamics/dyn_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ async def main() -> None:
schedule=[
{
# Here we also can specify cron instead of time.
"time": datetime.datetime.utcnow() + datetime.timedelta(seconds=2),
"time": datetime.datetime.now(datetime.timezone.utc)
+ datetime.timedelta(seconds=2),
"args": [22],
},
],
Expand Down
6 changes: 3 additions & 3 deletions docs/guide/scheduling-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ Now we can use this source to add new schedules in runtime. Here's an example:
await my_task.schedule_by_time(
redis_source,
# It's better to use UTC time, or add tzinfo to datetime.
datetime.datetime.utcnow() + datetime.timedelta(minutes=1, seconds=5),
datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=1, seconds=5),
# You can pass args and kwargs here as usual
11,
arg2="arg2",
Expand All @@ -137,7 +137,7 @@ If you want to pass additional labels, you can call these methods on the `Kicker
.with_labels(label1="value")
.schedule_by_time(
redis_source,
datetime.datetime.utcnow() + datetime.timedelta(seconds=10),
datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=10),
11,
arg2="arg2",
)
Expand All @@ -155,7 +155,7 @@ Each of these methods return you an instance of the `CreatedSchedule` class. Thi
```python
schedule = await my_task.schedule_by_time(
redis_source,
datetime.datetime.utcnow() + datetime.timedelta(minutes=1, seconds=5),
datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=1, seconds=5),
11,
arg2="arg2",
)
Expand Down
1 change: 1 addition & 0 deletions taskiq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Distributed task manager."""

from importlib.metadata import version

from taskiq_dependencies import Depends as TaskiqDepends
Expand Down
1 change: 1 addition & 0 deletions taskiq/abc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Abstract classes for taskiq."""

from taskiq.abc.broker import AsyncBroker
from taskiq.abc.result_backend import AsyncResultBackend

Expand Down
9 changes: 6 additions & 3 deletions taskiq/cli/scheduler/run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import inspect
import sys
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from logging import basicConfig, getLevelName, getLogger
from typing import Any, Dict, List, Optional, Set, Tuple

Expand Down Expand Up @@ -273,10 +273,13 @@ async def run_scheduler(args: SchedulerArgs) -> None:
await scheduler.startup()
logger.info("Startup completed.")
if args.skip_first_run:
next_minute = datetime.utcnow().replace(second=0, microsecond=0) + timedelta(
next_minute = datetime.now(timezone.utc).replace(
second=0,
microsecond=0,
) + timedelta(
minutes=1,
)
delay = next_minute - datetime.utcnow()
delay = next_minute - datetime.now(timezone.utc)
delay_secs = int(delay.total_seconds())
logger.info(f"Skipping first run. Waiting {delay_secs} seconds.")
await asyncio.sleep(delay.total_seconds())
Expand Down
1 change: 0 additions & 1 deletion taskiq/middlewares/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Taskiq middlewares."""


from .prometheus_middleware import PrometheusMiddleware
from .simple_retry_middleware import SimpleRetryMiddleware
from .smart_retry_middleware import SmartRetryMiddleware
Expand Down
4 changes: 3 additions & 1 deletion taskiq/middlewares/smart_retry_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ async def on_send(
if self.schedule_source is None:
await kicker.with_labels(delay=delay).kiq(*message.args, **message.kwargs)
else:
target_time = datetime.datetime.now(datetime.UTC) + datetime.timedelta(
target_time = datetime.datetime.now(
datetime.timezone.utc,
) + datetime.timedelta(
seconds=delay,
)
await kicker.schedule_by_time(
Expand Down
8 changes: 1 addition & 7 deletions taskiq/result/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
from taskiq.compat import IS_PYDANTIC2

if IS_PYDANTIC2:
from .v2 import TaskiqResult
else:
from .v1 import TaskiqResult # type: ignore

from .result import TaskiqResult

__all__ = [
"TaskiqResult",
Expand Down
133 changes: 133 additions & 0 deletions taskiq/result/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import json
import pickle
from functools import partial
from typing import Any, Callable, Dict, Generic, Optional, TypeVar

from pydantic import Field
from typing_extensions import Self

from taskiq.compat import IS_PYDANTIC2
from taskiq.serialization import exception_to_python, prepare_exception

_ReturnType = TypeVar("_ReturnType")


def _json_encoder(value: Any, default: Callable[[Any], Any]) -> Any:
if isinstance(value, BaseException):
return prepare_exception(value, json)

return default(value)


def _json_dumps(value: Any, *, default: Callable[[Any], Any], **kwargs: Any) -> str:
return json.dumps(value, default=partial(_json_encoder, default=default), **kwargs)


if IS_PYDANTIC2:
from pydantic import BaseModel, ConfigDict, field_serializer, field_validator

class TaskiqResult(BaseModel, Generic[_ReturnType]):
"""Result of a remote task invocation."""

is_err: bool
# Log is a deprecated field. It would be removed in future
# releases of not, if we find a way to capture logs in async
# environment.
log: Optional[str] = None
return_value: _ReturnType
execution_time: float
labels: Dict[str, Any] = Field(default_factory=dict)

error: Optional[BaseException] = None

model_config = ConfigDict(arbitrary_types_allowed=True)

@field_serializer("error")
def serialize_error(self, value: BaseException) -> Any:
"""
Serialize error field.

:returns: Any
:param value: exception to serialize.
"""
if value:
return prepare_exception(value, json)

return None

def raise_for_error(self) -> "Self":
"""Raise exception if `error`.

:raises error: task execution exception
:returns: TaskiqResult
"""
if self.error is not None:
raise self.error
return self

def __getstate__(self) -> Dict[Any, Any]:
dict = super().__getstate__()
vals: Dict[str, Any] = dict["__dict__"]

if "error" in vals and vals["error"] is not None:
vals["error"] = prepare_exception(
vals["error"],
pickle,
)

return dict

@field_validator("error", mode="before")
@classmethod
def _validate_error(cls, value: Any) -> Optional[BaseException]:
return exception_to_python(value)

else:
from pydantic import validator
from pydantic.generics import GenericModel

class TaskiqResult(GenericModel, Generic[_ReturnType]): # type: ignore[no-redef]
"""Result of a remote task invocation."""

is_err: bool
# Log is a deprecated field. It would be removed in future
# releases of not, if we find a way to capture logs in async
# environment.
log: Optional[str] = None
return_value: _ReturnType
execution_time: float
labels: Dict[str, Any] = Field(default_factory=dict)

error: Optional[BaseException] = None

class Config:
arbitrary_types_allowed = True
json_dumps = _json_dumps # type: ignore
json_loads = json.loads

def raise_for_error(self) -> "Self":
"""Raise exception if `error`.

:raises error: task execution exception
:returns: TaskiqResult
"""
if self.error is not None:
raise self.error
return self

def __getstate__(self) -> Dict[Any, Any]:
dict = super().__getstate__()
vals: Dict[str, Any] = dict["__dict__"]

if "error" in vals and vals["error"] is not None:
vals["error"] = prepare_exception(
vals["error"],
pickle,
)

return dict

@validator("error", pre=True)
@classmethod
def _validate_error(cls, value: Any) -> Optional[BaseException]:
return exception_to_python(value)
70 changes: 0 additions & 70 deletions taskiq/result/v1.py

This file was deleted.

67 changes: 0 additions & 67 deletions taskiq/result/v2.py

This file was deleted.

1 change: 1 addition & 0 deletions taskiq/schedule_sources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Package for schedule sources."""

from taskiq.schedule_sources.label_based import LabelScheduleSource

__all__ = [
Expand Down
Loading