Skip to content

Commit 572de12

Browse files
committed
Added K_CE_OVERRIDE to cloudevents plugin.
1 parent e204a00 commit 572de12

File tree

3 files changed

+94
-0
lines changed

3 files changed

+94
-0
lines changed

eoapi_notifier/outputs/cloudevents.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
Supports standard CloudEvents environment variables and KNative SinkBinding.
66
"""
77

8+
import json
89
import os
910
from typing import Any
1011
from uuid import uuid4
@@ -28,6 +29,7 @@ class CloudEventsConfig(BasePluginConfig):
2829
timeout: float = 30.0
2930
max_retries: int = 3
3031
retry_backoff: float = 1.0
32+
overrides: dict[str, str] = {}
3133

3234
@field_validator("endpoint")
3335
@classmethod
@@ -42,6 +44,10 @@ def apply_knative_overrides(self) -> "CloudEventsConfig":
4244
if k_sink := os.getenv("K_SINK"):
4345
self.endpoint = k_sink
4446

47+
if k_ce_overrides := os.getenv("K_CE_OVERRIDES"):
48+
overrides_data = json.loads(k_ce_overrides)
49+
self.overrides = overrides_data.get("extensions", {})
50+
4551
return self
4652

4753
@classmethod
@@ -197,6 +203,17 @@ def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent:
197203
source = self.config.source
198204
event_type_base = self.config.event_type
199205

206+
# Apply KNative CE overrides if present
207+
ce_extensions = {}
208+
if k_ce_overrides := os.getenv("K_CE_OVERRIDES"):
209+
try:
210+
overrides_data = json.loads(k_ce_overrides)
211+
ce_extensions = overrides_data.get("extensions", {})
212+
except json.JSONDecodeError:
213+
self.logger.warning(
214+
"Invalid K_CE_OVERRIDES JSON, ignoring: %s", k_ce_overrides
215+
)
216+
200217
# Map operation to event type suffix
201218
operation_map = {"INSERT": "created", "UPDATE": "updated", "DELETE": "deleted"}
202219
operation = operation_map.get(event.operation.upper(), event.operation.lower())
@@ -217,6 +234,10 @@ def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent:
217234
if event.collection:
218235
attributes["collection"] = event.collection
219236

237+
# Apply KNative CE extension overrides
238+
for key, value in ce_extensions.items():
239+
attributes[key] = str(value)
240+
220241
# Event data payload
221242
data = {
222243
"id": event.id,

helm-chart/eoapi-notifier/values.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ secrets:
7878
# KNative Support:
7979
# The cloudevents plugin supports K_SINK variables for KNative SinkBinding:
8080
# - K_SINK: Overrides CLOUDEVENTS_ENDPOINT (automatically set by SinkBinding)
81+
# - K_CE_OVERRIDE: A JSON object that specifies overrides to the outbound event.
8182
env: {}
8283
# Examples - Standard environment variables:
8384
# PGSTAC_HOST: postgresql-service

tests/test_cloudevents_output.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,29 @@ def test_connection_info(self) -> None:
5353
config = CloudEventsConfig(endpoint="https://example.com/webhook")
5454
assert "POST https://example.com/webhook" in config.get_connection_info()
5555

56+
@patch.dict(
57+
os.environ,
58+
{"K_CE_OVERRIDES": '{"extensions": {"extra": "test", "num": 42}}'},
59+
)
60+
def test_k_ce_overrides_valid(self) -> None:
61+
"""Test K_CE_OVERRIDES with valid extensions."""
62+
config = CloudEventsConfig()
63+
assert config.overrides == {"extra": "test", "num": 42}
64+
65+
@patch.dict(os.environ, {"K_CE_OVERRIDES": '{"other": "field"}'})
66+
def test_k_ce_overrides_no_extensions(self) -> None:
67+
"""Test K_CE_OVERRIDES without extensions field."""
68+
config = CloudEventsConfig()
69+
assert config.overrides == {}
70+
71+
@patch.dict(os.environ, {"K_CE_OVERRIDES": "invalid-json"})
72+
def test_k_ce_overrides_invalid_json(self) -> None:
73+
"""Test K_CE_OVERRIDES with invalid JSON."""
74+
from pydantic import ValidationError
75+
76+
with pytest.raises(ValidationError):
77+
CloudEventsConfig()
78+
5679

5780
class TestCloudEventsAdapter:
5881
"""Test CloudEvents output adapter."""
@@ -229,3 +252,52 @@ async def test_health_check(self, adapter: CloudEventsAdapter) -> None:
229252
# Running with client
230253
adapter._client = MagicMock()
231254
assert await adapter.health_check() is True
255+
256+
@patch.dict(
257+
os.environ,
258+
{
259+
"K_CE_OVERRIDES": (
260+
'{"extensions": {"extra": "test-value", "priority": "high"}}'
261+
)
262+
},
263+
)
264+
def test_convert_to_cloudevent_with_overrides(
265+
self, adapter: CloudEventsAdapter, sample_event: NotificationEvent
266+
) -> None:
267+
"""Test CloudEvent conversion with K_CE_OVERRIDES."""
268+
cloud_event = adapter._convert_to_cloudevent(sample_event)
269+
270+
assert isinstance(cloud_event, CloudEvent)
271+
assert cloud_event["extra"] == "test-value"
272+
assert cloud_event["priority"] == "high"
273+
274+
@patch.dict(os.environ, {"K_CE_OVERRIDES": '{"extensions": {"number": 123}}'})
275+
def test_convert_to_cloudevent_with_number_override(
276+
self, adapter: CloudEventsAdapter, sample_event: NotificationEvent
277+
) -> None:
278+
"""Test CloudEvent conversion with number in K_CE_OVERRIDES."""
279+
cloud_event = adapter._convert_to_cloudevent(sample_event)
280+
281+
assert cloud_event["number"] == "123" # Should be converted to string
282+
283+
@patch.dict(os.environ, {"K_CE_OVERRIDES": "invalid-json"})
284+
def test_convert_to_cloudevent_invalid_overrides(
285+
self, adapter: CloudEventsAdapter, sample_event: NotificationEvent
286+
) -> None:
287+
"""Test CloudEvent conversion with invalid K_CE_OVERRIDES JSON."""
288+
cloud_event = adapter._convert_to_cloudevent(sample_event)
289+
290+
# Should work normally without overrides
291+
assert isinstance(cloud_event, CloudEvent)
292+
assert cloud_event["source"] == "/eoapi/stac"
293+
294+
@patch.dict(os.environ, {"K_CE_OVERRIDES": '{"other": "field"}'})
295+
def test_convert_to_cloudevent_no_extensions(
296+
self, adapter: CloudEventsAdapter, sample_event: NotificationEvent
297+
) -> None:
298+
"""Test CloudEvent conversion with K_CE_OVERRIDES but no extensions field."""
299+
cloud_event = adapter._convert_to_cloudevent(sample_event)
300+
301+
# Should work normally without extensions
302+
assert isinstance(cloud_event, CloudEvent)
303+
assert cloud_event["source"] == "/eoapi/stac"

0 commit comments

Comments
 (0)