Skip to content

Commit a27e432

Browse files
author
xiehaopeng
committed
feat(ignore-binlog-events): ignore-over-iteration-range-max-binlog
1 parent d5ab048 commit a27e432

File tree

10 files changed

+238
-32
lines changed

10 files changed

+238
-32
lines changed

doc/command-line-flags.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ Why is this behavior configurable? Different workloads have different characteri
125125

126126
Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `gh-ost` blocks or waits on writes. The batch size is an upper limit on transaction size, not a minimal one. If `gh-ost` doesn't have "enough" events in the pipe, it does not wait on the binary log, it just writes what it already has. This conveniently suggests that if write load is light enough for `gh-ost` to only see a few events in the binary log at a given time, then it is also light enough for `gh-ost` to apply a fraction of the batch size.
127127

128+
129+
### ignore-over-iteration-range-max-binlog
130+
131+
Defaults to false. When binlog unique key value is over `MigrationIterationRangeMaxValues`, and less than `MigrationRangeMaxValues`, the binlog will be ignored. Because the data will be synced by copy chunk. When binlog unique key value is over `MigrationRangeMaxValues` or less than `MigrationIterationRangeMaxValues`, the binlog will be applied. Currently when enabled, this only takes effect for single-column unique index of int type.
132+
133+
128134
### exact-rowcount
129135

130136
A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can and often will be, a large number. Exactly what that number is?

go/base/context.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,9 @@ type MigrationContext struct {
201201
controlReplicasLagResult mysql.ReplicationLagResult
202202
TotalRowsCopied int64
203203
TotalDMLEventsApplied int64
204+
TotalDMLEventsIgnored int64
204205
DMLBatchSize int64
206+
IgnoreOverIterationRangeMaxBinlog bool
205207
isThrottled bool
206208
throttleReason string
207209
throttleReasonHint ThrottleReasonHint

go/cmd/gh-ost/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ func main() {
109109
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
110110
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout) or attempting instant DDL")
111111
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")
112+
flag.BoolVar(&migrationContext.IgnoreOverIterationRangeMaxBinlog, "ignore-over-iteration-range-max-binlog", false, "When binlog unique key value is over MigrationIterationRangeMaxValues, and less than MigrationRangeMaxValues, the binlog will be ignored. Because the data will be synced by copy chunk")
112113

113114
maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation")
114115
replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query")

go/logic/applier.go

Lines changed: 107 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,20 @@ const (
3030
)
3131

3232
type dmlBuildResult struct {
33-
query string
34-
args []interface{}
35-
rowsDelta int64
36-
err error
33+
query string
34+
args []interface{}
35+
uniqueKeyArgs []interface{}
36+
rowsDelta int64
37+
err error
3738
}
3839

39-
func newDmlBuildResult(query string, args []interface{}, rowsDelta int64, err error) *dmlBuildResult {
40+
func newDmlBuildResult(query string, args []interface{}, uniqueKeyArgs []interface{}, rowsDelta int64, err error) *dmlBuildResult {
4041
return &dmlBuildResult{
41-
query: query,
42-
args: args,
43-
rowsDelta: rowsDelta,
44-
err: err,
42+
query: query,
43+
args: args,
44+
uniqueKeyArgs: uniqueKeyArgs,
45+
rowsDelta: rowsDelta,
46+
err: err,
4547
}
4648
}
4749

@@ -129,6 +131,7 @@ func (this *Applier) prepareQueries() (err error) {
129131
this.migrationContext.OriginalTableColumns,
130132
this.migrationContext.SharedColumns,
131133
this.migrationContext.MappedSharedColumns,
134+
&this.migrationContext.UniqueKey.Columns,
132135
); err != nil {
133136
return err
134137
}
@@ -1190,12 +1193,12 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB
11901193
case binlog.DeleteDML:
11911194
{
11921195
query, uniqueKeyArgs, err := this.dmlDeleteQueryBuilder.BuildQuery(dmlEvent.WhereColumnValues.AbstractValues())
1193-
return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, -1, err)}
1196+
return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, uniqueKeyArgs, -1, err)}
11941197
}
11951198
case binlog.InsertDML:
11961199
{
1197-
query, sharedArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues())
1198-
return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, 1, err)}
1200+
query, sharedArgs, uniqueKeyArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues())
1201+
return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, uniqueKeyArgs, 1, err)}
11991202
}
12001203
case binlog.UpdateDML:
12011204
{
@@ -1211,12 +1214,87 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB
12111214
args := sqlutils.Args()
12121215
args = append(args, sharedArgs...)
12131216
args = append(args, uniqueKeyArgs...)
1214-
return []*dmlBuildResult{newDmlBuildResult(query, args, 0, err)}
1217+
return []*dmlBuildResult{newDmlBuildResult(query, args, uniqueKeyArgs, 0, err)}
12151218
}
12161219
}
12171220
return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))}
12181221
}
12191222

