Skip to content

Commit 780aa90

Browse files
authored
Fix/test warnings (#473)
* fix: warnings * change UTC to timezone to compat
1 parent 26485ff commit 780aa90

File tree

17 files changed

+167
-164
lines changed

17 files changed

+167
-164
lines changed

docs/examples/dynamics/dyn_scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ async def main() -> None:
2323
schedule=[
2424
{
2525
# Here we also can specify cron instead of time.
26-
"time": datetime.datetime.utcnow() + datetime.timedelta(seconds=2),
26+
"time": datetime.datetime.now(datetime.timezone.utc)
27+
+ datetime.timedelta(seconds=2),
2728
"args": [22],
2829
},
2930
],

docs/guide/scheduling-tasks.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ Now we can use this source to add new schedules in runtime. Here's an example:
111111
await my_task.schedule_by_time(
112112
redis_source,
113113
# It's better to use UTC time, or add tzinfo to datetime.
114-
datetime.datetime.utcnow() + datetime.timedelta(minutes=1, seconds=5),
114+
datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=1, seconds=5),
115115
# You can pass args and kwargs here as usual
116116
11,
117117
arg2="arg2",
@@ -137,7 +137,7 @@ If you want to pass additional labels, you can call these methods on the `Kicker
137137
.with_labels(label1="value")
138138
.schedule_by_time(
139139
redis_source,
140-
datetime.datetime.utcnow() + datetime.timedelta(seconds=10),
140+
datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=10),
141141
11,
142142
arg2="arg2",
143143
)
@@ -155,7 +155,7 @@ Each of these methods return you an instance of the `CreatedSchedule` class. Thi
155155
```python
156156
schedule = await my_task.schedule_by_time(
157157
redis_source,
158-
datetime.datetime.utcnow() + datetime.timedelta(minutes=1, seconds=5),
158+
datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=1, seconds=5),
159159
11,
160160
arg2="arg2",
161161
)

taskiq/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Distributed task manager."""
2+
23
from importlib.metadata import version
34

45
from taskiq_dependencies import Depends as TaskiqDepends

taskiq/abc/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Abstract classes for taskiq."""
2+
23
from taskiq.abc.broker import AsyncBroker
34
from taskiq.abc.result_backend import AsyncResultBackend
45

taskiq/cli/scheduler/run.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import inspect
33
import sys
4-
from datetime import datetime, timedelta
4+
from datetime import datetime, timedelta, timezone
55
from logging import basicConfig, getLevelName, getLogger
66
from typing import Any, Dict, List, Optional, Set, Tuple
77

