Skip to content

Commit c2da8c4

Browse files
committed
change to private fields
1 parent 3961b2d commit c2da8c4

File tree

4 files changed

+36
-34
lines changed

4 files changed

+36
-34
lines changed

internal/verifier/change_stream.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,10 @@ type ChangeStreamReader struct {
6565
clusterInfo util.ClusterInfo
6666

6767
changeStreamRunning bool
68-
ChangeEventBatchChan chan []ParsedEvent
69-
WritesOffTsChan chan primitive.Timestamp
70-
ErrChan chan error
71-
DoneChan chan struct{}
68+
changeEventBatchChan chan []ParsedEvent
69+
writesOffTsChan chan primitive.Timestamp
70+
errChan chan error
71+
doneChan chan struct{}
7272

7373
startAtTs *primitive.Timestamp
7474
}
@@ -82,10 +82,10 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
8282
watcherClient: verifier.srcClient,
8383
clusterInfo: *verifier.srcClusterInfo,
8484
changeStreamRunning: false,
85-
ChangeEventBatchChan: make(chan []ParsedEvent),
86-
WritesOffTsChan: make(chan primitive.Timestamp),
87-
ErrChan: make(chan error),
88-
DoneChan: make(chan struct{}),
85+
changeEventBatchChan: make(chan []ParsedEvent),
86+
writesOffTsChan: make(chan primitive.Timestamp),
87+
errChan: make(chan error),
88+
doneChan: make(chan struct{}),
8989
}
9090
verifier.dstChangeStreamReader = &ChangeStreamReader{
9191
readerType: dst,
@@ -95,10 +95,10 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
9595
watcherClient: verifier.dstClient,
9696
clusterInfo: *verifier.dstClusterInfo,
9797
changeStreamRunning: false,
98-
ChangeEventBatchChan: make(chan []ParsedEvent),
99-
WritesOffTsChan: make(chan primitive.Timestamp),
100-
ErrChan: make(chan error),
101-
DoneChan: make(chan struct{}),
98+
changeEventBatchChan: make(chan []ParsedEvent),
99+
writesOffTsChan: make(chan primitive.Timestamp),
100+
errChan: make(chan error),
101+
doneChan: make(chan struct{}),
102102
}
103103
}
104104

@@ -109,15 +109,15 @@ func (verifier *Verifier) StartChangeEventHandler(ctx context.Context, reader *C
109109
select {
110110
case <-ctx.Done():
111111
return ctx.Err()
112-
case batch, more := <-reader.ChangeEventBatchChan:
112+
case batch, more := <-reader.changeEventBatchChan:
113113
if !more {
114114
verifier.logger.Debug().Msgf("Change Event Batch Channel has been closed by %s, returning...", reader)
115115
return nil
116116
}
117117
verifier.logger.Trace().Msgf("Verifier is handling a change event batch from %s: %v", reader, batch)
118118
err := verifier.HandleChangeStreamEvents(ctx, batch, reader.readerType)
119119
if err != nil {
120-
reader.ErrChan <- err
120+
reader.errChan <- err
121121
return err
122122
}
123123
}
@@ -295,7 +295,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
295295
return nil
296296
}
297297

298-
csr.ChangeEventBatchChan <- changeEventBatch
298+
csr.changeEventBatchChan <- changeEventBatch
299299
return nil
300300
}
301301

@@ -333,7 +333,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(
333333
// source writes are ended and the migration tool is finished / committed.
334334
// This means we should exit rather than continue reading the change stream
335335
// since there should be no more events.
336-
case writesOffTs := <-csr.WritesOffTsChan:
336+
case writesOffTs := <-csr.writesOffTsChan:
337337
csr.logger.Debug().
338338
Interface("writesOffTimestamp", writesOffTs).
339339
Msgf("%s thread received writesOff timestamp. Finalizing change stream.", csr)
@@ -386,7 +386,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(
386386
}
387387
// since we have started Recheck, we must signal that we have
388388
// finished the change stream changes so that Recheck can continue.
389-
csr.DoneChan <- struct{}{}
389+
csr.doneChan <- struct{}{}
390390
break
391391
}
392392
}
@@ -486,9 +486,9 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
486486
initialCreateResultChan := make(chan mo.Result[primitive.Timestamp])
487487

