Skip to content

Commit 7787aa9

Browse files
Execute hook on every batch insert retry
Co-authored-by: Bastian Bartmann <[email protected]>
1 parent 4502796 commit 7787aa9

File tree

2 files changed

+39
-21
lines changed

2 files changed

+39
-21
lines changed

go/logic/hooks.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
onInteractiveCommand = "gh-ost-on-interactive-command"
2929
onSuccess = "gh-ost-on-success"
3030
onFailure = "gh-ost-on-failure"
31+
onBatchCopyRetry = "gh-ost-on-batch-copy-retry"
3132
onStatus = "gh-ost-on-status"
3233
onStopReplication = "gh-ost-on-stop-replication"
3334
onStartReplication = "gh-ost-on-start-replication"
@@ -77,6 +78,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [
7778
// executeHook executes a command, and sets relevant environment variables
7879
// combined output & error are printed to the configured writer.
7980
func (this *HooksExecutor) executeHook(hook string, extraVariables ...string) error {
81+
this.migrationContext.Log.Infof("executing hook: %+v", hook)
8082
cmd := exec.Command(hook)
8183
cmd.Env = this.applyEnvironmentVariables(extraVariables...)
8284

@@ -123,6 +125,10 @@ func (this *HooksExecutor) onBeforeRowCopy() error {
123125
return this.executeHooks(onBeforeRowCopy)
124126
}
125127

128+
func (this *HooksExecutor) onBatchCopyRetry() error {
129+
return this.executeHooks(onBatchCopyRetry)
130+
}
131+
126132
func (this *HooksExecutor) onRowCopyComplete() error {
127133
return this.executeHooks(onRowCopyComplete)
128134
}

go/logic/migrator.go

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,18 @@ func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
130130
}
131131
}
132132

133+
func (this *Migrator) retryBatchCopyWithHooks(operation func() error, notFatalHint ...bool) (err error) {
134+
wrappedOperation := func() error {
135+
if err := operation(); err != nil {
136+
this.hooksExecutor.onBatchCopyRetry()
137+
return err
138+
}
139+
return nil
140+
}
141+
142+
return this.retryOperation(wrappedOperation, notFatalHint...)
143+
}
144+
133145
// retryOperation attempts up to `count` attempts at running given function,
134146
// exiting as soon as it returns with non-error.
135147
func (this *Migrator) retryOperation(operation func() error, notFatalHint ...bool) (err error) {
@@ -1232,28 +1244,28 @@ func (this *Migrator) iterateChunks() error {
12321244
return nil
12331245
}
12341246
copyRowsFunc := func() error {
1235-
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 {
1236-
// Done.
1237-
// There's another such check down the line
1238-
return nil
1239-
}
1240-
1241-
// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever
1242-
1243-
hasFurtherRange := false
1244-
expectedRangeSize := int64(0)
1245-
if err := this.retryOperation(func() (e error) {
1246-
hasFurtherRange, expectedRangeSize, e = this.applier.CalculateNextIterationRangeEndValues()
1247-
return e
1248-
}); err != nil {
1249-
return terminateRowIteration(err)
1250-
}
1251-
if !hasFurtherRange {
1252-
atomic.StoreInt64(&hasNoFurtherRangeFlag, 1)
1253-
return terminateRowIteration(nil)
1254-
}
12551247
// Copy task:
12561248
applyCopyRowsFunc := func() error {
1249+
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 {
1250+
// Done.
1251+
// There's another such check down the line
1252+
return nil
1253+
}
1254+
1255+
// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever
1256+
1257+
hasFurtherRange := false
1258+
// TODO: figure out how to rewrite this double retry?
1259+
if err := this.retryOperation(func() (e error) {
1260+
hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues()
1261+
return e
1262+
}); err != nil {
1263+
return terminateRowIteration(err)
1264+
}
1265+
if !hasFurtherRange {
1266+
atomic.StoreInt64(&hasNoFurtherRangeFlag, 1)
1267+
return terminateRowIteration(nil)
1268+
}
12571269
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
12581270
// No need for more writes.
12591271
// This is the de-facto place where we avoid writing in the event of completed cut-over.
@@ -1286,7 +1298,7 @@ func (this *Migrator) iterateChunks() error {
12861298
atomic.AddInt64(&this.migrationContext.Iteration, 1)
12871299
return nil
12881300
}
1289-
if err := this.retryOperation(applyCopyRowsFunc); err != nil {
1301+
if err := this.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil {
12901302
return terminateRowIteration(err)
12911303
}
12921304
return nil

0 commit comments

Comments
 (0)