fix(budpipeline): align pub/sub publishing with budmicroframe CloudEvent pattern#1311
fix(budpipeline): align pub/sub publishing with budmicroframe CloudEvent pattern#1311
Conversation
…ent pattern budnotify was returning 422 on pipeline notifications because Dapr was wrapping the payload in a CloudEvent envelope (adding `id` and `datacontenttype` fields), which CloudEventBase rejects via extra="forbid". Changes: - Fix notification_type from "EVENT" to "event" (enum case mismatch) - Switch data_content_type to "application/cloudevents+json" - Add publish_metadata with cloudevent.id/source/type (budmicroframe pattern) - Enrich payload with source/source_topic/type fields - Add NOTIFY_SERVICE_TOPIC to Helm template and .env.sample Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary of ChangesHello @dittops, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses critical notification delivery failures by standardizing how Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request effectively addresses the 422 errors from budnotify by aligning event publishing with the CloudEvent standard. The changes to use application/cloudevents+json and fix the notification_type enum are correct. I've added one suggestion to improve the implementation of _publish_single_topic to ensure it constructs a fully compliant CloudEvent, which will make the solution more robust and aligned with Dapr best practices.
| # Copy payload to avoid mutating the shared dict across topics | ||
| data = payload.copy() | ||
|
|
||
| # Enrich data matching budmicroframe's DaprService.publish_to_topic() pattern | ||
| data["source"] = settings.name | ||
| data["source_topic"] = topic | ||
| if data.get("type") is None: | ||
| data["type"] = event_type | ||
|
|
||
| # CloudEvent metadata for Dapr (budmicroframe pattern) | ||
| event_id = str(uuid4()) | ||
| publish_metadata = { | ||
| "cloudevent.id": event_id, | ||
| "cloudevent.source": settings.name, | ||
| "cloudevent.type": event_type, | ||
| } | ||
|
|
||
| async with DaprClient() as client: | ||
| await client.publish_event( | ||
| pubsub_name=self.pubsub_name, | ||
| topic_name=topic, | ||
| data=json.dumps(payload, cls=_DecimalEncoder), | ||
| data_content_type="application/json", | ||
| data=json.dumps(data, cls=_DecimalEncoder), | ||
| data_content_type="application/cloudevents+json", | ||
| publish_metadata=publish_metadata, | ||
| ) |
There was a problem hiding this comment.
The current implementation appears to mix two different Dapr publishing patterns. When using data_content_type="application/cloudevents+json", the data parameter should be a complete, valid CloudEvent JSON object. The publish_metadata parameter is intended for when Dapr wraps a non-CloudEvent payload (e.g., with data_content_type="application/json"), so it's redundant and potentially confusing here.
To correctly address the issue of Dapr injecting fields into the payload, we should manually construct the full CloudEvent envelope and place our application payload inside its data field. This ensures we send a valid CloudEvent and have full control over its structure, making the solution more robust and aligned with best practices.
# To send a compliant CloudEvent and prevent Dapr from modifying the payload,
# we should construct the full CloudEvent envelope and place our application
# payload within its 'data' field.
event_payload = payload.copy()
event_payload["source_topic"] = topic
cloud_event = {
"specversion": "1.0",
"id": str(uuid4()),
"source": settings.name,
"type": event_type,
"datacontenttype": "application/json",
"data": event_payload,
}
async with DaprClient() as client:
await client.publish_event(
pubsub_name=self.pubsub_name,
topic_name=topic,
data=json.dumps(cloud_event, cls=_DecimalEncoder),
data_content_type="application/cloudevents+json",
)content.get("result", {}) returns None when the key exists with a None
value, causing AttributeError on .get(). Use `or {}` fallback instead.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ing_event The persistence_service.mark_step_awaiting_event() was creating its own DB session, but the RUNNING status update in _continue_pipeline_execution was only flushed (not committed) in the outer session. The new session could not see the uncommitted version, causing an optimistic lock conflict that silently prevented external_workflow_id from being persisted. This left event-driven steps (like model_add) stuck in RUNNING state with no external_workflow_id, making them unable to receive completion events. Fix: Use step_crud (same session) directly instead of persistence_service, ensuring the version chain stays consistent within a single transaction. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The process_timeout function fetches the execution to get subscriber_ids for event publishing. Without mocking PipelineExecutionCRUD, the mock session's async chain returns a coroutine instead of an execution object, causing "'coroutine' object has no attribute 'subscriber_ids'". Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
8af8102 to
fa1373b
Compare
Summary
_publish_single_topic()with budmicroframe'sDaprService.publish_to_topic()pattern: setdata_content_type="application/cloudevents+json", addpublish_metadatawith CloudEvent fields, enrich payload withsource/source_topic/typenotification_typeenum case from"EVENT"to"event"NOTIFY_SERVICE_TOPICenv var to Helm template (matching budcluster/budeval) and.env.sampleRoot Cause
budpipeline was publishing with
data_content_type="application/json"and nopublish_metadata. This caused Dapr to wrap the payload in a CloudEvent envelope, injectingidanddatacontenttypefields that budnotify'sCloudEventBase(extra="forbid")rejected with 422.Test plan
subscriber_idsand verify notification reaches budnotify without 422pytest tests/test_event_publishing.py)🤖 Generated with Claude Code