1223+
// IsIgnoreOverMaxChunkRangeEvent returns true if this event can be ignored, because the data will be synced by copy chunk
1224+
// min rangeMax max
1225+
// the value > rangeMax and value < max, ignore = true
1226+
// otherwise ignore = false
1227+
func (this *Applier) IsIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs []interface{}) (bool, error) {
1228+
if !this.migrationContext.IgnoreOverIterationRangeMaxBinlog {
1229+
return false, nil
1230+
}
1231+
1232+
// Currently only supports single-column unique index of int type
1233+
uniqueKeyCols := this.migrationContext.UniqueKey.Columns.Columns()
1234+
if len(uniqueKeyCols) != 1 {
1235+
return false, nil
1236+
}
1237+
uniqueKeyCol := uniqueKeyCols[0]
1238+
if uniqueKeyCol.CompareValueFunc == nil {
1239+
return false, nil
1240+
}
1241+
1242+
// Compare whether it is less than the MigrationIterationRangeMaxValues boundary value. If it is, it cannot be ignored and the corresponding binlog needs to be applied.
1243+
ignore, err := func() (bool, error) {
1244+
compareValues := this.migrationContext.MigrationIterationRangeMaxValues
1245+
if compareValues == nil {
1246+
// It means that the migration has not started yet, use MigrationRangeMinValues instead
1247+
compareValues = this.migrationContext.MigrationRangeMinValues
1248+
}
1249+
1250+
than, err := uniqueKeyCol.CompareValueFunc(uniqueKeyArgs[0], compareValues.StringColumn(0))
1251+
if err != nil {
1252+
return false, err
1253+
}
1254+
1255+
switch {
1256+
case than > 0:
1257+
return true, nil
1258+
case than < 0:
1259+
return false, nil
1260+
default:
1261+
// Since rowcopy is left-open-right-closed, when it is equal to the MigrationIterationRangeMaxValues boundary value, it cannot be ignored.
1262+
return false, nil
1263+
}
1264+
}()
1265+
if err != nil {
1266+
return false, err
1267+
}
1268+
1269+
if !ignore {
1270+
return false, nil
1271+
}
1272+
1273+
// Compare whether it is greater than the MigrationRangeMaxValues boundary value. If it is, it cannot be ignored and the corresponding binlog needs to be applied.
1274+
ignore, err = func() (bool, error) {
1275+
compareValues := this.migrationContext.MigrationRangeMaxValues
1276+
than, err := uniqueKeyCol.CompareValueFunc(uniqueKeyArgs[0], compareValues)
1277+
if err != nil {
1278+
return false, err
1279+
}
1280+
1281+
switch {
1282+
case than < 0:
1283+
return true, nil
1284+
case than > 0:
1285+
return false, nil
1286+
default:
1287+
// Since rowcopy is left-open-right-closed, when it is equal to the MigrationRangeMaxValues boundary value, it can be ignored.
1288+
return true, nil
1289+
}
1290+
}()
1291+
if err != nil {
1292+
return false, err
1293+
}
1294+
1295+
return ignore, nil
1296+
}
1297+
12201298
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
12211299
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
12221300
var totalDelta int64
@@ -1244,17 +1322,33 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
12441322
return err
12451323
}
12461324

