Skip to content

Commit bbd7abd

Browse files
authored
refactor: improve event reconcile (#3602)
1 parent 5ad0701 commit bbd7abd

File tree

2 files changed

+36
-146
lines changed

2 files changed

+36
-146
lines changed

openmeter/notification/eventhandler/reconcile.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func (h *Handler) Reconcile(ctx context.Context) error {
114114
// TODO: run reconciliation in parallel (goroutines)
115115
if err = h.reconcileEvent(ctx, &event); err != nil {
116116
errs = append(errs,
117-
fmt.Errorf("failed to reconcile notification event [namespace=%s event.id=%s]: %w",
117+
fmt.Errorf("failed to reconcile notification event [namespace=%s notification.event.id=%s]: %w",
118118
event.Namespace, event.ID, err),
119119
)
120120
}

openmeter/notification/eventhandler/webhook.go

Lines changed: 35 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -121,146 +121,14 @@ func (h *Handler) reconcileWebhookEvent(ctx context.Context, event *notification
121121
break
122122
}
123123

124-
switch {
125-
case msg != nil:
126-
// Event fetched from the provider successfully, however, the event delivery states might be missing in case
127-
// the provider has not populated the delivery statuses mostly because the event has not been processed yet.
128-
129-
span.AddEvent("webhook message fetched from provider",
130-
trace.WithAttributes(spanAttrs...),
131-
trace.WithAttributes(deliveryStatusAttrs...),
132-
)
133-
134-
attempts := status.Attempts
135-
state := notification.EventDeliveryStatusStateSending
136-
nextAttempt := (*time.Time)(nil)
137-
138-
msgStatusByChannel := getDeliveryStatusByChannelID(lo.FromPtr(msg.DeliveryStatuses), status.ChannelID)
139-
140-
if msgStatusByChannel != nil {
141-
attempts = msgStatusByChannel.Attempts
142-
state = msgStatusByChannel.State
143-
nextAttempt = msgStatusByChannel.NextAttempt
144-
}
145-
146-
input = &notification.UpdateEventDeliveryStatusInput{
147-
NamespacedID: status.NamespacedID,
148-
State: state,
149-
Annotations: status.Annotations,
150-
NextAttempt: nextAttempt,
151-
Attempts: attempts,
152-
}
153-
case webhook.IsNotFoundError(err):
154-
// The provider returned a not found error, which means that either the event has not been dispatched to the provider
155-
// or the provider has not processed the event yet. The latter case can be verified by resending the message and checking
156-
// if the message already exists error is returned indicating that the message has been dispatched.
157-
124+
// The provider returned a not found error, which means that either the event has not been dispatched to the provider
125+
// or the provider has not processed the event yet. The latter case can be verified by resending the message and checking
126+
// if the message already exists error is returned indicating that the message has been dispatched.
127+
if webhook.IsNotFoundError(err) {
158128
msg, err = h.sendWebhookMessage(ctx, event)
129+
}
159130

160-
switch {
161-
case webhook.IsMessageAlreadyExistsError(err):
162-
// Event is sent to the provider but has not been processed yet. Keep it in pending state and update the next attempt.
163-
164-
span.AddEvent("webhook message is already sent to provider but it has not been processed",
165-
trace.WithAttributes(spanAttrs...),
166-
trace.WithAttributes(deliveryStatusAttrs...),
167-
)
168-
169-
input = &notification.UpdateEventDeliveryStatusInput{
170-
NamespacedID: status.NamespacedID,
171-
State: notification.EventDeliveryStatusStatePending,
172-
Annotations: status.Annotations,
173-
NextAttempt: nil,
174-
Attempts: status.Attempts,
175-
}
176-
case webhook.IsUnrecoverableError(err):
177-
// Unrecoverable error happened, no retry is possible.
178-
179-
span.AddEvent("fetching webhook message from provider returned unrecoverable error",
180-
trace.WithAttributes(spanAttrs...),
181-
trace.WithAttributes(deliveryStatusAttrs...),
182-
)
183-
184-
span.RecordError(err, trace.WithAttributes(spanAttrs...), trace.WithAttributes(deliveryStatusAttrs...))
185-
186-
h.logger.ErrorContext(ctx, "fetching webhook message from provider returned unrecoverable error",
187-
"error", err.Error(), "delivery_status.state", status.State, "namespace", event.Namespace, "event_id", event.ID)
188-
189-
input = &notification.UpdateEventDeliveryStatusInput{
190-
NamespacedID: status.NamespacedID,
191-
State: notification.EventDeliveryStatusStateFailed,
192-
Reason: ErrSystemUnrecoverableError.Error(),
193-
Annotations: status.Annotations,
194-
NextAttempt: nil,
195-
Attempts: status.Attempts,
196-
}
197-
case err != nil:
198-
// Transient error happened, retry after a short delay.
199-
200-
span.AddEvent("fetching webhook message from provider returned transient error",
201-
trace.WithAttributes(spanAttrs...),
202-
trace.WithAttributes(deliveryStatusAttrs...),
203-
)
204-
205-
span.RecordError(err, trace.WithAttributes(spanAttrs...), trace.WithAttributes(deliveryStatusAttrs...))
206-
207-
h.logger.WarnContext(ctx, "fetching webhook message from provider returned transient error",
208-
"error", err.Error(),
209-
"notification.delivery_status.state", status.State,
210-
"namespace", event.Namespace,
211-
"notification.event.id", event.ID,
212-
)
213-
214-
retryAfter := h.reconcileInterval
215-
216-
rErr, ok := lo.ErrorsAs[webhook.RetryableError](err)
217-
if ok {
218-
retryAfter = rErr.RetryAfter()
219-
}
220-
221-
input = &notification.UpdateEventDeliveryStatusInput{
222-
NamespacedID: status.NamespacedID,
223-
State: notification.EventDeliveryStatusStatePending,
224-
Reason: ErrSystemRecoverableError.Error(),
225-
Annotations: status.Annotations,
226-
NextAttempt: lo.ToPtr(now.Add(retryAfter)),
227-
Attempts: status.Attempts,
228-
}
229-
case msg != nil:
230-
// This happens when an event is ment to be dispatched to multiple channels therefore,
231-
// it has multiple corresponding delivery statuses, and one of them already triggered
232-
// the dispatch of the event via *sendWebhookMessage* API.
233-
234-
span.AddEvent("webhook message is sent to provider",
235-
trace.WithAttributes(spanAttrs...),
236-
trace.WithAttributes(deliveryStatusAttrs...),
237-
)
238-
239-
attempts := status.Attempts
240-
state := notification.EventDeliveryStatusStateSending
241-
nextAttempt := (*time.Time)(nil)
242-
243-
msgStatusByChannel := getDeliveryStatusByChannelID(lo.FromPtr(msg.DeliveryStatuses), status.ChannelID)
244-
245-
if msgStatusByChannel != nil {
246-
attempts = msgStatusByChannel.Attempts
247-
state = msgStatusByChannel.State
248-
nextAttempt = msgStatusByChannel.NextAttempt
249-
}
250-
251-
input = &notification.UpdateEventDeliveryStatusInput{
252-
NamespacedID: status.NamespacedID,
253-
State: state,
254-
Annotations: status.Annotations,
255-
NextAttempt: nextAttempt,
256-
Attempts: attempts,
257-
}
258-
default:
259-
errs = append(errs,
260-
fmt.Errorf("unhandled reconciling state after dispatching event [namespace=%s notification.delivery_status.id=%s notification.delivery_status.state=%s]",
261-
status.Namespace, status.ID, status.State.String()),
262-
)
263-
}
131+
switch {
264132
case webhook.IsMessageAlreadyExistsError(err):
265133
// Event is sent to the provider but has not been processed yet. Keep it in pending state and update the next attempt.
266134

@@ -329,6 +197,34 @@ func (h *Handler) reconcileWebhookEvent(ctx context.Context, event *notification
329197
NextAttempt: lo.ToPtr(now.Add(retryAfter)),
330198
Attempts: status.Attempts,
331199
}
200+
case msg != nil:
201+
// Event fetched from the provider successfully, however, the event delivery states might be missing in case
202+
// the provider has not populated the delivery statuses mostly because the event has not been processed yet.
203+
204+
span.AddEvent("webhook message fetched from provider",
205+
trace.WithAttributes(spanAttrs...),
206+
trace.WithAttributes(deliveryStatusAttrs...),
207+
)
208+
209+
attempts := status.Attempts
210+
state := notification.EventDeliveryStatusStateSending
211+
nextAttempt := (*time.Time)(nil)
212+
213+
msgStatusByChannel := getDeliveryStatusByChannelID(lo.FromPtr(msg.DeliveryStatuses), status.ChannelID)
214+
215+
if msgStatusByChannel != nil {
216+
attempts = msgStatusByChannel.Attempts
217+
state = msgStatusByChannel.State
218+
nextAttempt = msgStatusByChannel.NextAttempt
219+
}
220+
221+
input = &notification.UpdateEventDeliveryStatusInput{
222+
NamespacedID: status.NamespacedID,
223+
State: state,
224+
Annotations: status.Annotations,
225+
NextAttempt: nextAttempt,
226+
Attempts: attempts,
227+
}
332228
default:
333229
errs = append(errs,
334230
fmt.Errorf("unhandled reconciling state [namespace=%s notification.delivery_status.id=%s notification.delivery_status.state=%s]",
@@ -514,7 +410,6 @@ func (h *Handler) reconcileWebhookEvent(ctx context.Context, event *notification
514410
Attempts: msgStatusByChannel.Attempts,
515411
}
516412
default:
517-
span.RecordError(err, trace.WithAttributes(spanAttrs...))
518413
errs = append(errs, fmt.Errorf("unhandled reconciling state: %s", status.State.String()))
519414
}
520415
case notification.EventDeliveryStatusStateSuccess, notification.EventDeliveryStatusStateFailed:
@@ -539,12 +434,7 @@ func (h *Handler) reconcileWebhookEvent(ctx context.Context, event *notification
539434
}
540435
}
541436

542-
err = errors.Join(errs...)
543-
if err != nil {
544-
h.logger.ErrorContext(ctx, "reconciling webhook event has errors", "errors", errs)
545-
}
546-
547-
return err
437+
return errors.Join(errs...)
548438
}
549439

550440
return tracex.StartWithNoValue(ctx, h.tracer, "event_handler.reconcile_event.webhook").Wrap(fn)

0 commit comments

Comments
 (0)