Skip to content

Commit a25029e

Browse files
committed
feat(v2): Support both _ and : when reading AMQP messages
Signed-off-by: Tudor Plugaru <[email protected]>
1 parent b3ae91d commit a25029e

File tree

2 files changed

+233
-9
lines changed

2 files changed

+233
-9
lines changed

src/cloudevents/core/bindings/amqp.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
from cloudevents.core.base import BaseCloudEvent
2222
from cloudevents.core.formats.base import Format
2323

24-
CE_PREFIX: Final[str] = "cloudEvents_"
24+
# AMQP CloudEvents spec allows both cloudEvents_ and cloudEvents: prefixes
25+
# The underscore variant is preferred for JMS 2.0 compatibility
26+
CE_PREFIX_UNDERSCORE: Final[str] = "cloudEvents_"
27+
CE_PREFIX_COLON: Final[str] = "cloudEvents:"
2528
CONTENT_TYPE_PROPERTY: Final[str] = "content-type"
2629

2730

@@ -99,6 +102,9 @@ def to_binary(event: BaseCloudEvent, event_format: Format) -> AMQPMessage:
99102
(milliseconds since Unix epoch), while boolean and integer values are preserved
100103
as native types.
101104
105+
Note: Per AMQP CloudEvents spec, attributes may use 'cloudEvents_' or 'cloudEvents:'
106+
prefix. This implementation uses 'cloudEvents_' for JMS 2.0 compatibility.
107+
102108
Example:
103109
>>> from cloudevents.core.v1.event import CloudEvent
104110
>>> from cloudevents.core.formats.json import JSONFormat
@@ -124,7 +130,7 @@ def to_binary(event: BaseCloudEvent, event_format: Format) -> AMQPMessage:
124130
if attr_name == "datacontenttype":
125131
properties[CONTENT_TYPE_PROPERTY] = str(attr_value)
126132
else:
127-
property_name = f"{CE_PREFIX}{attr_name}"
133+
property_name = f"{CE_PREFIX_UNDERSCORE}{attr_name}"
128134
# Encode datetime to AMQP timestamp (milliseconds since epoch)
129135
# Other types (bool, int, str, bytes) use native AMQP types
130136
application_properties[property_name] = _encode_amqp_value(attr_value)
@@ -150,11 +156,12 @@ def from_binary(
150156
"""
151157
Parse an AMQP binary content mode message to a CloudEvent.
152158
153-
Extracts CloudEvent attributes from cloudEvents_-prefixed AMQP application
154-
properties and treats the AMQP 'content-type' property as the 'datacontenttype'
155-
attribute. The application-data section is parsed as event data according to
156-
the content type. The 'time' attribute accepts both AMQP timestamp (int milliseconds)
157-
and ISO 8601 string, while other native AMQP types (boolean, integer) are preserved.
159+
Extracts CloudEvent attributes from AMQP application properties with either
160+
'cloudEvents_' or 'cloudEvents:' prefix (per AMQP CloudEvents spec), and treats
161+
the AMQP 'content-type' property as the 'datacontenttype' attribute. The
162+
application-data section is parsed as event data according to the content type.
163+
The 'time' attribute accepts both AMQP timestamp (int milliseconds) and ISO 8601
164+
string, while other native AMQP types (boolean, integer) are preserved.
158165
159166
Example:
160167
>>> from cloudevents.core.v1.event import CloudEvent
@@ -180,8 +187,15 @@ def from_binary(
180187
attributes: dict[str, Any] = {}
181188

182189
for prop_name, prop_value in message.application_properties.items():
183-
if prop_name.startswith(CE_PREFIX):
184-
attr_name = prop_name[len(CE_PREFIX) :]
190+
# Check for both cloudEvents_ and cloudEvents: prefixes
191+
attr_name = None
192+
193+
if prop_name.startswith(CE_PREFIX_UNDERSCORE):
194+
attr_name = prop_name[len(CE_PREFIX_UNDERSCORE) :]
195+
elif prop_name.startswith(CE_PREFIX_COLON):
196+
attr_name = prop_name[len(CE_PREFIX_COLON) :]
197+
198+
if attr_name:
185199
# Decode timestamp (int or ISO 8601 string) to datetime, preserve other native types
186200
attributes[attr_name] = _decode_amqp_value(attr_name, prop_value)
187201

tests/test_core/test_bindings/test_amqp.py

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,3 +664,213 @@ def test_from_binary_ignores_non_cloudevents_properties() -> None:
664664
# get_extension returns None for missing extensions
665665
assert event.get_extension("custom_property") is None
666666
assert event.get_extension("another_prop") is None
667+
668+
669+
def test_from_binary_with_colon_prefix() -> None:
670+
"""Test from_binary accepts cloudEvents: prefix per AMQP spec"""
671+
message = AMQPMessage(
672+
properties={"content-type": "application/json"},
673+
application_properties={
674+
"cloudEvents:type": "com.example.test",
675+
"cloudEvents:source": "/test",
676+
"cloudEvents:id": "test-123",
677+
"cloudEvents:specversion": "1.0",
678+
},
679+
application_data=b'{"message": "Hello"}',
680+
)
681+
event = from_binary(message, JSONFormat(), CloudEvent)
682+
683+
assert event.get_type() == "com.example.test"
684+
assert event.get_source() == "/test"
685+
assert event.get_id() == "test-123"
686+
assert event.get_specversion() == "1.0"
687+
assert event.get_data() == {"message": "Hello"}
688+
689+
690+
def test_from_binary_colon_prefix_with_extensions() -> None:
691+
"""Test from_binary with cloudEvents: prefix handles extensions"""
692+
message = AMQPMessage(
693+
properties={},
694+
application_properties={
695+
"cloudEvents:type": "test",
696+
"cloudEvents:source": "/test",
697+
"cloudEvents:id": "123",
698+
"cloudEvents:specversion": "1.0",
699+
"cloudEvents:customext": "custom-value",
700+
"cloudEvents:boolext": True,
701+
"cloudEvents:intext": 42,
702+
},
703+
application_data=b"{}",
704+
)
705+
event = from_binary(message, JSONFormat(), CloudEvent)
706+
707+
assert event.get_extension("customext") == "custom-value"
708+
assert event.get_extension("boolext") is True
709+
assert event.get_extension("intext") == 42
710+
711+
712+
def test_from_binary_colon_prefix_with_datetime() -> None:
713+
"""Test from_binary with cloudEvents: prefix handles datetime"""
714+
dt = datetime(2023, 1, 15, 10, 30, 45, tzinfo=timezone.utc)
715+
timestamp_ms = int(dt.timestamp() * 1000)
716+
717+
message = AMQPMessage(
718+
properties={},
719+
application_properties={
720+
"cloudEvents:type": "test",
721+
"cloudEvents:source": "/test",
722+
"cloudEvents:id": "123",
723+
"cloudEvents:specversion": "1.0",
724+
"cloudEvents:time": timestamp_ms, # AMQP timestamp
725+
},
726+
application_data=b"{}",
727+
)
728+
event = from_binary(message, JSONFormat(), CloudEvent)
729+
730+
assert event.get_time() == dt
731+
732+
733+
def test_from_binary_colon_prefix_round_trip() -> None:
734+
"""Test round-trip with cloudEvents: prefix (manual construction)"""
735+
# Create event with underscore prefix
736+
original_event = create_event(
737+
extra_attrs={"customext": "value", "datacontenttype": "application/json"},
738+
data={"message": "test"},
739+
)
740+
message_underscore = to_binary(original_event, JSONFormat())
741+
742+
# Manually construct message with colon prefix (simulate receiving from another system)
743+
message_colon = AMQPMessage(
744+
properties=message_underscore.properties,
745+
application_properties={
746+
# Convert underscore to colon prefix
747+
key.replace("cloudEvents_", "cloudEvents:"): value
748+
for key, value in message_underscore.application_properties.items()
749+
},
750+
application_data=message_underscore.application_data,
751+
)
752+
753+
# Should parse correctly
754+
recovered = from_binary(message_colon, JSONFormat(), CloudEvent)
755+
756+
assert recovered.get_type() == original_event.get_type()
757+
assert recovered.get_source() == original_event.get_source()
758+
assert recovered.get_extension("customext") == "value"
759+
assert recovered.get_data() == {"message": "test"}
760+
761+
762+
def test_from_binary_mixed_prefixes_accepted() -> None:
763+
"""Test from_binary accepts mixed cloudEvents_ and cloudEvents: prefixes"""
764+
message = AMQPMessage(
765+
properties={},
766+
application_properties={
767+
"cloudEvents_type": "test", # Underscore
768+
"cloudEvents:source": "/test", # Colon - mixed is OK
769+
"cloudEvents_id": "123",
770+
"cloudEvents_specversion": "1.0",
771+
},
772+
application_data=b"{}",
773+
)
774+
775+
event = from_binary(message, JSONFormat(), CloudEvent)
776+
777+
# Should extract all attributes regardless of prefix
778+
assert event.get_type() == "test"
779+
assert event.get_source() == "/test"
780+
assert event.get_id() == "123"
781+
assert event.get_specversion() == "1.0"
782+
783+
784+
def test_from_amqp_with_colon_prefix_binary_mode() -> None:
785+
"""Test from_amqp detects binary mode with cloudEvents: prefix"""
786+
message = AMQPMessage(
787+
properties={"content-type": "application/json"},
788+
application_properties={
789+
"cloudEvents:type": "test",
790+
"cloudEvents:source": "/test",
791+
"cloudEvents:id": "123",
792+
"cloudEvents:specversion": "1.0",
793+
},
794+
application_data=b'{"data": "value"}',
795+
)
796+
797+
event = from_amqp(message, JSONFormat(), CloudEvent)
798+
799+
assert event.get_type() == "test"
800+
assert event.get_source() == "/test"
801+
assert event.get_data() == {"data": "value"}
802+
803+
804+
def test_from_amqp_mixed_prefixes_accepted() -> None:
805+
"""Test from_amqp accepts mixed prefixes"""
806+
message = AMQPMessage(
807+
properties={"content-type": "application/json"},
808+
application_properties={
809+
"cloudEvents_type": "test",
810+
"cloudEvents:source": "/test", # Mixed is OK
811+
"cloudEvents_id": "123",
812+
"cloudEvents_specversion": "1.0",
813+
},
814+
application_data=b"{}",
815+
)
816+
817+
event = from_amqp(message, JSONFormat(), CloudEvent)
818+
819+
assert event.get_type() == "test"
820+
assert event.get_source() == "/test"
821+
822+
823+
def test_from_binary_all_underscore_prefix_valid() -> None:
824+
"""Test from_binary accepts all cloudEvents_ prefix (baseline)"""
825+
message = AMQPMessage(
826+
properties={},
827+
application_properties={
828+
"cloudEvents_type": "test",
829+
"cloudEvents_source": "/test",
830+
"cloudEvents_id": "123",
831+
"cloudEvents_specversion": "1.0",
832+
},
833+
application_data=b"{}",
834+
)
835+
836+
event = from_binary(message, JSONFormat(), CloudEvent)
837+
assert event.get_type() == "test"
838+
839+
840+
def test_from_binary_all_colon_prefix_valid() -> None:
841+
"""Test from_binary accepts all cloudEvents: prefix"""
842+
message = AMQPMessage(
843+
properties={},
844+
application_properties={
845+
"cloudEvents:type": "test",
846+
"cloudEvents:source": "/test",
847+
"cloudEvents:id": "123",
848+
"cloudEvents:specversion": "1.0",
849+
},
850+
application_data=b"{}",
851+
)
852+
853+
event = from_binary(message, JSONFormat(), CloudEvent)
854+
assert event.get_type() == "test"
855+
856+
857+
def test_from_binary_colon_prefix_ignores_non_ce_properties() -> None:
858+
"""Test from_binary with colon prefix ignores non-CloudEvents properties"""
859+
message = AMQPMessage(
860+
properties={},
861+
application_properties={
862+
"cloudEvents:type": "test",
863+
"cloudEvents:source": "/test",
864+
"cloudEvents:id": "123",
865+
"cloudEvents:specversion": "1.0",
866+
"customProperty": "ignored", # No prefix
867+
"anotherProp": 123,
868+
},
869+
application_data=b"{}",
870+
)
871+
872+
event = from_binary(message, JSONFormat(), CloudEvent)
873+
874+
assert event.get_type() == "test"
875+
assert event.get_extension("customProperty") is None
876+
assert event.get_extension("anotherProp") is None

0 commit comments

Comments
 (0)