Skip to content

Commit 9d14100

Browse files
authored
refactor: improve river test retries (#1843)
* refactor: improve river test retries * chore: remove extra helper util
1 parent f66ca42 commit 9d14100

File tree

5 files changed

+54
-103
lines changed

5 files changed

+54
-103
lines changed

internal/db/db.go

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,6 @@ func (d *Database) migrateFromSource(ctx context.Context, fs embed.FS, sqlPath s
119119
logger := logger.MigrationLogger{Logger: zapctx.Logger(ctx)}
120120
m.Log = logger
121121

122-
if err := d.handleDeprecatedMigrations(ctx, m); err != nil {
123-
return fmt.Errorf("failed to handle deprecated migrations: %w", err)
124-
}
125-
126122
v, dirty, err := m.Version()
127123
if err != nil {
128124
if !stderr.Is(err, migrate.ErrNilVersion) {
@@ -149,30 +145,6 @@ func (d *Database) migrateFromSource(ctx context.Context, fs embed.FS, sqlPath s
149145
return nil
150146
}
151147

152-
// This method is used for handling deployments that are live when we made the
153-
// switch from a home-grown migration library to golang-migrate. To avoid running
154-
// migrations twice, we check if the old "versions" table exists and make golang-migrate
155-
// aware of which migrations have been run using the Force() method.
156-
func (d *Database) handleDeprecatedMigrations(ctx context.Context, m *migrate.Migrate) error {
157-
var version int
158-
err := d.DB.Raw("SELECT minor FROM versions;").Row().Scan(&version)
159-
if err != nil {
160-
// The versions table may already be deleted. Other errors are ignored intentionally.
161-
zapctx.Debug(ctx, "no minor version from deprecated migrations table", zap.Error(err))
162-
//nolint:nilerr
163-
return nil
164-
}
165-
if version == 0 {
166-
return nil
167-
}
168-
zapctx.Debug(ctx, "forcing db version", zap.Int("version", version))
169-
err = m.Force(version)
170-
if err != nil {
171-
return err
172-
}
173-
return nil
174-
}
175-
176148
// ready checks that the database is ready to accept requests. An error is
177149
// returned if the database is not yet initialised.
178150
func (d *Database) ready() error {

internal/river/migrationworker_test.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ func TestMigrationWorker(t *testing.T) {
2424

2525
upgradeManager := NewMockUpgradeManager(ctrl)
2626

27-
db := setupTestDB(c)
28-
sqlDb, err := db.SqlDB()
29-
c.Assert(err, qt.IsNil)
27+
db, sqlDb := setupTestDB(c)
3028

3129
openfgaClient := &openfga.OFGAClient{}
3230
w, err := newMigrationWorker(openfgaClient, db, upgradeManager)
@@ -45,6 +43,7 @@ func TestMigrationWorker(t *testing.T) {
4543

4644
tx, err := sqlDb.Begin()
4745
c.Assert(err, qt.IsNil)
46+
defer func() { _ = tx.Rollback() }()
4847

4948
result, err := testWorker.Work(
5049
c.Context(),
@@ -71,9 +70,7 @@ func TestMigrationWorker_Error(t *testing.T) {
7170

7271
upgradeManager := NewMockUpgradeManager(ctrl)
7372

74-
db := setupTestDB(c)
75-
sqlDb, err := db.SqlDB()
76-
c.Assert(err, qt.IsNil)
73+
db, sqlDb := setupTestDB(c)
7774

7875
openfgaClient := &openfga.OFGAClient{}
7976
w, err := newMigrationWorker(openfgaClient, db, upgradeManager)
@@ -92,8 +89,7 @@ func TestMigrationWorker_Error(t *testing.T) {
9289

9390
tx, err := sqlDb.Begin()
9491
c.Assert(err, qt.IsNil)
95-
96-
c.Assert(err, qt.IsNil)
92+
defer func() { _ = tx.Rollback() }()
9793
result, err := testWorker.Work(
9894
c.Context(),
9995
c.TB,

internal/river/upgradetoworker_test.go

Lines changed: 29 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ func TestUpgradeToWorker_Success(t *testing.T) {
2929
ctrl := gomock.NewController(c)
3030
defer ctrl.Finish()
3131

32-
database := setupTestDB(c)
33-
sqlDB, err := database.SqlDB()
34-
c.Assert(err, qt.IsNil)
32+
database, sqlDb := setupTestDB(c)
3533

3634
upgradeManager := NewMockUpgradeManager(ctrl)
3735

@@ -41,7 +39,7 @@ func TestUpgradeToWorker_Success(t *testing.T) {
4139
setupWorkerParams{
4240
database: database,
4341
upgradeManager: upgradeManager,
44-
sqlDB: sqlDB,
42+
sqlDB: sqlDb,
4543
migrateRetryCount: 1,
4644
upgradeRetryCount: 1,
4745
awaitFunc: waitForJobToFinalise,
@@ -66,7 +64,7 @@ func TestUpgradeToWorker_Success(t *testing.T) {
6664
}, nil)
6765
c.Assert(err, qt.IsNil)
6866

69-
row := waitForFinalisedJob(c, ctx, sub, insRes)
67+
row := waitForFinalisedJob(c, ctx, sub, insRes.Job.ID)
7068
c.Assert(row.State, qt.Equals, rivertype.JobStateCompleted)
7169
c.Assert(row.Errors, qt.HasLen, 0)
7270
}
@@ -78,9 +76,7 @@ func TestUpgradeToWorker_SuccessCanBeUpgradedToAgain(t *testing.T) {
7876
ctrl := gomock.NewController(c)
7977
defer ctrl.Finish()
8078

81-
database := setupTestDB(c)
82-
sqlDB, err := database.SqlDB()
83-
c.Assert(err, qt.IsNil)
79+
database, sqlDb := setupTestDB(c)
8480

8581
upgradeManager := NewMockUpgradeManager(ctrl)
8682

@@ -90,7 +86,7 @@ func TestUpgradeToWorker_SuccessCanBeUpgradedToAgain(t *testing.T) {
9086
setupWorkerParams{
9187
database: database,
9288
upgradeManager: upgradeManager,
93-
sqlDB: sqlDB,
89+
sqlDB: sqlDb,
9490
migrateRetryCount: 1,
9591
upgradeRetryCount: 1,
9692
awaitFunc: waitForJobToFinalise,
@@ -115,7 +111,7 @@ func TestUpgradeToWorker_SuccessCanBeUpgradedToAgain(t *testing.T) {
115111
}, nil)
116112
c.Assert(err, qt.IsNil)
117113

118-
row := waitForFinalisedJob(c, ctx, sub, insRes)
114+
row := waitForFinalisedJob(c, ctx, sub, insRes.Job.ID)
119115
c.Assert(row.ID, qt.Equals, int64(1))
120116
c.Assert(row.State, qt.Equals, rivertype.JobStateCompleted)
121117
c.Assert(row.Errors, qt.HasLen, 0)
@@ -136,7 +132,7 @@ func TestUpgradeToWorker_SuccessCanBeUpgradedToAgain(t *testing.T) {
136132
}, nil)
137133
c.Assert(err, qt.IsNil)
138134

139-
row = waitForFinalisedJob(c, ctx, sub, insRes)
135+
row = waitForFinalisedJob(c, ctx, sub, insRes.Job.ID)
140136
c.Assert(row.ID, qt.Equals, int64(4))
141137
c.Assert(row.State, qt.Equals, rivertype.JobStateCompleted)
142138
c.Assert(row.Errors, qt.HasLen, 0)
@@ -164,9 +160,7 @@ func TestUpgradeToWorker_MigrationFails(t *testing.T) {
164160
ctrl := gomock.NewController(c)
165161
defer ctrl.Finish()
166162

167-
database := setupTestDB(c)
168-
sqlDB, err := database.SqlDB()
169-
c.Assert(err, qt.IsNil)
163+
database, sqlDb := setupTestDB(c)
170164

171165
upgradeManager := NewMockUpgradeManager(ctrl)
172166

@@ -177,7 +171,7 @@ func TestUpgradeToWorker_MigrationFails(t *testing.T) {
177171
setupWorkerParams{
178172
database: database,
179173
upgradeManager: upgradeManager,
180-
sqlDB: sqlDB,
174+
sqlDB: sqlDb,
181175
migrateRetryCount: 3,
182176
upgradeRetryCount: 1,
183177
awaitFunc: waitForJobToFinalise,
@@ -204,7 +198,7 @@ func TestUpgradeToWorker_MigrationFails(t *testing.T) {
204198
}, &river.InsertOpts{MaxAttempts: 1})
205199
c.Assert(err, qt.IsNil)
206200

207-
row := waitForFinalisedJob(c, ctx, sub, insRes)
201+
row := waitForFinalisedJob(c, ctx, sub, insRes.Job.ID)
208202
c.Assert(row.State, qt.Equals, rivertype.JobStateDiscarded)
209203
// Ensure we capture the last error only from the migrate job, and that it is surfaced to the upgrade to job.
210204
upgradeToJobFinalError := row.Errors[len(row.Errors)-1].Error
@@ -218,9 +212,7 @@ func TestUpgradeToWorker_UpgradeFails(t *testing.T) {
218212
ctrl := gomock.NewController(c)
219213
defer ctrl.Finish()
220214

221-
database := setupTestDB(c)
222-
sqlDB, err := database.SqlDB()
223-
c.Assert(err, qt.IsNil)
215+
database, sqlDb := setupTestDB(c)
224216

225217
upgradeManager := NewMockUpgradeManager(ctrl)
226218

@@ -231,7 +223,7 @@ func TestUpgradeToWorker_UpgradeFails(t *testing.T) {
231223
setupWorkerParams{
232224
database: database,
233225
upgradeManager: upgradeManager,
234-
sqlDB: sqlDB,
226+
sqlDB: sqlDb,
235227
migrateRetryCount: 1,
236228
upgradeRetryCount: 3,
237229
awaitFunc: waitForJobToFinalise,
@@ -262,7 +254,7 @@ func TestUpgradeToWorker_UpgradeFails(t *testing.T) {
262254
}, &river.InsertOpts{MaxAttempts: 1})
263255
c.Assert(err, qt.IsNil)
264256

265-
row := waitForFinalisedJob(c, ctx, sub, insRes)
257+
row := waitForFinalisedJob(c, ctx, sub, insRes.Job.ID)
266258
c.Assert(row.State, qt.Equals, rivertype.JobStateDiscarded)
267259
// Ensure we capture the last error only from the migrate job, and that it is surfaced to the upgrade to job.
268260
upgradeToJobFinalError := row.Errors[len(row.Errors)-1].Error
@@ -277,9 +269,7 @@ func TestUpgradeToWorker_SuccessAfterTransientFailures(t *testing.T) {
277269
ctrl := gomock.NewController(c)
278270
defer ctrl.Finish()
279271

280-
database := setupTestDB(c)
281-
sqlDB, err := database.SqlDB()
282-
c.Assert(err, qt.IsNil)
272+
database, sqlDb := setupTestDB(c)
283273

284274
upgradeManager := NewMockUpgradeManager(ctrl)
285275

@@ -290,7 +280,7 @@ func TestUpgradeToWorker_SuccessAfterTransientFailures(t *testing.T) {
290280
setupWorkerParams{
291281
database: database,
292282
upgradeManager: upgradeManager,
293-
sqlDB: sqlDB,
283+
sqlDB: sqlDb,
294284
migrateRetryCount: 2,
295285
upgradeRetryCount: 2,
296286
awaitFunc: waitForJobToFinalise,
@@ -333,7 +323,7 @@ func TestUpgradeToWorker_SuccessAfterTransientFailures(t *testing.T) {
333323
}, nil)
334324
c.Assert(err, qt.IsNil)
335325

336-
row := waitForFinalisedJob(c, ctx, sub, insRes)
326+
row := waitForFinalisedJob(c, ctx, sub, insRes.Job.ID)
337327
c.Assert(row.State, qt.Equals, rivertype.JobStateCompleted)
338328
c.Assert(row.Errors, qt.HasLen, 0)
339329
}
@@ -345,9 +335,7 @@ func TestUpgradeToWorker_EnsureCancellingSupervisorCancelsSpawnedMigrateJob(t *t
345335
ctrl := gomock.NewController(c)
346336
defer ctrl.Finish()
347337

348-
database := setupTestDB(c)
349-
sqlDB, err := database.SqlDB()
350-
c.Assert(err, qt.IsNil)
338+
database, sqlDb := setupTestDB(c)
351339

352340
upgradeManager := NewMockUpgradeManager(ctrl)
353341

@@ -359,7 +347,7 @@ func TestUpgradeToWorker_EnsureCancellingSupervisorCancelsSpawnedMigrateJob(t *t
359347
setupWorkerParams{
360348
database: database,
361349
upgradeManager: upgradeManager,
362-
sqlDB: sqlDB,
350+
sqlDB: sqlDb,
363351
migrateRetryCount: 3,
364352
upgradeRetryCount: 1,
365353
awaitFunc: waitForJobToFinalise,
@@ -388,15 +376,15 @@ func TestUpgradeToWorker_EnsureCancellingSupervisorCancelsSpawnedMigrateJob(t *t
388376
sub, cancel := riverClient.Subscribe(river.EventKindJobFailed, river.EventKindJobCompleted)
389377
c.Cleanup(cancel)
390378

391-
_, err = riverClient.Insert(ctx, UpgradeToArgs{
379+
_, err := riverClient.Insert(ctx, UpgradeToArgs{
392380
ModelUUID: "model-uuid",
393381
TargetVersion: version.MustParse("2.0.0"),
394382
Username: username,
395383
TargetControllerName: "target-controller",
396384
}, nil)
397385
c.Assert(err, qt.IsNil)
398386

399-
supervisingJobFailureUpdate := waitForSupervisingJob(c, ctx, sub, supervisingJobId)
387+
supervisingJobFailureUpdate := waitForFinalisedJob(c, ctx, sub, supervisingJobId)
400388

401389
// At this point, our job is cancelled and we'll see it as "completed".
402390
c.Assert(supervisingJobFailureUpdate.State, qt.Equals, rivertype.JobStateCompleted)
@@ -419,9 +407,7 @@ func TestUpgradeToWorker_SupervisorHandlesCrashMidway(t *testing.T) {
419407
ctrl := gomock.NewController(c)
420408
defer ctrl.Finish()
421409

422-
database := setupTestDB(c)
423-
sqlDB, err := database.SqlDB()
424-
c.Assert(err, qt.IsNil)
410+
database, sqlDb := setupTestDB(c)
425411

426412
upgradeManager := NewMockUpgradeManager(ctrl)
427413

@@ -438,7 +424,7 @@ func TestUpgradeToWorker_SupervisorHandlesCrashMidway(t *testing.T) {
438424
setupWorkerParams{
439425
database: database,
440426
upgradeManager: upgradeManager,
441-
sqlDB: sqlDB,
427+
sqlDB: sqlDb,
442428
migrateRetryCount: 1,
443429
upgradeRetryCount: 1,
444430
awaitFunc: func(ctx context.Context, result *rivertype.JobInsertResult, eventCh <-chan *river.Event) error {
@@ -489,7 +475,7 @@ func TestUpgradeToWorker_SupervisorHandlesCrashMidway(t *testing.T) {
489475
// 5. Waits for the migrate to finalise, but this time, unblocks the migrate job just before waiting.
490476
// 6. 2nd try of supervisor finally completes.
491477
// And we expect to see the supervisor attempted twice, but migrate once.
492-
supervisorRow := waitForFinalisedJob(c, ctx, sub, insRes)
478+
supervisorRow := waitForFinalisedJob(c, ctx, sub, insRes.Job.ID)
493479
c.Assert(supervisorRow.State, qt.Equals, rivertype.JobStateCompleted)
494480
c.Assert(supervisorRow.Attempt, qt.Equals, 2)
495481

@@ -537,7 +523,8 @@ func setupWorkers(
537523
Queues: map[string]river.QueueConfig{
538524
river.QueueDefault: {MaxWorkers: 5},
539525
},
540-
Workers: workers,
526+
Workers: workers,
527+
RetryPolicy: &testRetryPolicy{},
541528
})
542529
c.Assert(err, qt.IsNil)
543530

@@ -550,33 +537,16 @@ func setupWorkers(
550537
return riverClient, u.Name
551538
}
552539

553-
func waitForFinalisedJob(c *qt.C, ctx context.Context, sub <-chan *river.Event, insRes *rivertype.JobInsertResult) *rivertype.JobRow {
554-
loop:
555-
for {
556-
select {
557-
case event := <-sub:
558-
c.Logf("received job failed event for job ID %d", event.Job.ID)
559-
if event.Job.ID != insRes.Job.ID {
560-
continue loop
561-
}
562-
if event.Job.FinalizedAt != nil {
563-
return event.Job
564-
}
565-
case <-ctx.Done():
566-
c.Fatal("timed out waiting for job failed event")
567-
}
568-
}
569-
}
570-
571-
func waitForSupervisingJob(c *qt.C, ctx context.Context, sub <-chan *river.Event, supervisingJobId int64) *rivertype.JobRow {
540+
func waitForFinalisedJob(c *qt.C, ctx context.Context, sub <-chan *river.Event, jobID int64) *rivertype.JobRow {
572541
for {
573542
select {
574543
case event := <-sub:
575-
if event.Job.ID == supervisingJobId {
544+
c.Logf("received job event for job ID %d", event.Job.ID)
545+
if event.Job.ID == jobID && event.Job.FinalizedAt != nil {
576546
return event.Job
577547
}
578548
case <-ctx.Done():
579-
c.Fatal("timed out waiting for job failed event")
549+
c.Fatal("timed out waiting for job event")
580550
}
581551
}
582552
}

internal/river/upgradeworker_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ func TestUpgradeWorker(t *testing.T) {
2121
ctrl := gomock.NewController(c)
2222
defer ctrl.Finish()
2323

24-
db := setupTestDB(c)
25-
sqlDb, err := db.SqlDB()
26-
c.Assert(err, qt.IsNil)
24+
_, sqlDb := setupTestDB(c)
2725

2826
upgradeManager := NewMockUpgradeManager(ctrl)
2927
w, err := newUpgradeWorker(upgradeManager)
@@ -37,6 +35,7 @@ func TestUpgradeWorker(t *testing.T) {
3735

3836
tx, err := sqlDb.Begin()
3937
c.Assert(err, qt.IsNil)
38+
defer func() { _ = tx.Rollback() }()
4039

4140
result, err := testWorker.Work(c.Context(), c.TB, tx, upgradeWorkerArgs{
4241
ModelUUID: "test-string", TargetVersion: version.MustParse("2.0.0")}, nil)
@@ -51,9 +50,7 @@ func TestUpgradeWorker_Error(t *testing.T) {
5150
ctrl := gomock.NewController(c)
5251
defer ctrl.Finish()
5352

54-
db := setupTestDB(c)
55-
sqlDb, err := db.SqlDB()
56-
c.Assert(err, qt.IsNil)
53+
_, sqlDb := setupTestDB(c)
5754

5855
upgradeManager := NewMockUpgradeManager(ctrl)
5956
w, err := newUpgradeWorker(upgradeManager)
@@ -67,6 +64,7 @@ func TestUpgradeWorker_Error(t *testing.T) {
6764

6865
tx, err := sqlDb.Begin()
6966
c.Assert(err, qt.IsNil)
67+
defer func() { _ = tx.Rollback() }()
7068

7169
result, err := testWorker.Work(c.Context(), c.TB, tx, upgradeWorkerArgs{
7270
ModelUUID: "test-string", TargetVersion: version.MustParse("2.0.0")}, nil)

0 commit comments

Comments
 (0)