Skip to content

Commit 06834b3

Browse files
authored
feat(v2): Full AMQP spec implementation (#254)
* feat(v2): full AMQP 1.0 implementation Signed-off-by: Tudor Plugaru <[email protected]> * feat(v2): Support both `_` and `:` when reading AMQP messages Signed-off-by: Tudor Plugaru <[email protected]> * chore: define type alias for EventFactory and use it everywhere Signed-off-by: Tudor Plugaru <[email protected]> --------- Signed-off-by: Tudor Plugaru <[email protected]>
1 parent 675c44b commit 06834b3

File tree

7 files changed

+1232
-34
lines changed

7 files changed

+1232
-34
lines changed

src/cloudevents/core/base.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,21 @@
1313
# under the License.
1414

1515
from datetime import datetime
16-
from typing import Any, Protocol
16+
from typing import Any, Callable, Protocol
17+
18+
EventFactory = Callable[
19+
[dict[str, Any], dict[str, Any] | str | bytes | None], "BaseCloudEvent"
20+
]
21+
"""
22+
Type alias for a callable that creates a BaseCloudEvent from attributes and data.
23+
24+
Args:
25+
attributes: The CloudEvent attributes (required fields like id, source, type, etc.)
26+
data: The CloudEvent data payload (optional)
27+
28+
Returns:
29+
A BaseCloudEvent instance
30+
"""
1731

1832

1933
class BaseCloudEvent(Protocol):
Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
# Copyright 2018-Present The CloudEvents Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
15+
from dataclasses import dataclass
16+
from datetime import datetime, timezone
17+
from typing import Any, Final
18+
19+
from dateutil.parser import isoparse
20+
21+
from cloudevents.core.base import BaseCloudEvent, EventFactory
22+
from cloudevents.core.formats.base import Format
23+
24+
# AMQP CloudEvents spec allows both cloudEvents_ and cloudEvents: prefixes
25+
# The underscore variant is preferred for JMS 2.0 compatibility
26+
CE_PREFIX_UNDERSCORE: Final[str] = "cloudEvents_"
27+
CE_PREFIX_COLON: Final[str] = "cloudEvents:"
28+
CONTENT_TYPE_PROPERTY: Final[str] = "content-type"
29+
30+
31+
@dataclass(frozen=True)
32+
class AMQPMessage:
33+
"""
34+
Represents an AMQP 1.0 message containing CloudEvent data.
35+
36+
This dataclass encapsulates AMQP message properties, application properties,
37+
and application data for transmitting CloudEvents over AMQP. It is immutable
38+
to prevent accidental modifications and works with any AMQP 1.0 library
39+
(e.g., Pika, aio-pika, qpid-proton, azure-servicebus).
40+
41+
Attributes:
42+
properties: AMQP message properties as a dictionary
43+
application_properties: AMQP application properties as a dictionary
44+
application_data: AMQP application data section as bytes
45+
"""
46+
47+
properties: dict[str, Any]
48+
application_properties: dict[str, Any]
49+
application_data: bytes
50+
51+
52+
def _encode_amqp_value(value: Any) -> Any:
53+
"""
54+
Encode a CloudEvent attribute value for AMQP application properties.
55+
56+
Handles special encoding for datetime objects to AMQP timestamp type
57+
(milliseconds since Unix epoch as int). Per AMQP 1.0 CloudEvents spec,
58+
senders SHOULD use native AMQP types when efficient.
59+
60+
:param value: The attribute value to encode
61+
:return: Encoded value (int for datetime timestamp, original type otherwise)
62+
"""
63+
if isinstance(value, datetime):
64+
# AMQP 1.0 timestamp: milliseconds since Unix epoch (UTC)
65+
timestamp_ms = int(value.timestamp() * 1000)
66+
return timestamp_ms
67+
68+
return value
69+
70+
71+
def _decode_amqp_value(attr_name: str, value: Any) -> Any:
72+
"""
73+
Decode a CloudEvent attribute value from AMQP application properties.
74+
75+
Handles special parsing for the 'time' attribute. Per AMQP 1.0 CloudEvents spec,
76+
receivers MUST accept both native AMQP timestamp (int milliseconds since epoch)
77+
and canonical string form (ISO 8601).
78+
79+
:param attr_name: The name of the CloudEvent attribute
80+
:param value: The AMQP property value
81+
:return: Decoded value (datetime for 'time' attribute, original type otherwise)
82+
"""
83+
if attr_name == "time":
84+
if isinstance(value, int):
85+
# AMQP timestamp: milliseconds since Unix epoch
86+
return datetime.fromtimestamp(value / 1000.0, tz=timezone.utc)
87+
if isinstance(value, str):
88+
# ISO 8601 string (canonical form, also accepted per spec)
89+
return isoparse(value)
90+
91+
return value
92+
93+
94+
def to_binary(event: BaseCloudEvent, event_format: Format) -> AMQPMessage:
95+
"""
96+
Convert a CloudEvent to AMQP binary content mode.
97+
98+
In binary mode, CloudEvent attributes are mapped to AMQP application properties
99+
with the 'cloudEvents_' prefix, except for 'datacontenttype' which maps to the
100+
AMQP 'content-type' property. The event data is placed directly in the AMQP
101+
application-data section. Datetime values are encoded as AMQP timestamp type
102+
(milliseconds since Unix epoch), while boolean and integer values are preserved
103+
as native types.
104+
105+
Note: Per AMQP CloudEvents spec, attributes may use 'cloudEvents_' or 'cloudEvents:'
106+
prefix. This implementation uses 'cloudEvents_' for JMS 2.0 compatibility.
107+
108+
Example:
109+
>>> from cloudevents.core.v1.event import CloudEvent
110+
>>> from cloudevents.core.formats.json import JSONFormat
111+
>>>
112+
>>> event = CloudEvent(
113+
... attributes={"type": "com.example.test", "source": "/test"},
114+
... data={"message": "Hello"}
115+
... )
116+
>>> message = to_binary(event, JSONFormat())
117+
>>> # message.application_properties = {"cloudEvents_type": "com.example.test", ...}
118+
>>> # message.properties = {"content-type": "application/json"}
119+
>>> # message.application_data = b'{"message": "Hello"}'
120+
121+
:param event: The CloudEvent to convert
122+
:param event_format: Format implementation for data serialization
123+
:return: AMQPMessage with CloudEvent attributes as application properties
124+
"""
125+
properties: dict[str, Any] = {}
126+
application_properties: dict[str, Any] = {}
127+
attributes = event.get_attributes()
128+
129+
for attr_name, attr_value in attributes.items():
130+
if attr_name == "datacontenttype":
131+
properties[CONTENT_TYPE_PROPERTY] = str(attr_value)
132+
else:
133+
property_name = f"{CE_PREFIX_UNDERSCORE}{attr_name}"
134+
# Encode datetime to AMQP timestamp (milliseconds since epoch)
135+
# Other types (bool, int, str, bytes) use native AMQP types
136+
application_properties[property_name] = _encode_amqp_value(attr_value)
137+
138+
data = event.get_data()
139+
datacontenttype = attributes.get("datacontenttype")
140+
application_data = event_format.write_data(data, datacontenttype)
141+
142+
return AMQPMessage(
143+
properties=properties,
144+
application_properties=application_properties,
145+
application_data=application_data,
146+
)
147+
148+
149+
def from_binary(
150+
message: AMQPMessage,
151+
event_format: Format,
152+
event_factory: EventFactory,
153+
) -> BaseCloudEvent:
154+
"""
155+
Parse an AMQP binary content mode message to a CloudEvent.
156+
157+
Extracts CloudEvent attributes from AMQP application properties with either
158+
'cloudEvents_' or 'cloudEvents:' prefix (per AMQP CloudEvents spec), and treats
159+
the AMQP 'content-type' property as the 'datacontenttype' attribute. The
160+
application-data section is parsed as event data according to the content type.
161+
The 'time' attribute accepts both AMQP timestamp (int milliseconds) and ISO 8601
162+
string, while other native AMQP types (boolean, integer) are preserved.
163+
164+
Example:
165+
>>> from cloudevents.core.v1.event import CloudEvent
166+
>>> from cloudevents.core.formats.json import JSONFormat
167+
>>>
168+
>>> message = AMQPMessage(
169+
... properties={"content-type": "application/json"},
170+
... application_properties={
171+
... "cloudEvents_type": "com.example.test",
172+
... "cloudEvents_source": "/test",
173+
... "cloudEvents_id": "123",
174+
... "cloudEvents_specversion": "1.0"
175+
... },
176+
... application_data=b'{"message": "Hello"}'
177+
... )
178+
>>> event = from_binary(message, JSONFormat(), CloudEvent)
179+
180+
:param message: AMQPMessage to parse
181+
:param event_format: Format implementation for data deserialization
182+
:param event_factory: Factory function to create CloudEvent instances
183+
:return: CloudEvent instance
184+
"""
185+
attributes: dict[str, Any] = {}
186+
187+
for prop_name, prop_value in message.application_properties.items():
188+
# Check for both cloudEvents_ and cloudEvents: prefixes
189+
attr_name = None
190+
191+
if prop_name.startswith(CE_PREFIX_UNDERSCORE):
192+
attr_name = prop_name[len(CE_PREFIX_UNDERSCORE) :]
193+
elif prop_name.startswith(CE_PREFIX_COLON):
194+
attr_name = prop_name[len(CE_PREFIX_COLON) :]
195+
196+
if attr_name:
197+
# Decode timestamp (int or ISO 8601 string) to datetime, preserve other native types
198+
attributes[attr_name] = _decode_amqp_value(attr_name, prop_value)
199+
200+
if CONTENT_TYPE_PROPERTY in message.properties:
201+
attributes["datacontenttype"] = message.properties[CONTENT_TYPE_PROPERTY]
202+
203+
datacontenttype = attributes.get("datacontenttype")
204+
data = event_format.read_data(message.application_data, datacontenttype)
205+
206+
return event_factory(attributes, data)
207+
208+
209+
def to_structured(event: BaseCloudEvent, event_format: Format) -> AMQPMessage:
210+
"""
211+
Convert a CloudEvent to AMQP structured content mode.
212+
213+
In structured mode, the entire CloudEvent (attributes and data) is serialized
214+
into the AMQP application-data section using the specified format. The
215+
content-type property is set to the format's media type (e.g.,
216+
"application/cloudevents+json").
217+
218+
Example:
219+
>>> from cloudevents.core.v1.event import CloudEvent
220+
>>> from cloudevents.core.formats.json import JSONFormat
221+
>>>
222+
>>> event = CloudEvent(
223+
... attributes={"type": "com.example.test", "source": "/test"},
224+
... data={"message": "Hello"}
225+
... )
226+
>>> message = to_structured(event, JSONFormat())
227+
>>> # message.properties = {"content-type": "application/cloudevents+json"}
228+
>>> # message.application_data = b'{"type": "com.example.test", ...}'
229+
230+
:param event: The CloudEvent to convert
231+
:param event_format: Format implementation for serialization
232+
:return: AMQPMessage with structured content in application-data
233+
"""
234+
content_type = event_format.get_content_type()
235+
236+
properties = {CONTENT_TYPE_PROPERTY: content_type}
237+
application_properties: dict[str, Any] = {}
238+
239+
application_data = event_format.write(event)
240+
241+
return AMQPMessage(
242+
properties=properties,
243+
application_properties=application_properties,
244+
application_data=application_data,
245+
)
246+
247+
248+
def from_structured(
249+
message: AMQPMessage,
250+
event_format: Format,
251+
event_factory: EventFactory,
252+
) -> BaseCloudEvent:
253+
"""
254+
Parse an AMQP structured content mode message to a CloudEvent.
255+
256+
Deserializes the CloudEvent from the AMQP application-data section using the
257+
specified format. Any cloudEvents_-prefixed application properties are ignored
258+
as the application-data contains all event metadata.
259+
260+
Example:
261+
>>> from cloudevents.core.v1.event import CloudEvent
262+
>>> from cloudevents.core.formats.json import JSONFormat
263+
>>>
264+
>>> message = AMQPMessage(
265+
... properties={"content-type": "application/cloudevents+json"},
266+
... application_properties={},
267+
... application_data=b'{"type": "com.example.test", "source": "/test", ...}'
268+
... )
269+
>>> event = from_structured(message, JSONFormat(), CloudEvent)
270+
271+
:param message: AMQPMessage to parse
272+
:param event_format: Format implementation for deserialization
273+
:param event_factory: Factory function to create CloudEvent instances
274+
:return: CloudEvent instance
275+
"""
276+
return event_format.read(event_factory, message.application_data)
277+
278+
279+
def from_amqp(
280+
message: AMQPMessage,
281+
event_format: Format,
282+
event_factory: EventFactory,
283+
) -> BaseCloudEvent:
284+
"""
285+
Parse an AMQP message to a CloudEvent with automatic mode detection.
286+
287+
Automatically detects whether the message uses binary or structured content mode:
288+
- If content-type starts with "application/cloudevents" → structured mode
289+
- Otherwise → binary mode
290+
291+
This function provides a convenient way to handle both content modes without
292+
requiring the caller to determine the mode beforehand.
293+
294+
Example:
295+
>>> from cloudevents.core.v1.event import CloudEvent
296+
>>> from cloudevents.core.formats.json import JSONFormat
297+
>>>
298+
>>> # Works with binary mode
299+
>>> binary_msg = AMQPMessage(
300+
... properties={"content-type": "application/json"},
301+
... application_properties={"cloudEvents_type": "com.example.test", ...},
302+
... application_data=b'...'
303+
... )
304+
>>> event1 = from_amqp(binary_msg, JSONFormat(), CloudEvent)
305+
>>>
306+
>>> # Also works with structured mode
307+
>>> structured_msg = AMQPMessage(
308+
... properties={"content-type": "application/cloudevents+json"},
309+
... application_properties={},
310+
... application_data=b'{"type": "com.example.test", ...}'
311+
... )
312+
>>> event2 = from_amqp(structured_msg, JSONFormat(), CloudEvent)
313+
314+
:param message: AMQPMessage to parse
315+
:param event_format: Format implementation for deserialization
316+
:param event_factory: Factory function to create CloudEvent instances
317+
:return: CloudEvent instance
318+
"""
319+
content_type = message.properties.get(CONTENT_TYPE_PROPERTY, "")
320+
321+
if isinstance(content_type, str) and content_type.lower().startswith(
322+
"application/cloudevents"
323+
):
324+
return from_structured(message, event_format, event_factory)
325+
326+
return from_binary(message, event_format, event_factory)

src/cloudevents/core/bindings/http.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
# under the License.
1414

1515
from dataclasses import dataclass
16-
from typing import Any, Callable, Final
16+
from typing import Any, Final
1717

18-
from cloudevents.core.base import BaseCloudEvent
18+
from cloudevents.core.base import BaseCloudEvent, EventFactory
1919
from cloudevents.core.bindings.common import (
2020
CONTENT_TYPE_HEADER,
2121
DATACONTENTTYPE_ATTR,
@@ -92,9 +92,7 @@ def to_binary(event: BaseCloudEvent, event_format: Format) -> HTTPMessage:
9292
def from_binary(
9393
message: HTTPMessage,
9494
event_format: Format,
95-
event_factory: Callable[
96-
[dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent
97-
],
95+
event_factory: EventFactory,
9896
) -> BaseCloudEvent:
9997
"""
10098
Parse an HTTP binary content mode message to a CloudEvent.
@@ -172,9 +170,7 @@ def to_structured(event: BaseCloudEvent, event_format: Format) -> HTTPMessage:
172170
def from_structured(
173171
message: HTTPMessage,
174172
event_format: Format,
175-
event_factory: Callable[
176-
[dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent
177-
],
173+
event_factory: EventFactory,
178174
) -> BaseCloudEvent:
179175
"""
180176
Parse an HTTP structured content mode message to a CloudEvent.
@@ -203,9 +199,7 @@ def from_structured(
203199
def from_http(
204200
message: HTTPMessage,
205201
event_format: Format,
206-
event_factory: Callable[
207-
[dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent
208-
],
202+
event_factory: EventFactory,
209203
) -> BaseCloudEvent:
210204
"""
211205
Parse an HTTP message to a CloudEvent with automatic mode detection.

0 commit comments

Comments
 (0)