Skip to content

Commit 04f2be1

Browse files
committed
feat: improve publish endpoint response (#563)
* test: event with destination_id should not match disabled destination * fix: check disabled * feat: enhane publish handler with more result info * refactor: match event logic with destination_id field * test: remove parallel from flaky tests * test: extend timeout to avoid flaky test * refactor: api response * test: use fast timeout to avoid flaky tests * test: retry poll timeout * refactor: simplify API response to use duplicate boolean * refactor: topics.matchtopic util * test: duplicate publish * chore: simplify publish response * docs: openapi.yaml & api.mdx
1 parent 3a97f1f commit 04f2be1

File tree

12 files changed

+469
-149
lines changed

12 files changed

+469
-149
lines changed

docs/apis/openapi.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1492,11 +1492,16 @@ components:
14921492
type: object
14931493
required:
14941494
- id
1495+
- duplicate
14951496
properties:
14961497
id:
14971498
type: string
14981499
description: The ID of the event that was accepted for publishing. This will be the ID provided in the request's `id` field if present, otherwise it's a server-generated UUID.
14991500
example: "evt_abc123xyz789"
1501+
duplicate:
1502+
type: boolean
1503+
description: Whether this event was already processed (idempotency hit). If true, the event was not queued again.
1504+
example: false
15001505
Event:
15011506
type: object
15021507
properties:

docs/pages/references/api.mdx

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,15 @@ Publish an event.
373373

374374
#### Response
375375

376-
Empty body
376+
```json
377+
{
378+
"id": "evt_abc123xyz789",
379+
"duplicate": false
380+
}
381+
```
382+
383+
- `id`: The ID of the event (provided in request or server-generated)
384+
- `duplicate`: Whether this event was already processed (idempotency hit)
377385

378386
#### Example
379387

internal/apirouter/publish_handlers.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ func (h *PublishHandlers) Ingest(c *gin.Context) {
3535
return
3636
}
3737
event := publishedEvent.toEvent()
38-
if err := h.eventHandler.Handle(c.Request.Context(), &event); err != nil {
38+
result, err := h.eventHandler.Handle(c.Request.Context(), &event)
39+
if err != nil {
3940
if errors.Is(err, idempotence.ErrConflict) {
4041
c.Status(http.StatusConflict)
4142
} else if errors.Is(err, publishmq.ErrRequiredTopic) {
@@ -61,7 +62,7 @@ func (h *PublishHandlers) Ingest(c *gin.Context) {
6162
}
6263
return
6364
}
64-
c.JSON(http.StatusAccepted, gin.H{"id": event.ID})
65+
c.JSON(http.StatusAccepted, result)
6566
}
6667

6768
type PublishedEvent struct {

internal/deliverymq/messagehandler_test.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ func TestMessageHandler_DestinationGetterError(t *testing.T) {
2626
// - Should be nacked (let system retry)
2727
// - Should NOT use retry scheduler
2828
// - Should NOT call alert monitor (no destination)
29-
t.Parallel()
3029

3130
// Setup test data
3231
tenant := models.Tenant{ID: idgen.String()}
@@ -89,7 +88,6 @@ func TestMessageHandler_DestinationNotFound(t *testing.T) {
8988
// - Should return error
9089
// - Message should be nacked (no retry)
9190
// - No retry should be scheduled
92-
t.Parallel()
9391

9492
// Setup test data
9593
tenant := models.Tenant{ID: idgen.String()}
@@ -150,7 +148,6 @@ func TestMessageHandler_DestinationDeleted(t *testing.T) {
150148
// - Destination lookup returns ErrDestinationDeleted
151149
// - Should return error but ack message (no retry needed)
152150
// - No retry should be scheduled
153-
t.Parallel()
154151

155152
// Setup test data
156153
tenant := models.Tenant{ID: idgen.String()}
@@ -211,7 +208,6 @@ func TestMessageHandler_PublishError_EligibleForRetry(t *testing.T) {
211208
// - Publish fails with a publish error
212209
// - Event is eligible for retry and under max attempts
213210
// - Should schedule retry and ack
214-
t.Parallel()
215211

216212
// Setup test data
217213
tenant := models.Tenant{ID: idgen.String()}
@@ -285,7 +281,6 @@ func TestMessageHandler_PublishError_NotEligible(t *testing.T) {
285281
// - Publish returns ErrDestinationPublishAttempt
286282
// - Event is NOT eligible for retry
287283
// - Should ack (no retry, no nack)
288-
t.Parallel()
289284

290285
// Setup test data
291286
tenant := models.Tenant{ID: idgen.String()}
@@ -358,7 +353,6 @@ func TestMessageHandler_EventGetterError(t *testing.T) {
358353
// - Event getter fails to retrieve event during retry
359354
// - Should be treated as system error
360355
// - Should nack for retry
361-
t.Parallel()
362356

363357
// Setup test data
364358
tenant := models.Tenant{ID: idgen.String()}
@@ -425,7 +419,6 @@ func TestMessageHandler_RetryFlow(t *testing.T) {
425419
// - Message is a retry attempt (Attempt > 1)
426420
// - Event getter successfully retrieves full event data
427421
// - Message is processed normally
428-
t.Parallel()
429422

430423
// Setup test data
431424
tenant := models.Tenant{ID: idgen.String()}
@@ -493,7 +486,6 @@ func TestMessageHandler_Idempotency(t *testing.T) {
493486
// - Message with same ID is processed twice
494487
// - Second attempt should be idempotent
495488
// - Should ack without publishing
496-
t.Parallel()
497489

498490
// Setup test data
499491
tenant := models.Tenant{ID: idgen.String()}
@@ -560,7 +552,6 @@ func TestMessageHandler_IdempotencyWithSystemError(t *testing.T) {
560552
// - First attempt fails with system error (event getter error)
561553
// - Second attempt with same message ID succeeds after error is cleared
562554
// - Should demonstrate that system errors don't affect idempotency
563-
t.Parallel()
564555

565556
// Setup test data
566557
tenant := models.Tenant{ID: idgen.String()}
@@ -636,7 +627,6 @@ func TestMessageHandler_DestinationDisabled(t *testing.T) {
636627
// - Destination is disabled
637628
// - Should be treated as a destination error (not system error)
638629
// - Should ack without retry or publish attempt
639-
t.Parallel()
640630

641631
// Setup test data
642632
tenant := models.Tenant{ID: idgen.String()}
@@ -702,7 +692,6 @@ func TestMessageHandler_LogPublisherError(t *testing.T) {
702692
// - Publish succeeds but log publisher fails
703693
// - Should be treated as system error
704694
// - Should nack for retry
705-
t.Parallel()
706695

707696
// Setup test data
708697
tenant := models.Tenant{ID: idgen.String()}
@@ -764,7 +753,6 @@ func TestMessageHandler_PublishAndLogError(t *testing.T) {
764753
// - Should join both errors
765754
// - Should be treated as system error
766755
// - Should nack for retry
767-
t.Parallel()
768756

769757
// Setup test data
770758
tenant := models.Tenant{ID: idgen.String()}
@@ -826,7 +814,6 @@ func TestManualDelivery_Success(t *testing.T) {
826814
// - Manual delivery succeeds
827815
// - Should cancel any pending retries
828816
// - Should be acked
829-
t.Parallel()
830817

831818
// Setup test data
832819
tenant := models.Tenant{ID: idgen.String()}
@@ -892,7 +879,6 @@ func TestManualDelivery_PublishError(t *testing.T) {
892879
// - Manual delivery fails with publish error
893880
// - Should not schedule retry (manual delivery never retries)
894881
// - Should be acked
895-
t.Parallel()
896882

897883
// Setup test data
898884
tenant := models.Tenant{ID: idgen.String()}
@@ -965,7 +951,6 @@ func TestManualDelivery_CancelError(t *testing.T) {
965951
// - Manual delivery succeeds but retry cancellation fails
966952
// - Should be treated as post-delivery error
967953
// - Should nack for retry
968-
t.Parallel()
969954

970955
// Setup test data
971956
tenant := models.Tenant{ID: idgen.String()}
@@ -1033,7 +1018,6 @@ func TestManualDelivery_DestinationDisabled(t *testing.T) {
10331018
// - Manual delivery to disabled destination
10341019
// - Should be treated as pre-delivery error
10351020
// - Should ack without attempting publish or retry cancellation
1036-
t.Parallel()
10371021

10381022
// Setup test data
10391023
tenant := models.Tenant{ID: idgen.String()}
@@ -1098,7 +1082,6 @@ func TestMessageHandler_PublishSuccess(t *testing.T) {
10981082
// Test scenario:
10991083
// - Publish succeeds
11001084
// - Should call alert monitor with successful attempt
1101-
t.Parallel()
11021085

11031086
// Setup test data
11041087
tenant := models.Tenant{ID: idgen.String()}
@@ -1167,7 +1150,6 @@ func TestMessageHandler_AlertMonitorError(t *testing.T) {
11671150
// - Publish succeeds
11681151
// - Alert monitor fails
11691152
// - Should still succeed overall (alert errors don't affect main flow)
1170-
t.Parallel()
11711153

11721154
// Setup test data
11731155
tenant := models.Tenant{ID: idgen.String()}

0 commit comments

Comments
 (0)