Skip to content

Commit 6d7bd6b

Browse files
committed
writesoffts is now rereadable
1 parent 6bc2e10 commit 6d7bd6b

File tree

7 files changed

+118
-14
lines changed

7 files changed

+118
-14
lines changed

internal/replay/replay.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package replay
2+
3+
import "context"
4+
5+
func CreateChannel[T any](
6+
ctx context.Context,
7+
) (<-chan T, func(T)) {
8+
retChan := make(chan T)
9+
10+
writer := make(chan T)
11+
12+
go func() {
13+
defer close(retChan)
14+
15+
var val T
16+
select {
17+
case <-ctx.Done():
18+
return
19+
case val = <-writer:
20+
}
21+
22+
for {
23+
select {
24+
case <-ctx.Done():
25+
return
26+
case retChan <- val:
27+
}
28+
}
29+
}()
30+
31+
writerCalled := false
32+
33+
return retChan, func(val T) {
34+
if writerCalled {
35+
panic("replay-channel writer called twice")
36+
}
37+
writerCalled = true
38+
39+
writer <- val
40+
close(writer)
41+
}
42+
}

internal/replay/replay_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package replay
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/10gen/migration-verifier/internal/logger"
9+
"github.com/stretchr/testify/suite"
10+
)
11+
12+
type UnitTestSuite struct {
13+
suite.Suite
14+
logger *logger.Logger
15+
}
16+
17+
func TestUnitTestSuite(t *testing.T) {
18+
ts := new(UnitTestSuite)
19+
suite.Run(t, ts)
20+
}
21+
22+
func (s *UnitTestSuite) TestReplay() {
23+
ctx, cancel := context.WithCancel(context.Background())
24+
defer cancel()
25+
26+
theChan, theWriter := CreateChannel[int](ctx)
27+
28+
select {
29+
case <-theChan:
30+
s.Require().Fail("should not be readable")
31+
case <-time.NewTimer(time.Second).C:
32+
}
33+
34+
theWriter(123)
35+
36+
s.Assert().Panics(func() { theWriter(234) })
37+
38+
for range 5 {
39+
select {
40+
case val := <-theChan:
41+
s.Require().Equal(val, 123)
42+
case <-time.NewTimer(time.Second).C:
43+
s.Require().Fail("should be readable")
44+
}
45+
}
46+
}

