@@ -48,6 +48,7 @@ type Migrator struct {
48
48
voluntaryLockAcquired chan bool
49
49
panicAbort chan error
50
50
51
+ allEventsUpToLockProcessedFlag int64
51
52
// copyRowsQueue should not be buffered; if buffered some non-damaging but
52
53
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
53
54
copyRowsQueue chan tableWriteFunc
@@ -65,6 +66,8 @@ func NewMigrator() *Migrator {
65
66
voluntaryLockAcquired : make (chan bool , 1 ),
66
67
panicAbort : make (chan error ),
67
68
69
+ allEventsUpToLockProcessedFlag : 0 ,
70
+
68
71
copyRowsQueue : make (chan tableWriteFunc ),
69
72
applyEventsQueue : make (chan tableWriteFunc , applyEventsQueueBuffer ),
70
73
handledChangelogStates : make (map [string ]bool ),
@@ -106,7 +109,7 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
106
109
if time .Duration (lag ) > time .Duration (this .migrationContext .MaxLagMillisecondsThrottleThreshold )* time .Millisecond {
107
110
return true , fmt .Sprintf ("lag=%fs" , time .Duration (lag ).Seconds ())
108
111
}
109
- if this .migrationContext .TestOnReplica {
112
+ if this .migrationContext .TestOnReplica && ( atomic . LoadInt64 ( & this . allEventsUpToLockProcessedFlag ) == 0 ) {
110
113
replicationLag , err := mysql .GetMaxReplicationLag (this .migrationContext .InspectorConnectionConfig , this .migrationContext .ThrottleControlReplicaKeys , this .migrationContext .ReplictionLagQuery )
111
114
if err != nil {
112
115
return true , err .Error ()
@@ -198,6 +201,16 @@ func (this *Migrator) executeAndThrottleOnError(operation func() error) (err err
198
201
return nil
199
202
}
200
203
204
+ // consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then
205
+ // consumers and drops any further incoming events that may be left hanging.
206
+ func (this * Migrator ) consumeRowCopyComplete () {
207
+ <- this .rowCopyComplete
208
+ go func () {
209
+ for <- this .rowCopyComplete {
210
+ }
211
+ }()
212
+ }
213
+
201
214
func (this * Migrator ) canStopStreaming () bool {
202
215
return false
203
216
}
@@ -215,33 +228,18 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
215
228
}
216
229
case AllEventsUpToLockProcessed :
217
230
{
218
- this .allEventsUpToLockProcessed <- true
219
- }
220
- default :
221
- {
222
- return fmt .Errorf ("Unknown changelog state: %+v" , changelogState )
223
- }
224
- }
225
- log .Debugf ("Received state %+v" , changelogState )
226
- return nil
227
- }
228
-
229
- func (this * Migrator ) onChangelogState (stateValue string ) (err error ) {
230
- log .Fatalf ("I shouldn't be here" )
231
- if this .handledChangelogStates [stateValue ] {
232
- return nil
233
- }
234
- this .handledChangelogStates [stateValue ] = true
235
-
236
- changelogState := ChangelogState (stateValue )
237
- switch changelogState {
238
- case TablesInPlace :
239
- {
240
- this .tablesInPlace <- true
241
- }
242
- case AllEventsUpToLockProcessed :
243
- {
244
- this .allEventsUpToLockProcessed <- true
231
+ applyEventFunc := func () error {
232
+ this .allEventsUpToLockProcessed <- true
233
+ return nil
234
+ }
235
+ // at this point we know all events up to lock have been read from the streamer,
236
+ // because the streamer works sequentially. So those events are either already handled,
237
+ // or have event functions in applyEventsQueue.
238
+ // So as not to create a potential deadlock, we write this func to applyEventsQueue
239
+ // asynchronously, understanding it doesn't really matter.
240
+ go func () {
241
+ this .applyEventsQueue <- applyEventFunc
242
+ }()
245
243
}
246
244
default :
247
245
{
@@ -295,6 +293,9 @@ func (this *Migrator) Migrate() (err error) {
295
293
if err := this .inspector .InspectOriginalAndGhostTables (); err != nil {
296
294
return err
297
295
}
296
+ if err := this .addDMLEventsListener (); err != nil {
297
+ return err
298
+ }
298
299
go this .initiateHeartbeatListener ()
299
300
300
301
if err := this .applier .ReadMigrationRangeValues (); err != nil {
@@ -307,7 +308,7 @@ func (this *Migrator) Migrate() (err error) {
307
308
go this .initiateStatus ()
308
309
309
310
log .Debugf ("Operating until row copy is complete" )
310
- <- this .rowCopyComplete
311
+ this .consumeRowCopyComplete ()
311
312
log .Debugf ("Row copy complete" )
312
313
this .printStatus ()
313
314
@@ -336,18 +337,20 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
336
337
if this .migrationContext .QuickAndBumpySwapTables {
337
338
return this .stopWritesAndCompleteMigrationOnMasterQuickAndBumpy ()
338
339
}
339
- // Lock-based solution: we use low timeout and multiple attempts. But for
340
- // each failed attempt, we throttle until replication lag is back to normal
341
- if err := this .retryOperation (
342
- func () error {
343
- return this .executeAndThrottleOnError (this .stopWritesAndCompleteMigrationOnMasterViaLock )
344
- }); err != nil {
345
- return err
346
- }
347
- if err := this .dropOldTableIfRequired (); err != nil {
348
- return err
349
- }
350
340
341
+ {
342
+ // Lock-based solution: we use low timeout and multiple attempts. But for
343
+ // each failed attempt, we throttle until replication lag is back to normal
344
+ if err := this .retryOperation (
345
+ func () error {
346
+ return this .executeAndThrottleOnError (this .stopWritesAndCompleteMigrationOnMasterViaLock )
347
+ }); err != nil {
348
+ return err
349
+ }
350
+ if err := this .dropOldTableIfRequired (); err != nil {
351
+ return err
352
+ }
353
+ }
351
354
return
352
355
}
353
356
@@ -364,6 +367,21 @@ func (this *Migrator) dropOldTableIfRequired() (err error) {
364
367
return nil
365
368
}
366
369
370
+ // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
371
+ // make sure the queue is drained.
372
+ func (this * Migrator ) waitForEventsUpToLock () (err error ) {
373
+ if _ , err := this .applier .WriteChangelogState (string (AllEventsUpToLockProcessed )); err != nil {
374
+ return err
375
+ }
376
+ log .Debugf ("Waiting for events up to lock" )
377
+ <- this .allEventsUpToLockProcessed
378
+ atomic .StoreInt64 (& this .allEventsUpToLockProcessedFlag , 1 )
379
+ log .Debugf ("Done waiting for events up to lock" )
380
+ this .printStatus ()
381
+
382
+ return nil
383
+ }
384
+
367
385
// stopWritesAndCompleteMigrationOnMasterQuickAndBumpy will lock down the original table, execute
368
386
// what's left of last DML entries, and **non-atomically** swap original->old, then new->original.
369
387
// There is a point in time where the "original" table does not exist and queries are non-blocked
@@ -373,11 +391,9 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err
373
391
return err
374
392
}
375
393
376
- this .applier .WriteChangelogState (string (AllEventsUpToLockProcessed ))
377
- log .Debugf ("Waiting for events up to lock" )
378
- <- this .allEventsUpToLockProcessed
379
- log .Debugf ("Done waiting for events up to lock" )
380
-
394
+ if err := this .retryOperation (this .waitForEventsUpToLock ); err != nil {
395
+ return err
396
+ }
381
397
if err := this .retryOperation (this .applier .SwapTablesQuickAndBumpy ); err != nil {
382
398
return err
383
399
}
@@ -438,10 +454,7 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error
438
454
log .Infof ("Found RENAME to be executing" )
439
455
440
456
// OK, at this time we know any newly incoming DML on original table is blocked.
441
- this .applier .WriteChangelogState (string (AllEventsUpToLockProcessed ))
442
- log .Debugf ("Waiting for events up to lock" )
443
- <- this .allEventsUpToLockProcessed
444
- log .Debugf ("Done waiting for events up to lock" )
457
+ this .waitForEventsUpToLock ()
445
458
446
459
okToReleaseLock <- true
447
460
// BAM: voluntary lock is released, blocking query is released, rename is released.
@@ -466,14 +479,11 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error
466
479
// in sync. There is no table swap.
467
480
func (this * Migrator ) stopWritesAndCompleteMigrationOnReplica () (err error ) {
468
481
log .Debugf ("testing on replica. Instead of LOCK tables I will STOP SLAVE" )
469
- if err := this .retryOperation (this .applier .StopSlaveIOThread ); err != nil {
482
+ if err := this .retryOperation (this .applier .StopSlaveNicely ); err != nil {
470
483
return err
471
484
}
472
485
473
- this .applier .WriteChangelogState (string (AllEventsUpToLockProcessed ))
474
- log .Debugf ("Waiting for events up to lock" )
475
- <- this .allEventsUpToLockProcessed
476
- log .Debugf ("Done waiting for events up to lock" )
486
+ this .waitForEventsUpToLock ()
477
487
478
488
log .Info ("Table duplicated with new schema. Am not touching the original table. Replication is stopped. You may now compare the two tables to gain trust into this tool's operation" )
479
489
return nil
@@ -612,7 +622,17 @@ func (this *Migrator) initiateStreaming() error {
612
622
return this .onChangelogStateEvent (dmlEvent )
613
623
},
614
624
)
615
- this .eventsStreamer .AddListener (
625
+
626
+ go func () {
627
+ log .Debugf ("Beginning streaming" )
628
+ this .eventsStreamer .StreamEvents (func () bool { return this .canStopStreaming () })
629
+ }()
630
+ return nil
631
+ }
632
+
633
+ // addDMLEventsListener
634
+ func (this * Migrator ) addDMLEventsListener () error {
635
+ err := this .eventsStreamer .AddListener (
616
636
false ,
617
637
this .migrationContext .DatabaseName ,
618
638
this .migrationContext .OriginalTableName ,
@@ -624,12 +644,7 @@ func (this *Migrator) initiateStreaming() error {
624
644
return nil
625
645
},
626
646
)
627
-
628
- go func () {
629
- log .Debugf ("Beginning streaming" )
630
- this .eventsStreamer .StreamEvents (func () bool { return this .canStopStreaming () })
631
- }()
632
- return nil
647
+ return err
633
648
}
634
649
635
650
func (this * Migrator ) initiateApplier () error {
@@ -680,13 +695,16 @@ func (this *Migrator) iterateChunks() error {
680
695
if ! hasFurtherRange {
681
696
return terminateRowIteration (nil )
682
697
}
683
- _ , rowsAffected , _ , err := this .applier .ApplyIterationInsertQuery ()
684
- if err != nil {
685
- return terminateRowIteration (err )
698
+ applyCopyRowsFunc := func () error {
699
+ _ , rowsAffected , _ , err := this .applier .ApplyIterationInsertQuery ()
700
+ if err != nil {
701
+ return terminateRowIteration (err )
702
+ }
703
+ atomic .AddInt64 (& this .migrationContext .TotalRowsCopied , rowsAffected )
704
+ atomic .AddInt64 (& this .migrationContext .Iteration , 1 )
705
+ return nil
686
706
}
687
- atomic .AddInt64 (& this .migrationContext .TotalRowsCopied , rowsAffected )
688
- atomic .AddInt64 (& this .migrationContext .Iteration , 1 )
689
- return nil
707
+ return this .retryOperation (applyCopyRowsFunc )
690
708
}
691
709
this .copyRowsQueue <- copyRowsFunc
692
710
}
@@ -714,7 +732,8 @@ func (this *Migrator) executeWriteFuncs() error {
714
732
select {
715
733
case copyRowsFunc := <- this .copyRowsQueue :
716
734
{
717
- if err := this .retryOperation (copyRowsFunc ); err != nil {
735
+ // Retries are handled within the copyRowsFunc
736
+ if err := copyRowsFunc (); err != nil {
718
737
return log .Errore (err )
719
738
}
720
739
}
0 commit comments