Skip to content

Commit fec0128

Browse files
committed
expose task status & reset outside the transaction
1 parent bf92401 commit fec0128

File tree

2 files changed

+51
-4
lines changed

2 files changed

+51
-4
lines changed

internal/verifier/check.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
151151
return err
152152
}
153153

154+
err = verifier.resetProcessingTasks(ctx)
155+
if err != nil {
156+
return errors.Wrapf(err, "failed to reset %#q tasks", string(verificationTaskProcessing))
157+
}
158+
154159
verifier.logger.Debug().Msg("Starting Check")
155160

156161
verifier.phase = Check
@@ -404,7 +409,10 @@ func (verifier *Verifier) workInTransaction(
404409
metaCtx mongo.SessionContext,
405410
workerNum int,
406411
) error {
407-
task, err := verifier.FindNextVerifyTaskAndUpdate(metaCtx)
412+
// This happens outside the transaction in order to make the task status
413+
// globally visible. This is also why we have to reset status=processing
414+
// tasks when/if the verifier restarts.
415+
task, err := verifier.FindNextVerifyTaskAndUpdate(ctx)
408416
if errors.Is(err, mongo.ErrNoDocuments) {
409417
verifier.logger.Debug().Msgf("[Worker %d] No tasks found, sleeping...", workerNum)
410418
time.Sleep(verifier.workerSleepDelayMillis * time.Millisecond)

internal/verifier/verification_task.go

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ package verifier
77
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
88

99
import (
10+
"context"
1011
"fmt"
1112
"strings"
1213
"time"
1314

1415
"github.com/10gen/migration-verifier/internal/partitions"
1516
"github.com/10gen/migration-verifier/internal/types"
17+
"github.com/pkg/errors"
1618
"go.mongodb.org/mongo-driver/bson"
1719
"go.mongodb.org/mongo-driver/bson/primitive"
1820
"go.mongodb.org/mongo-driver/mongo"
@@ -44,8 +46,9 @@ const (
4446
// The “workhorse” task type: verify a partition of documents.
4547
verificationTaskVerifyDocuments verificationTaskType = "verify"
4648

47-
// A verifyCollection task verifies collection metadata
48-
// and inserts verify-documents tasks to verify data ranges.
49+
// A verifyCollection task verifies one collection's metadata
50+
// and inserts verify-documents tasks to verify its data ranges.
51+
// This includes sampling & partitioning the collection.
4952
verificationTaskVerifyCollection verificationTaskType = "verifyCollection"
5053

5154
// The primary task creates a verifyCollection task for each
@@ -186,7 +189,43 @@ func (verifier *Verifier) InsertFailedIdsVerificationTask(ctx mongo.SessionConte
186189
return err
187190
}
188191

189-
func (verifier *Verifier) FindNextVerifyTaskAndUpdate(ctx mongo.SessionContext) (*VerificationTask, error) {
192+
func (v *Verifier) resetProcessingTasks(ctx context.Context) error {
193+
coll := v.verificationTaskCollection()
194+
195+
result, err := coll.UpdateMany(
196+
ctx,
197+
bson.M{
198+
"generation": v.generation,
199+
"status": verificationTaskProcessing,
200+
},
201+
bson.M{
202+
"$set": bson.M{
203+
"status": verificationTaskAdded,
204+
},
205+
"$unset": bson.M{
206+
"begin_time": 1,
207+
},
208+
},
209+
)
210+
211+
if err != nil {
212+
return errors.Wrap(err, "failed to reset in-progress tasks")
213+
}
214+
215+
if result.ModifiedCount > 0 {
216+
v.logger.Info().
217+
Int64("count", result.ModifiedCount).
218+
Msg("Formerly in-progress task(s) found and reset.")
219+
}
220+
221+
return nil
222+
}
223+
224+
func (verifier *Verifier) FindNextVerifyTaskAndUpdate(ctx context.Context) (*VerificationTask, error) {
225+
if mongo.SessionFromContext(ctx) != nil {
226+
panic("should be called from outside transaction")
227+
}
228+
190229
var verificationTask = VerificationTask{}
191230
filter := bson.M{
192231
"$and": bson.A{

0 commit comments

Comments
 (0)