1325+
var ignoredEventSize int64
12471326
buildResults := make([]*dmlBuildResult, 0, len(dmlEvents))
12481327
nArgs := 0
12491328
for _, dmlEvent := range dmlEvents {
12501329
for _, buildResult := range this.buildDMLEventQuery(dmlEvent) {
12511330
if buildResult.err != nil {
12521331
return rollback(buildResult.err)
12531332
}
1333+
if ignore, err := this.IsIgnoreOverMaxChunkRangeEvent(buildResult.uniqueKeyArgs); err != nil {
1334+
return rollback(err)
1335+
} else if ignore {
1336+
ignoredEventSize++
1337+
continue
1338+
}
12541339
nArgs += len(buildResult.args)
12551340
buildResults = append(buildResults, buildResult)
12561341
}
12571342
}
1343+
atomic.AddInt64(&this.migrationContext.TotalDMLEventsIgnored, ignoredEventSize)
1344+
1345+
// If there are no statements to execute, return directly
1346+
if len(buildResults) == 0 {
1347+
if err := tx.Commit(); err != nil {
1348+
return err
1349+
}
1350+
return nil
1351+
}
12581352

12591353
// We batch together the DML queries into multi-statements to minimize network trips.
12601354
// We have to use the raw driver connection to access the rows affected

go/logic/applier_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ package logic
88
import (
99
"context"
1010
gosql "database/sql"
11+
"fmt"
12+
"math/big"
1113
"strings"
1214
"testing"
1315

@@ -181,6 +183,63 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
181183
})
182184
}
183185

186+
func TestIsIgnoreOverMaxChunkRangeEvent(t *testing.T) {
187+
migrationContext := base.NewMigrationContext()
188+
migrationContext.IgnoreOverIterationRangeMaxBinlog = true
189+
uniqueColumns := sql.NewColumnList([]string{"id"})
190+
uniqueColumns.SetColumnCompareValueFunc("id", func(a interface{}, b interface{}) (int, error) {
191+
_a := new(big.Int)
192+
if _a, _ = _a.SetString(fmt.Sprintf("%+v", a), 10); a == nil {
193+
return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", a)
194+
}
195+
_b := new(big.Int)
196+
if _b, _ = _b.SetString(fmt.Sprintf("%+v", b), 10); b == nil {
197+
return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", b)
198+
}
199+
return _a.Cmp(_b), nil
200+
})
201+
202+
migrationContext.UniqueKey = &sql.UniqueKey{
203+
Name: "PRIMARY KEY",
204+
Columns: *uniqueColumns,
205+
}
206+
migrationContext.MigrationRangeMinValues = sql.ToColumnValues([]interface{}{10})
207+
migrationContext.MigrationRangeMaxValues = sql.ToColumnValues([]interface{}{123456})
208+
migrationContext.MigrationIterationRangeMaxValues = sql.ToColumnValues([]interface{}{11111})
209+
210+
applier := NewApplier(migrationContext)
211+
212+
t.Run("less than MigrationRangeMinValues", func(t *testing.T) {
213+
ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{5})
214+
require.NoError(t, err)
215+
require.False(t, ignore)
216+
})
217+
218+
t.Run("equal to MigrationIterationRangeMaxValues", func(t *testing.T) {
219+
ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{11111})
220+
require.NoError(t, err)
221+
require.False(t, ignore)
222+
})
223+
224+
t.Run("ignore event", func(t *testing.T) {
225+
ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{88888})
226+
require.NoError(t, err)
227+
require.True(t, ignore)
228+
})
229+
230+
t.Run("equal to MigrationRangeMaxValues", func(t *testing.T) {
231+
ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{123456})
232+
require.NoError(t, err)
233+
require.True(t, ignore)
234+
})
235+
236+
t.Run("larger than MigrationRangeMaxValues", func(t *testing.T) {
237+
ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{123457})
238+
require.NoError(t, err)
239+
require.False(t, ignore)
240+
})
241+
}
242+
184243
func TestApplierInstantDDL(t *testing.T) {
185244
migrationContext := base.NewMigrationContext()
186245
migrationContext.DatabaseName = "test"

go/logic/inspect.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
gosql "database/sql"
1111
"errors"
1212
"fmt"
13+
"math/big"
1314
"reflect"
1415
"strings"
1516
"sync/atomic"
@@ -635,6 +636,19 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL
635636
continue
636637
}
637638

