@@ -32,11 +32,6 @@ type Coordinator struct {
32
32
// Atomic counter for number of active workers (not in workerQueue)
33
33
busyWorkers atomic.Int64
34
34
35
- // Mutex protecting currentCoordinates
36
- currentCoordinatesMutex sync.Mutex
37
- // The binlog coordinates of the low water mark transaction.
38
- currentCoordinates mysql.BinlogCoordinates
39
-
40
35
// Mutex to protect the fields below
41
36
mu sync.Mutex
42
37
@@ -50,7 +45,7 @@ type Coordinator struct {
50
45
// This is a map of completed jobs by their sequence numbers.
51
46
// This is used when updating the low water mark.
52
47
// It records the binlog coordinates of the completed transaction.
53
- completedJobs map [int64 ]* mysql. BinlogCoordinates
48
+ completedJobs map [int64 ]struct {}
54
49
55
50
// These are the jobs that are waiting for a previous job to complete.
56
51
// They are indexed by the sequence number of the job they are waiting for.
@@ -256,21 +251,18 @@ func NewCoordinator(migrationContext *base.MigrationContext, applier *Applier, t
256
251
257
252
throttler : throttler ,
258
253
259
- currentCoordinates : mysql.BinlogCoordinates {},
260
-
261
254
binlogReader : binlog .NewGoMySQLReader (migrationContext ),
262
255
263
256
lowWaterMark : 0 ,
264
- completedJobs : make (map [int64 ]* mysql. BinlogCoordinates ),
257
+ completedJobs : make (map [int64 ]struct {} ),
265
258
waitingJobs : make (map [int64 ][]chan struct {}),
266
259
267
260
events : make (chan * replication.BinlogEvent , 1000 ),
268
261
}
269
262
}
270
263
271
- func (c * Coordinator ) StartStreaming (ctx context.Context , canStopStreaming func () bool ) error {
272
- coords := c .GetCurrentBinlogCoordinates ()
273
- err := c .binlogReader .ConnectBinlogStreamer (* coords )
264
+ func (c * Coordinator ) StartStreaming (ctx context.Context , coords mysql.BinlogCoordinates , canStopStreaming func () bool ) error {
265
+ err := c .binlogReader .ConnectBinlogStreamer (coords )
274
266
if err != nil {
275
267
return err
276
268
}
@@ -297,10 +289,11 @@ func (c *Coordinator) StartStreaming(ctx context.Context, canStopStreaming func(
297
289
}
298
290
c .migrationContext .Log .Infof ("Reconnecting... Will resume at %+v" , coords )
299
291
300
- // We reconnect at the position of the last low water mark.
301
- // Some jobs after the low water mark may have already applied, but
302
- // it's OK to reapply them since the DML operations are idempotent.
303
- coords := c .GetCurrentBinlogCoordinates ()
292
+ // We reconnect from the event that was last emitted to the stream.
293
+ // This ensures we don't miss any events, and we don't process any events twice.
294
+ // Processing events twice messes up the transaction tracking and
295
+ // will cause data corruption.
296
+ coords := c .binlogReader .GetCurrentBinlogCoordinates ()
304
297
if err := c .binlogReader .ConnectBinlogStreamer (* coords ); err != nil {
305
298
return err
306
299
}
@@ -385,10 +378,7 @@ func (c *Coordinator) ProcessEventsUntilDrained() error {
385
378
}
386
379
c .mu .Unlock ()
387
380
case * replication.RotateEvent :
388
- c .currentCoordinatesMutex .Lock ()
389
- c .currentCoordinates .LogFile = string (binlogEvent .NextLogName )
390
- c .currentCoordinatesMutex .Unlock ()
391
- c .migrationContext .Log .Infof ("rotate to next log from %s:%d to %s" , c .currentCoordinates .LogFile , int64 (ev .Header .LogPos ), binlogEvent .NextLogName )
381
+ c .migrationContext .Log .Infof ("rotate to next log in %s" , binlogEvent .NextLogName )
392
382
continue
393
383
default : // ignore all other events
394
384
continue
@@ -495,19 +485,17 @@ func (c *Coordinator) HandleChangeLogEvent(event *binlog.BinlogDMLEvent) {
495
485
496
486
func (c * Coordinator ) MarkTransactionCompleted (sequenceNumber , logPos , eventSize int64 ) {
497
487
var channelsToNotify []chan struct {}
498
- var lastCoords * mysql.BinlogCoordinates
499
488
500
489
func () {
501
490
c .mu .Lock ()
502
491
defer c .mu .Unlock ()
503
492
504
493
// Mark the job as completed
505
- c .completedJobs [sequenceNumber ] = & mysql. BinlogCoordinates { LogPos : logPos , EventSize : eventSize }
494
+ c .completedJobs [sequenceNumber ] = struct {}{ }
506
495
507
496
// Then, update the low water mark if possible
508
497
for {
509
- if coords , ok := c .completedJobs [c .lowWaterMark + 1 ]; ok {
510
- lastCoords = coords
498
+ if _ , ok := c .completedJobs [c .lowWaterMark + 1 ]; ok {
511
499
c .lowWaterMark ++
512
500
delete (c .completedJobs , c .lowWaterMark )
513
501
} else {
@@ -525,29 +513,11 @@ func (c *Coordinator) MarkTransactionCompleted(sequenceNumber, logPos, eventSize
525
513
}
526
514
}()
527
515
528
- // update the binlog coords of the low water mark
529
- if lastCoords != nil {
530
- func () {
531
- // c.migrationContext.Log.Infof("Updating binlog coordinates to %s:%d\n", c.currentCoordinates.LogFile, c.currentCoordinates.LogPos)
532
- c .currentCoordinatesMutex .Lock ()
533
- defer c .currentCoordinatesMutex .Unlock ()
534
- c .currentCoordinates .LogPos = lastCoords .LogPos
535
- c .currentCoordinates .EventSize = lastCoords .EventSize
536
- }()
537
- }
538
-
539
516
for _ , waitChannel := range channelsToNotify {
540
517
waitChannel <- struct {}{}
541
518
}
542
519
}
543
520
544
- func (c * Coordinator ) GetCurrentBinlogCoordinates () * mysql.BinlogCoordinates {
545
- c .currentCoordinatesMutex .Lock ()
546
- defer c .currentCoordinatesMutex .Unlock ()
547
- returnCoordinates := c .currentCoordinates
548
- return & returnCoordinates
549
- }
550
-
551
521
func (c * Coordinator ) Teardown () {
552
522
c .finishedMigrating .Store (true )
553
523
}
0 commit comments