Skip to content

Commit 8ddb4bb

Browse files
committed
feat(v2): full AMQP 1.0 implementation
Signed-off-by: Tudor Plugaru <[email protected]>
1 parent db71318 commit 8ddb4bb

File tree

2 files changed

+984
-0
lines changed

2 files changed

+984
-0
lines changed
Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
# Copyright 2018-Present The CloudEvents Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
15+
from dataclasses import dataclass
16+
from datetime import datetime, timezone
17+
from typing import Any, Callable, Final
18+
19+
from dateutil.parser import isoparse
20+
21+
from cloudevents.core.base import BaseCloudEvent
22+
from cloudevents.core.formats.base import Format
23+
24+
CE_PREFIX: Final[str] = "cloudEvents_"
25+
CONTENT_TYPE_PROPERTY: Final[str] = "content-type"
26+
27+
28+
@dataclass(frozen=True)
29+
class AMQPMessage:
30+
"""
31+
Represents an AMQP 1.0 message containing CloudEvent data.
32+
33+
This dataclass encapsulates AMQP message properties, application properties,
34+
and application data for transmitting CloudEvents over AMQP. It is immutable
35+
to prevent accidental modifications and works with any AMQP 1.0 library
36+
(e.g., Pika, aio-pika, qpid-proton, azure-servicebus).
37+
38+
Attributes:
39+
properties: AMQP message properties as a dictionary
40+
application_properties: AMQP application properties as a dictionary
41+
application_data: AMQP application data section as bytes
42+
"""
43+
44+
properties: dict[str, Any]
45+
application_properties: dict[str, Any]
46+
application_data: bytes
47+
48+
49+
def _encode_amqp_value(value: Any) -> Any:
50+
"""
51+
Encode a CloudEvent attribute value for AMQP application properties.
52+
53+
Handles special encoding for datetime objects to AMQP timestamp type
54+
(milliseconds since Unix epoch as int). Per AMQP 1.0 CloudEvents spec,
55+
senders SHOULD use native AMQP types when efficient.
56+
57+
:param value: The attribute value to encode
58+
:return: Encoded value (int for datetime timestamp, original type otherwise)
59+
"""
60+
if isinstance(value, datetime):
61+
# AMQP 1.0 timestamp: milliseconds since Unix epoch (UTC)
62+
timestamp_ms = int(value.timestamp() * 1000)
63+
return timestamp_ms
64+
65+
return value
66+
67+
68+
def _decode_amqp_value(attr_name: str, value: Any) -> Any:
69+
"""
70+
Decode a CloudEvent attribute value from AMQP application properties.
71+
72+
Handles special parsing for the 'time' attribute. Per AMQP 1.0 CloudEvents spec,
73+
receivers MUST accept both native AMQP timestamp (int milliseconds since epoch)
74+
and canonical string form (ISO 8601).
75+
76+
:param attr_name: The name of the CloudEvent attribute
77+
:param value: The AMQP property value
78+
:return: Decoded value (datetime for 'time' attribute, original type otherwise)
79+
"""
80+
if attr_name == "time":
81+
if isinstance(value, int):
82+
# AMQP timestamp: milliseconds since Unix epoch
83+
return datetime.fromtimestamp(value / 1000.0, tz=timezone.utc)
84+
if isinstance(value, str):
85+
# ISO 8601 string (canonical form, also accepted per spec)
86+
return isoparse(value)
87+
88+
return value
89+
90+
91+
def to_binary(event: BaseCloudEvent, event_format: Format) -> AMQPMessage:
92+
"""
93+
Convert a CloudEvent to AMQP binary content mode.
94+
95+
In binary mode, CloudEvent attributes are mapped to AMQP application properties
96+
with the 'cloudEvents_' prefix, except for 'datacontenttype' which maps to the
97+
AMQP 'content-type' property. The event data is placed directly in the AMQP
98+
application-data section. Datetime values are encoded as AMQP timestamp type
99+
(milliseconds since Unix epoch), while boolean and integer values are preserved
100+
as native types.
101+
102+
Example:
103+
>>> from cloudevents.core.v1.event import CloudEvent
104+
>>> from cloudevents.core.formats.json import JSONFormat
105+
>>>
106+
>>> event = CloudEvent(
107+
... attributes={"type": "com.example.test", "source": "/test"},
108+
... data={"message": "Hello"}
109+
... )
110+
>>> message = to_binary(event, JSONFormat())
111+
>>> # message.application_properties = {"cloudEvents_type": "com.example.test", ...}
112+
>>> # message.properties = {"content-type": "application/json"}
113+
>>> # message.application_data = b'{"message": "Hello"}'
114+
115+
:param event: The CloudEvent to convert
116+
:param event_format: Format implementation for data serialization
117+
:return: AMQPMessage with CloudEvent attributes as application properties
118+
"""
119+
properties: dict[str, Any] = {}
120+
application_properties: dict[str, Any] = {}
121+
attributes = event.get_attributes()
122+
123+
for attr_name, attr_value in attributes.items():
124+
if attr_name == "datacontenttype":
125+
properties[CONTENT_TYPE_PROPERTY] = str(attr_value)
126+
else:
127+
property_name = f"{CE_PREFIX}{attr_name}"
128+
# Encode datetime to AMQP timestamp (milliseconds since epoch)
129+
# Other types (bool, int, str, bytes) use native AMQP types
130+
application_properties[property_name] = _encode_amqp_value(attr_value)
131+
132+
data = event.get_data()
133+
datacontenttype = attributes.get("datacontenttype")
134+
application_data = event_format.write_data(data, datacontenttype)
135+
136+
return AMQPMessage(
137+
properties=properties,
138+
application_properties=application_properties,
139+
application_data=application_data,
140+
)
141+
142+
143+
def from_binary(
144+
message: AMQPMessage,
145+
event_format: Format,
146+
event_factory: Callable[
147+
[dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent
148+
],
149+
) -> BaseCloudEvent:
150+
"""
151+
Parse an AMQP binary content mode message to a CloudEvent.
152+
153+
Extracts CloudEvent attributes from cloudEvents_-prefixed AMQP application
154+
properties and treats the AMQP 'content-type' property as the 'datacontenttype'
155+
attribute. The application-data section is parsed as event data according to
156+
the content type. The 'time' attribute accepts both AMQP timestamp (int milliseconds)
157+
and ISO 8601 string, while other native AMQP types (boolean, integer) are preserved.
158+
159+
Example:
160+
>>> from cloudevents.core.v1.event import CloudEvent
161+
>>> from cloudevents.core.formats.json import JSONFormat
162+
>>>
163+
>>> message = AMQPMessage(
164+
... properties={"content-type": "application/json"},
165+
... application_properties={
166+
... "cloudEvents_type": "com.example.test",
167+
... "cloudEvents_source": "/test",
168+
... "cloudEvents_id": "123",
169+
... "cloudEvents_specversion": "1.0"
170+
... },
171+
... application_data=b'{"message": "Hello"}'
172+
... )
173+
>>> event = from_binary(message, JSONFormat(), CloudEvent)
174+
175+
:param message: AMQPMessage to parse
176+
:param event_format: Format implementation for data deserialization
177+
:param event_factory: Factory function to create CloudEvent instances
178+
:return: CloudEvent instance
179+
"""
180+
attributes: dict[str, Any] = {}
181+
182+
for prop_name, prop_value in message.application_properties.items():
183+
if prop_name.startswith(CE_PREFIX):
184+
attr_name = prop_name[len(CE_PREFIX) :]
185+
# Decode timestamp (int or ISO 8601 string) to datetime, preserve other native types
186+
attributes[attr_name] = _decode_amqp_value(attr_name, prop_value)
187+
188+
if CONTENT_TYPE_PROPERTY in message.properties:
189+
attributes["datacontenttype"] = message.properties[CONTENT_TYPE_PROPERTY]
190+
191+
datacontenttype = attributes.get("datacontenttype")
192+
data = event_format.read_data(message.application_data, datacontenttype)
193+
194+
return event_factory(attributes, data)
195+
196+
197+
def to_structured(event: BaseCloudEvent, event_format: Format) -> AMQPMessage:
198+
"""
199+
Convert a CloudEvent to AMQP structured content mode.
200+
201+
In structured mode, the entire CloudEvent (attributes and data) is serialized
202+
into the AMQP application-data section using the specified format. The
203+
content-type property is set to the format's media type (e.g.,
204+
"application/cloudevents+json").
205+
206+
Example:
207+
>>> from cloudevents.core.v1.event import CloudEvent
208+
>>> from cloudevents.core.formats.json import JSONFormat
209+
>>>
210+
>>> event = CloudEvent(
211+
... attributes={"type": "com.example.test", "source": "/test"},
212+
... data={"message": "Hello"}
213+
... )
214+
>>> message = to_structured(event, JSONFormat())
215+
>>> # message.properties = {"content-type": "application/cloudevents+json"}
216+
>>> # message.application_data = b'{"type": "com.example.test", ...}'
217+
218+
:param event: The CloudEvent to convert
219+
:param event_format: Format implementation for serialization
220+
:return: AMQPMessage with structured content in application-data
221+
"""
222+
content_type = event_format.get_content_type()
223+
224+
properties = {CONTENT_TYPE_PROPERTY: content_type}
225+
application_properties: dict[str, Any] = {}
226+
227+
application_data = event_format.write(event)
228+
229+
return AMQPMessage(
230+
properties=properties,
231+
application_properties=application_properties,
232+
application_data=application_data,
233+
)
234+
235+
236+
def from_structured(
237+
message: AMQPMessage,
238+
event_format: Format,
239+
event_factory: Callable[
240+
[dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent
241+
],
242+
) -> BaseCloudEvent:
243+
"""
244+
Parse an AMQP structured content mode message to a CloudEvent.
245+
246+
Deserializes the CloudEvent from the AMQP application-data section using the
247+
specified format. Any cloudEvents_-prefixed application properties are ignored
248+
as the application-data contains all event metadata.
249+
250+
Example:
251+
>>> from cloudevents.core.v1.event import CloudEvent
252+
>>> from cloudevents.core.formats.json import JSONFormat
253+
>>>
254+
>>> message = AMQPMessage(
255+
... properties={"content-type": "application/cloudevents+json"},
256+
... application_properties={},
257+
... application_data=b'{"type": "com.example.test", "source": "/test", ...}'
258+
... )
259+
>>> event = from_structured(message, JSONFormat(), CloudEvent)
260+
261+
:param message: AMQPMessage to parse
262+
:param event_format: Format implementation for deserialization
263+
:param event_factory: Factory function to create CloudEvent instances
264+
:return: CloudEvent instance
265+
"""
266+
return event_format.read(event_factory, message.application_data)
267+
268+
269+
def from_amqp(
270+
message: AMQPMessage,
271+
event_format: Format,
272+
event_factory: Callable[
273+
[dict[str, Any], dict[str, Any] | str | bytes | None], BaseCloudEvent
274+
],
275+
) -> BaseCloudEvent:
276+
"""
277+
Parse an AMQP message to a CloudEvent with automatic mode detection.
278+
279+
Automatically detects whether the message uses binary or structured content mode:
280+
- If content-type starts with "application/cloudevents" → structured mode
281+
- Otherwise → binary mode
282+
283+
This function provides a convenient way to handle both content modes without
284+
requiring the caller to determine the mode beforehand.
285+
286+
Example:
287+
>>> from cloudevents.core.v1.event import CloudEvent
288+
>>> from cloudevents.core.formats.json import JSONFormat
289+
>>>
290+
>>> # Works with binary mode
291+
>>> binary_msg = AMQPMessage(
292+
... properties={"content-type": "application/json"},
293+
... application_properties={"cloudEvents_type": "com.example.test", ...},
294+
... application_data=b'...'
295+
... )
296+
>>> event1 = from_amqp(binary_msg, JSONFormat(), CloudEvent)
297+
>>>
298+
>>> # Also works with structured mode
299+
>>> structured_msg = AMQPMessage(
300+
... properties={"content-type": "application/cloudevents+json"},
301+
... application_properties={},
302+
... application_data=b'{"type": "com.example.test", ...}'
303+
... )
304+
>>> event2 = from_amqp(structured_msg, JSONFormat(), CloudEvent)
305+
306+
:param message: AMQPMessage to parse
307+
:param event_format: Format implementation for deserialization
308+
:param event_factory: Factory function to create CloudEvent instances
309+
:return: CloudEvent instance
310+
"""
311+
content_type = message.properties.get(CONTENT_TYPE_PROPERTY, "")
312+
313+
if isinstance(content_type, str) and content_type.lower().startswith(
314+
"application/cloudevents"
315+
):
316+
return from_structured(message, event_format, event_factory)
317+
318+
return from_binary(message, event_format, event_factory)

0 commit comments

Comments
 (0)