488488
go func() {
489-
// Closing ChangeEventBatchChan at the end of change stream goroutine
489+
// Closing changeEventBatchChan at the end of change stream goroutine
490490
// notifies the verifier's change event handler to exit.
491-
defer close(csr.ChangeEventBatchChan)
491+
defer close(csr.changeEventBatchChan)
492492

493493
retryer := retry.New(retry.DefaultDurationLimit)
494494
retryer = retryer.WithErrorCodes(util.CursorKilled)
@@ -524,8 +524,8 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error {
524524
if err != nil {
525525
// NB: This failure always happens after the initial change stream
526526
// creation.
527-
csr.ErrChan <- err
528-
close(csr.ErrChan)
527+
csr.errChan <- err
528+
close(csr.errChan)
529529
}
530530
}()
531531

internal/verifier/change_stream_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() {
165165
suite.Require().NotNil(origStartTs)
166166
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
167167
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
168-
verifier.srcChangeStreamReader.WritesOffTsChan <- *origStartTs
169-
<-verifier.srcChangeStreamReader.DoneChan
168+
verifier.srcChangeStreamReader.writesOffTsChan <- *origStartTs
169+
<-verifier.srcChangeStreamReader.doneChan
170170
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
171171
}
172172

@@ -211,8 +211,8 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
211211
"session time after events should exceed the original",
212212
)
213213

214-
verifier.srcChangeStreamReader.WritesOffTsChan <- *postEventsSessionTime
215-
<-verifier.srcChangeStreamReader.DoneChan
214+
verifier.srcChangeStreamReader.writesOffTsChan <- *postEventsSessionTime
215+
<-verifier.srcChangeStreamReader.doneChan
216216

217217
suite.Assert().Equal(
218218
*postEventsSessionTime,

internal/verifier/check.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ func (verifier *Verifier) waitForChangeStream(ctx context.Context, csr *ChangeSt
4444
select {
4545
case <-ctx.Done():
4646
return ctx.Err()
47-
case err := <-csr.ErrChan:
47+
case err := <-csr.errChan:
4848
verifier.logger.Warn().Err(err).
4949
Msgf("Received error from %s.", csr)
5050
return err
51-
case <-csr.DoneChan:
51+
case <-csr.doneChan:
5252
verifier.logger.Debug().
5353
Msgf("Received completion signal from %s.", csr)
5454
break
@@ -82,9 +82,9 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error {
8282
// If the change stream fails, everything should stop.
8383
eg.Go(func() error {
8484
select {
85-
case err := <-verifier.srcChangeStreamReader.ErrChan:
85+
case err := <-verifier.srcChangeStreamReader.errChan:
8686
return errors.Wrapf(err, "%s failed", verifier.srcChangeStreamReader)
87-
case err := <-verifier.dstChangeStreamReader.ErrChan:
87+
case err := <-verifier.dstChangeStreamReader.errChan:
8888
return errors.Wrapf(err, "%s failed", verifier.dstChangeStreamReader)
8989
case <-ctx.Done():
9090
return nil

internal/verifier/migration_verifier.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,6 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
255255
return errors.New("writesOff already set")
256256
}
257257
verifier.writesOff = true
258-
verifier.mux.Unlock()
259258

260259
verifier.logger.Debug().Msg("Signalling that writes are done.")
261260

@@ -266,6 +265,7 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
266265
)
267266

268267
if err != nil {
268+
verifier.mux.Unlock()
269269
return errors.Wrapf(err, "failed to fetch source's cluster time")
270270
}
271271

@@ -276,21 +276,23 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
276276
)
277277

278278
if err != nil {
279+
verifier.mux.Unlock()
279280
return errors.Wrapf(err, "failed to fetch destination's cluster time")
280281
}
282+
verifier.mux.Unlock()
281283

282284
// This has to happen outside the lock because the change streams
283285
// might be inserting docs into the recheck queue, which happens
284286
// under the lock.
285287
select {
286-
case verifier.srcChangeStreamReader.WritesOffTsChan <- srcFinalTs:
287-
case err := <-verifier.srcChangeStreamReader.ErrChan:
288+
case verifier.srcChangeStreamReader.writesOffTsChan <- srcFinalTs:
289+
case err := <-verifier.srcChangeStreamReader.errChan:
288290
return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.srcChangeStreamReader)
289291
}
290292

291293
select {
292-
case verifier.dstChangeStreamReader.WritesOffTsChan <- dstFinalTs:
293-
case err := <-verifier.dstChangeStreamReader.ErrChan:
294+
case verifier.dstChangeStreamReader.writesOffTsChan <- dstFinalTs:
295+
case err := <-verifier.dstChangeStreamReader.errChan:
294296
return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.dstChangeStreamReader)
295297
}
296298

0 commit comments

Comments
 (0)