Skip to content

Commit aadc3fa

Browse files
dittopsclaude
andcommitted
fix(budpipeline): address review feedback from Gemini and Codex
- Fix Decimal serialization: add _DecimalEncoder for json.dumps in publisher (Gemini medium: Decimal objects not JSON-serializable for step events) - Propagate subscriber_ids/payload_type to workflow-level events via update_execution_status and _publish_execution_event (Codex P1: workflow completed/failed/progress notifications missing dual-publish) - Remove redundant DB query in update_step_status: pass subscriber_ids/payload_type as params from caller instead of re-fetching execution per step (Gemini medium: performance issue with N+1 queries) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent fbb6fd7 commit aadc3fa

File tree

3 files changed

+47
-6
lines changed

3 files changed

+47
-6
lines changed

services/budpipeline/budpipeline/pipeline/persistence_service.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,8 @@ async def update_execution_status(
241241
final_outputs: dict[str, Any] | None = None,
242242
error_info: dict[str, Any] | None = None,
243243
correlation_id: str | None = None,
244+
subscriber_ids: str | None = None,
245+
payload_type: str | None = None,
244246
) -> tuple[bool, int]:
245247
"""Update execution status with optimistic locking.
246248
@@ -256,6 +258,8 @@ async def update_execution_status(
256258
final_outputs: Final outputs (will be sanitized).
257259
error_info: Error details.
258260
correlation_id: Optional correlation ID for event tracing.
261+
subscriber_ids: Optional user ID(s) for Novu notification delivery.
262+
payload_type: Optional custom payload.type for event routing.
259263
260264
Returns:
261265
Tuple of (success, new_version).
@@ -299,6 +303,8 @@ async def update_execution_status(
299303
final_outputs=final_outputs,
300304
error_info=error_info,
301305
correlation_id=correlation_id,
306+
subscriber_ids=subscriber_ids,
307+
payload_type=payload_type,
302308
)
303309

304310
return True, execution.version
@@ -330,6 +336,8 @@ async def _publish_execution_event(
330336
final_outputs: dict[str, Any] | None = None,
331337
error_info: dict[str, Any] | None = None,
332338
correlation_id: str | None = None,
339+
subscriber_ids: str | None = None,
340+
payload_type: str | None = None,
333341
) -> None:
334342
"""Publish execution status events to callback topics.
335343
@@ -342,6 +350,8 @@ async def _publish_execution_event(
342350
success=True,
343351
final_outputs=final_outputs,
344352
correlation_id=correlation_id,
353+
subscriber_ids=subscriber_ids,
354+
payload_type=payload_type,
345355
)
346356
elif status == ExecutionStatus.FAILED:
347357
message = error_info.get("message") if error_info else None
@@ -350,12 +360,16 @@ async def _publish_execution_event(
350360
success=False,
351361
final_message=message,
352362
correlation_id=correlation_id,
363+
subscriber_ids=subscriber_ids,
364+
payload_type=payload_type,
353365
)
354366
elif status == ExecutionStatus.RUNNING and progress_percentage is not None:
355367
await event_publisher.publish_workflow_progress(
356368
execution_id=execution_id,
357369
progress_percentage=progress_percentage,
358370
correlation_id=correlation_id,
371+
subscriber_ids=subscriber_ids,
372+
payload_type=payload_type,
359373
)
360374
except Exception as e:
361375
# Non-blocking - log and continue (FR-014)
@@ -382,6 +396,8 @@ async def update_step_status(
382396
step_name: str | None = None,
383397
sequence_number: int | None = None,
384398
correlation_id: str | None = None,
399+
subscriber_ids: str | None = None,
400+
payload_type: str | None = None,
385401
) -> tuple[bool, int]:
386402
"""Update step execution status with optimistic locking.
387403
@@ -402,6 +418,8 @@ async def update_step_status(
402418
step_name: Step name for event publishing.
403419
sequence_number: Step sequence number for event publishing.
404420
correlation_id: Optional correlation ID for event tracing.
421+
subscriber_ids: Optional user ID(s) for Novu notification delivery.
422+
payload_type: Optional custom payload.type for event routing.
405423
406424
Returns:
407425
Tuple of (success, new_version).
@@ -433,9 +451,6 @@ async def update_step_status(
433451

434452
# Publish step events to callback topics (T048)
435453
if execution_id and step_id and step_name:
436-
# Look up execution notification config
437-
exec_crud = PipelineExecutionCRUD(session)
438-
execution = await exec_crud.get_by_id(execution_id)
439454
await self._publish_step_event(
440455
execution_id=execution_id,
441456
step_id=step_id,
@@ -445,8 +460,8 @@ async def update_step_status(
445460
sequence_number=sequence_number or 0,
446461
error_message=error_message,
447462
correlation_id=correlation_id,
448-
subscriber_ids=execution.subscriber_ids if execution else None,
449-
payload_type=execution.payload_type if execution else None,
463+
subscriber_ids=subscriber_ids,
464+
payload_type=payload_type,
450465
)
451466

452467
return True, step.version

services/budpipeline/budpipeline/pipeline/service.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -779,6 +779,8 @@ async def _execute_pipeline_impl(
779779
expected_version=db_version,
780780
status=DBExecutionStatus.RUNNING,
781781
start_time_value=started_at,
782+
subscriber_ids=subscriber_ids,
783+
payload_type=payload_type,
782784
)
783785
if success:
784786
logger.info(f"Updated execution {execution_id} to RUNNING with start_time")
@@ -867,6 +869,8 @@ async def _execute_pipeline_impl(
867869
step_id=step.id,
868870
step_name=step.name,
869871
sequence_number=seq_num,
872+
subscriber_ids=subscriber_ids,
873+
payload_type=payload_type,
870874
)
871875
if success:
872876
step_db_info[step.id] = (db_uuid, new_version, seq_num)
@@ -902,6 +906,8 @@ async def _execute_pipeline_impl(
902906
step_id=step.id,
903907
step_name=step.name,
904908
sequence_number=seq_num,
909+
subscriber_ids=subscriber_ids,
910+
payload_type=payload_type,
905911
)
906912
except Exception as e:
907913
logger.warning(f"Failed to persist step SKIPPED status: {e}")
@@ -1066,6 +1072,8 @@ async def _execute_pipeline_impl(
10661072
step_id=step.id,
10671073
step_name=step.name,
10681074
sequence_number=seq_num,
1075+
subscriber_ids=subscriber_ids,
1076+
payload_type=payload_type,
10691077
)
10701078
except Exception as e:
10711079
logger.warning(f"Failed to persist step COMPLETED status: {e}")
@@ -1129,6 +1137,8 @@ async def _execute_pipeline_impl(
11291137
step_id=step.id,
11301138
step_name=step.name,
11311139
sequence_number=seq_num,
1140+
subscriber_ids=subscriber_ids,
1141+
payload_type=payload_type,
11321142
)
11331143
except Exception as e:
11341144
logger.warning(f"Failed to persist step FAILED status: {e}")
@@ -1196,6 +1206,8 @@ async def _execute_pipeline_impl(
11961206
progress_percentage=Decimal("100.00"),
11971207
end_time_value=datetime.now(timezone.utc),
11981208
final_outputs=execution_data.get("outputs"),
1209+
subscriber_ids=subscriber_ids,
1210+
payload_type=payload_type,
11991211
)
12001212
if success:
12011213
db_version = new_version
@@ -1236,6 +1248,8 @@ async def _execute_pipeline_impl(
12361248
step_id=step_id,
12371249
step_name=step_state.get("name", step_id),
12381250
sequence_number=seq_num,
1251+
subscriber_ids=subscriber_ids,
1252+
payload_type=payload_type,
12391253
)
12401254
except Exception as persist_step_err:
12411255
logger.warning(
@@ -1255,6 +1269,8 @@ async def _execute_pipeline_impl(
12551269
status=DBExecutionStatus.FAILED,
12561270
end_time_value=datetime.now(timezone.utc),
12571271
error_info={"error": str(e)},
1272+
subscriber_ids=subscriber_ids,
1273+
payload_type=payload_type,
12581274
)
12591275
logger.info(f"Persisted failure to database: {execution_id}")
12601276
except Exception as persist_err:

services/budpipeline/budpipeline/progress/publisher.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@
2323

2424
logger = get_logger(__name__)
2525

26+
27+
class _DecimalEncoder(json.JSONEncoder):
28+
"""JSON encoder that handles Decimal objects."""
29+
30+
def default(self, o: Any) -> Any:
31+
if isinstance(o, Decimal):
32+
return float(o)
33+
return super().default(o)
34+
35+
2636
# Default payload type when none is specified
2737
_DEFAULT_PAYLOAD_TYPE = "pipeline_execution"
2838

@@ -353,7 +363,7 @@ async def _publish_single_topic(
353363
await client.publish_event(
354364
pubsub_name=self.pubsub_name,
355365
topic_name=topic,
356-
data=json.dumps(payload),
366+
data=json.dumps(payload, cls=_DecimalEncoder),
357367
data_content_type="application/json",
358368
)
359369

0 commit comments

Comments
 (0)