internal/verifier/change_stream.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/10gen/migration-verifier/internal/keystring"
99
"github.com/10gen/migration-verifier/internal/logger"
10+
"github.com/10gen/migration-verifier/internal/replay"
1011
"github.com/10gen/migration-verifier/internal/retry"
1112
"github.com/10gen/migration-verifier/internal/util"
1213
"github.com/10gen/migration-verifier/msync"
@@ -68,7 +69,8 @@ type ChangeStreamReader struct {
6869

6970
changeStreamRunning bool
7071
changeEventBatchChan chan []ParsedEvent
71-
writesOffTsChan chan primitive.Timestamp
72+
writesOffTsChan <-chan primitive.Timestamp
73+
writesOffTsWriteFunc func(primitive.Timestamp)
7274
errChan chan error
7375
doneChan chan struct{}
7476

@@ -77,7 +79,9 @@ type ChangeStreamReader struct {
7779
lag *msync.TypedAtomic[option.Option[time.Duration]]
7880
}
7981

80-
func (verifier *Verifier) initializeChangeStreamReaders() {
82+
func (verifier *Verifier) initializeChangeStreamReaders(ctx context.Context) {
83+
srcWritesOffReaderChan, srcWritesOffWriter := replay.CreateChannel[primitive.Timestamp](ctx)
84+
8185
verifier.srcChangeStreamReader = &ChangeStreamReader{
8286
readerType: src,
8387
logger: verifier.logger,
@@ -87,11 +91,15 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
8791
clusterInfo: *verifier.srcClusterInfo,
8892
changeStreamRunning: false,
8993
changeEventBatchChan: make(chan []ParsedEvent),
90-
writesOffTsChan: make(chan primitive.Timestamp),
94+
writesOffTsChan: srcWritesOffReaderChan,
95+
writesOffTsWriteFunc: srcWritesOffWriter,
9196
errChan: make(chan error),
9297
doneChan: make(chan struct{}),
9398
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
9499
}
100+
101+
dstWritesOffReaderChan, dstWritesOffWriter := replay.CreateChannel[primitive.Timestamp](ctx)
102+
95103
verifier.dstChangeStreamReader = &ChangeStreamReader{
96104
readerType: dst,
97105
logger: verifier.logger,
@@ -101,7 +109,8 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
101109
clusterInfo: *verifier.dstClusterInfo,
102110
changeStreamRunning: false,
103111
changeEventBatchChan: make(chan []ParsedEvent),
104-
writesOffTsChan: make(chan primitive.Timestamp),
112+
writesOffTsChan: dstWritesOffReaderChan,
113+
writesOffTsWriteFunc: dstWritesOffWriter,
105114
errChan: make(chan error),
106115
doneChan: make(chan struct{}),
107116
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
@@ -123,7 +132,6 @@ func (verifier *Verifier) StartChangeEventHandler(ctx context.Context, reader *C
123132
verifier.logger.Trace().Msgf("Verifier is handling a change event batch from %s: %v", reader, batch)
124133
err := verifier.HandleChangeStreamEvents(ctx, batch, reader.readerType)
125134
if err != nil {
126-
reader.errChan <- err
127135
return err
128136
}
129137
}
@@ -355,7 +363,10 @@ func (csr *ChangeStreamReader) iterateChangeStream(
355363
// source writes are ended and the migration tool is finished / committed.
356364
// This means we should exit rather than continue reading the change stream
357365
// since there should be no more events.
358-
case writesOffTs := <-csr.writesOffTsChan:
366+
case writesOffTs, more := <-csr.writesOffTsChan:
367+
if !more {
368+
panic(csr.String() + ": attempted read from already-closed writesOffTsChan")
369+
}
359370
csr.logger.Debug().
360371
Interface("writesOffTimestamp", writesOffTs).
361372
Msgf("%s thread received writesOff timestamp. Finalizing change stream.", csr)
@@ -408,7 +419,8 @@ func (csr *ChangeStreamReader) iterateChangeStream(
408419
}
409420
// since we have started Recheck, we must signal that we have
410421
// finished the change stream changes so that Recheck can continue.
411-
csr.doneChan <- struct{}{}
422+
close(csr.doneChan)
423+
412424
break
413425
}
414426
}

internal/verifier/change_stream_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() {
318318
suite.Require().NotNil(origStartTs)
319319
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
320320
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
321-
verifier.srcChangeStreamReader.writesOffTsChan <- *origStartTs
321+
verifier.srcChangeStreamReader.writesOffTsWriteFunc(*origStartTs)
322322
<-verifier.srcChangeStreamReader.doneChan
323323
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
324324
}
@@ -364,7 +364,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
364364
"session time after events should exceed the original",
365365
)
366366

367-
verifier.srcChangeStreamReader.writesOffTsChan <- *postEventsSessionTime
367+
verifier.srcChangeStreamReader.writesOffTsWriteFunc(*postEventsSessionTime)
368368
<-verifier.srcChangeStreamReader.doneChan
369369

370370
suite.Assert().Equal(

internal/verifier/check.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
179179
}
180180
verifier.running = true
181181
verifier.globalFilter = filter
182-
verifier.initializeChangeStreamReaders()
182+
verifier.initializeChangeStreamReaders(ctx)
183183
verifier.mux.Unlock()
184184
defer func() {
185185
verifier.mux.Lock()

internal/verifier/integration_test_suite.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (suite *IntegrationTestSuite) BuildVerifier() *Verifier {
176176
"should set metadata connection string",
177177
)
178178
verifier.SetMetaDBName(metaDBName)
179-
verifier.initializeChangeStreamReaders()
179+
verifier.initializeChangeStreamReaders(ctx)
180180

181181
suite.Require().NoError(verifier.srcClientCollection(&task).Drop(ctx))
182182
suite.Require().NoError(verifier.dstClientCollection(&task).Drop(ctx))

internal/verifier/migration_verifier.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,6 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
248248
verifier.mux.Unlock()
249249
return errors.New("writesOff already set")
250250
}
251-
verifier.writesOff = true
252251

253252
verifier.logger.Debug().Msg("Signalling that writes are done.")
254253

@@ -273,21 +272,26 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
273272
verifier.mux.Unlock()
274273
return errors.Wrapf(err, "failed to fetch destination's cluster time")
275274
}
275+
276+
verifier.writesOff = true
277+
276278
verifier.mux.Unlock()
277279

278280
// This has to happen outside the lock because the change streams
279281
// might be inserting docs into the recheck queue, which happens
280282
// under the lock.
281283
select {
282-
case verifier.srcChangeStreamReader.writesOffTsChan <- srcFinalTs:
283284
case err := <-verifier.srcChangeStreamReader.errChan:
284285
return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.srcChangeStreamReader)
286+
default:
287+
verifier.srcChangeStreamReader.writesOffTsWriteFunc(srcFinalTs)
285288
}
286289

287290
select {
288-
case verifier.dstChangeStreamReader.writesOffTsChan <- dstFinalTs:
289291
case err := <-verifier.dstChangeStreamReader.errChan:
290292
return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.dstChangeStreamReader)
293+
default:
294+
verifier.dstChangeStreamReader.writesOffTsWriteFunc(dstFinalTs)
291295
}
292296

293297
return nil

0 commit comments

Comments
 (0)