Skip to content

Commit 9403332

Browse files
committed
fix error handling
1 parent bfc46eb commit 9403332

File tree

7 files changed

+223
-131
lines changed

7 files changed

+223
-131
lines changed

internal/verifier/check.go

Lines changed: 94 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ package verifier
33
import (
44
"context"
55
"fmt"
6-
"sync"
76
"time"
87

98
mapset "github.com/deckarep/golang-set/v2"
109
"github.com/pkg/errors"
1110
"go.mongodb.org/mongo-driver/bson"
1211
"go.mongodb.org/mongo-driver/mongo"
12+
"golang.org/x/sync/errgroup"
1313
)
1414

1515
type GenerationStatus string
@@ -57,19 +57,14 @@ func (verifier *Verifier) waitForChangeStream(ctx context.Context) error {
5757
return nil
5858
}
5959

60-
func (verifier *Verifier) CheckWorker(ctx context.Context) error {
61-
verifier.logger.Debug().Msgf("Starting %d verification workers", verifier.numWorkers)
62-
ctx, cancel := context.WithCancel(ctx)
63-
64-
wg := sync.WaitGroup{}
65-
for i := 0; i < verifier.numWorkers; i++ {
66-
wg.Add(1)
67-
go verifier.Work(ctx, i, &wg)
68-
time.Sleep(10 * time.Millisecond)
69-
}
70-
60+
func (verifier *Verifier) CheckWorker(ctxIn context.Context) error {
7161
generation := verifier.generation
7262

63+
verifier.logger.Debug().
64+
Int("generation", generation).
65+
Int("workersCount", verifier.numWorkers).
66+
Msgf("Starting verification worker threads.")
67+
7368
// Since we do a progress report right at the start we don’t need
7469
// this to go in non-debug output.
7570
startLabel := fmt.Sprintf("Starting check generation %d", generation)
@@ -81,44 +76,75 @@ func (verifier *Verifier) CheckWorker(ctx context.Context) error {
8176

8277
verifier.writeStringBuilder(genStartReport)
8378

84-
waitForTaskCreation := 0
79+
eg, ctx := errgroup.WithContext(ctxIn)
8580

86-
for {
81+
// If the change stream fails, everything should stop.
82+
eg.Go(func() error {
8783
select {
8884
case err := <-verifier.changeStreamErrChan:
89-
cancel()
9085
return errors.Wrap(err, "change stream failed")
9186
case <-ctx.Done():
92-
cancel()
9387
return nil
94-
default:
9588
}
89+
})
9690

97-
verificationStatus, err := verifier.GetVerificationStatus()
98-
if err != nil {
99-
verifier.logger.Error().Msgf("Failed getting verification status: %v", err)
100-
}
91+
// Start the worker threads.
92+
for i := 0; i < verifier.numWorkers; i++ {
93+
eg.Go(func() error {
94+
return errors.Wrapf(
95+
verifier.Work(ctx, i),
96+
"worker %d failed",
97+
i,
98+
)
99+
})
100+
time.Sleep(10 * time.Millisecond)
101+
}
101102

102-
if waitForTaskCreation%2 == 0 {
103-
if generation > 0 || verifier.gen0PendingCollectionTasks.Load() == 0 {
104-
verifier.PrintVerificationSummary(ctx, GenerationInProgress)
103+
waitForTaskCreation := 0
104+
105+
eg.Go(func() error {
106+
for {
107+
verificationStatus, err := verifier.GetVerificationStatus(ctx)
108+
if err != nil {
109+
return errors.Wrapf(
110+
err,
111+
"failed to retrieve status of generation %d's tasks",
112+
generation,
113+
)
105114
}
106-
}
107115

108-
//wait for task to be created, if none of the tasks found.
109-
if verificationStatus.AddedTasks > 0 || verificationStatus.ProcessingTasks > 0 || verificationStatus.RecheckTasks > 0 {
110-
waitForTaskCreation++
111-
time.Sleep(verifier.verificationStatusCheckInterval)
112-
} else {
113-
verifier.PrintVerificationSummary(ctx, GenerationComplete)
114-
verifier.logger.Debug().Msg("Verification tasks complete")
115-
cancel()
116-
wg.Wait()
117-
break
116+
if waitForTaskCreation%2 == 0 {
117+
if generation > 0 || verifier.gen0PendingCollectionTasks.Load() == 0 {
118+
verifier.PrintVerificationSummary(ctx, GenerationInProgress)
119+
}
120+
}
121+
122+
// The generation continues as long as >=1 task for this generation is
123+
// “added” or “pending”.
124+
if verificationStatus.AddedTasks > 0 || verificationStatus.ProcessingTasks > 0 {
125+
waitForTaskCreation++
126+
time.Sleep(verifier.verificationStatusCheckInterval)
127+
continue
128+
} else {
129+
verifier.PrintVerificationSummary(ctx, GenerationComplete)
130+
return nil
131+
}
118132
}
133+
})
134+
135+
err := eg.Wait()
136+
137+
if err != nil {
138+
verifier.logger.Debug().
139+
Int("generation", generation).
140+
Msgf("Check finished.")
119141
}
120-
verifier.logger.Debug().Msgf("Check generation %d finished", generation)
121-
return nil
142+
143+
return errors.Wrapf(
144+
err,
145+
"check generation %d failed",
146+
generation,
147+
)
122148
}
123149

124150
func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any, testChan ...chan struct{}) error {
@@ -179,10 +205,14 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
179205
return errors.Wrap(err, "failed to start change stream on source")
180206
}
181207
}
182-
// Log out the verification status when initially booting up so it's easy to see the current state
183-
verificationStatus, err := verifier.GetVerificationStatus()
208+
209+
// Log the verification status when initially booting up so it's easy to see the current state
210+
verificationStatus, err := verifier.GetVerificationStatus(ctx)
184211
if err != nil {
185-
verifier.logger.Error().Msgf("Failed getting verification status: %v", err)
212+
return errors.Wrapf(
213+
err,
214+
"failed to retrieve verification status",
215+
)
186216
} else {
187217
verifier.logger.Debug().Msgf("Initial verification phase: %+v", verificationStatus)
188218
}
@@ -259,8 +289,9 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
259289

260290
err = verifier.ClearRecheckDocs(ctx)
261291
if err != nil {
262-
verifier.logger.Error().Msgf("Failed trying to clear out old recheck docs, continuing: %v",
263-
err)
292+
verifier.logger.Warn().
293+
Err(err).
294+
Msg("Failed to clear out old recheck docs. (This is probably unimportant.)")
264295
}
265296
verifier.mux.Unlock()
266297
}
@@ -368,9 +399,8 @@ func FetchFailedAndIncompleteTasks(ctx context.Context, coll *mongo.Collection,
368399
return FailedTasks, IncompleteTasks, nil
369400
}
370401

371-
func (verifier *Verifier) Work(ctx context.Context, workerNum int, wg *sync.WaitGroup) {
372-
defer wg.Done()
373-
402+
// Work is the logic for an individual worker thread.
403+
func (verifier *Verifier) Work(ctx context.Context, workerNum int) error {
374404
verifier.logger.Debug().
375405
Int("workerNum", workerNum).
376406
Msg("Worker started.")
@@ -382,9 +412,9 @@ func (verifier *Verifier) Work(ctx context.Context, workerNum int, wg *sync.Wait
382412
for {
383413
select {
384414
case <-ctx.Done():
385-
return
415+
return ctx.Err()
386416
default:
387-
task, err := verifier.FindNextVerifyTaskAndUpdate()
417+
task, err := verifier.FindNextVerifyTaskAndUpdate(ctx)
388418
if errors.Is(err, mongo.ErrNoDocuments) {
389419
duration := verifier.workerSleepDelayMillis * time.Millisecond
390420

@@ -399,20 +429,32 @@ func (verifier *Verifier) Work(ctx context.Context, workerNum int, wg *sync.Wait
399429

400430
continue
401431
} else if err != nil {
402-
panic(err)
432+
return errors.Wrap(
433+
err,
434+
"failed to seek next task",
435+
)
403436
}
404437

405-
if task.Type == verificationTaskVerifyCollection {
406-
verifier.ProcessCollectionVerificationTask(ctx, workerNum, task)
438+
switch task.Type {
439+
case verificationTaskVerifyCollection:
440+
err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, task)
441+
if err != nil {
442+
return err
443+
}
407444
if task.Generation == 0 {
408445
newVal := verifier.gen0PendingCollectionTasks.Add(-1)
409446
if newVal == 0 {
410447
verifier.PrintVerificationSummary(ctx, Gen0MetadataAnalysisComplete)
411448
}
412449
}
413-
} else {
414-
verifier.ProcessVerifyTask(workerNum, task)
450+
case verificationTaskVerifyDocuments:
451+
err := verifier.ProcessVerifyTask(ctx, workerNum, task)
452+
if err != nil {
453+
return err
454+
}
415455
}
416456
}
417457
}
458+
459+
return nil
418460
}

0 commit comments

Comments
 (0)