Skip to content

Commit a9c09a5

Browse files
committed
Revert "preliminary"
This reverts commit e7d8a4b.
1 parent e7d8a4b commit a9c09a5

File tree

12 files changed

+50
-702
lines changed

12 files changed

+50
-702
lines changed

internal/util/value.go

Lines changed: 0 additions & 7 deletions
This file was deleted.

internal/verifier/change_stream.go

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -160,21 +160,15 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
160160
eventsRead++
161161
}
162162

163-
if cs.Err() != nil {
164-
return false, errors.Wrap(cs.Err(), "change stream iteration failed")
165-
}
166-
167-
if eventsRead == 0 {
168-
return false, nil
169-
}
170-
171-
verifier.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.")
172-
err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
173-
if err != nil {
174-
return false, errors.Wrap(err, "failed to handle change events")
163+
if eventsRead > 0 {
164+
verifier.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.")
165+
err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
166+
if err != nil {
167+
return false, errors.Wrap(err, "failed to handle change events")
168+
}
175169
}
176170

177-
return true, nil
171+
return eventsRead > 0, errors.Wrap(cs.Err(), "change stream iteration failed")
178172
}
179173

180174
for {
@@ -190,31 +184,15 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
190184
// If the changeStreamEnderChan has a message, the user has indicated that
191185
// source writes are ended. This means we should exit rather than continue
192186
// reading the change stream since there should be no more events.
193-
case finalTs := <-verifier.changeStreamFinalTsChan:
187+
case <-verifier.changeStreamEnderChan:
194188
verifier.logger.Debug().
195-
Interface("finalTimestamp", finalTs).
196-
Msg("Change stream thread received final timestamp.")
189+
Msg("Change stream thread received shutdown request.")
197190

198191
changeStreamEnded = true
199192

200193
// Read all change events until the source reports no events.
201194
// (i.e., the `getMore` call returns empty)
202195
for {
203-
curTs, err := extractTimestampFromResumeToken(cs.ResumeToken())
204-
if err != nil {
205-
err = errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
206-
break
207-
}
208-
209-
if curTs.Compare(finalTs) >= 0 {
210-
verifier.logger.Debug().
211-
Interface("currentTimestamp", curTs).
212-
Interface("finalTimestamp", finalTs).
213-
Msg("Change stream has reached the final timestamp. Shutting down.")
214-
215-
break
216-
}
217-
218196
var gotEvent bool
219197
gotEvent, err = readAndHandleOneChangeEventBatch()
220198

internal/verifier/check.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
mapset "github.com/deckarep/golang-set/v2"
1010
"github.com/pkg/errors"
1111
"go.mongodb.org/mongo-driver/bson"
12-
"go.mongodb.org/mongo-driver/bson/primitive"
1312
"go.mongodb.org/mongo-driver/mongo"
1413
)
1514

@@ -43,13 +42,13 @@ func (verifier *Verifier) Check(ctx context.Context, filter map[string]any) {
4342
verifier.MaybeStartPeriodicHeapProfileCollection(ctx)
4443
}
4544

46-
func (verifier *Verifier) finalizeChangeStream(finalTs primitive.Timestamp) error {
45+
func (verifier *Verifier) waitForChangeStream() error {
4746
verifier.mux.RLock()
4847
csRunning := verifier.changeStreamRunning
4948
verifier.mux.RUnlock()
5049
if csRunning {
5150
verifier.logger.Debug().Msg("Changestream still running, signalling that writes are done and waiting for change stream to exit")
52-
verifier.changeStreamFinalTsChan <- finalTs
51+
verifier.changeStreamEnderChan <- struct{}{}
5352
select {
5453
case err := <-verifier.changeStreamErrChan:
5554
verifier.logger.Warn().Err(err).
@@ -239,11 +238,11 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
239238
// will result in an extra iteration. The odds of this are lower and the user should be
240239
// paying attention. Also, this should not matter too much because any failures will be
241240
// caught again on the next iteration.
242-
if ts, writesOff := verifier.writesOffTs.Get(); writesOff {
241+
if verifier.writesOff {
243242
// It's necessary to wait for the change stream to finish before incrementing the
244243
// generation number, or the last changes will not be checked.
245244
verifier.mux.Unlock()
246-
err := verifier.finalizeChangeStream(ts)
245+
err := verifier.waitForChangeStream()
247246
if err != nil {
248247
return err
249248
}

internal/verifier/clustertime.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,23 @@ func GetClusterTime(
5454
// OPTIMIZATION: We now append an oplog entry--this time for real!--to
5555
// cause any lagging shards to output their events.
5656
//
57-
// Since this is just an optimization, failures here are nonfatal,
58-
// and we might as well forgo the retryer.
59-
_, err = runAppendOplogNote(ctx, client, false)
57+
// Since this is just an optimization, failures here are nonfatal.
58+
err = retryer.RunForTransientErrorsOnly(
59+
ctx,
60+
logger,
61+
func(_ *retry.Info) error {
62+
var err error
63+
optime, err = runAppendOplogNote(ctx, client, false)
64+
if err != nil {
65+
err = fmt.Errorf("%w: %w", retry.CustomTransientErr, err)
66+
}
67+
return err
68+
},
69+
)
6070
if err != nil {
6171
// This isn't serious enough even to warn on, so leave it at info-level.
6272
logger.Info().Err(err).
63-
Msg("Failed to update oplog; change stream may need extra time to finish.")
73+
Msg("Failed to append oplog note; change stream may need extra time to finish.")
6474
}
6575

6676
return optime, nil

internal/verifier/migration_verifier.go

Lines changed: 20 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ import (
2121
"github.com/10gen/migration-verifier/internal/reportutils"
2222
"github.com/10gen/migration-verifier/internal/retry"
2323
"github.com/10gen/migration-verifier/internal/types"
24-
"github.com/10gen/migration-verifier/internal/util"
2524
"github.com/10gen/migration-verifier/internal/uuidutil"
26-
"github.com/10gen/migration-verifier/option"
2725
"github.com/olekukonko/tablewriter"
2826
"github.com/pkg/errors"
2927
"github.com/rs/zerolog"
@@ -80,7 +78,7 @@ var timeFormat = time.RFC3339
8078

8179
// Verifier is the main state for the migration verifier
8280
type Verifier struct {
83-
writesOffTs option.Option[primitive.Timestamp]
81+
writesOff bool
8482
lastGeneration bool
8583
running bool
8684
generation int
@@ -125,12 +123,12 @@ type Verifier struct {
125123
metaDBName string
126124
srcStartAtTs *primitive.Timestamp
127125

128-
mux sync.RWMutex
129-
changeStreamRunning bool
130-
changeStreamFinalTsChan chan primitive.Timestamp
131-
changeStreamErrChan chan error
132-
changeStreamDoneChan chan struct{}
133-
lastChangeEventTime *primitive.Timestamp
126+
mux sync.RWMutex
127+
changeStreamRunning bool
128+
changeStreamEnderChan chan struct{}
129+
changeStreamErrChan chan error
130+
changeStreamDoneChan chan struct{}
131+
lastChangeEventTime *primitive.Timestamp
134132

135133
readConcernSetting ReadConcernSetting
136134

@@ -190,15 +188,15 @@ func NewVerifier(settings VerifierSettings) *Verifier {
190188
}
191189

192190
return &Verifier{
193-
phase: Idle,
194-
numWorkers: NumWorkers,
195-
readPreference: readpref.Primary(),
196-
partitionSizeInBytes: 400 * 1024 * 1024,
197-
failureDisplaySize: DefaultFailureDisplaySize,
198-
changeStreamFinalTsChan: make(chan primitive.Timestamp),
199-
changeStreamErrChan: make(chan error),
200-
changeStreamDoneChan: make(chan struct{}),
201-
readConcernSetting: readConcern,
191+
phase: Idle,
192+
numWorkers: NumWorkers,
193+
readPreference: readpref.Primary(),
194+
partitionSizeInBytes: 400 * 1024 * 1024,
195+
failureDisplaySize: DefaultFailureDisplaySize,
196+
changeStreamEnderChan: make(chan struct{}),
197+
changeStreamErrChan: make(chan error),
198+
changeStreamDoneChan: make(chan struct{}),
199+
readConcernSetting: readConcern,
202200

203201
// This will get recreated once gen0 starts, but we want it
204202
// here in case the change streams gets an event before then.
@@ -230,30 +228,18 @@ func (verifier *Verifier) SetFailureDisplaySize(size int64) {
230228
verifier.failureDisplaySize = size
231229
}
232230

233-
func (verifier *Verifier) WritesOff(ctx context.Context) error {
231+
func (verifier *Verifier) WritesOff(ctx context.Context) {
234232
verifier.logger.Debug().
235233
Msg("WritesOff called.")
236234

237235
verifier.mux.Lock()
238-
defer verifier.mux.Unlock()
239-
writesOffTs, err := GetClusterTime(
240-
ctx,
241-
verifier.logger,
242-
verifier.srcClient,
243-
)
244-
245-
if err != nil {
246-
return errors.Wrapf(err, "failed to fetch source's cluster time")
247-
}
248-
249-
verifier.writesOffTs = option.Some(writesOffTs)
250-
251-
return nil
236+
verifier.writesOff = true
237+
verifier.mux.Unlock()
252238
}
253239

254240
func (verifier *Verifier) WritesOn(ctx context.Context) {
255241
verifier.mux.Lock()
256-
util.ResetToZero(&verifier.writesOffTs)
242+
verifier.writesOff = false
257243
verifier.mux.Unlock()
258244
}
259245

internal/verifier/web_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ const RequestInProgressErrorDescription = "Another request is currently in progr
2222
// MigrationVerifierAPI represents the interaction webserver with mongosync
2323
type MigrationVerifierAPI interface {
2424
Check(ctx context.Context, filter map[string]any)
25-
WritesOff(ctx context.Context) error
25+
WritesOff(ctx context.Context)
2626
WritesOn(ctx context.Context)
2727
GetProgress(ctx context.Context) (Progress, error)
2828
}

internal/verifier/web_server_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ func NewMockVerifier() *MockVerifier {
3030
func (verifier *MockVerifier) Check(ctx context.Context, filter map[string]any) {
3131
verifier.filter = filter
3232
}
33-
func (verifier *MockVerifier) WritesOff(ctx context.Context) error { return nil }
34-
func (verifier *MockVerifier) WritesOn(ctx context.Context) {}
33+
func (verifier *MockVerifier) WritesOff(ctx context.Context) {}
34+
func (verifier *MockVerifier) WritesOn(ctx context.Context) {}
3535
func (verifier *MockVerifier) GetProgress(ctx context.Context) (Progress, error) {
3636
return Progress{}, nil
3737
}

option/bson.go

Lines changed: 0 additions & 49 deletions
This file was deleted.

option/json.go

Lines changed: 0 additions & 37 deletions
This file was deleted.

0 commit comments

Comments
 (0)