Skip to content

Commit 71ff6f9

Browse files
craig[bot]HonoreDB
andcommitted
106438: changefeedccl: surface errors when job fails in alter changefeed tests r=[miretskiy] a=HonoreDB Alter changefeed tests use SucceedsSoon around checking various things to do with a job's progress, which leads to an unhelpful hardcoded error message if the job has actually failed entirely. This commit extracts the logic into a test helper and makes it surface job errors. Fixes: cockroachdb#102760 Co-authored-by: Aaron Zinger <[email protected]>
2 parents 3a52371 + 70c51c0 commit 71ff6f9

File tree

2 files changed

+24
-28
lines changed

2 files changed

+24
-28
lines changed

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,17 +1156,9 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {
11561156
_ = g.Wait()
11571157
}()
11581158

1159-
// Helper to read job progress
1160-
loadProgress := func() jobspb.Progress {
1161-
jobID := jobFeed.JobID()
1162-
job, err := jobRegistry.LoadJob(context.Background(), jobID)
1163-
require.NoError(t, err)
1164-
return job.Progress()
1165-
}
1166-
11671159
// Ensure initial backfill completes
11681160
testutils.SucceedsSoon(t, func() error {
1169-
prog := loadProgress()
1161+
prog := loadProgress(t, jobFeed, jobRegistry)
11701162
if p := prog.GetHighWater(); p != nil && !p.IsEmpty() {
11711163
return nil
11721164
}
@@ -1210,7 +1202,7 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {
12101202
}
12111203

12121204
// Check if we've set a checkpoint yet
1213-
progress := loadProgress()
1205+
progress := loadProgress(t, jobFeed, jobRegistry)
12141206
if p := progress.GetChangefeed(); p != nil && p.Checkpoint != nil && len(p.Checkpoint.Spans) > 0 {
12151207
initialCheckpoint.Add(p.Checkpoint.Spans...)
12161208
atomic.StoreInt32(&foundCheckpoint, 1)
@@ -1238,7 +1230,10 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {
12381230
if atomic.LoadInt32(&foundCheckpoint) != 0 {
12391231
return nil
12401232
}
1241-
return errors.New("waiting for checkpoint")
1233+
if err := jobFeed.FetchTerminalJobErr(); err != nil {
1234+
return err
1235+
}
1236+
return errors.Newf("waiting for checkpoint")
12421237
})
12431238

12441239
require.NoError(t, jobFeed.Pause())
@@ -1339,16 +1334,10 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
13391334
}()
13401335

13411336
jobFeed := testFeed.(cdctest.EnterpriseTestFeed)
1342-
loadProgress := func() jobspb.Progress {
1343-
jobID := jobFeed.JobID()
1344-
job, err := registry.LoadJob(context.Background(), jobID)
1345-
require.NoError(t, err)
1346-
return job.Progress()
1347-
}
13481337

13491338
// Wait for non-nil checkpoint.
13501339
testutils.SucceedsSoon(t, func() error {
1351-
progress := loadProgress()
1340+
progress := loadProgress(t, jobFeed, registry)
13521341
if p := progress.GetChangefeed(); p != nil && p.Checkpoint != nil && len(p.Checkpoint.Spans) > 0 {
13531342
return nil
13541343
}
@@ -1357,7 +1346,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
13571346

13581347
// Pause the job and read and verify the latest checkpoint information.
13591348
require.NoError(t, jobFeed.Pause())
1360-
progress := loadProgress()
1349+
progress := loadProgress(t, jobFeed, registry)
13611350
require.NotNil(t, progress.GetChangefeed())
13621351
h := progress.GetHighWater()
13631352
noHighWater := h == nil || h.IsEmpty()
@@ -1385,15 +1374,15 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
13851374

13861375
// Wait for the high water mark to be non-zero.
13871376
testutils.SucceedsSoon(t, func() error {
1388-
prog := loadProgress()
1377+
prog := loadProgress(t, jobFeed, registry)
13891378
if p := prog.GetHighWater(); p != nil && !p.IsEmpty() {
13901379
return nil
13911380
}
13921381
return errors.New("waiting for highwater")
13931382
})
13941383

13951384
// At this point, highwater mark should be set, and previous checkpoint should be gone.
1396-
progress = loadProgress()
1385+
progress = loadProgress(t, jobFeed, registry)
13971386
require.NotNil(t, progress.GetChangefeed())
13981387
require.Equal(t, 0, len(progress.GetChangefeed().Checkpoint.Spans))
13991388

@@ -1569,14 +1558,8 @@ func TestAlterChangefeedWithOldCursorFromCreateChangefeed(t *testing.T) {
15691558
castedFeed, ok := testFeed.(cdctest.EnterpriseTestFeed)
15701559
require.True(t, ok)
15711560

1572-
loadProgress := func() jobspb.Progress {
1573-
job, err := registry.LoadJob(context.Background(), castedFeed.JobID())
1574-
require.NoError(t, err)
1575-
return job.Progress()
1576-
}
1577-
15781561
testutils.SucceedsSoon(t, func() error {
1579-
progress := loadProgress()
1562+
progress := loadProgress(t, castedFeed, registry)
15801563
if hw := progress.GetHighWater(); hw != nil && cursor.LessEq(*hw) {
15811564
return nil
15821565
}

pkg/ccl/changefeedccl/helpers_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,19 @@ func expectNotice(t *testing.T, s serverutils.TestTenantInterface, sql string, e
688688
require.Equal(t, expected, actual)
689689
}
690690

691+
func loadProgress(
692+
t *testing.T, jobFeed cdctest.EnterpriseTestFeed, jobRegistry *jobs.Registry,
693+
) jobspb.Progress {
694+
t.Helper()
695+
jobID := jobFeed.JobID()
696+
job, err := jobRegistry.LoadJob(context.Background(), jobID)
697+
require.NoError(t, err)
698+
if job.Status().Terminal() {
699+
t.Errorf("tried to load progress for job %v but it has reached terminal status %s with error %s", job, job.Status(), jobFeed.FetchTerminalJobErr())
700+
}
701+
return job.Progress()
702+
}
703+
691704
func feed(
692705
t testing.TB, f cdctest.TestFeedFactory, create string, args ...interface{},
693706
) cdctest.TestFeed {

0 commit comments

Comments
 (0)