Skip to content

Commit 4356029

Browse files
authored
Merge pull request #1321 from BudEcosystem/fix/budpipeline-notify-publish-pattern
fix(budpipeline): use flat payload for pub/sub and fix stale DB constraint
2 parents cacc66b + f765c83 commit 4356029

File tree

3 files changed

+62
-22
lines changed

3 files changed

+62
-22
lines changed

services/budpipeline/budpipeline/alembic/versions/20260116_2040_003_fix_delivery_status_constraint.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020

2121
def upgrade() -> None:
2222
"""Update check constraint to use uppercase enum names."""
23-
# Drop the old constraint using raw SQL
23+
# Drop both possible constraint names: the SQLAlchemy-generated prefixed
24+
# name and the short name used in migration 001.
25+
op.execute(
26+
'ALTER TABLE execution_subscription DROP CONSTRAINT IF EXISTS "ck_execution_subscription_chk_execution_subscription_status"'
27+
)
2428
op.execute(
2529
'ALTER TABLE execution_subscription DROP CONSTRAINT IF EXISTS "chk_execution_subscription_status"'
2630
)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
"""Drop stale lowercase delivery_status check constraint.
2+
3+
Revision ID: 009_fix_status_constraint
4+
Revises: 008_add_notification_workflow_id
5+
Create Date: 2026-02-16 12:00:00.000000
6+
7+
Migration 003 intended to replace the lowercase delivery_status constraint
8+
with an uppercase one, but used the wrong constraint name. The SQLAlchemy
9+
naming convention prefixed the original constraint as
10+
'ck_execution_subscription_chk_execution_subscription_status' (lowercase),
11+
while migration 003 only dropped 'chk_execution_subscription_status'.
12+
13+
This left two constraints: the old lowercase one (blocking inserts of 'ACTIVE')
14+
and the new uppercase one. This migration drops the stale lowercase constraint.
15+
"""
16+
17+
from alembic import op
18+
19+
# revision identifiers, used by Alembic.
20+
revision = "009_fix_status_constraint"
21+
down_revision = "008_add_notification_workflow_id"
22+
branch_labels = None
23+
depends_on = None
24+
25+
26+
def upgrade() -> None:
27+
"""Drop the stale lowercase constraint that blocks ACTIVE inserts."""
28+
op.execute(
29+
'ALTER TABLE execution_subscription DROP CONSTRAINT IF EXISTS "ck_execution_subscription_chk_execution_subscription_status"'
30+
)
31+
32+
33+
def downgrade() -> None:
34+
"""Re-add the lowercase constraint (not recommended)."""
35+
op.execute(
36+
"""
37+
ALTER TABLE execution_subscription
38+
ADD CONSTRAINT ck_execution_subscription_chk_execution_subscription_status
39+
CHECK (delivery_status IN ('active', 'expired', 'failed'))
40+
"""
41+
)

services/budpipeline/budpipeline/progress/publisher.py

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -350,10 +350,10 @@ async def _publish_single_topic(
350350
) -> bool:
351351
"""Publish event to a single topic.
352352
353-
Constructs a full CloudEvent envelope and publishes with
354-
application/cloudevents+json content type. This prevents Dapr from
355-
injecting its own id/datacontenttype fields, which budnotify's
356-
CloudEventBase(extra="forbid") would reject with 422.
353+
Sends flat payload data with CloudEvent attributes passed via
354+
publish_metadata, matching the budmicroframe publish_to_topic
355+
pattern. This avoids injecting extra fields (id, datacontenttype)
356+
into the JSON body that downstream subscribers would reject.
357357
358358
Args:
359359
topic: Target topic.
@@ -370,30 +370,25 @@ async def _publish_single_topic(
370370

371371
# Copy payload to avoid mutating the shared dict across topics
372372
event_payload = payload.copy()
373-
correlation_id = event_payload.pop("correlation_id", None)
374-
375-
# Construct a full CloudEvent envelope so Dapr does not inject
376-
# its own id/datacontenttype fields (which budnotify rejects).
377-
# Extension attributes (source_topic, correlation_id) go at the
378-
# envelope level, not inside data.
379-
cloud_event: dict[str, Any] = {
380-
"specversion": "1.0",
381-
"id": str(uuid4()),
382-
"source": settings.name,
383-
"type": event_type,
384-
"datacontenttype": "application/json",
385-
"source_topic": topic,
386-
"data": event_payload,
373+
event_payload["source"] = settings.name
374+
event_payload["source_topic"] = topic
375+
376+
# Pass CloudEvent attributes via publish_metadata so Dapr
377+
# does not inject them into the JSON body.
378+
event_id = str(uuid4())
379+
publish_metadata = {
380+
"cloudevent.id": event_id,
381+
"cloudevent.source": settings.name,
382+
"cloudevent.type": event_type,
387383
}
388-
if correlation_id:
389-
cloud_event["correlation_id"] = correlation_id
390384

391385
async with DaprClient() as client:
392386
await client.publish_event(
393387
pubsub_name=self.pubsub_name,
394388
topic_name=topic,
395-
data=json.dumps(cloud_event, cls=_DecimalEncoder),
389+
data=json.dumps(event_payload, cls=_DecimalEncoder),
396390
data_content_type="application/cloudevents+json",
391+
publish_metadata=publish_metadata,
397392
)
398393

399394
record_event_published(event_type)

0 commit comments

Comments
 (0)