@@ -273,10 +273,13 @@ async def run_scheduler(args: SchedulerArgs) -> None:
273273
await scheduler.startup()
274274
logger.info("Startup completed.")
275275
if args.skip_first_run:
276-
next_minute = datetime.utcnow().replace(second=0, microsecond=0) + timedelta(
276+
next_minute = datetime.now(timezone.utc).replace(
277+
second=0,
278+
microsecond=0,
279+
) + timedelta(
277280
minutes=1,
278281
)
279-
delay = next_minute - datetime.utcnow()
282+
delay = next_minute - datetime.now(timezone.utc)
280283
delay_secs = int(delay.total_seconds())
281284
logger.info(f"Skipping first run. Waiting {delay_secs} seconds.")
282285
await asyncio.sleep(delay.total_seconds())

taskiq/middlewares/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
"""Taskiq middlewares."""
22

3-
43
from .prometheus_middleware import PrometheusMiddleware
54
from .simple_retry_middleware import SimpleRetryMiddleware
65
from .smart_retry_middleware import SmartRetryMiddleware

taskiq/middlewares/smart_retry_middleware.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,9 @@ async def on_send(
109109
if self.schedule_source is None:
110110
await kicker.with_labels(delay=delay).kiq(*message.args, **message.kwargs)
111111
else:
112-
target_time = datetime.datetime.now(datetime.UTC) + datetime.timedelta(
112+
target_time = datetime.datetime.now(
113+
datetime.timezone.utc,
114+
) + datetime.timedelta(
113115
seconds=delay,
114116
)
115117
await kicker.schedule_by_time(

taskiq/result/__init__.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,4 @@
1-
from taskiq.compat import IS_PYDANTIC2
2-
3-
if IS_PYDANTIC2:
4-
from .v2 import TaskiqResult
5-
else:
6-
from .v1 import TaskiqResult # type: ignore
7-
1+
from .result import TaskiqResult
82

93
__all__ = [
104
"TaskiqResult",

taskiq/result/result.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import json
2+
import pickle
3+
from functools import partial
4+
from typing import Any, Callable, Dict, Generic, Optional, TypeVar
5+
6+
from pydantic import Field
7+
from typing_extensions import Self
8+
9+
from taskiq.compat import IS_PYDANTIC2
10+
from taskiq.serialization import exception_to_python, prepare_exception
11+
12+
_ReturnType = TypeVar("_ReturnType")
13+
14+
15+
def _json_encoder(value: Any, default: Callable[[Any], Any]) -> Any:
16+
if isinstance(value, BaseException):
17+
return prepare_exception(value, json)
18+
19+
return default(value)
20+
21+
22+
def _json_dumps(value: Any, *, default: Callable[[Any], Any], **kwargs: Any) -> str:
23+
return json.dumps(value, default=partial(_json_encoder, default=default), **kwargs)
24+
25+
26+
if IS_PYDANTIC2:
27+
from pydantic import BaseModel, ConfigDict, field_serializer, field_validator
28+
29+
class TaskiqResult(BaseModel, Generic[_ReturnType]):
30+
"""Result of a remote task invocation."""
31+
32+
is_err: bool
33+
# Log is a deprecated field. It would be removed in future
34+
# releases of not, if we find a way to capture logs in async
35+
# environment.
36+
log: Optional[str] = None
37+
return_value: _ReturnType
38+
execution_time: float
39+
labels: Dict[str, Any] = Field(default_factory=dict)
40+
41+
error: Optional[BaseException] = None
42+
43+
model_config = ConfigDict(arbitrary_types_allowed=True)
44+
45+
@field_serializer("error")
46+
def serialize_error(self, value: BaseException) -> Any:
47+
"""
48+
Serialize error field.
49+
50+
:returns: Any
51+
:param value: exception to serialize.
52+
"""
53+
if value:
54+
return prepare_exception(value, json)
55+
56+
return None
57+
58+
def raise_for_error(self) -> "Self":
59+
"""Raise exception if `error`.
60+
61+
:raises error: task execution exception
62+
:returns: TaskiqResult
63+
"""
64+
if self.error is not None:
65+
raise self.error
66+
return self
67+
68+
def __getstate__(self) -> Dict[Any, Any]:
69+
dict = super().__getstate__()
70+
vals: Dict[str, Any] = dict["__dict__"]
71+
72+
if "error" in vals and vals["error"] is not None:
73+
vals["error"] = prepare_exception(
74+
vals["error"],
75+
pickle,
76+
)
77+
78+
return dict
79+
80+
@field_validator("error", mode="before")
81+
@classmethod
82+
def _validate_error(cls, value: Any) -> Optional[BaseException]:
83+
return exception_to_python(value)
84+
85+
else:
86+
from pydantic import validator
87+
from pydantic.generics import GenericModel
88+
89+
class TaskiqResult(GenericModel, Generic[_ReturnType]): # type: ignore[no-redef]
90+
"""Result of a remote task invocation."""
91+
92+
is_err: bool
93+
# Log is a deprecated field. It would be removed in future
94+
# releases of not, if we find a way to capture logs in async
95+
# environment.
96+
log: Optional[str] = None
97+
return_value: _ReturnType
98+
execution_time: float
99+
labels: Dict[str, Any] = Field(default_factory=dict)
100+
101+
error: Optional[BaseException] = None
102+
103+
class Config:
104+
arbitrary_types_allowed = True
105+
json_dumps = _json_dumps # type: ignore
106+
json_loads = json.loads
107+
108+
def raise_for_error(self) -> "Self":
109+
"""Raise exception if `error`.
110+
111+
:raises error: task execution exception
112+
:returns: TaskiqResult
113+
"""
114+
if self.error is not None:
115+
raise self.error
116+
return self
117+
118+
def __getstate__(self) -> Dict[Any, Any]:
119+
dict = super().__getstate__()
120+
vals: Dict[str, Any] = dict["__dict__"]
121+
122+
if "error" in vals and vals["error"] is not None:
123+
vals["error"] = prepare_exception(
124+
vals["error"],
125+
pickle,
126+
)
127+
128+
return dict
129+
130+
@validator("error", pre=True)
131+
@classmethod
132+
def _validate_error(cls, value: Any) -> Optional[BaseException]:
133+
return exception_to_python(value)

taskiq/result/v1.py

Lines changed: 0 additions & 70 deletions
This file was deleted.

0 commit comments

Comments
 (0)