Skip to content

Commit 0dedaf0

Browse files
committed
bar
1 parent e204a00 commit 0dedaf0

File tree

6 files changed

+210
-12
lines changed

6 files changed

+210
-12
lines changed

eoapi_notifier/outputs/cloudevents.py

Lines changed: 80 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,48 @@
1212
import httpx
1313
from cloudevents.conversion import to_binary
1414
from cloudevents.http import CloudEvent
15-
from pydantic import field_validator, model_validator
15+
from pydantic import BaseModel, field_validator, model_validator
1616

1717
from ..core.event import NotificationEvent
1818
from ..core.plugin import BaseOutput, BasePluginConfig, PluginMetadata
1919

2020

21+
class KNativeRef(BaseModel):
22+
"""KNative object reference for SinkBinding."""
23+
24+
apiVersion: str
25+
kind: str
26+
name: str
27+
namespace: str | None = None
28+
29+
30+
class Destination(BaseModel):
31+
"""Destination configuration for CloudEvents - either URL or KNative ref."""
32+
33+
url: str | None = None
34+
ref: KNativeRef | None = None
35+
36+
@model_validator(mode="after")
37+
def validate_destination(self) -> "Destination":
38+
"""Ensure exactly one of url or ref is provided."""
39+
if self.url and self.ref:
40+
raise ValueError(
41+
"destination cannot have both 'url' and 'ref' - "
42+
"they are mutually exclusive"
43+
)
44+
if not self.url and not self.ref:
45+
raise ValueError("destination must have either 'url' or 'ref'")
46+
if self.url and not self.url.startswith(("http://", "https://")):
47+
raise ValueError("destination.url must start with http:// or https://")
48+
return self
49+
50+
2151
class CloudEventsConfig(BasePluginConfig):
2252
"""Configuration for CloudEvents output adapter with environment variable
2353
support."""
2454

2555
endpoint: str | None = None
56+
destination: Destination | None = None
2657
source: str = "/eoapi/stac"
2758
event_type: str = "org.eoapi.stac"
2859
timeout: float = 30.0
@@ -37,8 +68,13 @@ def validate_endpoint(cls, v: str | None) -> str | None:
3768
return v
3869

3970
@model_validator(mode="after")
40-
def apply_knative_overrides(self) -> "CloudEventsConfig":
41-
"""Apply KNative SinkBinding environment variables as special case."""
71+
def validate_and_apply_overrides(self) -> "CloudEventsConfig":
72+
"""Validate configuration and apply KNative SinkBinding env variables."""
73+
# Ensure endpoint and destination are mutually exclusive
74+
if self.endpoint and self.destination:
75+
raise ValueError("endpoint and destination are mutually exclusive")
76+
77+
# Apply KNative SinkBinding environment variables as special case
4278
if k_sink := os.getenv("K_SINK"):
4379
self.endpoint = k_sink
4480

@@ -48,6 +84,14 @@ def apply_knative_overrides(self) -> "CloudEventsConfig":
4884
def get_sample_config(cls) -> dict[str, Any]:
4985
return {
5086
"endpoint": None, # Uses K_SINK env var if not set
87+
"destination": {
88+
"ref": {
89+
"apiVersion": "messaging.knative.dev/v1",
90+
"kind": "Broker",
91+
"name": "my-broker",
92+
"namespace": "default",
93+
}
94+
},
5195
"source": "/eoapi/stac",
5296
"event_type": "org.eoapi.stac",
5397
"timeout": 30.0,
@@ -66,12 +110,36 @@ def get_metadata(cls) -> PluginMetadata:
66110
)
67111

68112
def get_connection_info(self) -> str:
69-
url = self.endpoint or os.getenv("K_SINK", "K_SINK env var")
113+
if self.endpoint:
114+
url = self.endpoint
115+
elif self.destination:
116+
if self.destination.url:
117+
url = self.destination.url
118+
elif self.destination.ref:
119+
ref = self.destination.ref
120+
url = f"KNative {ref.kind}/{ref.name}"
121+
else:
122+
url = "Invalid destination"
123+
else:
124+
url = os.getenv("K_SINK", "K_SINK env var")
70125
return f"POST {url}"
71126

