Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 37 additions & 35 deletions minioevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
import sys
from typing import TYPE_CHECKING, Any, NoReturn

from cloudevents.http import CloudEvent
from cloudevents.kafka import KafkaMessage, to_structured
from cloudevents.core.bindings.kafka import KafkaMessage, to_structured_event
from cloudevents.core.v1.event import CloudEvent
from configargparse import ArgumentParser # type: ignore[import-untyped]
from dateutil import parser as dtparser # type: ignore[import-untyped]
from kafka import KafkaConsumer, KafkaProducer # type: ignore[import-untyped]

if TYPE_CHECKING: # pragma: no cover
from collections.abc import Generator

from cloudevents.core.base import BaseCloudEvent
from kafka.consumer.fetcher import ConsumerRecord # type: ignore[import-untyped]

logger = logging.getLogger(__name__)
Expand All @@ -24,34 +26,34 @@
def from_consumer_record(msg: ConsumerRecord) -> Generator[CloudEvent, None, None]:
"""Convert msg to an array of CloudEvents using a naive implementation of https://github.com/cloudevents/spec/blob/main/cloudevents/adapters/aws-s3.md."""
for rec in json.loads(msg.value).get("Records", []):
yield CloudEvent(
{
"id": ".".join(
[
rec.get("responseElements", {}).get("x-amz-request-id"),
rec.get("responseElements", {}).get("x-amz-id-2"),
],
),
"source": ".".join(
[
rec.get("eventSource"),
rec.get("awsRegion"),
rec.get("s3", {}).get("bucket", {}).get("name"),
],
),
"specversion": "1.0",
"type": ".".join(
[
"com.amazonaws.s3",
rec.get("eventName"),
],
),
"datacontenttype": "application/json",
"subject": rec.get("s3", {}).get("object", {}).get("key"),
"time": rec.get("eventTime"),
},
rec,
)
event_time_str = rec.get("eventTime")
attributes: dict[str, Any] = {
"id": ".".join(
[
rec.get("responseElements", {}).get("x-amz-request-id"),
rec.get("responseElements", {}).get("x-amz-id-2"),
],
),
"source": ".".join(
[
rec.get("eventSource"),
rec.get("awsRegion"),
rec.get("s3", {}).get("bucket", {}).get("name"),
],
),
"specversion": "1.0",
"type": ".".join(
[
"com.amazonaws.s3",
rec.get("eventName"),
],
),
"datacontenttype": "application/json",
"subject": rec.get("s3", {}).get("object", {}).get("key"),
}
if event_time_str:
attributes["time"] = dtparser.parse(event_time_str)
yield CloudEvent(attributes=attributes, data=rec)


def app( # noqa: PLR0913
Expand Down Expand Up @@ -98,18 +100,18 @@ def on_sigint(*_: Any) -> NoReturn: # noqa: ANN401 # pragma: no cover
def on_send_error(ex: Exception) -> None: # pragma: no cover
logger.error("Failed to send CloudEvent", exc_info=ex)

def _key_mapper(ce: CloudEvent) -> str:
def _key_mapper(ce: BaseCloudEvent) -> str | bytes | None:
return ".".join(
[
ce.get("type"), # type: ignore[list-item]
ce.get("source"), # type: ignore[list-item]
ce.get("subject"), # type: ignore[list-item]
ce.get_type(),
ce.get_source(),
ce.get_subject() or "",
],
)

for msg in consumer:
for ce in from_consumer_record(msg):
km: KafkaMessage = to_structured(ce, key_mapper=_key_mapper)
km: KafkaMessage = to_structured_event(ce, key_mapper=_key_mapper)
headers: list[tuple[str, bytes]] | None
if km.headers:
headers = list(km.headers.items())
Expand Down
41 changes: 33 additions & 8 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ minioevents = 'minioevents:main'

[tool.poetry.dependencies]
python = "^3.11"
cloudevents = "^1.12.1"
cloudevents = "^2.0.0"
ConfigArgParse = "^1.7"
kafka-python-ng = "^2.2.3"

Expand Down
64 changes: 33 additions & 31 deletions tests/test_minioevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from unittest.mock import patch

import pytest
from cloudevents.http import CloudEvent
from dateutil import parser as dtparser # type: ignore[import-untyped]
from kafka.consumer.fetcher import ConsumerRecord # type: ignore[import-untyped]

from minioevents import app, from_consumer_record

_EVENT_TIME = "2023-01-15T12:34:56.000Z"
_EVENT_TIME_SERIALIZED = "2023-01-15T12:34:56Z"
_CONSUMER_RECORD = ConsumerRecord(
topic="test",
partition=0,
Expand All @@ -34,7 +36,7 @@
}
},
"eventName": "eventname",
"eventTime": "eventtime"
"eventTime": "2023-01-15T12:34:56.000Z"
}
]
}
Expand All @@ -48,42 +50,41 @@


@pytest.mark.parametrize(
("minioevent", "expected"),
("minioevent", "expected_attrs", "expected_data"),
[
(
_CONSUMER_RECORD,
CloudEvent(
{
"id": "x-amz-request-id.x-amz-id-2",
"source": "eventsource..bucketname",
"specversion": "1.0",
"type": "com.amazonaws.s3.eventname",
"datacontenttype": "application/json",
"subject": "objectkey",
"time": "eventtime",
{
"id": "x-amz-request-id.x-amz-id-2",
"source": "eventsource..bucketname",
"specversion": "1.0",
"type": "com.amazonaws.s3.eventname",
"datacontenttype": "application/json",
"subject": "objectkey",
"time": dtparser.parse(_EVENT_TIME),
},
{
"responseElements": {
"x-amz-request-id": "x-amz-request-id",
"x-amz-id-2": "x-amz-id-2",
},
{
"responseElements": {
"x-amz-request-id": "x-amz-request-id",
"x-amz-id-2": "x-amz-id-2",
},
"eventSource": "eventsource",
"awsRegion": "",
"s3": {
"bucket": {"name": "bucketname"},
"object": {"key": "objectkey"},
},
"eventName": "eventname",
"eventTime": "eventtime",
"eventSource": "eventsource",
"awsRegion": "",
"s3": {
"bucket": {"name": "bucketname"},
"object": {"key": "objectkey"},
},
),
"eventName": "eventname",
"eventTime": _EVENT_TIME,
},
),
],
)
def test_from_consumer_record(minioevent, expected):
def test_from_consumer_record(minioevent, expected_attrs, expected_data):
called = False
for ce in from_consumer_record(minioevent):
assert ce == expected
assert ce.get_attributes() == expected_attrs
assert ce.get_data() == expected_data
called = True
assert called

Expand Down Expand Up @@ -116,8 +117,9 @@ def test_app(mock_consumer, mock_producer):
"source": "eventsource..bucketname",
"specversion": "1.0",
"type": "com.amazonaws.s3.eventname",
"datacontenttype": "application/json",
"subject": "objectkey",
"time": "eventtime",
"time": _EVENT_TIME_SERIALIZED,
"data": {
"responseElements": {
"x-amz-request-id": "x-amz-request-id",
Expand All @@ -130,11 +132,11 @@ def test_app(mock_consumer, mock_producer):
"object": {"key": "objectkey"},
},
"eventName": "eventname",
"eventTime": "eventtime",
"eventTime": _EVENT_TIME,
},
},
),
"utf-8",
),
headers=[("content-type", b"application/json")],
headers=[("content-type", b"application/cloudevents+json")],
)
Loading