Skip to content

Commit b34f2e2

Browse files
committed
Merge branch 'master' into meiji163/parallel-repl
2 parents e1ca9cd + 0fe1190 commit b34f2e2

File tree

150 files changed

+3099
-1670
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

150 files changed

+3099
-1670
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33
/libexec/
44
/.vendor/
55
.idea/
6+
*.tmp

go.mod

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ require (
1111
github.com/stretchr/testify v1.9.0
1212
github.com/testcontainers/testcontainers-go v0.34.0
1313
golang.org/x/net v0.24.0
14-
golang.org/x/sync v0.8.0
15-
golang.org/x/term v0.19.0
16-
golang.org/x/text v0.14.0
14+
golang.org/x/sync v0.10.0
15+
golang.org/x/term v0.27.0
16+
golang.org/x/text v0.21.0
1717
)
1818

1919
require (
@@ -66,7 +66,7 @@ require (
6666
go.opentelemetry.io/otel/metric v1.24.0 // indirect
6767
go.opentelemetry.io/otel/trace v1.24.0 // indirect
6868
go.uber.org/atomic v1.7.0 // indirect
69-
golang.org/x/crypto v0.22.0 // indirect
70-
golang.org/x/sys v0.21.0 // indirect
69+
golang.org/x/crypto v0.31.0 // indirect
70+
golang.org/x/sys v0.28.0 // indirect
7171
gopkg.in/yaml.v3 v3.0.1 // indirect
7272
)

go.sum

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,8 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
183183
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
184184
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
185185
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
186-
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
187-
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
186+
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
187+
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
188188
golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
189189
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
190190
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@@ -199,8 +199,8 @@ golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
199199
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
200200
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
201201
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
202-
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
203-
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
202+
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
203+
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
204204
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
205205
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
206206
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -211,15 +211,15 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc
211211
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
212212
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
213213
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
214-
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
215-
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
216-
golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q=
217-
golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk=
214+
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
215+
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
216+
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
217+
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
218218
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
219219
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
220220
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
221-
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
222-
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
221+
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
222+
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
223223
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44=
224224
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
225225
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

go/base/context.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ type MigrationContext struct {
197197
CurrentLag int64
198198
currentProgress uint64
199199
etaNanoseonds int64
200+
EtaRowsPerSecond int64
200201
ThrottleHTTPIntervalMillis int64
201202
ThrottleHTTPStatusCode int64
202203
ThrottleHTTPTimeoutMillis int64

go/cmd/gh-ost/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func main() {
107107
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)")
108108
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)")
109109
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
110-
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)")
110+
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout) or attempting instant DDL")
111111
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")
112112
flag.IntVar(&migrationContext.NumWorkers, "workers", 8, "Number of concurrent workers for applying DML events. Each worker uses one goroutine.")
113113

