Skip to content

Commit 116327e

Browse files
authored
Added K_CE_OVERRIDE to cloudevents plugin. (#3)
1 parent 6e3b560 commit 116327e

File tree

3 files changed

+114
-0
lines changed

3 files changed

+114
-0
lines changed

eoapi_notifier/outputs/cloudevents.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
Supports standard CloudEvents environment variables and KNative SinkBinding.
66
"""
77

8+
import json
89
import os
910
from typing import Any
1011
from uuid import uuid4
@@ -29,6 +30,7 @@ class CloudEventsConfig(BasePluginConfig):
2930
max_retries: int = 3
3031
retry_backoff: float = 1.0
3132
max_header_length: int = 4096
33+
overrides: dict[str, str] = {}
3234

3335
@field_validator("endpoint")
3436
@classmethod
@@ -43,6 +45,10 @@ def apply_knative_overrides(self) -> "CloudEventsConfig":
4345
if k_sink := os.getenv("K_SINK"):
4446
self.endpoint = k_sink
4547

48+
if k_ce_overrides := os.getenv("K_CE_OVERRIDES"):
49+
overrides_data = json.loads(k_ce_overrides)
50+
self.overrides = overrides_data.get("extensions", {})
51+
4652
return self
4753

4854
@classmethod
@@ -88,6 +94,26 @@ def __init__(self, config: CloudEventsConfig) -> None:
8894
super().__init__(config)
8995
self.config: CloudEventsConfig = config
9096
self._client: httpx.AsyncClient | None = None
97+
# Parse K_CE_OVERRIDES once during initialization
98+
self._ce_extensions = self._parse_k_ce_overrides()
99+
100+
def _parse_k_ce_overrides(self) -> dict[str, str]:
101+
"""Parse K_CE_OVERRIDES environment variable once during initialization."""
102+
k_ce_overrides = os.getenv("K_CE_OVERRIDES")
103+
if not k_ce_overrides:
104+
return {}
105+
106+
try:
107+
overrides_data = json.loads(k_ce_overrides)
108+
extensions = overrides_data.get("extensions", {})
109+
if isinstance(extensions, dict):
110+
return {str(k): str(v) for k, v in extensions.items()}
111+
return {}
112+
except json.JSONDecodeError:
113+
self.logger.warning(
114+
"Invalid K_CE_OVERRIDES JSON, ignoring: %s", k_ce_overrides
115+
)
116+
return {}
91117

92118
async def start(self) -> None:
93119
"""Start the HTTP client."""
@@ -209,6 +235,9 @@ def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent:
209235
source = self.config.source
210236
event_type_base = self.config.event_type
211237

238+
# Use pre-parsed KNative CE overrides
239+
ce_extensions = self._ce_extensions
240+
212241
# Map operation to event type suffix
213242
operation_map = {"INSERT": "created", "UPDATE": "updated", "DELETE": "deleted"}
214243
operation = operation_map.get(event.operation.upper(), event.operation.lower())
@@ -233,6 +262,10 @@ def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent:
233262
if truncated_collection:
234263
attributes["collection"] = truncated_collection
235264

265+
# Apply KNative CE extension overrides
266+
for key, value in ce_extensions.items():
267+
attributes[key] = str(value)
268+
236269
# Event data payload
237270
data = {
238271
"id": event.id,

helm-chart/eoapi-notifier/values.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ secrets:
8282
# KNative Support:
8383
# The cloudevents plugin supports K_SINK variables for KNative SinkBinding:
8484
# - K_SINK: Overrides CLOUDEVENTS_ENDPOINT (automatically set by SinkBinding)
85+
# - K_CE_OVERRIDE: A JSON object that specifies overrides to the outbound event.
8586
env: {}
8687
# Examples - Standard environment variables:
8788
# PGSTAC_HOST: postgresql-service

tests/test_cloudevents_output.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,29 @@ def test_connection_info(self) -> None:
5454
config = CloudEventsConfig(endpoint="https://example.com/webhook")
5555
assert "POST https://example.com/webhook" in config.get_connection_info()
5656

57+
@patch.dict(
58+
os.environ,
59+
{"K_CE_OVERRIDES": '{"extensions": {"extra": "test", "num": 42}}'},
60+
)
61+
def test_k_ce_overrides_valid(self) -> None:
62+
"""Test K_CE_OVERRIDES with valid extensions."""
63+
config = CloudEventsConfig()
64+
assert config.overrides == {"extra": "test", "num": 42}
65+
66+
@patch.dict(os.environ, {"K_CE_OVERRIDES": '{"other": "field"}'})
67+
def test_k_ce_overrides_no_extensions(self) -> None:
68+
"""Test K_CE_OVERRIDES without extensions field."""
69+
config = CloudEventsConfig()
70+
assert config.overrides == {}
71+
72+
@patch.dict(os.environ, {"K_CE_OVERRIDES": "invalid-json"})
73+
def test_k_ce_overrides_invalid_json(self) -> None:
74+
"""Test K_CE_OVERRIDES with invalid JSON."""
75+
from pydantic import ValidationError
76+
77+
with pytest.raises(ValidationError):
78+
CloudEventsConfig()
79+
5780

5881
class TestCloudEventsAdapter:
5982
"""Test CloudEvents output adapter."""
@@ -286,3 +309,60 @@ async def test_health_check(self, adapter: CloudEventsAdapter) -> None:
286309
# Running with client
287310
adapter._client = MagicMock()
288311
assert await adapter.health_check() is True
312+
313+
@patch.dict(
314+
os.environ,
315+
{
316+
"K_CE_OVERRIDES": (
317+
'{"extensions": {"extra": "test-value", "priority": "high"}}'
318+
)
319+
},
320+
)
321+
def test_convert_to_cloudevent_with_overrides(
322+
self, config: CloudEventsConfig, sample_event: NotificationEvent
323+
) -> None:
324+
"""Test CloudEvent conversion with K_CE_OVERRIDES."""
325+
# Create adapter after environment variable is set
326+
adapter = CloudEventsAdapter(config)
327+
cloud_event = adapter._convert_to_cloudevent(sample_event)
328+
329+
assert isinstance(cloud_event, CloudEvent)
330+
assert cloud_event["extra"] == "test-value"
331+
assert cloud_event["priority"] == "high"
332+
333+
@patch.dict(os.environ, {"K_CE_OVERRIDES": '{"extensions": {"number": 123}}'})
334+
def test_convert_to_cloudevent_with_number_override(
335+
self, config: CloudEventsConfig, sample_event: NotificationEvent
336+
) -> None:
337+
"""Test CloudEvent conversion with number in K_CE_OVERRIDES."""
338+
# Create adapter after environment variable is set
339+
adapter = CloudEventsAdapter(config)
340+
cloud_event = adapter._convert_to_cloudevent(sample_event)
341+
342+
assert cloud_event["number"] == "123" # Should be converted to string
343+
344+
@patch.dict(os.environ, {"K_CE_OVERRIDES": "invalid-json"})
345+
def test_convert_to_cloudevent_invalid_overrides(
346+
self, config: CloudEventsConfig, sample_event: NotificationEvent
347+
) -> None:
348+
"""Test CloudEvent conversion with invalid K_CE_OVERRIDES JSON."""
349+
# Create adapter after environment variable is set
350+
adapter = CloudEventsAdapter(config)
351+
cloud_event = adapter._convert_to_cloudevent(sample_event)
352+
353+
# Should work normally without overrides
354+
assert isinstance(cloud_event, CloudEvent)
355+
assert cloud_event["source"] == "/eoapi/stac"
356+
357+
@patch.dict(os.environ, {"K_CE_OVERRIDES": '{"other": "field"}'})
358+
def test_convert_to_cloudevent_no_extensions(
359+
self, config: CloudEventsConfig, sample_event: NotificationEvent
360+
) -> None:
361+
"""Test CloudEvent conversion with K_CE_OVERRIDES but no extensions field."""
362+
# Create adapter after environment variable is set
363+
adapter = CloudEventsAdapter(config)
364+
cloud_event = adapter._convert_to_cloudevent(sample_event)
365+
366+
# Should work normally without extensions
367+
assert isinstance(cloud_event, CloudEvent)
368+
assert cloud_event["source"] == "/eoapi/stac"

0 commit comments

Comments
 (0)