Skip to content

fix(budpipeline): use flat payload for pub/sub and fix stale DB constraint#1321

Merged
dittops merged 1 commit intomasterfrom
fix/budpipeline-notify-publish-pattern
Feb 16, 2026
Merged

fix(budpipeline): use flat payload for pub/sub and fix stale DB constraint#1321
dittops merged 1 commit intomasterfrom
fix/budpipeline-notify-publish-pattern

Conversation

@dittops
Copy link
Member

@dittops dittops commented Feb 16, 2026

Summary

  • Fix 422 errors: Rewrites EventPublisher._publish_single_topic() to send flat payload data with CloudEvent attributes via publish_metadata, matching the budmicroframe publish_to_topic pattern. Previously, the method manually constructed a CloudEvent envelope with id and datacontenttype inside the JSON body, which budnotify's CloudEventBase(extra="forbid") rejected with 422.
  • Fix stale DB constraint: Migration 003 used the wrong constraint name (chk_execution_subscription_status instead of ck_execution_subscription_chk_execution_subscription_status), leaving a stale lowercase check constraint that blocked all execution_subscription inserts with ACTIVE. Adds migration 009 to drop the stale constraint and fixes migration 003 to drop both constraint name variants.

Root Cause

Two separate issues were blocking budpipeline notifications:

  1. Publisher pattern mismatch: _publish_single_topic wrapped data in a CloudEvent envelope (specversion, id, source, type, datacontenttype, data). Dapr passed the entire envelope as the message body. budnotify's Pydantic model with extra="forbid" rejected id and datacontenttype as unexpected fields.

  2. Constraint name mismatch: SQLAlchemy generates constraint names with a table-prefix convention (ck_execution_subscription_chk_execution_subscription_status), but migration 003 only dropped the short name (chk_execution_subscription_status). Both constraints ended up co-existing — the old one (lowercase) blocked all inserts, so callback topics were never registered and the publisher was never invoked.

Changes

File Change
publisher.py Send flat data + publish_metadata instead of CloudEvent envelope
003_fix_delivery_status_constraint.py Also drop the prefixed constraint name
009_drop_stale_delivery_status_constraint.py New migration to fix DBs that already ran the old 003

Test plan

  • Verified in bud-dev: 422 errors with extra_forbidden on datacontenttype/id stopped after fix
  • Verified budnotify returning 200 OK + "Triggered notification successfully" for all messages
  • Dropped stale constraint manually on bud-dev DB; next execution should register subscriptions
  • Trigger a pipeline execution post-deploy to confirm end-to-end notification delivery
  • Run alembic upgrade head on a fresh DB to verify migration chain

🤖 Generated with Claude Code

@chatgpt-codex-connector
Copy link

You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @dittops, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses critical issues preventing proper notification delivery within the budpipeline service. It rectifies a mismatch in CloudEvent publishing patterns that led to downstream services rejecting messages and resolves a database constraint problem that blocked subscription registrations. The changes ensure reliable event propagation and correct database state for execution subscriptions.

Highlights

  • Fix 422 errors in event publishing: The _publish_single_topic() method was rewritten to send flat payload data with CloudEvent attributes via publish_metadata. This resolves 422 errors caused by budnotify rejecting id and datacontenttype fields when they were manually constructed inside the JSON body.
  • Fix stale DB constraint: Migration 003 was updated to correctly drop both the SQLAlchemy-generated prefixed constraint name and the short name. A new migration (009) was added to drop the stale lowercase check constraint for databases that had already run the old 003, which was blocking execution_subscription inserts with ACTIVE status.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • services/budpipeline/budpipeline/alembic/versions/20260116_2040_003_fix_delivery_status_constraint.py
    • Modified to drop both the SQLAlchemy-generated prefixed constraint name and the short constraint name for execution_subscription.
  • services/budpipeline/budpipeline/alembic/versions/20260216_1200_009_drop_stale_delivery_status_constraint.py
    • Added a new migration file to explicitly drop the stale lowercase delivery_status check constraint, ensuring database consistency for existing deployments.
  • services/budpipeline/budpipeline/progress/publisher.py
    • Refactored the _publish_single_topic function to send flat payload data and pass CloudEvent attributes via publish_metadata.
    • Updated the docstring to reflect the new publishing mechanism, which avoids injecting extra fields into the JSON body.
