Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions infra/helm/bud/templates/microservices/budpipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ spec:
value: "{{ .Values.microservices.budnotify.daprid }}"
- name: BUDMETRICS_APP_ID
value: "{{ .Values.microservices.budmetrics.daprid }}"
- name: NOTIFY_SERVICE_TOPIC
value: "{{ .Values.microservices.budnotify.pubsubTopic }}"
# PostgreSQL configuration (002-pipeline-event-persistence)
- name: PSQL_HOST
value: {{ include "bud.externalServices.postgresql.host" . }}
Expand Down
1 change: 1 addition & 0 deletions services/budpipeline/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ STATE_STORE_NAME=statestore

# Pub/Sub
PUBSUB_NAME=pubsub
NOTIFY_SERVICE_TOPIC=notificationMessages

# Service Discovery (Dapr App IDs)
BUDAPP_APP_ID=budapp
Expand Down
32 changes: 27 additions & 5 deletions services/budpipeline/budpipeline/progress/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from datetime import datetime, timezone
from decimal import Decimal
from typing import Any
from uuid import UUID
from uuid import UUID, uuid4

from budpipeline.commons.config import settings
from budpipeline.commons.observability import get_logger, record_event_published
Expand Down Expand Up @@ -242,7 +242,7 @@ def _build_event_payload(

Produces the format understood by budnotify (Novu) and budadmin's CommonStatus.tsx:
{
"notification_type": "EVENT",
"notification_type": "event",
"name": "bud-notification",
"subscriber_ids": "user-uuid-or-null",
"payload": {
Expand Down Expand Up @@ -323,7 +323,7 @@ def _build_event_payload(
content["result"] = result_data

payload: dict[str, Any] = {
"notification_type": "EVENT",
"notification_type": "event",
"name": "bud-notification",
"subscriber_ids": subscriber_ids,
"payload": {
Expand All @@ -350,6 +350,10 @@ async def _publish_single_topic(
) -> bool:
"""Publish event to a single topic.

Follows budmicroframe's DaprService.publish_to_topic() pattern:
enriches data with source/source_topic, sets CloudEvent metadata
via publish_metadata, and uses application/cloudevents+json content type.

Args:
topic: Target topic.
execution_id: Execution UUID.
Expand All @@ -363,12 +367,30 @@ async def _publish_single_topic(
# Import Dapr client here to avoid circular imports
from dapr.aio.clients import DaprClient

# Copy payload to avoid mutating the shared dict across topics
data = payload.copy()

# Enrich data matching budmicroframe's DaprService.publish_to_topic() pattern
data["source"] = settings.name
data["source_topic"] = topic
if data.get("type") is None:
data["type"] = event_type

# CloudEvent metadata for Dapr (budmicroframe pattern)
event_id = str(uuid4())
publish_metadata = {
"cloudevent.id": event_id,
"cloudevent.source": settings.name,
"cloudevent.type": event_type,
}

async with DaprClient() as client:
await client.publish_event(
pubsub_name=self.pubsub_name,
topic_name=topic,
data=json.dumps(payload, cls=_DecimalEncoder),
data_content_type="application/json",
data=json.dumps(data, cls=_DecimalEncoder),
data_content_type="application/cloudevents+json",
publish_metadata=publish_metadata,
)
Comment on lines +370 to 394
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation appears to mix two different Dapr publishing patterns. When using data_content_type="application/cloudevents+json", the data parameter should be a complete, valid CloudEvent JSON object. The publish_metadata parameter is intended for when Dapr wraps a non-CloudEvent payload (e.g., with data_content_type="application/json"), so it's redundant and potentially confusing here.

To correctly address the issue of Dapr injecting fields into the payload, we should manually construct the full CloudEvent envelope and place our application payload inside its data field. This ensures we send a valid CloudEvent and have full control over its structure, making the solution more robust and aligned with best practices.

            # To send a compliant CloudEvent and prevent Dapr from modifying the payload,
            # we should construct the full CloudEvent envelope and place our application
            # payload within its 'data' field.
            event_payload = payload.copy()
            event_payload["source_topic"] = topic

            cloud_event = {
                "specversion": "1.0",
                "id": str(uuid4()),
                "source": settings.name,
                "type": event_type,
                "datacontenttype": "application/json",
                "data": event_payload,
            }

            async with DaprClient() as client:
                await client.publish_event(
                    pubsub_name=self.pubsub_name,
                    topic_name=topic,
                    data=json.dumps(cloud_event, cls=_DecimalEncoder),
                    data_content_type="application/cloudevents+json",
                )


record_event_published(event_type)
Expand Down
4 changes: 2 additions & 2 deletions services/budpipeline/tests/test_event_publishing.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ async def capture_payload(topic, execution_id, event_type, payload):
assert len(published_payloads) == 1
payload = published_payloads[0]
assert captured_event_types[0] == EventType.WORKFLOW_PROGRESS
assert payload["notification_type"] == "EVENT"
assert payload["notification_type"] == "event"
assert payload["payload"]["type"] == "pipeline_execution"
assert payload["payload"]["event"] == "progress"
assert payload["payload"]["content"]["status"] == "RUNNING"
Expand Down Expand Up @@ -548,7 +548,7 @@ def test_build_event_payload_required_fields(self, event_publisher, execution_id
data={"progress_percentage": 50.0},
)

assert payload["notification_type"] == "EVENT"
assert payload["notification_type"] == "event"
assert payload["name"] == "bud-notification"
assert "payload" in payload
assert payload["payload"]["category"] == "internal"
Expand Down