Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ The full list of supported hooks is best found in code: [hooks.go](https://githu
- `gh-ost-on-before-cut-over`
- `gh-ost-on-success`
- `gh-ost-on-failure`
- `gh-ost-on-batch-copy-retry`

### Context

Expand Down Expand Up @@ -81,6 +82,7 @@ The following variable are available on particular hooks:

- `GH_OST_COMMAND` is only available in `gh-ost-on-interactive-command`
- `GH_OST_STATUS` is only available in `gh-ost-on-status`
- `GH_OST_LAST_BATCH_COPY_ERROR` is only available in `gh-ost-on-batch-copy-retry`

### Examples

Expand Down
7 changes: 7 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,13 @@ func (this *MigrationContext) GetIteration() int64 {
return atomic.LoadInt64(&this.Iteration)
}

func (this *MigrationContext) SetNextIterationRangeMinValues() {
this.MigrationIterationRangeMinValues = this.MigrationIterationRangeMaxValues
if this.MigrationIterationRangeMinValues == nil {
this.MigrationIterationRangeMinValues = this.MigrationRangeMinValues
}
}

func (this *MigrationContext) MarkPointOfInterest() int64 {
this.pointOfInterestTimeMutex.Lock()
defer this.pointOfInterestTimeMutex.Unlock()
Expand Down
4 changes: 0 additions & 4 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,10 +686,6 @@ func (this *Applier) ReadMigrationRangeValues() error {
// no further chunk to work through, i.e. we're past the last chunk and are done with
// iterating the range (and this done with copying row chunks)
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
if this.migrationContext.MigrationIterationRangeMinValues == nil {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
}
for i := 0; i < 2; i++ {
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
if i == 1 {
Expand Down
3 changes: 3 additions & 0 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,9 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
err = applier.ReadMigrationRangeValues()
suite.Require().NoError(err)

migrationContext.SetNextIterationRangeMinValues()
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()

suite.Require().NoError(err)
suite.Require().True(hasFurtherRange)

Expand Down Expand Up @@ -618,6 +620,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFai
err = applier.AlterGhost()
suite.Require().NoError(err)

migrationContext.SetNextIterationRangeMinValues()
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
suite.Require().NoError(err)
suite.Require().True(hasFurtherRange)
Expand Down
7 changes: 7 additions & 0 deletions go/logic/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
onInteractiveCommand = "gh-ost-on-interactive-command"
onSuccess = "gh-ost-on-success"
onFailure = "gh-ost-on-failure"
onBatchCopyRetry = "gh-ost-on-batch-copy-retry"
onStatus = "gh-ost-on-status"
onStopReplication = "gh-ost-on-stop-replication"
onStartReplication = "gh-ost-on-start-replication"
Expand Down Expand Up @@ -77,6 +78,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [
// executeHook executes a command, and sets relevant environment variables
// combined output & error are printed to the configured writer.
func (this *HooksExecutor) executeHook(hook string, extraVariables ...string) error {
this.migrationContext.Log.Infof("executing hook: %+v", hook)
Copy link
Preview

Copilot AI Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The added logging statement in executeHook duplicates the logging in executeHooks and may lead to redundant log entries. Consider removing one of these log statements to avoid unnecessary duplication.

Suggested change
this.migrationContext.Log.Infof("executing hook: %+v", hook)

Copilot uses AI. Check for mistakes.

cmd := exec.Command(hook)
cmd.Env = this.applyEnvironmentVariables(extraVariables...)

Expand Down Expand Up @@ -123,6 +125,11 @@ func (this *HooksExecutor) onBeforeRowCopy() error {
return this.executeHooks(onBeforeRowCopy)
}

func (this *HooksExecutor) onBatchCopyRetry(errorMessage string) error {
v := fmt.Sprintf("GH_OST_LAST_BATCH_COPY_ERROR=%s", errorMessage)
return this.executeHooks(onBatchCopyRetry, v)
}

func (this *HooksExecutor) onRowCopyComplete() error {
return this.executeHooks(onRowCopyComplete)
}
Expand Down
49 changes: 29 additions & 20 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
}
}

func (this *Migrator) retryBatchCopyWithHooks(operation func() error, notFatalHint ...bool) (err error) {
wrappedOperation := func() error {
if err := operation(); err != nil {
this.hooksExecutor.onBatchCopyRetry(err.Error())
return err
}
return nil
}

return this.retryOperation(wrappedOperation, notFatalHint...)
}

// retryOperation attempts up to `count` attempts at running given function,
// exiting as soon as it returns with non-error.
func (this *Migrator) retryOperation(operation func() error, notFatalHint ...bool) (err error) {
Expand Down Expand Up @@ -1238,27 +1250,24 @@ func (this *Migrator) iterateChunks() error {
return nil
}
copyRowsFunc := func() error {
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 {
// Done.
// There's another such check down the line
return nil
}

// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever

hasFurtherRange := false
if err := this.retryOperation(func() (e error) {
hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues()
return e
}); err != nil {
return terminateRowIteration(err)
}
if !hasFurtherRange {
atomic.StoreInt64(&hasNoFurtherRangeFlag, 1)
return terminateRowIteration(nil)
}
this.migrationContext.SetNextIterationRangeMinValues()
// Copy task:
applyCopyRowsFunc := func() error {
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 {
// Done.
// There's another such check down the line
return nil
}

// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever
hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues()
if err != nil {
return err // wrapping call will retry
}
if !hasFurtherRange {
atomic.StoreInt64(&hasNoFurtherRangeFlag, 1)
return terminateRowIteration(nil)
}
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
// No need for more writes.
// This is the de-facto place where we avoid writing in the event of completed cut-over.
Expand Down Expand Up @@ -1289,7 +1298,7 @@ func (this *Migrator) iterateChunks() error {
atomic.AddInt64(&this.migrationContext.Iteration, 1)
return nil
}
if err := this.retryOperation(applyCopyRowsFunc); err != nil {
if err := this.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil {
return terminateRowIteration(err)
}
return nil
Expand Down
124 changes: 124 additions & 0 deletions go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
package logic

import (
"bytes"
"context"
gosql "database/sql"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -312,6 +314,8 @@ func (suite *MigratorTestSuite) SetupTest() {

_, err := suite.db.ExecContext(ctx, "CREATE DATABASE IF NOT EXISTS "+testMysqlDatabase)
suite.Require().NoError(err)

os.Remove("/tmp/gh-ost.sock")
}

func (suite *MigratorTestSuite) TearDownTest() {
Expand Down Expand Up @@ -367,6 +371,126 @@ func (suite *MigratorTestSuite) TestMigrateEmpty() {
suite.Require().Equal("_testing_del", tableName)
}

func (suite *MigratorTestSuite) TestRetryBatchCopyWithHooks() {
ctx := context.Background()

_, err := suite.db.ExecContext(ctx, "CREATE TABLE test.test_retry_batch (id INT PRIMARY KEY AUTO_INCREMENT, name TEXT)")
suite.Require().NoError(err)

const initStride = 1000
const totalBatches = 3
for i := 0; i < totalBatches; i++ {
dataSize := 50 * i
for j := 0; j < initStride; j++ {
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO test.test_retry_batch (name) VALUES ('%s')", strings.Repeat("a", dataSize)))
suite.Require().NoError(err)
}
}

_, err = suite.db.ExecContext(ctx, fmt.Sprintf("SET GLOBAL max_binlog_cache_size = %d", 1024*8))
suite.Require().NoError(err)
defer func() {
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("SET GLOBAL max_binlog_cache_size = %d", 1024*1024*1024))
suite.Require().NoError(err)
}()

tmpDir, err := os.MkdirTemp("", "gh-ost-hooks")
suite.Require().NoError(err)
defer os.RemoveAll(tmpDir)

hookScript := filepath.Join(tmpDir, "gh-ost-on-batch-copy-retry")
hookContent := `#!/bin/bash
# Mock hook that reduces chunk size on binlog cache error
ERROR_MSG="$GH_OST_LAST_BATCH_COPY_ERROR"
SOCKET_PATH="/tmp/gh-ost.sock"

if ! [[ "$ERROR_MSG" =~ "max_binlog_cache_size" ]]; then
echo "Nothing to do for error: $ERROR_MSG"
exit 0
fi

CHUNK_SIZE=$(echo "chunk-size=?" | nc -U $SOCKET_PATH | tr -d '\n')

MIN_CHUNK_SIZE=10
NEW_CHUNK_SIZE=$(( CHUNK_SIZE * 8 / 10 ))
if [ $NEW_CHUNK_SIZE -lt $MIN_CHUNK_SIZE ]; then
NEW_CHUNK_SIZE=$MIN_CHUNK_SIZE
fi

if [ $CHUNK_SIZE -eq $NEW_CHUNK_SIZE ]; then
echo "Chunk size unchanged: $CHUNK_SIZE"
exit 0
fi

echo "[gh-ost-on-batch-copy-retry]: Changing chunk size from $CHUNK_SIZE to $NEW_CHUNK_SIZE"
echo "chunk-size=$NEW_CHUNK_SIZE" | nc -U $SOCKET_PATH
echo "[gh-ost-on-batch-copy-retry]: Done, exiting..."
`
err = os.WriteFile(hookScript, []byte(hookContent), 0755)
suite.Require().NoError(err)

origStdout := os.Stdout
origStderr := os.Stderr

rOut, wOut, _ := os.Pipe()
rErr, wErr, _ := os.Pipe()
os.Stdout = wOut
os.Stderr = wErr

connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := base.NewMigrationContext()
migrationContext.AllowedRunningOnMaster = true
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.InspectorConnectionConfig = connectionConfig
migrationContext.DatabaseName = "test"
migrationContext.SkipPortValidation = true
migrationContext.OriginalTableName = "test_retry_batch"
migrationContext.SetConnectionConfig("innodb")
migrationContext.AlterStatementOptions = "MODIFY name LONGTEXT, ENGINE=InnoDB"
migrationContext.ReplicaServerId = 99999
migrationContext.HeartbeatIntervalMilliseconds = 100
migrationContext.ThrottleHTTPIntervalMillis = 100
migrationContext.ThrottleHTTPTimeoutMillis = 1000
migrationContext.HooksPath = tmpDir
migrationContext.ChunkSize = 1000
migrationContext.SetDefaultNumRetries(10)
migrationContext.ServeSocketFile = "/tmp/gh-ost.sock"

migrator := NewMigrator(migrationContext, "0.0.0")

err = migrator.Migrate()
suite.Require().NoError(err)

wOut.Close()
wErr.Close()
os.Stdout = origStdout
os.Stderr = origStderr

var bufOut, bufErr bytes.Buffer
io.Copy(&bufOut, rOut)
io.Copy(&bufErr, rErr)

outStr := bufOut.String()
errStr := bufErr.String()

suite.Assert().Contains(outStr, "chunk-size: 1000")
suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 1000 to 800")
suite.Assert().Contains(outStr, "chunk-size: 800")

suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 800 to 640")
suite.Assert().Contains(outStr, "chunk-size: 640")

suite.Assert().Contains(errStr, "[gh-ost-on-batch-copy-retry]: Changing chunk size from 640 to 512")
suite.Assert().Contains(outStr, "chunk-size: 512")

var count int
err = suite.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM test.test_retry_batch").Scan(&count)
suite.Require().NoError(err)
suite.Assert().Equal(3000, count)
}

func (suite *MigratorTestSuite) TestCopierIntPK() {
ctx := context.Background()

Expand Down
1 change: 1 addition & 0 deletions localtests/copy-retries-exhausted/after.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
set global max_binlog_cache_size = 1073741824; -- 1GB
1 change: 1 addition & 0 deletions localtests/copy-retries-exhausted/before.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
set global max_binlog_cache_size = 1024;
12 changes: 12 additions & 0 deletions localtests/copy-retries-exhausted/create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
name mediumtext not null,
primary key (id)
) auto_increment=1;

insert into gh_ost_test (name)
select repeat('a', 1500)
from information_schema.columns
cross join information_schema.tables
limit 1000;
1 change: 1 addition & 0 deletions localtests/copy-retries-exhausted/expect_failure
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Multi-statement transaction required more than 'max_binlog_cache_size' bytes of storage
1 change: 1 addition & 0 deletions localtests/copy-retries-exhausted/extra_args
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
--alter "modify column name mediumtext" --default-retries=1 --chunk-size=1000
11 changes: 11 additions & 0 deletions localtests/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ test_single() {
fi

gh-ost-test-mysql-master --default-character-set=utf8mb4 test < $tests_path/$test_name/create.sql

if [ -f $tests_path/$test_name/before.sql ]; then
gh-ost-test-mysql-master --default-character-set=utf8mb4 test < $tests_path/$test_name/before.sql
gh-ost-test-mysql-replica --default-character-set=utf8mb4 test < $tests_path/$test_name/before.sql
fi

test_create_result=$?

if [ $test_create_result -ne 0 ] ; then
Expand Down Expand Up @@ -208,6 +214,11 @@ test_single() {
gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "set @@global.sql_mode='${original_sql_mode}'"
fi

if [ -f $tests_path/$test_name/after.sql ]; then
gh-ost-test-mysql-master --default-character-set=utf8mb4 test < $tests_path/$test_name/after.sql
gh-ost-test-mysql-replica --default-character-set=utf8mb4 test < $tests_path/$test_name/after.sql
fi

if [ -f $tests_path/$test_name/destroy.sql ] ; then
gh-ost-test-mysql-master --default-character-set=utf8mb4 test < $tests_path/$test_name/destroy.sql
fi
Expand Down