Skip to content

Commit 0a38b3d

Browse files
committed
Added output plugin for cloudevents.
1 parent 5a79b6b commit 0a38b3d

File tree

10 files changed

+633
-188
lines changed

10 files changed

+633
-188
lines changed

eoapi_notifier/outputs/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
44
"""
55

6+
from .cloudevents import CloudEventsAdapter, CloudEventsConfig
67
from .mqtt import MQTTAdapter, MQTTConfig
78

89
__all__ = [
10+
"CloudEventsAdapter",
11+
"CloudEventsConfig",
912
"MQTTAdapter",
1013
"MQTTConfig",
1114
]
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
"""
2+
CloudEvents output adapter.
3+
4+
Sends notification events as CloudEvents via HTTP POST.
5+
Supports standard CloudEvents environment variables and KNative SinkBinding.
6+
"""
7+
8+
import json
9+
import os
10+
from typing import Any
11+
from uuid import uuid4
12+
13+
import httpx
14+
from cloudevents.conversion import to_binary
15+
from cloudevents.http import CloudEvent
16+
from pydantic import field_validator, model_validator
17+
18+
from ..core.event import NotificationEvent
19+
from ..core.plugin import BaseOutput, BasePluginConfig, PluginMetadata
20+
21+
22+
class CloudEventsConfig(BasePluginConfig):
23+
"""Configuration for CloudEvents output adapter with environment variable
24+
support."""
25+
26+
endpoint: str | None = None
27+
source: str = "/eoapi/stac"
28+
event_type: str = "org.eoapi.stac"
29+
timeout: float = 30.0
30+
max_retries: int = 3
31+
retry_backoff: float = 1.0
32+
33+
@field_validator("endpoint")
34+
@classmethod
35+
def validate_endpoint(cls, v: str | None) -> str | None:
36+
if v and not v.startswith(("http://", "https://")):
37+
raise ValueError("endpoint must start with http:// or https://")
38+
return v
39+
40+
@model_validator(mode="after")
41+
def apply_knative_overrides(self) -> "CloudEventsConfig":
42+
"""Apply KNative SinkBinding environment variables as special case."""
43+
# K_SINK overrides endpoint (KNative SinkBinding)
44+
if k_sink := os.getenv("K_SINK"):
45+
self.endpoint = k_sink
46+
47+
# K_SOURCE overrides source
48+
if k_source := os.getenv("K_SOURCE"):
49+
self.source = k_source
50+
51+
# K_TYPE overrides event_type
52+
if k_type := os.getenv("K_TYPE"):
53+
self.event_type = k_type
54+
55+
return self
56+
57+
@classmethod
58+
def get_sample_config(cls) -> dict[str, Any]:
59+
return {
60+
"endpoint": None, # Uses K_SINK env var if not set
61+
"source": "/eoapi/stac",
62+
"event_type": "org.eoapi.stac",
63+
"timeout": 30.0,
64+
"max_retries": 3,
65+
"retry_backoff": 1.0,
66+
}
67+
68+
@classmethod
69+
def get_metadata(cls) -> PluginMetadata:
70+
return PluginMetadata(
71+
name="cloudevents",
72+
description="CloudEvents HTTP output adapter",
73+
category="http",
74+
tags=["cloudevents", "http", "webhook"],
75+
priority=10,
76+
)
77+
78+
def get_connection_info(self) -> str:
79+
url = self.endpoint or os.getenv("K_SINK", "K_SINK env var")
80+
return f"POST {url}"
81+
82+
def get_status_info(self) -> dict[str, Any]:
83+
return {
84+
"Endpoint": self.endpoint or "K_SINK env var",
85+
"Source": self.source,
86+
"Event Type": self.event_type,
87+
"Timeout": f"{self.timeout}s",
88+
"Max Retries": self.max_retries,
89+
}
90+
91+
92+
class CloudEventsAdapter(BaseOutput):
93+
"""CloudEvents HTTP output adapter."""
94+
95+
def __init__(self, config: CloudEventsConfig) -> None:
96+
super().__init__(config)
97+
self.config: CloudEventsConfig = config
98+
self._client: httpx.AsyncClient | None = None
99+
100+
async def start(self) -> None:
101+
"""Start the HTTP client."""
102+
self.logger.info(
103+
f"Starting CloudEvents adapter: {self.config.get_connection_info()}"
104+
)
105+
self.logger.debug(
106+
f"CloudEvents config: timeout={self.config.timeout}s, "
107+
f"max_retries={self.config.max_retries}"
108+
)
109+
110+
endpoint = self.config.endpoint
111+
if not endpoint:
112+
raise ValueError(
113+
"endpoint configuration required (can be set via config, K_SINK, "
114+
"or CLOUDEVENTS_ENDPOINT env vars)"
115+
)
116+
117+
self.logger.debug(f"Step 1: Resolved endpoint: {endpoint}")
118+
119+
# Create HTTP client
120+
self.logger.debug("Step 2: Creating HTTP client...")
121+
self._client = httpx.AsyncClient(
122+
timeout=httpx.Timeout(self.config.timeout),
123+
follow_redirects=True,
124+
)
125+
self.logger.debug("✓ HTTP client created")
126+
127+
# Call parent start method
128+
self.logger.debug("Step 3: Calling parent start method...")
129+
await super().start()
130+
self.logger.debug("✓ Parent start method completed")
131+
132+
self.logger.info(
133+
f"✅ CloudEvents adapter started successfully, "
134+
f"will send events to: {endpoint}"
135+
)
136+
137+
async def stop(self) -> None:
138+
"""Stop the HTTP client."""
139+
if self._client:
140+
await self._client.aclose()
141+
self._client = None
142+
await super().stop()
143+
self.logger.info("✓ CloudEvents adapter stopped")
144+
145+
async def send_event(self, event: NotificationEvent) -> bool:
146+
"""Send notification event as CloudEvent."""
147+
if not self._client:
148+
self.logger.warning(
149+
f"📤 HTTP client not available, cannot send event {event.id}"
150+
)
151+
return False
152+
153+
try:
154+
endpoint = self.config.endpoint
155+
156+
# Convert to CloudEvent
157+
self.logger.debug(f"Converting event {event.id} to CloudEvent format...")
158+
cloud_event = self._convert_to_cloudevent(event)
159+
self.logger.debug(
160+
f"CloudEvent created: id={cloud_event['id']}, "
161+
f"type={cloud_event['type']}"
162+
)
163+
164+
# Convert to binary format
165+
self.logger.debug("Converting CloudEvent to binary format...")
166+
headers, data = to_binary(cloud_event)
167+
self.logger.debug(
168+
f"Binary conversion complete, headers: {list(headers.keys())}"
169+
)
170+
171+
# Send HTTP POST
172+
self.logger.debug(
173+
f"Sending CloudEvent {cloud_event['id']} to {endpoint} "
174+
f"(timeout: {self.config.timeout}s)"
175+
)
176+
response = await self._client.post(endpoint, headers=headers, data=data)
177+
response.raise_for_status()
178+
179+
self.logger.debug(
180+
f"✓ Successfully sent CloudEvent {cloud_event['id']} to {endpoint} "
181+
f"(status: {response.status_code})"
182+
)
183+
return True
184+
185+
except httpx.TimeoutException as e:
186+
self.logger.error(
187+
f"✗ Timeout sending CloudEvent {event.id} to {endpoint} "
188+
f"(waited {self.config.timeout}s): {e}"
189+
)
190+
return False
191+
except httpx.HTTPStatusError as e:
192+
self.logger.error(
193+
f"✗ HTTP error sending CloudEvent {event.id} to {endpoint}: "
194+
f"{e.response.status_code} {e.response.reason_phrase}"
195+
)
196+
return False
197+
except Exception as e:
198+
self.logger.error(
199+
f"✗ Error sending CloudEvent {event.id} to {endpoint}: {e}",
200+
exc_info=True,
201+
)
202+
return False
203+
204+
def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent:
205+
"""Convert NotificationEvent to CloudEvent."""
206+
# Use config values which now include environment overrides
207+
source = self.config.source
208+
event_type_base = self.config.event_type
209+
210+
# Apply KNative CE overrides if present
211+
ce_overrides = {}
212+
if k_ce_overrides := os.getenv("K_CE_OVERRIDES"):
213+
try:
214+
ce_overrides = json.loads(k_ce_overrides)
215+
except json.JSONDecodeError:
216+
self.logger.warning(
217+
"Invalid K_CE_OVERRIDES JSON, ignoring: %s", k_ce_overrides
218+
)
219+
220+
# Map operation to event type suffix
221+
operation_map = {"INSERT": "created", "UPDATE": "updated", "DELETE": "deleted"}
222+
operation = operation_map.get(event.operation.upper(), event.operation.lower())
223+
224+
attributes = {
225+
"id": str(uuid4()),
226+
"source": source,
227+
"type": f"{event_type_base}.{operation}",
228+
"time": event.timestamp.isoformat(),
229+
"datacontenttype": "application/json",
230+
}
231+
232+
# Add subject if item_id exists
233+
if event.item_id:
234+
attributes["subject"] = event.item_id
235+
236+
# Add collection attribute
237+
if event.collection:
238+
attributes["collection"] = event.collection
239+
240+
# Apply KNative CE overrides
241+
attributes.update(ce_overrides)
242+
243+
# Event data payload
244+
data = {
245+
"id": event.id,
246+
"source": event.source,
247+
"type": event.type,
248+
"operation": event.operation,
249+
"collection": event.collection,
250+
"item_id": event.item_id,
251+
"timestamp": event.timestamp.isoformat(),
252+
**event.data,
253+
}
254+
255+
return CloudEvent(attributes, data)
256+
257+
async def health_check(self) -> bool:
258+
"""Check if the adapter is healthy."""
259+
return self._running and self._client is not None

examples/config.yaml

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
# eoAPI Notifier Configuration Example
2-
# This file shows how to configure sources and outputs for the notifier
2+
#
3+
# Environment Variable Overrides:
4+
# Any field can be overridden using: PLUGINNAME_FIELDNAME
5+
# Examples: PGSTAC_HOST, MQTT_USE_TLS, CLOUDEVENTS_ENDPOINT
6+
37

48
# Sources: Define where notifications come from
59
sources:
610
# PostgreSQL/pgSTAC source for database changes
711
- type: pgstac
812
config:
9-
host: localhost
10-
port: 5432
11-
database: postgis
12-
username: username
13-
password: password
13+
host: localhost # Override: PGSTAC_HOST
14+
port: 5432 # Override: PGSTAC_PORT
15+
database: postgis # Override: PGSTAC_DATABASE
16+
user: postgres # Override: PGSTAC_USER
17+
password: changeme # Override: PGSTAC_PASSWORD
1418

1519
# Optional: specific table patterns to monitor
1620
# tables: ["items", "collections"]
@@ -20,26 +24,43 @@ sources:
2024

2125
# Operation Correlation Settings
2226
# Correlation transforms pgSTAC DELETE+INSERT pairs into semantic UPDATE events
23-
# enable_correlation: true # Enable DELETE+INSERT correlation (default: true)
24-
# correlation_window: 5.0 # Time window to correlate operations in seconds
25-
# cleanup_interval: 1.0 # How often to check for expired operations in seconds
27+
# enable_correlation: true # PGSTAC_ENABLE_CORRELATION
28+
# correlation_window: 5.0 # PGSTAC_CORRELATION_WINDOW
29+
# cleanup_interval: 1.0 # PGSTAC_CLEANUP_INTERVAL
2630

2731
# Outputs: Define where notifications are sent
2832
outputs:
2933
# MQTT output for publishing events
3034
- type: mqtt
3135
config:
32-
broker_host: localhost
33-
broker_port: 1883
36+
broker_host: localhost # MQTT_BROKER_HOST
37+
broker_port: 1883 # MQTT_BROKER_PORT
38+
3439
# Optional: authentication
35-
# username: mqtt_user
36-
# password: mqtt_password
40+
# username: mqtt_user # MQTT_USERNAME
41+
# password: mqtt_password # MQTT_PASSWORD
3742
# Optional: TLS settings
38-
# use_tls: true
39-
# ca_cert: /path/to/ca.pem
43+
# use_tls: true # MQTT_USE_TLS
44+
4045
# Optional: topic configuration
41-
# topic: "eoapi/"
42-
# qos: 1
46+
# topic: "eoapi/" # MQTT_TOPIC
47+
# qos: 1 # MQTT_QOS
48+
49+
# CloudEvents HTTP output for sending events as CloudEvents
50+
#
51+
# Besides the regular overwrite, this plugin also supports K_SINK
52+
# https://knative.dev/docs/eventing/custom-event-source/sinkbinding/
53+
- type: cloudevents
54+
config:
55+
endpoint: https://example.com/webhook # CLOUDEVENTS_ENDPOINT or K_SINK
56+
57+
# Optional: CloudEventattributes
58+
# source: "/eoapi/stac" # CLOUDEVENTS_SOURCE or K_SOURCE
59+
# event_type: "org.eoapi.stac" # CLOUDEVENTS_EVENT_TYPE or K_TYPE
60+
61+
# Optional: HTTP settings
62+
# timeout: 30.0 # CLOUDEVENTS_TIMEOUT
63+
# max_retries: 3 # CLOUDEVENTS_MAX_RETRIES
4364

4465
# Example with multiple sources and outputs
4566
# sources:

helm-chart/eoapi-notifier/templates/configmap.yaml

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,12 @@ metadata:
77
data:
88
config.yaml: |
99
# eoAPI Notifier Configuration
10+
# Environment variables automatically override config values (e.g., HOST env var overrides host config)
1011
sources:
1112
{{- range .Values.config.sources}}
1213
- type: {{.type}}
1314
config:
14-
{{- if eq .type "postgres"}}
15-
host: {{.config.host}}
16-
port: {{.config.port}}
17-
database: {{.config.database}}
18-
{{- if $.Values.secrets.postgresql.create}}
19-
user: __POSTGRES_USERNAME__
20-
password: __POSTGRES_PASSWORD__
21-
{{- else}}
22-
username: {{.config.username}}
23-
password: {{.config.password}}
24-
{{- end}}
25-
{{- range $key, $value := .config}}
26-
{{- if not (or (eq $key "host") (eq $key "port") (eq $key "database") (eq $key "user") (eq $key "username") (eq $key "password"))}}
27-
{{$key}}: {{$value}}
28-
{{- end}}
29-
{{- end}}
30-
{{- else}}
3115
{{- toYaml .config | nindent 10}}
32-
{{- end}}
3316
{{- end}}
3417
3518
outputs:

0 commit comments

Comments
 (0)