Skip to content

Commit ccea072

Browse files
Fix retry issue where MigrationIterationRangeMinValues advances before insert completes
- extract MigrationContext.SetNextIterationRangeValues outside of applyCopyRowsFunc, so that it doesn't run on retries - add an integration test for Migrator with retry hooks Co-authored-by: Bastian Bartmann <[email protected]>
1 parent f9af28b commit ccea072

File tree

5 files changed

+134
-4
lines changed

5 files changed

+134
-4
lines changed

go/base/context.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,13 @@ func (this *MigrationContext) GetIteration() int64 {
584584
return atomic.LoadInt64(&this.Iteration)
585585
}
586586

587+
func (this *MigrationContext) SetNextIterationRangeValues() {
588+
this.MigrationIterationRangeMinValues = this.MigrationIterationRangeMaxValues
589+
if this.MigrationIterationRangeMinValues == nil {
590+
this.MigrationIterationRangeMinValues = this.MigrationRangeMinValues
591+
}
592+
}
593+
587594
func (this *MigrationContext) MarkPointOfInterest() int64 {
588595
this.pointOfInterestTimeMutex.Lock()
589596
defer this.pointOfInterestTimeMutex.Unlock()

go/logic/applier.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -664,10 +664,6 @@ func (this *Applier) ReadMigrationRangeValues() error {
664664
// no further chunk to work through, i.e. we're past the last chunk and are done with
665665
// iterating the range (and this done with copying row chunks)
666666
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, expectedRowCount int64, err error) {
667-
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
668-
if this.migrationContext.MigrationIterationRangeMinValues == nil {
669-
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
670-
}
671667
for i := 0; i < 2; i++ {
672668
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
673669
if i == 1 {

go/logic/applier_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
562562
err = applier.ReadMigrationRangeValues()
563563
suite.Require().NoError(err)
564564

565+
migrationContext.SetNextIterationRangeValues()
565566
hasFurtherRange, expectedRangeSize, err := applier.CalculateNextIterationRangeEndValues()
566567
suite.Require().NoError(err)
567568
suite.Require().True(hasFurtherRange)

go/logic/migrator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1244,6 +1244,7 @@ func (this *Migrator) iterateChunks() error {
12441244
return nil
12451245
}
12461246
copyRowsFunc := func() error {
1247+
this.migrationContext.SetNextIterationRangeValues()
12471248
// Copy task:
12481249
applyCopyRowsFunc := func() error {
12491250
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 {

go/logic/migrator_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@
66
package logic
77

88
import (
9+
"bytes"
910
"context"
1011
gosql "database/sql"
1112
"errors"
13+
"fmt"
14+
"io"
1215
"os"
1316
"path/filepath"
1417
"runtime"
@@ -316,6 +319,8 @@ func (suite *MigratorTestSuite) SetupTest() {
316319

317320
_, err := suite.db.ExecContext(ctx, "CREATE DATABASE test")
318321
suite.Require().NoError(err)
322+
323+
os.Remove("/tmp/gh-ost.sock")
319324
}
320325

321326
func (suite *MigratorTestSuite) TearDownTest() {
@@ -379,6 +384,126 @@ func (suite *MigratorTestSuite) TestFoo() {
379384
suite.Require().Equal("_testing_del", tableName)
380385
}
381386

387+
func (suite *MigratorTestSuite) TestRetryBatchCopyWithHooks() {
388+
ctx := context.Background()
389+
390+
_, err := suite.db.ExecContext(ctx, "CREATE TABLE test.test_retry_batch (id INT PRIMARY KEY AUTO_INCREMENT, name TEXT)")
391+
suite.Require().NoError(err)
392+
393+
const initStride = 1000
394+
const totalBatches = 3
395+
for i := 0; i < totalBatches; i++ {
396+
dataSize := 50 * i
397+
for j := 0; j < initStride; j++ {
398+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO test.test_retry_batch (name) VALUES ('%s')", strings.Repeat("a", dataSize)))
399+
suite.Require().NoError(err)
400+
}
401+
}
402+
403+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("SET GLOBAL max_binlog_cache_size = %d", 1024*8))
404+
suite.Require().NoError(err)
405+
defer func() {
406+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("SET GLOBAL max_binlog_cache_size = %d", 1024*1024*1024))
407+
suite.Require().NoError(err)
408+
}()
409+
410+
tmpDir, err := os.MkdirTemp("", "gh-ost-hooks")
411+
suite.Require().NoError(err)
412+
defer os.RemoveAll(tmpDir)
413+
414+
hookScript := filepath.Join(tmpDir, "gh-ost-on-batch-copy-retry")
415+
hookContent := `#!/bin/bash
416+
# Mock hook that reduces chunk size on binlog cache error
417+
ERROR_MSG="$GH_OST_LAST_BATCH_COPY_ERROR"
418+
SOCKET_PATH="/tmp/gh-ost.sock"
419+
420+
if ! [[ "$ERROR_MSG" =~ "max_binlog_cache_size" ]]; then
421+
echo "Nothing to do for error: $ERROR_MSG"
422+
exit 0
423+
fi
424+
425+
CHUNK_SIZE=$(echo "chunk-size=?" | nc -U $SOCKET_PATH | tr -d '\n')
426+
427+
MIN_CHUNK_SIZE=10
428+
NEW_CHUNK_SIZE=$(( CHUNK_SIZE * 8 / 10 ))
429+
if [ $NEW_CHUNK_SIZE -lt $MIN_CHUNK_SIZE ]; then
430+
NEW_CHUNK_SIZE=$MIN_CHUNK_SIZE
431+
fi
432+
433+
if [ $CHUNK_SIZE -eq $NEW_CHUNK_SIZE ]; then
434+
echo "Chunk size unchanged: $CHUNK_SIZE"
435+
exit 0
436+
fi
437+
438+
echo "[gh-ost-on-batch-copy-retry]: Changing chunk size from $CHUNK_SIZE to $NEW_CHUNK_SIZE"
439+
echo "chunk-size=$NEW_CHUNK_SIZE" | nc -U $SOCKET_PATH
440+
echo "[gh-ost-on-batch-copy-retry]: Done, exiting..."
441+
`
442+
err = os.WriteFile(hookScript, []byte(hookContent), 0755)
443+
suite.Require().NoError(err)
444+
445+
origStdout := os.Stdout
446+
origStderr := os.Stderr
447+
448+
rOut, wOut, _ := os.Pipe()
449+
rErr, wErr, _ := os.Pipe()
450+
os.Stdout = wOut
451+
os.Stderr = wErr
452+
453+
connectionConfig, err := GetConnectionConfig(ctx, suite.mysqlContainer)
454+
suite.Require().NoError(err)
455+
456+
migrationContext := base.NewMigrationContext()
457+
migrationContext.AllowedRunningOnMaster = true
458+
migrationContext.ApplierConnectionConfig = connectionConfig
459+
migrationContext.InspectorConnectionConfig = connectionConfig
460+
migrationContext.DatabaseName = "test"
461+
migrationContext.SkipPortValidation = true
462+
migrationContext.OriginalTableName = "test_retry_batch"
463+
migrationContext.SetConnectionConfig("innodb")
464+
migrationContext.AlterStatementOptions = "MODIFY name LONGTEXT, ENGINE=InnoDB"
465+
migrationContext.ReplicaServerId = 99999
466+
migrationContext.HeartbeatIntervalMilliseconds = 100
467+
migrationContext.ThrottleHTTPIntervalMillis = 100
468+
migrationContext.ThrottleHTTPTimeoutMillis = 1000
469+
migrationContext.HooksPath = tmpDir
470+
migrationContext.ChunkSize = 1000
471+
migrationContext.SetDefaultNumRetries(10)
472+
migrationContext.ServeSocketFile = "/tmp/gh-ost.sock"
473+
474+
migrator := NewMigrator(migrationContext, "0.0.0")
475+
476+
err = migrator.Migrate()
477+
suite.Require().NoError(err)
478+
479+
wOut.Close()
480+
wErr.Close()
481+
os.Stdout = origStdout
482+
os.Stderr = origStderr
483+
484+
var bufOut, bufErr bytes.Buffer
485+
io.Copy(&bufOut, rOut)
486+
io.Copy(&bufErr, rErr)
487+
488+
outStr := bufOut.String()
489+
errStr := bufErr.String()
490+
491+
suite.Assert().Contains(outStr, "chunk-size: 1000")
492+
suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 1000 to 800")
493+
suite.Assert().Contains(outStr, "chunk-size: 800")
494+
495+
suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 800 to 640")
496+
suite.Assert().Contains(outStr, "chunk-size: 640")
497+
498+
suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 640 to 512")
499+
suite.Assert().Contains(outStr, "chunk-size: 512")
500+
501+
var count int
502+
err = suite.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM test.test_retry_batch").Scan(&count)
503+
suite.Require().NoError(err)
504+
suite.Assert().Equal(3000, count)
505+
}
506+
382507
func TestMigratorRetry(t *testing.T) {
383508
oldRetrySleepFn := RetrySleepFn
384509
defer func() { RetrySleepFn = oldRetrySleepFn }()

0 commit comments

Comments
 (0)