Skip to content

Commit e978f6c

Browse files
author
Sergiu Ghitea
committed
Improve notification queuing and re-queuing in notification processor
1 parent 870f247 commit e978f6c

File tree

1 file changed

+18
-4
lines changed

1 file changed

+18
-4
lines changed

driver/sql/projection_notification_processor.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,13 @@ func (b *projectionNotificationProcessor) Queue(ctx context.Context, notificatio
125125
return nil
126126
}
127127

128+
// ReQueue puts the notification again on the queue to be processed with a ValidAfter set
129+
func (b *projectionNotificationProcessor) ReQueue(ctx context.Context, notification *ProjectionNotification) error {
130+
notification.ValidAfter = time.Now().Add(b.retryDelay)
131+
132+
return b.Queue(ctx, notification)
133+
}
134+
128135
func (b *projectionNotificationProcessor) startProcessor(ctx context.Context, handler ProcessHandler) {
129136
ProcessorLoop:
130137
for {
@@ -134,14 +141,21 @@ ProcessorLoop:
134141
case <-ctx.Done():
135142
return
136143
case notification := <-b.queue:
137-
if notification != nil && notification.ValidAfter.After(time.Now()) {
138-
b.queue <- notification
139-
continue ProcessorLoop
144+
var queueFunc ProjectionTrigger
145+
if notification == nil {
146+
queueFunc = b.Queue
147+
} else {
148+
queueFunc = b.ReQueue
149+
150+
if notification.ValidAfter.After(time.Now()) {
151+
b.queue <- notification
152+
continue ProcessorLoop
153+
}
140154
}
141155

142156
// Execute the notification
143157
b.metrics.StartNotificationProcessing(notification)
144-
if err := handler(ctx, notification, b.Queue); err != nil {
158+
if err := handler(ctx, notification, queueFunc); err != nil {
145159
b.logger.Error("the ProcessHandler produced an error", func(e goengine.LoggerEntry) {
146160
e.Error(err)
147161
e.Any("notification", notification)

0 commit comments

Comments
 (0)