@@ -79,7 +79,7 @@ type Migrator struct {
79
79
80
80
firstThrottlingCollected chan bool
81
81
ghostTableMigrated chan bool
82
- rowCopyComplete chan bool
82
+ rowCopyComplete chan error
83
83
allEventsUpToLockProcessed chan string
84
84
85
85
rowCopyCompleteFlag int64
@@ -97,7 +97,7 @@ func NewMigrator() *Migrator {
97
97
parser : sql .NewParser (),
98
98
ghostTableMigrated : make (chan bool ),
99
99
firstThrottlingCollected : make (chan bool , 3 ),
100
- rowCopyComplete : make (chan bool ),
100
+ rowCopyComplete : make (chan error ),
101
101
allEventsUpToLockProcessed : make (chan string ),
102
102
103
103
copyRowsQueue : make (chan tableWriteFunc ),
@@ -180,11 +180,16 @@ func (this *Migrator) executeAndThrottleOnError(operation func() error) (err err
180
180
// consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then
181
181
// consumes and drops any further incoming events that may be left hanging.
182
182
func (this * Migrator ) consumeRowCopyComplete () {
183
- <- this .rowCopyComplete
183
+ if err := <- this .rowCopyComplete ; err != nil {
184
+ this .migrationContext .PanicAbort <- err
185
+ }
184
186
atomic .StoreInt64 (& this .rowCopyCompleteFlag , 1 )
185
187
this .migrationContext .MarkRowCopyEndTime ()
186
188
go func () {
187
- for <- this .rowCopyComplete {
189
+ for err := range this .rowCopyComplete {
190
+ if err != nil {
191
+ this .migrationContext .PanicAbort <- err
192
+ }
188
193
}
189
194
}()
190
195
}
@@ -1024,7 +1029,7 @@ func (this *Migrator) initiateApplier() error {
1024
1029
// a chunk of rows onto the ghost table.
1025
1030
func (this * Migrator ) iterateChunks () error {
1026
1031
terminateRowIteration := func (err error ) error {
1027
- this .rowCopyComplete <- true
1032
+ this .rowCopyComplete <- err
1028
1033
return log .Errore (err )
1029
1034
}
1030
1035
if this .migrationContext .Noop {
@@ -1076,7 +1081,10 @@ func (this *Migrator) iterateChunks() error {
1076
1081
atomic .AddInt64 (& this .migrationContext .Iteration , 1 )
1077
1082
return nil
1078
1083
}
1079
- return this .retryOperation (applyCopyRowsFunc )
1084
+ if err := this .retryOperation (applyCopyRowsFunc ); err != nil {
1085
+ return terminateRowIteration (err )
1086
+ }
1087
+ return nil
1080
1088
}
1081
1089
// Enqueue copy operation; to be executed by executeWriteFuncs()
1082
1090
this .copyRowsQueue <- copyRowsFunc
0 commit comments