72127
def get_status_info(self) -> dict[str, Any]:
128+
if self.endpoint:
129+
endpoint_info = self.endpoint
130+
elif self.destination:
131+
if self.destination.url:
132+
endpoint_info = self.destination.url
133+
elif self.destination.ref:
134+
ref = self.destination.ref
135+
endpoint_info = f"{ref.kind}/{ref.name} ({ref.apiVersion})"
136+
else:
137+
endpoint_info = "Invalid destination"
138+
else:
139+
endpoint_info = "K_SINK env var"
140+
73141
return {
74-
"Endpoint": self.endpoint or "K_SINK env var",
142+
"Endpoint": endpoint_info,
75143
"Source": self.source,
76144
"Event Type": self.event_type,
77145
"Timeout": f"{self.timeout}s",
@@ -99,10 +167,13 @@ async def start(self) -> None:
99167

100168
endpoint = self.config.endpoint
101169
if not endpoint:
102-
raise ValueError(
103-
"endpoint configuration required (can be set via config, K_SINK, "
104-
"or CLOUDEVENTS_ENDPOINT env vars)"
105-
)
170+
if self.config.destination and self.config.destination.url:
171+
endpoint = self.config.destination.url
172+
else:
173+
raise ValueError(
174+
"endpoint configuration required (can be set via "
175+
"config.endpoint, config.destination.url, or K_SINK env var)"
176+
)
106177

107178
self.logger.debug(f"Step 1: Resolved endpoint: {endpoint}")
108179

helm-chart/eoapi-notifier/README.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,32 @@ config:
2727
database: postgis
2828
username: postgres
2929
password: password
30+
3031
outputs:
3132
- type: mqtt
3233
config:
3334
broker_host: mqtt-broker
3435
broker_port: 1883
3536

37+
# CloudEvents with direct HTTP endpoint
38+
- type: cloudevents
39+
config:
40+
endpoint: https://example.com/webhook
41+
source: /eoapi/stac
42+
event_type: org.eoapi.stac
43+
44+
# CloudEvents with KNative object reference (creates SinkBinding)
45+
- type: cloudevents
46+
config:
47+
source: /eoapi/pgstac
48+
event_type: org.eoapi.stac.item
49+
destination:
50+
ref:
51+
apiVersion: messaging.knative.dev/v1
52+
kind: Broker
53+
name: my-channel-1
54+
namespace: serverless
55+
3656
secrets:
3757
postgresql:
3858
create: true
@@ -47,3 +67,43 @@ resources:
4767
cpu: 100m
4868
memory: 128Mi
4969
```
70+
71+
## KNative SinkBinding Support
72+
73+
The chart automatically creates KNative SinkBinding resources when CloudEvents outputs use `destination.ref` configuration. This allows you to reference KNative objects (Brokers, Channels, Services) instead of direct URLs.
74+
75+
### Configuration Options
76+
77+
CloudEvents outputs support two mutually exclusive destination methods:
78+
79+
1. **Direct HTTP endpoint:**
80+
```yaml
81+
- type: cloudevents
82+
config:
83+
endpoint: https://webhook.example.com
84+
source: /eoapi/stac
85+
event_type: org.eoapi.stac
86+
```
87+
88+
2. **KNative object reference:**
89+
```yaml
90+
- type: cloudevents
91+
config:
92+
source: /eoapi/pgstac
93+
event_type: org.eoapi.stac.item
94+
destination:
95+
ref:
96+
apiVersion: messaging.knative.dev/v1
97+
kind: Broker
98+
name: my-broker
99+
namespace: default # optional, defaults to chart namespace
100+
```
101+
102+
When using `destination.ref`, the chart creates a SinkBinding that resolves the reference to a URL and injects it via the `K_SINK` environment variable.
103+
104+
### Supported KNative Resources
105+
106+
- **Brokers:** `messaging.knative.dev/v1` - `kind: Broker`
107+
- **Channels:** `messaging.knative.dev/v1` - `kind: Channel`
108+
- **Services:** `serving.knative.dev/v1` - `kind: Service`
109+
- **Custom resources:** Any resource that implements the Addressable interface

helm-chart/eoapi-notifier/templates/_helpers.tpl

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,22 @@ Return the proper image pull policy
7373
*/}}
7474
{{- define "eoapi-notifier.imagePullPolicy" -}}
7575
{{- .Values.image.pullPolicy | default "IfNotPresent" }}
76+
{{- end }}
77+
78+
{{/*
79+
Validate cloudevents configuration
80+
*/}}
81+
{{- define "eoapi-notifier.validateCloudEventsConfig" -}}
82+
{{- range $index, $output := .Values.config.outputs }}
83+
{{- if eq $output.type "cloudevents" }}
84+
{{- $hasEndpoint := $output.config.endpoint }}
85+
{{- $hasDestinationRef := and $output.config.destination $output.config.destination.ref }}
86+
{{- if and $hasEndpoint $hasDestinationRef }}
87+
{{- fail (printf "CloudEvents output %d: 'endpoint' and 'destination.ref' are mutually exclusive" $index) }}
88+
{{- end }}
89+
{{- if and (not $hasEndpoint) (not $hasDestinationRef) }}
90+
{{- fail (printf "CloudEvents output %d: either 'endpoint' or 'destination.ref' must be configured" $index) }}
91+
{{- end }}
92+
{{- end }}
93+
{{- end }}
7694
{{- end }}

helm-chart/eoapi-notifier/templates/configmap.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
{{- include "eoapi-notifier.validateCloudEventsConfig" . }}
12
apiVersion: v1
23
kind: ConfigMap
34
metadata:
@@ -19,5 +20,10 @@ data:
1920
{{- range .Values.config.outputs}}
2021
- type: {{.type}}
2122
config:
23+
{{- if and (eq .type "cloudevents") .config.destination .config.destination.ref}}
24+
{{- $config := omit .config "destination"}}
25+
{{- toYaml $config | nindent 10}}
26+
{{- else}}
2227
{{- toYaml .config | nindent 10}}
28+
{{- end}}
2329
{{- end}}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{{- range $index, $output := .Values.config.outputs }}
2+
{{- if and (eq $output.type "cloudevents") $output.config.destination $output.config.destination.ref }}
3+
---
4+
apiVersion: sources.knative.dev/v1
5+
kind: SinkBinding
6+
metadata:
7+
name: {{ include "eoapi-notifier.fullname" $ }}-cloudevents-{{ $index }}
8+
labels:
9+
{{- include "eoapi-notifier.labels" $ | nindent 4 }}
10+
app.kubernetes.io/component: sinkbinding
11+
spec:
12+
subject:
13+
apiVersion: apps/v1
14+
kind: Deployment
15+
name: {{ include "eoapi-notifier.fullname" $ }}
16+
sink:
17+
ref:
18+
apiVersion: {{ $output.config.destination.ref.apiVersion }}
19+
kind: {{ $output.config.destination.ref.kind }}
20+
name: {{ $output.config.destination.ref.name }}
21+
{{- if $output.config.destination.ref.namespace }}
22+
namespace: {{ $output.config.destination.ref.namespace }}
23+
{{- end }}
24+
{{- end }}
25+
{{- end }}

helm-chart/eoapi-notifier/values.yaml

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,27 @@ config:
5757
broker_host: mqtt-broker
5858
broker_port: 1883
5959

60+
# CloudEvents output - Choose ONE of the two approaches below:
61+
62+
# Approach 1: Direct HTTP endpoint
6063
- type: cloudevents
6164
config:
62-
# Will be overridden by K_SINK from KNative SinkBinding
6365
endpoint: https://example.com/webhook
6466
source: /eoapi/stac
6567
event_type: org.eoapi.stac
68+
69+
# Approach 2: KNative object reference (creates SinkBinding automatically)
70+
# Uncomment this section and comment out the above to use KNative references:
71+
# - type: cloudevents
72+
# config:
73+
# source: /eoapi/pgstac
74+
# event_type: org.eoapi.stac.item
75+
# destination:
76+
# ref:
77+
# apiVersion: messaging.knative.dev/v1
78+
# kind: Broker
79+
# name: my-channel-1
80+
# namespace: serverless
6681

6782
# Secrets
6883
secrets:
@@ -91,5 +106,8 @@ env: {}
91106
# CLOUDEVENTS_SOURCE: /eoapi/stac/production
92107
# CLOUDEVENTS_EVENT_TYPE: org.eoapi.stac.item
93108
#
94-
# KNative examples (typically set by SinkBinding):
95-
# K_SINK: https://my-knative-service.default.svc.cluster.local
109+
# KNative examples:
110+
# When using destination.ref in cloudevents config, K_SINK is automatically set by SinkBinding
111+
# You can also manually override: K_SINK: https://my-knative-service.default.svc.cluster.local
112+
#
113+
# Note: endpoint and destination.ref are mutually exclusive in cloudevents config

0 commit comments

Comments
 (0)