Skip to content

Commit f17f5ba

Browse files
authored
fix: delivery status sync (#3604)
1 parent bbd7abd commit f17f5ba

File tree

2 files changed

+18
-43
lines changed

2 files changed

+18
-43
lines changed

openmeter/notification/eventhandler/webhook.go

Lines changed: 18 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -355,49 +355,30 @@ func (h *Handler) reconcileWebhookEvent(ctx context.Context, event *notification
355355
break
356356
}
357357

358+
// Get the message delivery status for the current channel.
358359
msgStatusByChannel := getDeliveryStatusByChannelID(lo.FromPtr(msg.DeliveryStatuses), status.ChannelID)
359360

361+
// If message delivery status not found, skip reconciling the delivery status as
362+
// the provider might not populate the delivery status yet. The delivery status will be eventually
363+
// reconciled either by the information provided by the provider or by setting the delivery status
364+
// to 'FAILED' after the sending timeout is reached.
360365
if msgStatusByChannel == nil {
361-
h.logger.ErrorContext(ctx, "no channel found for webhook message delivery status",
362-
"namespace", event.Namespace,
363-
"notification.event.id", event.ID,
364-
"notification.delivery_status.id", status.ID)
365-
366-
input = &notification.UpdateEventDeliveryStatusInput{
367-
NamespacedID: status.NamespacedID,
368-
State: notification.EventDeliveryStatusStateFailed,
369-
Reason: ErrSystemUnrecoverableError.Error(),
370-
Annotations: status.Annotations,
371-
NextAttempt: nil,
372-
Attempts: status.Attempts,
373-
}
374-
375366
break
376367
}
377368

378-
if next := lo.FromPtr(status.NextAttempt); !next.IsZero() {
379-
// Check if attempts are synchronized between the webhook provider and OpenMeter.
380-
var inSync bool
381-
382-
for _, msgAttempt := range msgStatusByChannel.Attempts {
383-
if msgAttempt.Timestamp.Compare(next) != -1 {
384-
inSync = true
385-
break
386-
}
387-
}
388-
389-
if !inSync {
390-
span.AddEvent("delivery state is out of sync", trace.WithAttributes(spanAttrs...))
391-
h.logger.DebugContext(ctx, "delivery state is out of sync")
392-
393-
input = &notification.UpdateEventDeliveryStatusInput{
394-
NamespacedID: status.NamespacedID,
395-
State: notification.EventDeliveryStatusStateSending,
396-
Annotations: status.Annotations.Merge(msg.Annotations),
397-
NextAttempt: status.NextAttempt,
398-
Attempts: msgStatusByChannel.Attempts,
399-
}
400-
369+
// Check if attempts are synchronized between the webhook provider and OpenMeter.
370+
// This check is required to avoid transitioning the delivery status to its final state
371+
// ('SUCCESS' or 'FAILED') before all the attempts are collected from the provider.
372+
// It is possible that the provider reports that the message delivery status is 'SUCCESS' or 'FAILED'
373+
// while the corresponding delivery attempt is not yet available. Therefore, we need to keep reconciling
374+
// the delivery status until all the attempts are collected.
375+
if msgStatusByChannel.State == notification.EventDeliveryStatusStateSuccess ||
376+
msgStatusByChannel.State == notification.EventDeliveryStatusStateFailed {
377+
if len(status.Attempts) >= len(msgStatusByChannel.Attempts) {
378+
// The list of delivery attempts from the message (returned by the provider) must have at least one extra item
379+
// compared to the list of delivery status has when the message delivery status is in one of the final states.
380+
// Wait until the next reconciliation attempt to reconcile the delivery status to ensure we collected all the
381+
// delivery attempts from the provider.
401382
break
402383
}
403384
}

openmeter/notification/webhook/svix/message.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"net/http"
7-
"strings"
87
"time"
98

109
"github.com/samber/lo"
@@ -332,11 +331,6 @@ func (h svixHandler) ResendMessage(ctx context.Context, params webhook.ResendMes
332331
IdempotencyKey: &idempotencyKey,
333332
})
334333
if err = internal.WrapSvixError(err); err != nil {
335-
// NOTE(chrisgacsal): this is a workaround for a bug in svix-webhooks. Remove this after upstream released the fix.
336-
if strings.Contains(err.Error(), "unexpected end of JSON input") {
337-
return nil
338-
}
339-
340334
return fmt.Errorf("failed to resend message [svix.app=%s svix.message.id=%s svix.endpoint.uid=%s]: %w",
341335
params.Namespace, params.EventID, params.ChannelID, err)
342336
}

0 commit comments

Comments
 (0)