Skip to content

Commit 2044c5d

Browse files
author
Sergiu Ghitea
committed
Add a validAfter for retrying notifications
1 parent 7c77ce0 commit 2044c5d

File tree

5 files changed

+25
-2
lines changed

5 files changed

+25
-2
lines changed

driver/sql/postgres/projector_aggregate_bench_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ func setup(
182182
},
183183
goengine.NopLogger,
184184
driverSQL.NopMetrics,
185+
0,
185186
)
186187
require.NoError(b, err, "failed to create aggregate projector")
187188

driver/sql/projection.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sql
33
import (
44
"context"
55
"database/sql"
6+
"time"
67

78
"github.com/hellofresh/goengine"
89
"github.com/mailru/easyjson/jlexer"
@@ -12,8 +13,9 @@ import (
1213
type (
1314
// ProjectionNotification is a representation of the data provided by database notify
1415
ProjectionNotification struct {
15-
No int64 `json:"no"`
16-
AggregateID string `json:"aggregate_id"`
16+
No int64 `json:"no"`
17+
AggregateID string `json:"aggregate_id"`
18+
ValidAfter time.Time `json:"valid_after"`
1719
}
1820

1921
// ProjectionTrigger triggers the notification for processing

driver/sql/projector_aggregate.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
"sync"
7+
"time"
78

89
"github.com/hellofresh/goengine/aggregate"
910
"github.com/hellofresh/goengine/metadata"
@@ -24,6 +25,8 @@ type AggregateProjector struct {
2425
db *sql.DB
2526

2627
logger goengine.Logger
28+
29+
retryDelay time.Duration
2730
}
2831

2932
// NewAggregateProjector creates a new projector for a projection
@@ -36,6 +39,7 @@ func NewAggregateProjector(
3639
projectionErrorHandler ProjectionErrorCallback,
3740
logger goengine.Logger,
3841
metrics Metrics,
42+
retryDelay time.Duration,
3943
) (*AggregateProjector, error) {
4044
switch {
4145
case db == nil:
@@ -76,6 +80,13 @@ func NewAggregateProjector(
7680
return nil, err
7781
}
7882

83+
if retryDelay == 0 {
84+
retryDelay, err = time.ParseDuration("50ms")
85+
if err != nil {
86+
return nil, err
87+
}
88+
}
89+
7990
return &AggregateProjector{
8091
backgroundProcessor: processor,
8192
executor: executor,
@@ -85,6 +96,8 @@ func NewAggregateProjector(
8596
db: db,
8697

8798
logger: logger,
99+
100+
retryDelay: retryDelay,
88101
}, nil
89102
}
90103

@@ -153,6 +166,7 @@ func (a *AggregateProjector) processNotification(
153166
return nil
154167
case errorRetry:
155168
a.logger.Debug("ProcessHandler->ErrorHandler: re-queueing notification", logFields)
169+
notification.ValidAfter = time.Now().Add(a.retryDelay)
156170
return queue(ctx, notification)
157171
}
158172

strategy/json/sql/postgres/manager.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package postgres
22

33
import (
44
"database/sql"
5+
"time"
56

67
"github.com/hellofresh/goengine"
78
driverSQL "github.com/hellofresh/goengine/driver/sql"
@@ -120,6 +121,7 @@ func (m *SingleStreamManager) NewAggregateProjector(
120121
projection goengine.Projection,
121122
projectionErrorHandler driverSQL.ProjectionErrorCallback,
122123
useLockedField bool,
124+
retryDelay time.Duration,
123125
) (*driverSQL.AggregateProjector, error) {
124126
eventStore, err := m.NewEventStore()
125127
if err != nil {
@@ -151,5 +153,6 @@ func (m *SingleStreamManager) NewAggregateProjector(
151153
projectionErrorHandler,
152154
m.logger,
153155
m.metrics,
156+
retryDelay,
154157
)
155158
}

test/projector_aggregate_integration_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ func (s *aggregateProjectorTestSuite) TestRunAndListen() {
111111
},
112112
s.GetLogger(),
113113
s.Metrics,
114+
0,
114115
)
115116
s.Require().NoError(err, "failed to create projector")
116117

@@ -209,6 +210,7 @@ func (s *aggregateProjectorTestSuite) TestRunAndListen() {
209210
},
210211
s.GetLogger(),
211212
s.Metrics,
213+
0,
212214
)
213215
s.Require().NoError(err, "failed to create projector")
214216

@@ -264,6 +266,7 @@ func (s *aggregateProjectorTestSuite) TestRun() {
264266
},
265267
s.GetLogger(),
266268
s.Metrics,
269+
0,
267270
)
268271
s.Require().NoError(err, "failed to create projector")
269272

0 commit comments

Comments
 (0)