Skip to content

Commit 0415e41

Browse files
committed
Added K_CE_OVERRIDE to cloudevents plugin.
1 parent 6e3b560 commit 0415e41

File tree

3 files changed

+94
-0
lines changed

3 files changed

+94
-0
lines changed

eoapi_notifier/outputs/cloudevents.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
Supports standard CloudEvents environment variables and KNative SinkBinding.
66
"""
77

8+
import json
89
import os
910
from typing import Any
1011
from uuid import uuid4
@@ -29,6 +30,7 @@ class CloudEventsConfig(BasePluginConfig):
2930
max_retries: int = 3
3031
retry_backoff: float = 1.0
3132
max_header_length: int = 4096
33+
overrides: dict[str, str] = {}
3234

3335
@field_validator("endpoint")
3436
@classmethod
@@ -43,6 +45,10 @@ def apply_knative_overrides(self) -> "CloudEventsConfig":
4345
if k_sink := os.getenv("K_SINK"):
4446
self.endpoint = k_sink
4547

48+
if k_ce_overrides := os.getenv("K_CE_OVERRIDES"):
49+
overrides_data = json.loads(k_ce_overrides)
50+
self.overrides = overrides_data.get("extensions", {})
51+
4652
return self
4753

4854
@classmethod
@@ -209,6 +215,17 @@ def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent:
209215
source = self.config.source
210216
event_type_base = self.config.event_type
211217

218+
# Apply KNative CE overrides if present
219+
ce_extensions = {}
220+
if k_ce_overrides := os.getenv("K_CE_OVERRIDES"):
221+
try:
222+
overrides_data = json.loads(k_ce_overrides)
223+
ce_extensions = overrides_data.get("extensions", {})
224+
except json.JSONDecodeError:
225+
self.logger.warning(
226+
"Invalid K_CE_OVERRIDES JSON, ignoring: %s", k_ce_overrides
227+
)
228+
212229
# Map operation to event type suffix
213230
operation_map = {"INSERT": "created", "UPDATE": "updated", "DELETE": "deleted"}
214231
operation = operation_map.get(event.operation.upper(), event.operation.lower())
@@ -233,6 +250,10 @@ def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent:
233250
if truncated_collection:
234251
attributes["collection"] = truncated_collection
235252

253+
# Apply KNative CE extension overrides
254+
for key, value in ce_extensions.items():
255+
attributes[key] = str(value)
256+
236257
# Event data payload
237258
data = {
238259
"id": event.id,

helm-chart/eoapi-notifier/values.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ secrets:
8282
# KNative Support:
8383
# The cloudevents plugin supports K_SINK variables for KNative SinkBinding:
8484
# - K_SINK: Overrides CLOUDEVENTS_ENDPOINT (automatically set by SinkBinding)
85+
# - K_CE_OVERRIDE: A JSON object that specifies overrides to the outbound event.
8586
env: {}
8687
# Examples - Standard environment variables:
8788
# PGSTAC_HOST: postgresql-service

tests/test_cloudevents_output.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,29 @@ def test_connection_info(self) -> None:
5454
config = CloudEventsConfig(endpoint="https://example.com/webhook")
5555
assert "POST https://example.com/webhook" in config.get_connection_info()
5656

57+
@patch.dict(
58+
os.environ,
59+
{"K_CE_OVERRIDES": '{"extensions": {"extra": "test", "num": 42}}'},
60+
)
61+
def test_k_ce_overrides_valid(self) -> None:
62+
"""Test K_CE_OVERRIDES with valid extensions."""
63+
config = CloudEventsConfig()
64+
assert config.overrides == {"extra": "test", "num": 42}
65+
66+
@patch.dict(os.environ, {"K_CE_OVERRIDES": '{"other": "field"}'})
67+
def test_k_ce_overrides_no_extensions(self) -> None:
68+
"""Test K_CE_OVERRIDES without extensions field."""
69+
config = CloudEventsConfig()
70+
assert config.overrides == {}
71+
72+
@patch.dict(os.environ, {"K_CE_OVERRIDES": "invalid-json"})
73+
def test_k_ce_overrides_invalid_json(self) -> None:
74+
"""Test K_CE_OVERRIDES with invalid JSON."""
75+
from pydantic import ValidationError
76+
77+
with pytest.raises(ValidationError):
78+
CloudEventsConfig()
79+
5780

5881
class TestCloudEventsAdapter:
5982
"""Test CloudEvents output adapter."""
@@ -286,3 +309,52 @@ async def test_health_check(self, adapter: CloudEventsAdapter) -> None:
286309
# Running with client
287310
adapter._client = MagicMock()
288311
assert await adapter.health_check() is True
312+
313+
@patch.dict(
314+
os.environ,
315+
{
316+
"K_CE_OVERRIDES": (
317+
'{"extensions": {"extra": "test-value", "priority": "high"}}'
318+
)
319+
},
320+
)
321+
def test_convert_to_cloudevent_with_overrides(
322+
self, adapter: CloudEventsAdapter, sample_event: NotificationEvent
323+
) -> None:
324+
"""Test CloudEvent conversion with K_CE_OVERRIDES."""
325+
cloud_event = adapter._convert_to_cloudevent(sample_event)
326+
327+
assert isinstance(cloud_event, CloudEvent)
328+
assert cloud_event["extra"] == "test-value"
329+
assert cloud_event["priority"] == "high"
330+
331+
@patch.dict(os.environ, {"K_CE_OVERRIDES": '{"extensions": {"number": 123}}'})
332+
def test_convert_to_cloudevent_with_number_override(
333+
self, adapter: CloudEventsAdapter, sample_event: NotificationEvent
334+
) -> None:
335+
"""Test CloudEvent conversion with number in K_CE_OVERRIDES."""
336+
cloud_event = adapter._convert_to_cloudevent(sample_event)
337+
338+
assert cloud_event["number"] == "123" # Should be converted to string
339+
340+
@patch.dict(os.environ, {"K_CE_OVERRIDES": "invalid-json"})
341+
def test_convert_to_cloudevent_invalid_overrides(
342+
self, adapter: CloudEventsAdapter, sample_event: NotificationEvent
343+
) -> None:
344+
"""Test CloudEvent conversion with invalid K_CE_OVERRIDES JSON."""
345+
cloud_event = adapter._convert_to_cloudevent(sample_event)
346+
347+
# Should work normally without overrides
348+
assert isinstance(cloud_event, CloudEvent)
349+
assert cloud_event["source"] == "/eoapi/stac"
350+
351+
@patch.dict(os.environ, {"K_CE_OVERRIDES": '{"other": "field"}'})
352+
def test_convert_to_cloudevent_no_extensions(
353+
self, adapter: CloudEventsAdapter, sample_event: NotificationEvent
354+
) -> None:
355+
"""Test CloudEvent conversion with K_CE_OVERRIDES but no extensions field."""
356+
cloud_event = adapter._convert_to_cloudevent(sample_event)
357+
358+
# Should work normally without extensions
359+
assert isinstance(cloud_event, CloudEvent)
360+
assert cloud_event["source"] == "/eoapi/stac"

0 commit comments

Comments
 (0)