diff --git a/README.md b/README.md index 4c5b77c..0f51e5d 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,7 @@ See [Helm Chart README](helm-chart/eoapi-notifier/README.md) for configuration o #### Outputs - `mqtt`: Publish events to MQTT broker +- `cloudevents`: Send events as CloudEvents ## Development diff --git a/eoapi_notifier/outputs/cloudevents.py b/eoapi_notifier/outputs/cloudevents.py index a64ecdc..559ace7 100644 --- a/eoapi_notifier/outputs/cloudevents.py +++ b/eoapi_notifier/outputs/cloudevents.py @@ -13,36 +13,66 @@ import httpx from cloudevents.conversion import to_binary from cloudevents.http import CloudEvent -from pydantic import field_validator, model_validator +from pydantic import BaseModel, field_validator, model_validator from ..core.event import NotificationEvent from ..core.plugin import BaseOutput, BasePluginConfig, PluginMetadata +class RefConfig(BaseModel): + """Kubernetes resource reference configuration.""" + + apiVersion: str + kind: str + name: str + namespace: str | None = None + + +class DestinationConfig(BaseModel): + """Destination configuration - either ref or url.""" + + ref: RefConfig | None = None + url: str | None = None + + @model_validator(mode="after") + def validate_mutually_exclusive(self) -> "DestinationConfig": + if self.ref and self.url: + raise ValueError( + "destination.ref and destination.url are mutually exclusive" + ) + if not self.ref and not self.url: + raise ValueError( + "Either destination.ref or destination.url must be specified" + ) + return self + + @field_validator("url") + @classmethod + def validate_url(cls, v: str | None) -> str | None: + if v and not v.startswith(("http://", "https://")): + raise ValueError("destination.url must start with http:// or https://") + return v + + class CloudEventsConfig(BasePluginConfig): """Configuration for CloudEvents output adapter with environment variable support.""" - endpoint: str | None = None + destination: DestinationConfig source: str = "/eoapi/stac" event_type: str = "org.eoapi.stac" timeout: float = 30.0 max_retries: int = 3 retry_backoff: float = 1.0 - @field_validator("endpoint") - @classmethod - def validate_endpoint(cls, v: str | None) -> str | None: - if v and not v.startswith(("http://", "https://")): - raise ValueError("endpoint must start with http:// or https://") - return v - @model_validator(mode="after") def apply_knative_overrides(self) -> "CloudEventsConfig": """Apply KNative SinkBinding environment variables as special case.""" - # K_SINK overrides endpoint (KNative SinkBinding) + # K_SINK overrides destination for ref-based configs (KNative SinkBinding) if k_sink := os.getenv("K_SINK"): - self.endpoint = k_sink + if self.destination.ref: + # For ref-based destinations, K_SINK provides the resolved URL + self.destination = DestinationConfig(url=k_sink) # K_SOURCE overrides source if k_source := os.getenv("K_SOURCE"): @@ -57,7 +87,15 @@ def apply_knative_overrides(self) -> "CloudEventsConfig": @classmethod def get_sample_config(cls) -> dict[str, Any]: return { - "endpoint": None, # Uses K_SINK env var if not set + "destination": { + "ref": { + "apiVersion": "messaging.knative.dev/v1", + "kind": "Broker", + "name": "eoapi-broker", + "namespace": "serverless", + } + # "url": "https://example.com/webhook" # mutually exclusive with ref + }, "source": "/eoapi/stac", "event_type": "org.eoapi.stac", "timeout": 30.0, @@ -76,12 +114,27 @@ def get_metadata(cls) -> PluginMetadata: ) def get_connection_info(self) -> str: - url = self.endpoint or os.getenv("K_SINK", "K_SINK env var") + if self.destination.url: + url = self.destination.url + elif self.destination.ref: + ref_name = f"{self.destination.ref.kind}/{self.destination.ref.name}" + url = os.getenv("K_SINK", f"K_SINK env var -> {ref_name}") + else: + url = "unresolved" return f"POST {url}" def get_status_info(self) -> dict[str, Any]: + if self.destination.url: + endpoint_info = self.destination.url + elif self.destination.ref: + endpoint_info = ( + f"{self.destination.ref.kind}/{self.destination.ref.name} (via K_SINK)" + ) + else: + endpoint_info = "unresolved" + return { - "Endpoint": self.endpoint or "K_SINK env var", + "Destination": endpoint_info, "Source": self.source, "Event Type": self.event_type, "Timeout": f"{self.timeout}s", @@ -107,12 +160,19 @@ async def start(self) -> None: f"max_retries={self.config.max_retries}" ) - endpoint = self.config.endpoint - if not endpoint: - raise ValueError( - "endpoint configuration required (can be set via config, K_SINK, " - "or CLOUDEVENTS_ENDPOINT env vars)" - ) + # Get endpoint URL + if self.config.destination.url: + endpoint = self.config.destination.url + elif self.config.destination.ref: + k_sink = os.getenv("K_SINK") + if not k_sink: + raise ValueError( + f"K_SINK environment variable required for ref destination " + f"{self.config.destination.ref.kind}/{self.config.destination.ref.name}" + ) + endpoint = k_sink + else: + raise ValueError("destination.ref or destination.url must be configured") self.logger.debug(f"Step 1: Resolved endpoint: {endpoint}") @@ -151,7 +211,15 @@ async def send_event(self, event: NotificationEvent) -> bool: return False try: - endpoint = self.config.endpoint + # Get endpoint URL + if self.config.destination.url: + endpoint = self.config.destination.url + else: + k_sink = os.getenv("K_SINK") + if not k_sink: + self.logger.error("K_SINK not available for ref destination") + return False + endpoint = k_sink # Convert to CloudEvent self.logger.debug(f"Converting event {event.id} to CloudEvent format...") diff --git a/examples/config.yaml b/examples/config.yaml index 716db6c..12468c5 100644 --- a/examples/config.yaml +++ b/examples/config.yaml @@ -46,17 +46,22 @@ outputs: # topic: "eoapi/" # MQTT_TOPIC # qos: 1 # MQTT_QOS - # CloudEvents HTTP output for sending events as CloudEvents - # - # Besides the regular overwrite, this plugin also supports K_SINK - # https://knative.dev/docs/eventing/custom-event-source/sinkbinding/ + # CloudEvents output for sending events as CloudEvents - type: cloudevents config: - endpoint: https://example.com/webhook # CLOUDEVENTS_ENDPOINT or K_SINK + source: "/eoapi/pgstac" # CLOUDEVENTS_SOURCE or K_SOURCE + event_type: "org.eoapi.stac.item" # CLOUDEVENTS_EVENT_TYPE or K_TYPE - # Optional: CloudEventattributes - # source: "/eoapi/stac" # CLOUDEVENTS_SOURCE or K_SOURCE - # event_type: "org.eoapi.stac" # CLOUDEVENTS_EVENT_TYPE or K_TYPE + destination: + # Option 1: Kubernetes resource reference (uses SinkBinding) + ref: + apiVersion: messaging.knative.dev/v1 + kind: Broker + name: eoapi-broker + namespace: serverless + + # Option 2: Direct HTTP endpoint (alternative to ref) + # url: https://example.com/webhook # Optional: HTTP settings # timeout: 30.0 # CLOUDEVENTS_TIMEOUT diff --git a/helm-chart/eoapi-notifier/templates/sinkbinding.yaml b/helm-chart/eoapi-notifier/templates/sinkbinding.yaml new file mode 100644 index 0000000..3b5da8b --- /dev/null +++ b/helm-chart/eoapi-notifier/templates/sinkbinding.yaml @@ -0,0 +1,24 @@ +{{- range .Values.config.outputs }} +{{- if and (eq .type "cloudevents") .config.destination.ref }} +apiVersion: sources.knative.dev/v1beta1 +kind: SinkBinding +metadata: + name: {{ include "eoapi-notifier.fullname" $ }}-binding + labels: + {{- include "eoapi-notifier.labels" $ | nindent 4 }} +spec: + subject: + apiVersion: apps/v1 + kind: Deployment + name: {{ include "eoapi-notifier.fullname" $ }} + sink: + ref: + apiVersion: {{ .config.destination.ref.apiVersion }} + kind: {{ .config.destination.ref.kind }} + name: {{ .config.destination.ref.name }} + {{- if .config.destination.ref.namespace }} + namespace: {{ .config.destination.ref.namespace }} + {{- end }} +{{- break }} +{{- end }} +{{- end }} diff --git a/helm-chart/eoapi-notifier/values.yaml b/helm-chart/eoapi-notifier/values.yaml index 2a67a63..3e3e681 100644 --- a/helm-chart/eoapi-notifier/values.yaml +++ b/helm-chart/eoapi-notifier/values.yaml @@ -59,10 +59,17 @@ config: - type: cloudevents config: - # Will be overridden by K_SINK from KNative SinkBinding - endpoint: https://example.com/webhook - source: /eoapi/stac - event_type: org.eoapi.stac + source: /eoapi/pgstac + event_type: org.eoapi.stac.item + destination: + # Use ref for Knative resources + ref: + apiVersion: messaging.knative.dev/v1 + kind: Broker + name: eoapi-broker + namespace: serverless + # Alternatively, use url for direct HTTP endpoints + # url: https://example.com/webhook # Secrets secrets: @@ -75,27 +82,9 @@ secrets: # These will be injected as environment variables and automatically override config values # Use plugin-prefixed variables: PGSTAC_PASSWORD, MQTT_USERNAME, CLOUDEVENTS_ENDPOINT, etc # -# KNative Support: -# The cloudevents plugin supports K_SINK variables for KNative SinkBinding: -# - K_SINK: Overrides CLOUDEVENTS_ENDPOINT (automatically set by SinkBinding) -# - K_SOURCE: Overrides CLOUDEVENTS_SOURCE -# - K_TYPE: Overrides CLOUDEVENTS_EVENT_TYPE -# -# For KNative integration, use SinkBinding to automatically inject K_SINK: -# apiVersion: sources.knative.dev/v1beta1 -# kind: SinkBinding -# metadata: -# name: eoapi-notifier-binding -# spec: -# subject: -# apiVersion: apps/v1 -# kind: Deployment -# name: eoapi-notifier -# sink: -# ref: -# apiVersion: serving.knative.dev/v1 -# kind: Service -# name: my-knative-service +# CloudEvents destination options: +# - destination.ref: Kubernetes resource reference (uses SinkBinding) +# - destination.url: Direct HTTP endpoint env: {} # Examples - Standard environment variables: # PGSTAC_HOST: postgresql-service @@ -105,11 +94,6 @@ env: {} # MQTT_USE_TLS: "true" # # CloudEvents examples: - # CLOUDEVENTS_ENDPOINT: https://my-webhook-url # CLOUDEVENTS_SOURCE: /eoapi/stac/production # CLOUDEVENTS_EVENT_TYPE: org.eoapi.stac.item - # - # KNative examples (typically set by SinkBinding): - # K_SINK: https://my-knative-service.default.svc.cluster.local - # K_SOURCE: /eoapi/stac/pgstac - # K_TYPE: org.eoapi.stac + # K_SINK: https://my-service.default.svc.cluster.local (set by SinkBinding) diff --git a/tests/test_cloudevents_output.py b/tests/test_cloudevents_output.py index 25e8ee0..ab4a585 100644 --- a/tests/test_cloudevents_output.py +++ b/tests/test_cloudevents_output.py @@ -10,7 +10,12 @@ from eoapi_notifier.core.event import NotificationEvent from eoapi_notifier.core.plugin import PluginMetadata -from eoapi_notifier.outputs.cloudevents import CloudEventsAdapter, CloudEventsConfig +from eoapi_notifier.outputs.cloudevents import ( + CloudEventsAdapter, + CloudEventsConfig, + DestinationConfig, + RefConfig, +) class TestCloudEventsConfig: @@ -18,7 +23,9 @@ class TestCloudEventsConfig: def test_config_implements_protocol(self) -> None: """Test that config implements required protocol methods.""" - config = CloudEventsConfig() + config = CloudEventsConfig( + destination=DestinationConfig(url="https://example.com/webhook") + ) assert isinstance(config.get_sample_config(), dict) assert isinstance(config.get_metadata(), PluginMetadata) @@ -27,18 +34,20 @@ def test_config_implements_protocol(self) -> None: def test_default_configuration(self) -> None: """Test default configuration values.""" - config = CloudEventsConfig() + config = CloudEventsConfig( + destination=DestinationConfig(url="https://example.com/webhook") + ) - assert config.endpoint is None + assert config.destination.url == "https://example.com/webhook" assert config.source == "/eoapi/stac" assert config.event_type == "org.eoapi.stac" assert config.timeout == 30.0 assert config.max_retries == 3 - def test_endpoint_validation_error(self) -> None: - """Test endpoint validation.""" + def test_url_validation_error(self) -> None: + """Test URL validation.""" with pytest.raises(ValueError, match="must start with http"): - CloudEventsConfig(endpoint="invalid-url") + CloudEventsConfig(destination=DestinationConfig(url="invalid-url")) def test_get_metadata(self) -> None: """Test metadata retrieval.""" @@ -50,9 +59,97 @@ def test_get_metadata(self) -> None: def test_connection_info(self) -> None: """Test connection info string.""" - config = CloudEventsConfig(endpoint="https://example.com/webhook") + config = CloudEventsConfig( + destination=DestinationConfig(url="https://example.com/webhook") + ) assert "POST https://example.com/webhook" in config.get_connection_info() + def test_destination_mutually_exclusive(self) -> None: + """Test that ref and url are mutually exclusive.""" + with pytest.raises(ValueError, match="mutually exclusive"): + DestinationConfig( + ref=RefConfig( + apiVersion="messaging.knative.dev/v1", + kind="Broker", + name="test-broker", + ), + url="https://example.com/webhook", + ) + + def test_destination_required(self) -> None: + """Test that either ref or url is required.""" + with pytest.raises( + ValueError, + match="Either destination.ref or destination.url must be specified", + ): + DestinationConfig() + + def test_ref_destination_config(self) -> None: + """Test ref-based destination configuration.""" + config = CloudEventsConfig( + destination=DestinationConfig( + ref=RefConfig( + apiVersion="messaging.knative.dev/v1", + kind="Broker", + name="test-broker", + namespace="default", + ) + ) + ) + + assert config.destination.ref is not None + assert config.destination.ref.apiVersion == "messaging.knative.dev/v1" + assert config.destination.ref.kind == "Broker" + assert config.destination.ref.name == "test-broker" + assert config.destination.ref.namespace == "default" + assert config.destination.url is None + + def test_ref_destination_no_namespace(self) -> None: + """Test ref-based destination without namespace.""" + config = CloudEventsConfig( + destination=DestinationConfig( + ref=RefConfig( + apiVersion="messaging.knative.dev/v1", + kind="Broker", + name="test-broker", + ) + ) + ) + + assert config.destination.ref is not None + assert config.destination.ref.namespace is None + + @patch.dict(os.environ, {"K_SINK": "https://resolved.example.com"}) + def test_ref_destination_connection_info(self) -> None: + """Test connection info for ref destination.""" + config = CloudEventsConfig( + destination=DestinationConfig( + ref=RefConfig( + apiVersion="messaging.knative.dev/v1", + kind="Broker", + name="test-broker", + ) + ) + ) + + connection_info = config.get_connection_info() + assert "POST https://resolved.example.com" in connection_info + + def test_ref_destination_connection_info_no_k_sink(self) -> None: + """Test connection info for ref destination without K_SINK.""" + config = CloudEventsConfig( + destination=DestinationConfig( + ref=RefConfig( + apiVersion="messaging.knative.dev/v1", + kind="Broker", + name="test-broker", + ) + ) + ) + + connection_info = config.get_connection_info() + assert "Broker/test-broker" in connection_info + class TestCloudEventsAdapter: """Test CloudEvents output adapter.""" @@ -60,7 +157,9 @@ class TestCloudEventsAdapter: @pytest.fixture def config(self) -> CloudEventsConfig: """Create test configuration.""" - return CloudEventsConfig(endpoint="https://example.com/webhook") + return CloudEventsConfig( + destination=DestinationConfig(url="https://example.com/webhook") + ) @pytest.fixture def adapter(self, config: CloudEventsConfig) -> CloudEventsAdapter: @@ -87,18 +186,26 @@ async def test_start_success(self, adapter: CloudEventsAdapter) -> None: assert adapter._client is not None mock_client.assert_called_once() - async def test_start_no_endpoint(self) -> None: - """Test start failure without endpoint.""" - config = CloudEventsConfig() - adapter = CloudEventsAdapter(config) - - with pytest.raises(ValueError, match="endpoint configuration required"): - await adapter.start() + async def test_start_no_destination(self) -> None: + """Test start failure without destination.""" + with pytest.raises( + ValueError, + match="Either destination.ref or destination.url must be specified", + ): + DestinationConfig() @patch.dict(os.environ, {"K_SINK": "https://k8s.example.com"}) async def test_start_with_k_sink_env(self) -> None: """Test start with K_SINK environment variable.""" - config = CloudEventsConfig() + config = CloudEventsConfig( + destination=DestinationConfig( + ref=RefConfig( + apiVersion="messaging.knative.dev/v1", + kind="Broker", + name="test-broker", + ) + ) + ) adapter = CloudEventsAdapter(config) with patch("httpx.AsyncClient"): @@ -208,7 +315,9 @@ def test_convert_to_cloudevent( def test_convert_with_env_vars(self, sample_event: NotificationEvent) -> None: """Test CloudEvent conversion with environment variables.""" # Create a new config and adapter after setting environment variables - config = CloudEventsConfig() + config = CloudEventsConfig( + destination=DestinationConfig(url="https://example.com/webhook") + ) adapter = CloudEventsAdapter(config) cloud_event = adapter._convert_to_cloudevent(sample_event) @@ -246,3 +355,75 @@ async def test_health_check(self, adapter: CloudEventsAdapter) -> None: # Running with client adapter._client = MagicMock() assert await adapter.health_check() is True + + @pytest.fixture + def ref_config(self) -> CloudEventsConfig: + """Create test configuration with ref destination.""" + return CloudEventsConfig( + destination=DestinationConfig( + ref=RefConfig( + apiVersion="messaging.knative.dev/v1", + kind="Broker", + name="test-broker", + namespace="default", + ) + ) + ) + + @pytest.fixture + def ref_adapter(self, ref_config: CloudEventsConfig) -> CloudEventsAdapter: + """Create test adapter with ref destination.""" + return CloudEventsAdapter(ref_config) + + async def test_start_ref_destination_no_k_sink( + self, ref_adapter: CloudEventsAdapter + ) -> None: + """Test start failure with ref destination but no K_SINK.""" + with pytest.raises(ValueError, match="K_SINK environment variable required"): + await ref_adapter.start() + + @patch.dict(os.environ, {"K_SINK": "https://resolved.broker.example.com"}) + async def test_start_ref_destination_with_k_sink( + self, ref_adapter: CloudEventsAdapter + ) -> None: + """Test successful start with ref destination and K_SINK.""" + with patch("httpx.AsyncClient") as mock_client: + await ref_adapter.start() + + assert ref_adapter.is_running + assert ref_adapter._client is not None + mock_client.assert_called_once() + + @patch.dict(os.environ, {"K_SINK": "https://resolved.broker.example.com"}) + async def test_send_event_ref_destination_success( + self, ref_adapter: CloudEventsAdapter, sample_event: NotificationEvent + ) -> None: + """Test successful event sending with ref destination.""" + mock_client = AsyncMock() + mock_response = MagicMock() + mock_client.post.return_value = mock_response + ref_adapter._client = mock_client + ref_adapter._running = True + + with patch("eoapi_notifier.outputs.cloudevents.to_binary") as mock_to_binary: + mock_to_binary.return_value = ({"ce-id": "test"}, b"data") + + result = await ref_adapter.send_event(sample_event) + + assert result is True + mock_client.post.assert_called_once_with( + "https://resolved.broker.example.com", + headers={"ce-id": "test"}, + data=b"data", + ) + mock_response.raise_for_status.assert_called_once() + + async def test_send_event_ref_destination_no_k_sink( + self, ref_adapter: CloudEventsAdapter, sample_event: NotificationEvent + ) -> None: + """Test sending event with ref destination but no K_SINK.""" + ref_adapter._client = AsyncMock() + ref_adapter._running = True + + result = await ref_adapter.send_event(sample_event) + assert result is False