639+
if strings.Contains(columnType, "int") {
640+
column.CompareValueFunc = func(a interface{}, b interface{}) (int, error) {
641+
_a := new(big.Int)
642+
if _a, _ = _a.SetString(fmt.Sprintf("%+v", a), 10); a == nil {
643+
return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", a)
644+
}
645+
_b := new(big.Int)
646+
if _b, _ = _b.SetString(fmt.Sprintf("%+v", b), 10); b == nil {
647+
return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", b)
648+
}
649+
return _a.Cmp(_b), nil
650+
}
651+
}
638652
if strings.Contains(columnType, "unsigned") {
639653
column.IsUnsigned = true
640654
}

go/logic/migrator.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1054,9 +1054,10 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
10541054

10551055
currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates()
10561056

1057-
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s",
1057+
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Ignored: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s",
10581058
totalRowsCopied, rowsEstimate, progressPct,
10591059
atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied),
1060+
atomic.LoadInt64(&this.migrationContext.TotalDMLEventsIgnored),
10601061
len(this.applyEventsQueue), cap(this.applyEventsQueue),
10611062
base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()),
10621063
currentBinlogCoordinates,

go/sql/builder.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -466,15 +466,15 @@ func (b *DMLDeleteQueryBuilder) BuildQuery(args []interface{}) (string, []interf
466466
// DMLInsertQueryBuilder can build INSERT queries for DML events.
467467
// It holds the prepared query statement so it doesn't need to be recreated every time.
468468
type DMLInsertQueryBuilder struct {
469-
tableColumns, sharedColumns *ColumnList
470-
preparedStatement string
469+
tableColumns, sharedColumns, uniqueKeyColumns *ColumnList
470+
preparedStatement string
471471
}
472472

473473
// NewDMLInsertQueryBuilder creates a new DMLInsertQueryBuilder.
474474
// It prepares the INSERT query statement.
475475
// Returns an error if no shared columns are given, the shared columns are not a subset of the table columns,
476476
// or the prepared statement cannot be built.
477-
func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList) (*DMLInsertQueryBuilder, error) {
477+
func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList, uniqueKeyColumns *ColumnList) (*DMLInsertQueryBuilder, error) {
478478
if !sharedColumns.IsSubsetOf(tableColumns) {
479479
return nil, fmt.Errorf("shared columns is not a subset of table columns in NewDMLInsertQueryBuilder")
480480
}
@@ -505,24 +505,33 @@ func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, shar
505505
return &DMLInsertQueryBuilder{
506506
tableColumns: tableColumns,
507507
sharedColumns: sharedColumns,
508+
uniqueKeyColumns: uniqueKeyColumns,
508509
preparedStatement: stmt,
509510
}, nil
510511
}
511512

512513
// BuildQuery builds the arguments array for a DML event INSERT query.
513514
// It returns the query string and the shared arguments array.
514515
// Returns an error if the number of arguments differs from the number of table columns.
515-
func (b *DMLInsertQueryBuilder) BuildQuery(args []interface{}) (string, []interface{}, error) {
516+
func (b *DMLInsertQueryBuilder) BuildQuery(args []interface{}) (string, []interface{}, []interface{}, error) {
516517
if len(args) != b.tableColumns.Len() {
517-
return "", nil, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery")
518+
return "", nil, nil, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery")
518519
}
520+
519521
sharedArgs := make([]interface{}, 0, b.sharedColumns.Len())
520522
for _, column := range b.sharedColumns.Columns() {
521523
tableOrdinal := b.tableColumns.Ordinals[column.Name]
522524
arg := column.convertArg(args[tableOrdinal], false)
523525
sharedArgs = append(sharedArgs, arg)
524526
}
525-
return b.preparedStatement, sharedArgs, nil
527+
528+
uniqueKeyArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len())
529+
for _, column := range b.uniqueKeyColumns.Columns() {
530+
tableOrdinal := b.tableColumns.Ordinals[column.Name]
531+
arg := column.convertArg(args[tableOrdinal], true)
532+
uniqueKeyArgs = append(uniqueKeyArgs, arg)
533+
}
534+
return b.preparedStatement, sharedArgs, uniqueKeyArgs, nil
526535
}
527536

528537
// DMLUpdateQueryBuilder can build UPDATE queries for DML events.

0 commit comments

Comments
 (0)