go/logic/applier.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,15 @@ func (this *Applier) ValidateOrDropExistingTables() error {
257257
func (this *Applier) AttemptInstantDDL() error {
258258
query := this.generateInstantDDLQuery()
259259
this.migrationContext.Log.Infof("INSTANT DDL query is: %s", query)
260+
261+
// Reuse cut-over-lock-timeout from regular migration process to reduce risk
262+
// in situations where there may be long-running transactions.
263+
tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2
264+
this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
265+
lockTimeoutQuery := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
266+
if _, err := this.db.Exec(lockTimeoutQuery); err != nil {
267+
return err
268+
}
260269
// We don't need a trx, because for instant DDL the SQL mode doesn't matter.
261270
_, err := this.db.Exec(query)
262271
return err

go/logic/inspect.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import (
2222
"github.com/openark/golib/sqlutils"
2323
)
2424

25-
const startSlavePostWaitMilliseconds = 500 * time.Millisecond
25+
const startReplicationPostWait = 250 * time.Millisecond
26+
const startReplicationMaxWait = 2 * time.Second
2627

2728
// Inspector reads data from the read-MySQL-server (typically a replica, but can be the master)
2829
// It is used for gaining initial status and structure, and later also follow up on progress and changelog
@@ -302,12 +303,50 @@ func (this *Inspector) restartReplication() error {
302303
if startError != nil {
303304
return startError
304305
}
305-
time.Sleep(startSlavePostWaitMilliseconds)
306+
307+
// loop until replication is running unless we hit a max timeout.
308+
startTime := time.Now()
309+
for {
310+
replicationRunning, err := this.validateReplicationRestarted()
311+
if err != nil {
312+
return fmt.Errorf("Failed to validate if replication had been restarted: %w", err)
313+
}
314+
if replicationRunning {
315+
break
316+
}
317+
if time.Since(startTime) > startReplicationMaxWait {
318+
return fmt.Errorf("Replication did not restart within the maximum wait time of %s", startReplicationMaxWait)
319+
}
320+
this.migrationContext.Log.Debugf("Replication not yet restarted, waiting...")
321+
time.Sleep(startReplicationPostWait)
322+
}
306323

307324
this.migrationContext.Log.Debugf("Replication restarted")
308325
return nil
309326
}
310327

328+
// validateReplicationRestarted checks that the Slave_IO_Running and Slave_SQL_Running are both 'Yes'
329+
// returns true if both are 'Yes', false otherwise
330+
func (this *Inspector) validateReplicationRestarted() (bool, error) {
331+
errNotRunning := fmt.Errorf("Replication not running on %s", this.connectionConfig.Key.String())
332+
query := `show /* gh-ost */ slave status`
333+
err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
334+
if rowMap.GetString("Slave_IO_Running") != "Yes" || rowMap.GetString("Slave_SQL_Running") != "Yes" {
335+
return errNotRunning
336+
}
337+
return nil
338+
})
339+
340+
if err != nil {
341+
// If the error is that replication is not running, return that and not an error
342+
if errors.Is(err, errNotRunning) {
343+
return false, nil
344+
}
345+
return false, err
346+
}
347+
return true, nil
348+
}
349+
311350
// applyBinlogFormat sets ROW binlog format and restarts replication to make
312351
// the replication thread apply it.
313352
func (this *Inspector) applyBinlogFormat() error {
@@ -589,6 +628,7 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL
589628
columnName := m.GetString("COLUMN_NAME")
590629
columnType := m.GetString("COLUMN_TYPE")
591630
columnOctetLength := m.GetUint("CHARACTER_OCTET_LENGTH")
631+
extra := m.GetString("EXTRA")
592632
for _, columnsList := range columnsLists {
593633
column := columnsList.GetColumn(columnName)
594634
if column == nil {
@@ -621,6 +661,9 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL
621661
column.Type = sql.BinaryColumnType
622662
column.BinaryOctetLength = columnOctetLength
623663
}
664+
if strings.Contains(extra, " GENERATED") {
665+
column.IsVirtual = true
666+
}
624667
if charset := m.GetString("CHARACTER_SET_NAME"); charset != "" {
625668
column.Charset = charset
626669
}

go/logic/migrator.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -826,11 +826,18 @@ func (this *Migrator) initiateStatus() {
826826
this.printStatus(ForcePrintStatusAndHintRule)
827827
ticker := time.NewTicker(time.Second)
828828
defer ticker.Stop()
829+
var previousCount int64
829830
for range ticker.C {
830831
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
831832
return
832833
}
833834
go this.printStatus(HeuristicPrintStatusRule)
835+
totalCopied := atomic.LoadInt64(&this.migrationContext.TotalRowsCopied)
836+
if previousCount > 0 {
837+
copiedThisLoop := totalCopied - previousCount
838+
atomic.StoreInt64(&this.migrationContext.EtaRowsPerSecond, copiedThisLoop)
839+
}
840+
previousCount = totalCopied
834841
}
835842
}
836843

@@ -932,9 +939,20 @@ func (this *Migrator) getMigrationETA(rowsEstimate int64) (eta string, duration
932939
duration = 0
933940
} else if progressPct >= 0.1 {
934941
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
935-
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
936-
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
937-
etaSeconds := totalExpectedSeconds - elapsedRowCopySeconds
942+
etaRowsPerSecond := atomic.LoadInt64(&this.migrationContext.EtaRowsPerSecond)
943+
var etaSeconds float64
944+
// If there is data available on our current row-copies-per-second rate, use it.
945+
// Otherwise we can fallback to the total elapsed time and extrapolate.
946+
// This is going to be less accurate on a longer copy as the insert rate
947+
// will tend to slow down.
948+
if etaRowsPerSecond > 0 {
949+
remainingRows := float64(rowsEstimate) - float64(totalRowsCopied)
950+
etaSeconds = remainingRows / float64(etaRowsPerSecond)
951+
} else {
952+
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
953+
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
954+
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
955+
}
938956
if etaSeconds >= 0 {
939957
duration = time.Duration(etaSeconds) * time.Second
940958
} else {

go/logic/migrator_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,15 @@ func TestMigratorGetMigrationStateAndETA(t *testing.T) {
215215
require.Equal(t, "4h29m44s", eta)
216216
require.Equal(t, "4h29m44s", etaDuration.String())
217217
}
218+
{
219+
// Test using rows-per-second added data.
220+
migrationContext.TotalRowsCopied = 456
221+
migrationContext.EtaRowsPerSecond = 100
222+
state, eta, etaDuration := migrator.getMigrationStateAndETA(123456)
223+
require.Equal(t, "migrating", state)
224+
require.Equal(t, "20m30s", eta)
225+
require.Equal(t, "20m30s", etaDuration.String())
226+
}
218227
{
219228
migrationContext.TotalRowsCopied = 456
220229
state, eta, etaDuration := migrator.getMigrationStateAndETA(456)

go/sql/builder.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,12 @@ func NewDMLUpdateQueryBuilder(databaseName, tableName string, tableColumns, shar
546546
if uniqueKeyColumns.Len() == 0 {
547547
return nil, fmt.Errorf("no unique key columns found in NewDMLUpdateQueryBuilder")
548548
}
549+
// If unique key contains virtual columns, those column won't be in sharedColumns
550+
// which only contains non-virtual columns
551+
nonVirtualUniqueKeyColumns := uniqueKeyColumns.FilterBy(func(column Column) bool { return !column.IsVirtual })
552+
if !nonVirtualUniqueKeyColumns.IsSubsetOf(sharedColumns) {
553+
return nil, fmt.Errorf("unique key columns is not a subset of shared columns in NewDMLUpdateQueryBuilder")
554+
}
549555
databaseName = EscapeName(databaseName)
550556
tableName = EscapeName(tableName)
551557
setClause, err := BuildSetPreparedClause(mappedSharedColumns)
@@ -580,11 +586,6 @@ func NewDMLUpdateQueryBuilder(databaseName, tableName string, tableColumns, shar
580586
// BuildQuery builds the arguments array for a DML event UPDATE query.
581587
// It returns the query string, the shared arguments array, and the unique key arguments array.
582588
func (b *DMLUpdateQueryBuilder) BuildQuery(valueArgs, whereArgs []interface{}) (string, []interface{}, []interface{}, error) {
583-
// TODO: move this check back to `NewDMLUpdateQueryBuilder()`, needs fix on generated columns.
584-
if !b.uniqueKeyColumns.IsSubsetOf(b.sharedColumns) {
585-
return "", nil, nil, fmt.Errorf("unique key columns is not a subset of shared columns in DMLUpdateQueryBuilder")
586-
}
587-
588589
sharedArgs := make([]interface{}, 0, b.sharedColumns.Len())
589590
for _, column := range b.sharedColumns.Columns() {
590591
tableOrdinal := b.tableColumns.Ordinals[column.Name]

0 commit comments

Comments
 (0)