Skip to content

Commit 27021f7

Browse files
authored
feat: Event Retry API (#179)
* feat: Manual retry naive implementation * test: Update mock to match new interface * refactor: Deliverymq logic & error handling & idempotency * test: Verify delivery attempt status in tests * refactor: Deliverymq handler should only return error for unexpected error instead of edge case * test: Deliverymq manual test cases * chore: Rename func to isEligibleForManualRetry with comment
1 parent 4d02ac4 commit 27021f7

File tree

7 files changed

+654
-97
lines changed

7 files changed

+654
-97
lines changed

internal/deliverymq/messagehandler.go

Lines changed: 195 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package deliverymq
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"time"
78

89
"github.com/google/uuid"
@@ -19,10 +20,51 @@ import (
1920
"go.uber.org/zap"
2021
)
2122

23+
func idempotencyKeyFromDeliveryEvent(deliveryEvent models.DeliveryEvent) string {
24+
return "idempotency:deliverymq:" + deliveryEvent.ID
25+
}
26+
2227
var (
2328
errDestinationDisabled = errors.New("destination disabled")
2429
)
2530

31+
// Error types to distinguish between different stages of delivery
32+
type PreDeliveryError struct {
33+
err error
34+
}
35+
36+
func (e *PreDeliveryError) Error() string {
37+
return fmt.Sprintf("pre-delivery error: %v", e.err)
38+
}
39+
40+
func (e *PreDeliveryError) Unwrap() error {
41+
return e.err
42+
}
43+
44+
type DeliveryError struct {
45+
err error
46+
}
47+
48+
func (e *DeliveryError) Error() string {
49+
return fmt.Sprintf("delivery error: %v", e.err)
50+
}
51+
52+
func (e *DeliveryError) Unwrap() error {
53+
return e.err
54+
}
55+
56+
type PostDeliveryError struct {
57+
err error
58+
}
59+
60+
func (e *PostDeliveryError) Error() string {
61+
return fmt.Sprintf("post-delivery error: %v", e.err)
62+
}
63+
64+
func (e *PostDeliveryError) Unwrap() error {
65+
return e.err
66+
}
67+
2668
type messageHandler struct {
2769
eventTracer DeliveryTracer
2870
logger *otelzap.Logger
@@ -46,6 +88,7 @@ type LogPublisher interface {
4688

4789
type RetryScheduler interface {
4890
Schedule(ctx context.Context, task string, delay time.Duration, opts ...scheduler.ScheduleOption) error
91+
Cancel(ctx context.Context, taskID string) error
4992
}
5093

5194
type DestinationGetter interface {
@@ -91,102 +134,183 @@ func NewMessageHandler(
91134

92135
func (h *messageHandler) Handle(ctx context.Context, msg *mqs.Message) error {
93136
deliveryEvent := models.DeliveryEvent{}
137+
138+
// Parse message
94139
if err := deliveryEvent.FromMessage(msg); err != nil {
95-
msg.Nack()
96-
return err
140+
return h.handleError(msg, &PreDeliveryError{err: err})
97141
}
142+
143+
h.logger.Ctx(ctx).Info("deliverymq handler", zap.String("delivery_event", deliveryEvent.ID))
144+
145+
// Ensure event data
98146
if err := h.ensureDeliveryEvent(ctx, &deliveryEvent); err != nil {
147+
return h.handleError(msg, &PreDeliveryError{err: err})
148+
}
149+
150+
// Get destination
151+
destination, err := h.ensurePublishableDestination(ctx, deliveryEvent)
152+
if err != nil {
153+
return h.handleError(msg, &PreDeliveryError{err: err})
154+
}
155+
156+
// Handle delivery
157+
err = h.idempotence.Exec(ctx, idempotencyKeyFromDeliveryEvent(deliveryEvent), func(ctx context.Context) error {
158+
return h.doHandle(ctx, deliveryEvent, destination)
159+
})
160+
return h.handleError(msg, err)
161+
}
162+
163+
func (h *messageHandler) handleError(msg *mqs.Message, err error) error {
164+
shouldNack := h.shouldNackError(err)
165+
if shouldNack {
99166
msg.Nack()
100-
return err
167+
} else {
168+
msg.Ack()
101169
}
102-
idempotenceHandler := func(ctx context.Context) error {
103-
return h.doHandle(ctx, deliveryEvent)
170+
171+
// Don't return error for expected cases
172+
var preErr *PreDeliveryError
173+
if errors.As(err, &preErr) {
174+
if errors.Is(preErr.err, models.ErrDestinationDeleted) || errors.Is(preErr.err, errDestinationDisabled) {
175+
return nil
176+
}
104177
}
105-
if err := h.idempotence.Exec(ctx, idempotencyKeyFromDeliveryEvent(deliveryEvent), idempotenceHandler); err != nil {
106-
shouldScheduleRetry, shouldNack := h.checkError(err, deliveryEvent)
107-
if shouldScheduleRetry {
178+
return err
179+
}
180+
181+
func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.DeliveryEvent, destination *models.Destination) error {
182+
_, span := h.eventTracer.Deliver(ctx, &deliveryEvent)
183+
defer span.End()
184+
185+
if err := h.publisher.PublishEvent(ctx, destination, &deliveryEvent.Event); err != nil {
186+
h.logger.Ctx(ctx).Error("failed to publish event", zap.Error(err))
187+
deliveryErr := &DeliveryError{err: err}
188+
189+
if h.shouldScheduleRetry(deliveryEvent, err) {
108190
if retryErr := h.scheduleRetry(ctx, deliveryEvent); retryErr != nil {
109-
finalErr := errors.Join(err, retryErr)
110-
msg.Nack()
111-
return finalErr
191+
return h.logDeliveryResult(ctx, deliveryEvent, errors.Join(err, retryErr))
112192
}
113193
}
114-
if shouldNack {
115-
msg.Nack()
116-
} else {
117-
msg.Ack()
194+
return h.logDeliveryResult(ctx, deliveryEvent, deliveryErr)
195+
}
196+
197+
// Handle successful delivery
198+
if deliveryEvent.Manual {
199+
if err := h.retryScheduler.Cancel(ctx, deliveryEvent.GetRetryID()); err != nil {
200+
h.logger.Ctx(ctx).Error("failed to cancel scheduled retry", zap.Error(err))
201+
return h.logDeliveryResult(ctx, deliveryEvent, err)
118202
}
119-
return err
120203
}
121-
msg.Ack()
122-
return nil
204+
return h.logDeliveryResult(ctx, deliveryEvent, nil)
123205
}
124206

125-
func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.DeliveryEvent) error {
126-
_, span := h.eventTracer.Deliver(ctx, &deliveryEvent)
127-
defer span.End()
128-
logger := h.logger.Ctx(ctx)
129-
logger.Info("deliverymq handler", zap.String("delivery_event", deliveryEvent.ID))
207+
func (h *messageHandler) hasDeliveryError(err error) bool {
208+
var delErr *DeliveryError
209+
return errors.As(err, &delErr)
210+
}
130211

131-
destination, err := h.ensurePublishableDestination(ctx, deliveryEvent)
132-
if err != nil {
133-
span.RecordError(err)
134-
return err
212+
func (h *messageHandler) logDeliveryResult(ctx context.Context, deliveryEvent models.DeliveryEvent, err error) error {
213+
// Set up delivery record
214+
deliveryEvent.Delivery = &models.Delivery{
215+
ID: uuid.New().String(),
216+
DeliveryEventID: deliveryEvent.ID,
217+
EventID: deliveryEvent.Event.ID,
218+
DestinationID: deliveryEvent.DestinationID,
219+
Time: time.Now(),
135220
}
136221

137-
var finalErr error
138-
if err := h.publisher.PublishEvent(ctx, destination, &deliveryEvent.Event); err != nil {
139-
logger.Error("failed to publish event", zap.Error(err))
140-
finalErr = err
141-
deliveryEvent.Delivery = &models.Delivery{
142-
ID: uuid.New().String(),
143-
DeliveryEventID: deliveryEvent.ID,
144-
EventID: deliveryEvent.Event.ID,
145-
DestinationID: deliveryEvent.DestinationID,
146-
Status: models.DeliveryStatusFailed,
147-
Time: time.Now(),
148-
}
222+
// Check for delivery failures in the error chain
223+
if h.hasDeliveryError(err) {
224+
deliveryEvent.Delivery.Status = models.DeliveryStatusFailed
149225
} else {
150-
deliveryEvent.Delivery = &models.Delivery{
151-
ID: uuid.New().String(),
152-
DeliveryEventID: deliveryEvent.ID,
153-
EventID: deliveryEvent.Event.ID,
154-
DestinationID: deliveryEvent.DestinationID,
155-
Status: models.DeliveryStatusOK,
156-
Time: time.Now(),
157-
}
226+
deliveryEvent.Delivery.Status = models.DeliveryStatusOK
158227
}
159-
logErr := h.logMQ.Publish(ctx, deliveryEvent)
160-
if logErr != nil {
161-
logger.Error("failed to publish log event", zap.Error(logErr))
162-
if finalErr == nil {
163-
finalErr = logErr
164-
} else {
165-
finalErr = errors.Join(finalErr, logErr)
228+
229+
// Publish delivery log
230+
if logErr := h.logMQ.Publish(ctx, deliveryEvent); logErr != nil {
231+
h.logger.Ctx(ctx).Error("failed to publish delivery log", zap.Error(logErr))
232+
if err != nil {
233+
return &PostDeliveryError{err: errors.Join(err, logErr)}
166234
}
235+
return &PostDeliveryError{err: logErr}
236+
}
237+
238+
// If we have a DeliveryError, return it as is
239+
var delErr *DeliveryError
240+
if errors.As(err, &delErr) {
241+
return err
242+
}
243+
244+
// If we have a PreDeliveryError, return it as is
245+
var preErr *PreDeliveryError
246+
if errors.As(err, &preErr) {
247+
return err
248+
}
249+
250+
// For any other error, wrap it in PostDeliveryError
251+
if err != nil {
252+
return &PostDeliveryError{err: err}
167253
}
168-
if finalErr != nil {
169-
span.RecordError(finalErr)
254+
255+
return nil
256+
}
257+
258+
func (h *messageHandler) shouldScheduleRetry(deliveryEvent models.DeliveryEvent, err error) bool {
259+
if deliveryEvent.Manual {
260+
return false
170261
}
171-
return finalErr
262+
if !deliveryEvent.Event.EligibleForRetry {
263+
return false
264+
}
265+
if _, ok := err.(*destregistry.ErrDestinationPublishAttempt); !ok {
266+
return false
267+
}
268+
// Attempt starts at 0 for initial attempt, so we can compare directly
269+
return deliveryEvent.Attempt < h.retryMaxLimit
172270
}
173271

174-
// QUESTION: What if an internal error happens AFTER deliverying the message (doesn't matter whether it's successful or not),
175-
// say logmq.Publish fails. Should that count as an attempt? What about an error BEFORE deliverying the message?
176-
// Should we write code to differentiate between these two types of errors (predeliveryErr and postdeliveryErr, for example)?
177-
func (h *messageHandler) checkError(err error, deliveryEvent models.DeliveryEvent) (shouldScheduleRetry, shouldNack bool) {
178-
if errors.Is(err, models.ErrDestinationDeleted) || errors.Is(err, errDestinationDisabled) {
179-
return false, false // ack
272+
func (h *messageHandler) shouldNackError(err error) bool {
273+
if err == nil {
274+
return false // Success case, always ack
180275
}
181276

182-
if _, ok := err.(*destregistry.ErrDestinationPublishAttempt); ok {
183-
if deliveryEvent.Event.EligibleForRetry && deliveryEvent.Attempt < h.retryMaxLimit {
184-
return true, false // schedule retry and ack
277+
// Handle pre-delivery errors (system errors)
278+
var preErr *PreDeliveryError
279+
if errors.As(err, &preErr) {
280+
// Don't nack if it's a permanent error
281+
if errors.Is(preErr.err, models.ErrDestinationDeleted) || errors.Is(preErr.err, errDestinationDisabled) {
282+
return false
283+
}
284+
return true // Nack other pre-delivery errors
285+
}
286+
287+
// Handle delivery errors
288+
var delErr *DeliveryError
289+
if errors.As(err, &delErr) {
290+
return h.shouldNackDeliveryError(delErr.err)
291+
}
292+
293+
// Handle post-delivery errors
294+
var postErr *PostDeliveryError
295+
if errors.As(err, &postErr) {
296+
// Check if this wraps a delivery error
297+
var delErr *DeliveryError
298+
if errors.As(postErr.err, &delErr) {
299+
return h.shouldNackDeliveryError(delErr.err)
185300
}
186-
return false, false // ack and accept failure
301+
return true // Nack other post-delivery errors
187302
}
188303

189-
return false, true // nack for system retry
304+
// For any other error type, nack for safety
305+
return true
306+
}
307+
308+
func (h *messageHandler) shouldNackDeliveryError(err error) bool {
309+
// Don't nack if it's a delivery attempt error (handled by retry scheduling)
310+
if _, ok := err.(*destregistry.ErrDestinationPublishAttempt); ok {
311+
return false
312+
}
313+
return true // Nack other delivery errors
190314
}
191315

192316
func (h *messageHandler) scheduleRetry(ctx context.Context, deliveryEvent models.DeliveryEvent) error {
@@ -218,26 +342,21 @@ func (h *messageHandler) ensureDeliveryEvent(ctx context.Context, deliveryEvent
218342
return nil
219343
}
220344

221-
func idempotencyKeyFromDeliveryEvent(deliveryEvent models.DeliveryEvent) string {
222-
return "idempotency:deliverymq:" + deliveryEvent.ID
223-
}
224-
225345
// ensurePublishableDestination ensures that the destination exists and is in a publishable state.
226346
// Returns an error if the destination is not found, deleted, disabled, or any other state that
227347
// would prevent publishing.
228348
func (h *messageHandler) ensurePublishableDestination(ctx context.Context, deliveryEvent models.DeliveryEvent) (*models.Destination, error) {
229-
logger := h.logger.Ctx(ctx)
230349
destination, err := h.entityStore.RetrieveDestination(ctx, deliveryEvent.Event.TenantID, deliveryEvent.DestinationID)
231350
if err != nil {
232-
logger.Error("failed to retrieve destination", zap.Error(err))
351+
h.logger.Ctx(ctx).Error("failed to retrieve destination", zap.Error(err))
233352
return nil, err
234353
}
235354
if destination == nil {
236-
logger.Error("destination not found")
355+
h.logger.Ctx(ctx).Error("destination not found")
237356
return nil, models.ErrDestinationNotFound
238357
}
239358
if destination.DisabledAt != nil {
240-
logger.Info("destination is disabled", zap.String("destination_id", destination.ID))
359+
h.logger.Ctx(ctx).Info("destination is disabled", zap.String("destination_id", destination.ID))
241360
return nil, errDestinationDisabled
242361
}
243362
return destination, nil

0 commit comments

Comments
 (0)