Skip to content

Commit 43af7a9

Browse files
authored
Added new serializers. (#217)
1 parent 6e822b5 commit 43af7a9

File tree

11 files changed

+526
-14
lines changed

11 files changed

+526
-14
lines changed

.github/workflows/test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121
python-version: "3.11"
2222
cache: "poetry"
2323
- name: Install deps
24-
run: poetry install
24+
run: poetry install --all-extras
2525
- name: Run lint check
2626
run: poetry run pre-commit run -a ${{ matrix.cmd }}
2727
pytest:
@@ -44,7 +44,7 @@ jobs:
4444
python-version: "${{ matrix.py_version }}"
4545
cache: "poetry"
4646
- name: Install deps
47-
run: poetry install
47+
run: poetry install --all-extras
4848
- name: Run pytest check
4949
run: poetry run pytest -vv -n auto --cov="taskiq" .
5050
- name: Generate report

docs/guide/message-format.md

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
---
2+
order: 11
3+
---
4+
5+
# Taskiq message format
6+
7+
Taskiq doesn't force you to use any specific message format. We define default message format,
8+
but you can use any format you want.
9+
10+
The default message format is:
11+
12+
13+
::: tabs
14+
15+
@tab example
16+
17+
```json
18+
{
19+
"task_name": "my_project.module1.task",
20+
"args": [1, 2, 3],
21+
"kwargs": {"a": 1, "b": 2, "c": 3},
22+
"labels": {
23+
"label1": "value1",
24+
"label2": "value2"
25+
}
26+
}
27+
```
28+
29+
@tab json schema
30+
31+
```json
32+
{
33+
"properties": {
34+
"task_id": {
35+
"title": "Task Id",
36+
"type": "string"
37+
},
38+
"task_name": {
39+
"title": "Name of the task",
40+
"type": "string"
41+
},
42+
"labels": {
43+
"title": "Additional labels",
44+
"type": "object"
45+
},
46+
"args": {
47+
"items": {},
48+
"title": "Arguments",
49+
"type": "array"
50+
},
51+
"kwargs": {
52+
"title": "Keyword arguments",
53+
"type": "object"
54+
}
55+
},
56+
"required": [
57+
"task_id",
58+
"task_name",
59+
"labels",
60+
"args",
61+
"kwargs"
62+
],
63+
"type": "object"
64+
}
65+
```
66+
67+
:::
68+
69+
But this can be easily changed by creating your own implementation of the TaskiqFormatter class or TaskiqSerializer class.
70+
71+
72+
### Serializers
73+
74+
Serializers define the format of the message but not the structure. For example, if you want to use msgpack or ORJson to serialize your message, you should update the serializer of your broker.
75+
76+
Be default, Taskiq uses JSON serializer. But we also have some implementations of other serializers:
77+
78+
* ORJSONSerializer - faster [JSON implementation](https://pypi.org/project/orjson/). Also, it supports datetime and UUID serialization.
79+
* MSGPackSerializer - [MsgPack](https://pypi.org/project/msgpack/) format serializer. It might be useful to send less data over the network.
80+
* CBORSerializer - [CBOR](https://pypi.org/project/cbor2/) format serializer. It is also has a smaller size than JSON.
81+
82+
To define your own serializer, you have to subclass the TaskiqSerializer class and implement `dumpb` and `loadb` methods. You can take a look at the existing implementations from the `taskiq.serializers` module.
83+
84+
To install taskiq with libraries for non-JSON serializers, you should install taskiq with extras.
85+
86+
::: tabs
87+
88+
@tab orjson
89+
90+
```bash
91+
pip install "taskiq[orjson]"
92+
```
93+
94+
@tab msgpack
95+
96+
```bash
97+
pip install "taskiq[msgpack]"
98+
```
99+
100+
@tab cbor
101+
102+
```bash
103+
pip install "taskiq[cbor]"
104+
```
105+
106+
:::
107+
108+
### Formatters
109+
110+
Formatters define the format of the message. It might be useful if you'd like to send a task to a celery worker for a different project. You can do it in seriazier as well, but formatters give you correct type hints.
111+
112+
By default we use a formatter that dumps the message to dict and serializes it using serializer. But you can define your own formatter to send a message in any format you want. To define a new formatter, you have to subclass the TaskiqFormatter class and implement `dumps` and `loads` methods.
113+
As an example, you can take a look at the `JSONFormatter` from `taskiq.formatters` implementation.

poetry.lock

Lines changed: 177 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,14 @@ prometheus_client = { version = "^0", optional = true }
3939
# For ZMQBroker
4040
pyzmq = { version = "^23.2.0", optional = true }
4141
# For speed
42-
uvloop = { version = ">=0.16.0,<1", optional = true }
42+
uvloop = { version = ">=0.16.0,<1", optional = true, markers = "sys_platform != 'win32'" }
4343
# For hot-reload.
4444
watchdog = { version = "^2.1.9", optional = true }
4545
gitignore-parser = { version = "^0", optional = true }
4646
pytz = "*"
47+
orjson = { version = "^3.9.9", optional = true }
48+
msgpack = { version = "^1.0.7", optional = true }
49+
cbor2 = { version = "^5.4.6", optional = true }
4750

4851
[tool.poetry.dev-dependencies]
4952
pytest = "^7.1.2"
@@ -68,6 +71,9 @@ zmq = ["pyzmq"]
6871
uv = ["uvloop"]
6972
metrics = ["prometheus_client"]
7073
reload = ["watchdog", "gitignore-parser"]
74+
orjson = ["orjson"]
75+
msgpack = ["msgpack"]
76+
cbor = ["cbor2"]
7177

7278
[tool.poetry.scripts]
7379
taskiq = "taskiq.__main__:main"
@@ -160,12 +166,12 @@ line-length = 88
160166

161167
[tool.ruff.per-file-ignores]
162168
"tests/*" = [
163-
"S101", # Use of assert detected
164-
"S301", # Use of pickle detected
165-
"D103", # Missing docstring in public function
169+
"S101", # Use of assert detected
170+
"S301", # Use of pickle detected
171+
"D103", # Missing docstring in public function
166172
"SLF001", # Private member accessed
167-
"S311", # Standard pseudo-random generators are not suitable for security/cryptographic purposes
168-
"D101", # Missing docstring in public class
173+
"S311", # Standard pseudo-random generators are not suitable for security/cryptographic purposes
174+
"D101", # Missing docstring in public class
169175
]
170176

171177
[tool.ruff.pydocstyle]
@@ -176,7 +182,4 @@ ignore-decorators = ["typing.overload"]
176182
allow-magic-value-types = ["int", "str", "float", "tuple"]
177183

178184
[tool.ruff.flake8-bugbear]
179-
extend-immutable-calls = [
180-
"taskiq_dependencies.Depends",
181-
"taskiq.TaskiqDepends",
182-
]
185+
extend-immutable-calls = ["taskiq_dependencies.Depends", "taskiq.TaskiqDepends"]

taskiq/abc/broker.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,16 @@ def with_serializer(
495495
self.serializer = serializer
496496
return self
497497

498+
def with_formatter(self, formatter: "TaskiqFormatter") -> "Self":
499+
"""
500+
Set new formatter and return an updated broker.
501+
502+
:param formatter: new formatter.
503+
:return: self
504+
"""
505+
self.formatter = formatter
506+
return self
507+
498508
def _register_task(
499509
self,
500510
task_name: str,

taskiq/serializers/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,12 @@
11
"""Taskiq serializers."""
2+
from .cbor_serializer import CBORSerializer
3+
from .json_serializer import JSONSerializer
4+
from .msgpack_serializer import MSGPackSerializer
5+
from .orjson_serializer import ORJSONSerializer
6+
7+
__all__ = [
8+
"JSONSerializer",
9+
"ORJSONSerializer",
10+
"MSGPackSerializer",
11+
"CBORSerializer",
12+
]

taskiq/serializers/cbor_serializer.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import datetime
2+
from typing import Any, Callable, Optional
3+
4+
from taskiq.abc.serializer import TaskiqSerializer
5+
6+
try:
7+
import cbor2
8+
except ImportError:
9+
cbor2 = None # type: ignore
10+
11+
12+
class CBORSerializer(TaskiqSerializer):
13+
"""
14+
Taskiq serializer using cbor2 library.
15+
16+
See https://cbor2.readthedocs.io/en/stable/ for more information.
17+
"""
18+
19+
def __init__(
20+
self,
21+
datetime_as_timestamp: bool = True,
22+
timezone: Optional[datetime.tzinfo] = None,
23+
value_sharing: bool = False,
24+
default: Optional[Callable[[Any, Any], Any]] = None,
25+
canonical: bool = False,
26+
date_as_datetime: bool = True,
27+
string_referencing: bool = True,
28+
# Decoder options
29+
tag_hook: "Optional[Callable[[cbor2.CborDecoder, Any], Any]]" = None,
30+
object_hook: Optional[Callable[[Any], Any]] = None,
31+
) -> None:
32+
if cbor2 is None:
33+
raise ImportError("cbor2 is not installed")
34+
self.datetime_as_timestamp = datetime_as_timestamp
35+
self.timezone = timezone
36+
self.value_sharing = value_sharing
37+
self.default = default
38+
self.canonical = canonical
39+
self.date_as_datetime = date_as_datetime
40+
self.string_referencing = string_referencing
41+
self.tag_hook = tag_hook
42+
self.object_hook = object_hook
43+
44+
def dumpb(self, value: Any) -> bytes:
45+
"""Dump value to bytes."""
46+
return cbor2.dumps(
47+
value,
48+
datetime_as_timestamp=self.datetime_as_timestamp,
49+
timezone=self.timezone,
50+
value_sharing=self.value_sharing,
51+
default=self.default,
52+
canonical=self.canonical,
53+
date_as_datetime=self.date_as_datetime,
54+
string_referencing=self.string_referencing,
55+
)
56+
57+
def loadb(self, value: bytes) -> Any:
58+
"""Load value from bytes."""
59+
return cbor2.loads(
60+
value,
61+
tag_hook=self.tag_hook,
62+
object_hook=self.object_hook,
63+
)
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from typing import Any, Callable, Optional
2+
3+
from taskiq.abc.serializer import TaskiqSerializer
4+
5+
try:
6+
import msgpack
7+
except ImportError:
8+
msgpack = None # type: ignore
9+
10+
11+
class MSGPackSerializer(TaskiqSerializer):
12+
"""Taskiq serializer using msgpack library."""
13+
14+
def __init__(
15+
self,
16+
default: Optional[Callable[[Any], Any]] = None,
17+
use_single_float: bool = False,
18+
use_bin_type: bool = True,
19+
datetime: bool = True,
20+
) -> None:
21+
if msgpack is None:
22+
raise ImportError("msgpack is not installed")
23+
self.default = default
24+
self.use_single_float = use_single_float
25+
self.use_bin_type = use_bin_type
26+
self.datetime = datetime
27+
28+
def dumpb(self, value: Any) -> bytes:
29+
"""Dump value to bytes."""
30+
return msgpack.packb(
31+
value,
32+
default=self.default,
33+
use_single_float=self.use_single_float,
34+
use_bin_type=self.use_bin_type,
35+
datetime=self.datetime,
36+
)
37+
38+
def loadb(self, value: bytes) -> Any:
39+
"""Load value from bytes."""
40+
return msgpack.unpackb(value, timestamp=3)
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from typing import Any, Callable, Optional
2+
3+
from taskiq.abc.serializer import TaskiqSerializer
4+
5+
try:
6+
import orjson
7+
except ImportError:
8+
orjson = None # type: ignore
9+
10+
11+
class ORJSONSerializer(TaskiqSerializer):
12+
"""Taskiq serializer using orjson library."""
13+
14+
def __init__(
15+
self,
16+
default: Optional[Callable[[Any], Any]] = None,
17+
option: Optional[int] = None,
18+
) -> None:
19+
if orjson is None:
20+
raise ImportError("orjson is not installed")
21+
self.default = default
22+
self.option = option
23+
24+
def dumpb(self, value: Any) -> bytes:
25+
"""Dump value to bytes."""
26+
return orjson.dumps(
27+
value,
28+
default=self.default,
29+
option=self.option,
30+
)
31+
32+
def loadb(self, value: bytes) -> Any:
33+
"""Load value from bytes."""
34+
return orjson.loads(value)

tests/cli/scheduler/test_task_delays.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def test_should_run_cron_str_offset() -> None:
4545

4646
def test_should_run_cron_td_offset() -> None:
4747
offset = 2
48-
hour = datetime.datetime.utcnow().hour + offset
48+
hour = (datetime.datetime.utcnow().hour + offset) % 24
4949
delay = get_task_delay(
5050
ScheduledTask(
5151
task_name="",

0 commit comments

Comments
 (0)