diff --git a/minioevents.py b/minioevents.py index e0252b0..12ae1dd 100644 --- a/minioevents.py +++ b/minioevents.py @@ -8,14 +8,16 @@ import sys from typing import TYPE_CHECKING, Any, NoReturn -from cloudevents.http import CloudEvent -from cloudevents.kafka import KafkaMessage, to_structured +from cloudevents.core.bindings.kafka import KafkaMessage, to_structured_event +from cloudevents.core.v1.event import CloudEvent from configargparse import ArgumentParser # type: ignore[import-untyped] +from dateutil import parser as dtparser # type: ignore[import-untyped] from kafka import KafkaConsumer, KafkaProducer # type: ignore[import-untyped] if TYPE_CHECKING: # pragma: no cover from collections.abc import Generator + from cloudevents.core.base import BaseCloudEvent from kafka.consumer.fetcher import ConsumerRecord # type: ignore[import-untyped] logger = logging.getLogger(__name__) @@ -24,34 +26,34 @@ def from_consumer_record(msg: ConsumerRecord) -> Generator[CloudEvent, None, None]: """Convert msg to an array of CloudEvents using a naive implementation of https://github.com/cloudevents/spec/blob/main/cloudevents/adapters/aws-s3.md.""" for rec in json.loads(msg.value).get("Records", []): - yield CloudEvent( - { - "id": ".".join( - [ - rec.get("responseElements", {}).get("x-amz-request-id"), - rec.get("responseElements", {}).get("x-amz-id-2"), - ], - ), - "source": ".".join( - [ - rec.get("eventSource"), - rec.get("awsRegion"), - rec.get("s3", {}).get("bucket", {}).get("name"), - ], - ), - "specversion": "1.0", - "type": ".".join( - [ - "com.amazonaws.s3", - rec.get("eventName"), - ], - ), - "datacontenttype": "application/json", - "subject": rec.get("s3", {}).get("object", {}).get("key"), - "time": rec.get("eventTime"), - }, - rec, - ) + event_time_str = rec.get("eventTime") + attributes: dict[str, Any] = { + "id": ".".join( + [ + rec.get("responseElements", {}).get("x-amz-request-id"), + rec.get("responseElements", {}).get("x-amz-id-2"), + ], + ), + "source": ".".join( + [ + rec.get("eventSource"), + rec.get("awsRegion"), + rec.get("s3", {}).get("bucket", {}).get("name"), + ], + ), + "specversion": "1.0", + "type": ".".join( + [ + "com.amazonaws.s3", + rec.get("eventName"), + ], + ), + "datacontenttype": "application/json", + "subject": rec.get("s3", {}).get("object", {}).get("key"), + } + if event_time_str: + attributes["time"] = dtparser.parse(event_time_str) + yield CloudEvent(attributes=attributes, data=rec) def app( # noqa: PLR0913 @@ -98,18 +100,18 @@ def on_sigint(*_: Any) -> NoReturn: # noqa: ANN401 # pragma: no cover def on_send_error(ex: Exception) -> None: # pragma: no cover logger.error("Failed to send CloudEvent", exc_info=ex) - def _key_mapper(ce: CloudEvent) -> str: + def _key_mapper(ce: BaseCloudEvent) -> str | bytes | None: return ".".join( [ - ce.get("type"), # type: ignore[list-item] - ce.get("source"), # type: ignore[list-item] - ce.get("subject"), # type: ignore[list-item] + ce.get_type(), + ce.get_source(), + ce.get_subject() or "", ], ) for msg in consumer: for ce in from_consumer_record(msg): - km: KafkaMessage = to_structured(ce, key_mapper=_key_mapper) + km: KafkaMessage = to_structured_event(ce, key_mapper=_key_mapper) headers: list[tuple[str, bytes]] | None if km.headers: headers = list(km.headers.items()) diff --git a/poetry.lock b/poetry.lock index a3d073b..c5e77c7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -68,21 +68,19 @@ colorama = {version = "*", markers = "platform_system == \"Windows\""} [[package]] name = "cloudevents" -version = "1.12.1" +version = "2.0.0" description = "CloudEvents Python SDK" optional = false -python-versions = "*" +python-versions = ">=3.10" groups = ["main"] files = [ - {file = "cloudevents-1.12.1-py3-none-any.whl", hash = "sha256:5f1574bf49ff334381319bdddcab02175a69ca877a26d0900b59c85505b675b3"}, - {file = "cloudevents-1.12.1.tar.gz", hash = "sha256:1eb52051309c3228934f86f41d50bd02c0e3d9561e5876e73ad8ad9f98f0d3ef"}, + {file = "cloudevents-2.0.0-py3-none-any.whl", hash = "sha256:babb257989a933b18312897c3bf3a7cc65e270831227822f2db10b50d1278546"}, + {file = "cloudevents-2.0.0.tar.gz", hash = "sha256:3224851b2ac902b868ed4bc1aa10772ada8cdf85f180322b12968d90a1dceef2"}, ] [package.dependencies] deprecation = ">=2.0,<3.0" - -[package.extras] -pydantic = ["pydantic (>=1.0.0,<3.0)"] +python-dateutil = ">=2.8.2" [[package]] name = "colorama" @@ -600,6 +598,21 @@ files = [ pytest = ">=5" ruff = ">=0.0.242" +[[package]] +name = "python-dateutil" +version = "2.9.0.post0" +description = "Extensions to the standard Python datetime module" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" +groups = ["main"] +files = [ + {file = "python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3"}, + {file = "python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427"}, +] + +[package.dependencies] +six = ">=1.5" + [[package]] name = "pytokens" version = "0.4.1" @@ -683,6 +696,18 @@ files = [ {file = "ruff-0.15.7.tar.gz", hash = "sha256:04f1ae61fc20fe0b148617c324d9d009b5f63412c0b16474f3d5f1a1a665f7ac"}, ] +[[package]] +name = "six" +version = "1.17.0" +description = "Python 2 and 3 compatibility utilities" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" +groups = ["main"] +files = [ + {file = "six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274"}, + {file = "six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81"}, +] + [[package]] name = "typing-extensions" version = "4.15.0" @@ -698,4 +723,4 @@ files = [ [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "a3887568f6f35991c1324772bea2bcbcfef00a1489c0e0984b3e8924ea3aa079" +content-hash = "2972c15dfded4932b2ca818b117c16e6917c8690b3d421fb3c269bb68575d2c8" diff --git a/pyproject.toml b/pyproject.toml index 4e7103a..a83c9ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,7 @@ minioevents = 'minioevents:main' [tool.poetry.dependencies] python = "^3.11" -cloudevents = "^1.12.1" +cloudevents = "^2.0.0" ConfigArgParse = "^1.7" kafka-python-ng = "^2.2.3" diff --git a/tests/test_minioevents.py b/tests/test_minioevents.py index 1c94812..91cfa03 100644 --- a/tests/test_minioevents.py +++ b/tests/test_minioevents.py @@ -3,11 +3,13 @@ from unittest.mock import patch import pytest -from cloudevents.http import CloudEvent +from dateutil import parser as dtparser # type: ignore[import-untyped] from kafka.consumer.fetcher import ConsumerRecord # type: ignore[import-untyped] from minioevents import app, from_consumer_record +_EVENT_TIME = "2023-01-15T12:34:56.000Z" +_EVENT_TIME_SERIALIZED = "2023-01-15T12:34:56Z" _CONSUMER_RECORD = ConsumerRecord( topic="test", partition=0, @@ -34,7 +36,7 @@ } }, "eventName": "eventname", - "eventTime": "eventtime" + "eventTime": "2023-01-15T12:34:56.000Z" } ] } @@ -48,42 +50,41 @@ @pytest.mark.parametrize( - ("minioevent", "expected"), + ("minioevent", "expected_attrs", "expected_data"), [ ( _CONSUMER_RECORD, - CloudEvent( - { - "id": "x-amz-request-id.x-amz-id-2", - "source": "eventsource..bucketname", - "specversion": "1.0", - "type": "com.amazonaws.s3.eventname", - "datacontenttype": "application/json", - "subject": "objectkey", - "time": "eventtime", + { + "id": "x-amz-request-id.x-amz-id-2", + "source": "eventsource..bucketname", + "specversion": "1.0", + "type": "com.amazonaws.s3.eventname", + "datacontenttype": "application/json", + "subject": "objectkey", + "time": dtparser.parse(_EVENT_TIME), + }, + { + "responseElements": { + "x-amz-request-id": "x-amz-request-id", + "x-amz-id-2": "x-amz-id-2", }, - { - "responseElements": { - "x-amz-request-id": "x-amz-request-id", - "x-amz-id-2": "x-amz-id-2", - }, - "eventSource": "eventsource", - "awsRegion": "", - "s3": { - "bucket": {"name": "bucketname"}, - "object": {"key": "objectkey"}, - }, - "eventName": "eventname", - "eventTime": "eventtime", + "eventSource": "eventsource", + "awsRegion": "", + "s3": { + "bucket": {"name": "bucketname"}, + "object": {"key": "objectkey"}, }, - ), + "eventName": "eventname", + "eventTime": _EVENT_TIME, + }, ), ], ) -def test_from_consumer_record(minioevent, expected): +def test_from_consumer_record(minioevent, expected_attrs, expected_data): called = False for ce in from_consumer_record(minioevent): - assert ce == expected + assert ce.get_attributes() == expected_attrs + assert ce.get_data() == expected_data called = True assert called @@ -116,8 +117,9 @@ def test_app(mock_consumer, mock_producer): "source": "eventsource..bucketname", "specversion": "1.0", "type": "com.amazonaws.s3.eventname", + "datacontenttype": "application/json", "subject": "objectkey", - "time": "eventtime", + "time": _EVENT_TIME_SERIALIZED, "data": { "responseElements": { "x-amz-request-id": "x-amz-request-id", @@ -130,11 +132,11 @@ def test_app(mock_consumer, mock_producer): "object": {"key": "objectkey"}, }, "eventName": "eventname", - "eventTime": "eventtime", + "eventTime": _EVENT_TIME, }, }, ), "utf-8", ), - headers=[("content-type", b"application/json")], + headers=[("content-type", b"application/cloudevents+json")], )