Activity
  • No human activity has been recorded on this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses two separate issues: it corrects the payload format for Pub/Sub messages to resolve 422 errors and fixes a stale database constraint that was preventing inserts. The database migration changes are well-structured and correctly handle both new and existing database schemas. The refactoring of the event publisher to use publish_metadata for CloudEvent creation is a good improvement. I've provided a couple of suggestions on the publisher implementation to further align it with CloudEvents best practices, specifically regarding the placement of extension attributes and the data content type.

Comment on lines 372 to 383
event_payload = payload.copy()
correlation_id = event_payload.pop("correlation_id", None)

# Construct a full CloudEvent envelope so Dapr does not inject
# its own id/datacontenttype fields (which budnotify rejects).
# Extension attributes (source_topic, correlation_id) go at the
# envelope level, not inside data.
cloud_event: dict[str, Any] = {
"specversion": "1.0",
"id": str(uuid4()),
"source": settings.name,
"type": event_type,
"datacontenttype": "application/json",
"source_topic": topic,
"data": event_payload,
event_payload["source"] = settings.name
event_payload["source_topic"] = topic

# Pass CloudEvent attributes via publish_metadata so Dapr
# does not inject them into the JSON body.
event_id = str(uuid4())
publish_metadata = {
"cloudevent.id": event_id,
"cloudevent.source": settings.name,
"cloudevent.type": event_type,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This refactoring correctly switches to using publish_metadata to let Dapr build the CloudEvent envelope. However, it appears to have unintentionally changed where some attributes are placed. The previous implementation's comments indicated that correlation_id and source_topic should be top-level CloudEvent extension attributes. In this new version, they are added to the event_payload and will end up inside the data field of the CloudEvent.

While this may work with the current consumer, it's generally better practice to use CloudEvent extension attributes for metadata like this. It would also be more consistent with the previous design's intent. Consider moving these attributes to publish_metadata to make them proper CloudEvent extensions.

Suggested change
event_payload = payload.copy()
correlation_id = event_payload.pop("correlation_id", None)
# Construct a full CloudEvent envelope so Dapr does not inject
# its own id/datacontenttype fields (which budnotify rejects).
# Extension attributes (source_topic, correlation_id) go at the
# envelope level, not inside data.
cloud_event: dict[str, Any] = {
"specversion": "1.0",
"id": str(uuid4()),
"source": settings.name,
"type": event_type,
"datacontenttype": "application/json",
"source_topic": topic,
"data": event_payload,
event_payload["source"] = settings.name
event_payload["source_topic"] = topic
# Pass CloudEvent attributes via publish_metadata so Dapr
# does not inject them into the JSON body.
event_id = str(uuid4())
publish_metadata = {
"cloudevent.id": event_id,
"cloudevent.source": settings.name,
"cloudevent.type": event_type,
}
event_payload = payload.copy()
correlation_id = event_payload.pop("correlation_id", None)
# Pass CloudEvent attributes via publish_metadata so Dapr
# does not inject them into the JSON body.
event_id = str(uuid4())
publish_metadata: dict[str, Any] = {
"cloudevent.id": event_id,
"cloudevent.source": settings.name,
"cloudevent.type": event_type,
"source_topic": topic,
}
if correlation_id:
publish_metadata["correlation_id"] = correlation_id

topic_name=topic,
data=json.dumps(cloud_event, cls=_DecimalEncoder),
data=json.dumps(event_payload, cls=_DecimalEncoder),
data_content_type="application/cloudevents+json",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The data_content_type is set to application/cloudevents+json. When letting Dapr create the CloudEvent envelope by providing publish_metadata, this content type should typically be application/json to describe the data payload itself. Dapr will then wrap this payload in a CloudEvent, and the datacontenttype attribute of the resulting CloudEvent will be correctly set to application/json.

Suggested change
data_content_type="application/cloudevents+json",
data_content_type="application/json",

…DB constraint

The EventPublisher._publish_single_topic() was manually constructing a
CloudEvent envelope with id/datacontenttype in the JSON body, causing
budnotify's CloudEventBase(extra="forbid") to reject messages with 422.
Switch to the budmicroframe pattern: send flat data with CloudEvent
attributes via publish_metadata.

Also fix migration 003 which used the wrong constraint name, leaving a
stale lowercase check constraint on execution_subscription.delivery_status
that blocked all subscription inserts with 'ACTIVE'.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@dittops dittops force-pushed the fix/budpipeline-notify-publish-pattern branch from 718cd75 to f765c83 Compare February 16, 2026 16:43
@dittops dittops merged commit 4356029 into master Feb 16, 2026
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments