Skip to content

Commit 870f247

Browse files
author
Sergiu Ghitea
committed
Move retryDelay to projection notification processor
1 parent 7db03c5 commit 870f247

File tree

2 files changed

+11
-7
lines changed

2 files changed

+11
-7
lines changed

driver/sql/projection_notification_processor.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ type (
2323

2424
logger goengine.Logger
2525
metrics Metrics
26+
27+
retryDelay time.Duration
2628
}
2729

2830
// ProcessHandler is a func used to trigger a notification but with the addition of providing a Trigger func so
@@ -31,7 +33,13 @@ type (
3133
)
3234

3335
// newBackgroundProcessor create a new projectionNotificationProcessor
34-
func newBackgroundProcessor(queueProcessors, queueBuffer int, logger goengine.Logger, metrics Metrics) (*projectionNotificationProcessor, error) {
36+
func newBackgroundProcessor(
37+
queueProcessors,
38+
queueBuffer int,
39+
logger goengine.Logger,
40+
metrics Metrics,
41+
retryDelay time.Duration,
42+
) (*projectionNotificationProcessor, error) {
3543
if queueProcessors <= 0 {
3644
return nil, errors.New("queueProcessors must be greater then zero")
3745
}
@@ -50,6 +58,7 @@ func newBackgroundProcessor(queueProcessors, queueBuffer int, logger goengine.Lo
5058
queueBuffer: queueBuffer,
5159
logger: logger,
5260
metrics: metrics,
61+
retryDelay: retryDelay,
5362
}, nil
5463
}
5564

driver/sql/projector_aggregate.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ type AggregateProjector struct {
2525
db *sql.DB
2626

2727
logger goengine.Logger
28-
29-
retryDelay time.Duration
3028
}
3129

3230
// NewAggregateProjector creates a new projector for a projection
@@ -63,7 +61,7 @@ func NewAggregateProjector(
6361
e.String("projection", projection.Name())
6462
})
6563

66-
processor, err := newBackgroundProcessor(10, 32, logger, metrics)
64+
processor, err := newBackgroundProcessor(10, 32, logger, metrics, retryDelay)
6765
if err != nil {
6866
return nil, err
6967
}
@@ -96,8 +94,6 @@ func NewAggregateProjector(
9694
db: db,
9795

9896
logger: logger,
99-
100-
retryDelay: retryDelay,
10197
}, nil
10298
}
10399

@@ -166,7 +162,6 @@ func (a *AggregateProjector) processNotification(
166162
return nil
167163
case errorRetry:
168164
a.logger.Debug("ProcessHandler->ErrorHandler: re-queueing notification", logFields)
169-
notification.ValidAfter = time.Now().Add(a.retryDelay)
170165
return queue(ctx, notification)
171166
}
172167

0 commit comments

Comments
 (0)