Skip to content

Commit bc19738

Browse files
committed
feat: implement http bindings
Signed-off-by: Tudor Plugaru <[email protected]>
1 parent 3b8a073 commit bc19738

File tree

8 files changed

+1443
-25
lines changed

8 files changed

+1443
-25
lines changed

src/cloudevents/core/base.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515

1616
from datetime import datetime
17-
from typing import Any, Optional, Protocol, Union
17+
from typing import Any, Dict, Optional, Protocol, Union
1818

1919

2020
class BaseCloudEvent(Protocol):
@@ -27,7 +27,9 @@ class BaseCloudEvent(Protocol):
2727
"""
2828

2929
def __init__(
30-
self, attributes: dict[str, Any], data: Optional[Union[dict, str, bytes]] = None
30+
self,
31+
attributes: Dict[str, Any],
32+
data: Optional[Union[Dict[str, Any], str, bytes]] = None,
3133
) -> None:
3234
"""
3335
Create a new CloudEvent instance.
@@ -113,7 +115,7 @@ def get_extension(self, extension_name: str) -> Any:
113115
"""
114116
...
115117

116-
def get_data(self) -> Optional[Union[dict, str, bytes]]:
118+
def get_data(self) -> Optional[Union[Dict[str, Any], str, bytes]]:
117119
"""
118120
Retrieve data of the event.
119121
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Copyright 2018-Present The CloudEvents Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
15+
from cloudevents.core.bindings.http import (
16+
HTTPMessage,
17+
from_binary,
18+
from_http,
19+
from_structured,
20+
to_binary,
21+
to_structured,
22+
)
23+
24+
__all__ = [
25+
"HTTPMessage",
26+
"to_binary",
27+
"from_binary",
28+
"to_structured",
29+
"from_structured",
30+
"from_http",
31+
]
Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
# Copyright 2018-Present The CloudEvents Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
15+
from dataclasses import dataclass
16+
from datetime import datetime
17+
from typing import Any, Callable, Dict, Optional, Union
18+
from urllib.parse import quote, unquote
19+
20+
from dateutil.parser import isoparse
21+
22+
from cloudevents.core.base import BaseCloudEvent
23+
from cloudevents.core.formats.base import Format
24+
25+
CE_PREFIX: str = "ce-"
26+
CONTENT_TYPE_HEADER: str = "content-type"
27+
28+
29+
@dataclass(frozen=True)
30+
class HTTPMessage:
31+
"""
32+
Represents an HTTP message (request or response) containing CloudEvent data.
33+
34+
This dataclass encapsulates HTTP headers and body for transmitting CloudEvents
35+
over HTTP. It is immutable to prevent accidental modifications and works with
36+
any HTTP framework or library.
37+
38+
Attributes:
39+
headers: HTTP headers as a dictionary with string keys and values
40+
body: HTTP body as bytes
41+
"""
42+
43+
headers: Dict[str, str]
44+
body: bytes
45+
46+
47+
def _normalize_headers(headers: Dict[str, str]) -> Dict[str, str]:
48+
"""
49+
Normalize HTTP headers by converting all keys to lowercase.
50+
51+
:param headers: Original headers dictionary
52+
:return: New dictionary with lowercase header names
53+
"""
54+
return {key.lower(): value for key, value in headers.items()}
55+
56+
57+
def _encode_header_value(value: Any) -> str:
58+
"""
59+
Encode a CloudEvent attribute value for use in an HTTP header.
60+
61+
Handles special encoding for datetime objects (ISO 8601 with 'Z' suffix for UTC)
62+
and applies percent-encoding for non-ASCII and special characters per RFC 3986.
63+
64+
:param value: The attribute value to encode
65+
:return: Percent-encoded string suitable for HTTP headers
66+
"""
67+
if isinstance(value, datetime):
68+
str_value = value.isoformat()
69+
if str_value.endswith("+00:00"):
70+
str_value = str_value[:-6] + "Z"
71+
return quote(str_value, safe="")
72+
73+
return quote(str(value), safe="")
74+
75+
76+
def _decode_header_value(attr_name: str, value: str) -> Any:
77+
"""
78+
Decode a CloudEvent attribute value from an HTTP header.
79+
80+
Applies percent-decoding and special parsing for the 'time' attribute
81+
(converts to datetime object using RFC 3339 parsing).
82+
83+
:param attr_name: The name of the CloudEvent attribute
84+
:param value: The percent-encoded header value
85+
:return: Decoded value (datetime for 'time' attribute, string otherwise)
86+
"""
87+
decoded = unquote(value)
88+
89+
if attr_name == "time":
90+
return isoparse(decoded)
91+
92+
return decoded
93+
94+
95+
def to_binary(event: BaseCloudEvent, event_format: Format) -> HTTPMessage:
96+
"""
97+
Convert a CloudEvent to HTTP binary content mode.
98+
99+
In binary mode, CloudEvent attributes are mapped to HTTP headers with the 'ce-' prefix,
100+
except for 'datacontenttype' which maps to the 'Content-Type' header. The event data
101+
is placed directly in the HTTP body.
102+
103+
Example:
104+
>>> from cloudevents.core.v1.event import CloudEvent
105+
>>> from cloudevents.core.formats.json import JSONFormat
106+
>>>
107+
>>> event = CloudEvent(
108+
... attributes={"type": "com.example.test", "source": "/test"},
109+
... data={"message": "Hello"}
110+
... )
111+
>>> message = to_binary(event, JSONFormat())
112+
>>> # message.headers = {"ce-type": "com.example.test", "ce-source": "/test", ...}
113+
>>> # message.body = b'{"message": "Hello"}'
114+
115+
:param event: The CloudEvent to convert
116+
:param event_format: Format implementation for data serialization
117+
:return: HTTPMessage with ce-prefixed headers and event data as body
118+
"""
119+
headers: Dict[str, str] = {}
120+
attributes = event.get_attributes()
121+
122+
for attr_name, attr_value in attributes.items():
123+
if attr_value is None:
124+
continue
125+
126+
if attr_name == "datacontenttype":
127+
headers[CONTENT_TYPE_HEADER] = str(attr_value)
128+
else:
129+
header_name = f"{CE_PREFIX}{attr_name}"
130+
headers[header_name] = _encode_header_value(attr_value)
131+
132+
data = event.get_data()
133+
datacontenttype = attributes.get("datacontenttype")
134+
body = event_format.write_data(data, datacontenttype)
135+
136+
return HTTPMessage(headers=headers, body=body)
137+
138+
139+
def from_binary(
140+
message: HTTPMessage,
141+
event_format: Format,
142+
event_factory: Callable[
143+
[Dict[str, Any], Optional[Union[Dict[str, Any], str, bytes]]], BaseCloudEvent
144+
],
145+
) -> BaseCloudEvent:
146+
"""
147+
Parse an HTTP binary content mode message to a CloudEvent.
148+
149+
Extracts CloudEvent attributes from ce-prefixed HTTP headers and treats the
150+
'Content-Type' header as the 'datacontenttype' attribute. The HTTP body is
151+
parsed as event data according to the content type.
152+
153+
Example:
154+
>>> from cloudevents.core.v1.event import CloudEvent
155+
>>> from cloudevents.core.formats.json import JSONFormat
156+
>>>
157+
>>> message = HTTPMessage(
158+
... headers={"ce-type": "com.example.test", "ce-source": "/test",
159+
... "ce-id": "123", "ce-specversion": "1.0"},
160+
... body=b'{"message": "Hello"}'
161+
... )
162+
>>> event = from_binary(message, JSONFormat(), CloudEvent)
163+
164+
:param message: HTTPMessage to parse
165+
:param event_format: Format implementation for data deserialization
166+
:param event_factory: Factory function to create CloudEvent instances
167+
:return: CloudEvent instance
168+
"""
169+
normalized_headers = _normalize_headers(message.headers)
170+
171+
attributes: Dict[str, Any] = {}
172+
173+
for header_name, header_value in normalized_headers.items():
174+
if header_name.startswith(CE_PREFIX):
175+
attr_name = header_name[len(CE_PREFIX) :]
176+
attributes[attr_name] = _decode_header_value(attr_name, header_value)
177+
178+
if CONTENT_TYPE_HEADER in normalized_headers:
179+
attributes["datacontenttype"] = normalized_headers[CONTENT_TYPE_HEADER]
180+
181+
datacontenttype = attributes.get("datacontenttype")
182+
data = event_format.read_data(message.body, datacontenttype)
183+
184+
return event_factory(attributes, data)
185+
186+
187+
def to_structured(event: BaseCloudEvent, event_format: Format) -> HTTPMessage:
188+
"""
189+
Convert a CloudEvent to HTTP structured content mode.
190+
191+
In structured mode, the entire CloudEvent (attributes and data) is serialized
192+
into the HTTP body using the specified format. The Content-Type header is set
193+
to the format's media type.
194+
195+
Example:
196+
>>> from cloudevents.core.v1.event import CloudEvent
197+
>>> from cloudevents.core.formats.json import JSONFormat
198+
>>>
199+
>>> event = CloudEvent(
200+
... attributes={"type": "com.example.test", "source": "/test"},
201+
... data={"message": "Hello"}
202+
... )
203+
>>> message = to_structured(event, JSONFormat())
204+
>>> # message.headers = {"content-type": "application/cloudevents+json"}
205+
>>> # message.body = b'{"type": "com.example.test", "source": "/test", ...}'
206+
207+
:param event: The CloudEvent to convert
208+
:param event_format: Format implementation for serialization
209+
:return: HTTPMessage with structured content in body
210+
"""
211+
content_type = event_format.get_content_type()
212+
213+
headers = {CONTENT_TYPE_HEADER: content_type}
214+
215+
body = event_format.write(event)
216+
217+
return HTTPMessage(headers=headers, body=body)
218+
219+
220+
def from_structured(
221+
message: HTTPMessage,
222+
event_format: Format,
223+
event_factory: Callable[
224+
[Dict[str, Any], Optional[Union[Dict[str, Any], str, bytes]]], BaseCloudEvent
225+
],
226+
) -> BaseCloudEvent:
227+
"""
228+
Parse an HTTP structured content mode message to a CloudEvent.
229+
230+
Deserializes the CloudEvent from the HTTP body using the specified format.
231+
Any ce-prefixed headers are ignored as the body contains all event metadata.
232+
233+
Example:
234+
>>> from cloudevents.core.v1.event import CloudEvent
235+
>>> from cloudevents.core.formats.json import JSONFormat
236+
>>>
237+
>>> message = HTTPMessage(
238+
... headers={"content-type": "application/cloudevents+json"},
239+
... body=b'{"type": "com.example.test", "source": "/test", ...}'
240+
... )
241+
>>> event = from_structured(message, JSONFormat(), CloudEvent)
242+
243+
:param message: HTTPMessage to parse
244+
:param event_format: Format implementation for deserialization
245+
:param event_factory: Factory function to create CloudEvent instances
246+
:return: CloudEvent instance
247+
"""
248+
return event_format.read(event_factory, message.body)
249+
250+
251+
def from_http(
252+
message: HTTPMessage,
253+
event_format: Format,
254+
event_factory: Callable[
255+
[Dict[str, Any], Optional[Union[Dict[str, Any], str, bytes]]], BaseCloudEvent
256+
],
257+
) -> BaseCloudEvent:
258+
"""
259+
Parse an HTTP message to a CloudEvent with automatic mode detection.
260+
261+
Automatically detects whether the message uses binary or structured content mode:
262+
- If any ce- prefixed headers are present → binary mode
263+
- Otherwise → structured mode
264+
265+
This function provides a convenient way to handle both content modes without
266+
requiring the caller to determine the mode beforehand.
267+
268+
Example:
269+
>>> from cloudevents.core.v1.event import CloudEvent
270+
>>> from cloudevents.core.formats.json import JSONFormat
271+
>>>
272+
>>> # Works with binary mode
273+
>>> binary_msg = HTTPMessage(
274+
... headers={"ce-type": "com.example.test", ...},
275+
... body=b'...'
276+
... )
277+
>>> event1 = from_http(binary_msg, JSONFormat(), CloudEvent)
278+
>>>
279+
>>> # Also works with structured mode
280+
>>> structured_msg = HTTPMessage(
281+
... headers={"content-type": "application/cloudevents+json"},
282+
... body=b'{"type": "com.example.test", ...}'
283+
... )
284+
>>> event2 = from_http(structured_msg, JSONFormat(), CloudEvent)
285+
286+
:param message: HTTPMessage to parse
287+
:param event_format: Format implementation for deserialization
288+
:param event_factory: Factory function to create CloudEvent instances
289+
:return: CloudEvent instance
290+
"""
291+
normalized_headers = _normalize_headers(message.headers)
292+
293+
for header_name in normalized_headers.keys():
294+
if header_name.startswith(CE_PREFIX):
295+
return from_binary(message, event_format, event_factory)
296+
297+
return from_structured(message, event_format, event_factory)

0 commit comments

Comments
 (0)