diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index 7b5efd9fb..079c104e8 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -125,6 +125,12 @@ Why is this behavior configurable? Different workloads have different characteri Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `gh-ost` blocks or waits on writes. The batch size is an upper limit on transaction size, not a minimal one. If `gh-ost` doesn't have "enough" events in the pipe, it does not wait on the binary log, it just writes what it already has. This conveniently suggests that if write load is light enough for `gh-ost` to only see a few events in the binary log at a given time, then it is also light enough for `gh-ost` to apply a fraction of the batch size. + +### ignore-over-iteration-range-max-binlog + +Defaults to false. When binlog unique key value is over `MigrationIterationRangeMaxValues`, and less than `MigrationRangeMaxValues`, the binlog will be ignored. Because the data will be synced by copy chunk. When binlog unique key value is over `MigrationRangeMaxValues` or less than `MigrationIterationRangeMaxValues`, the binlog will be applied. Currently when enabled, this only takes effect for single-column unique index of int type. + + ### exact-rowcount A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can and often will be, a large number. Exactly what that number is? diff --git a/go/base/context.go b/go/base/context.go index 0a1cae739..c421c871c 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -204,7 +204,11 @@ type MigrationContext struct { controlReplicasLagResult mysql.ReplicationLagResult TotalRowsCopied int64 TotalDMLEventsApplied int64 + TotalDMLEventsIgnored int64 DMLBatchSize int64 + IgnoreOverIterationRangeMaxBinlog bool + IsMigrationRangeMaxValuesLocked bool + MigrationRangeMaxValuesInitial *sql.ColumnValues isThrottled bool throttleReason string throttleReasonHint ThrottleReasonHint diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 6391cf4fb..4580f6723 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -112,6 +112,7 @@ func main() { flag.BoolVar(&migrationContext.PanicOnWarnings, "panic-on-warnings", false, "Panic when SQL warnings are encountered when copying a batch indicating data loss") cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout) or attempting instant DDL") niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after") + flag.BoolVar(&migrationContext.IgnoreOverIterationRangeMaxBinlog, "ignore-over-iteration-range-max-binlog", false, "When binlog unique key value is over MigrationIterationRangeMaxValues, and less than MigrationRangeMaxValues, the binlog will be ignored. Because the data will be synced by copy chunk") maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation") replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query") diff --git a/go/logic/applier.go b/go/logic/applier.go index 30ac97695..5bb0db7de 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -32,18 +32,20 @@ const ( ) type dmlBuildResult struct { - query string - args []interface{} - rowsDelta int64 - err error + query string + args []interface{} + uniqueKeyArgs []interface{} + rowsDelta int64 + err error } -func newDmlBuildResult(query string, args []interface{}, rowsDelta int64, err error) *dmlBuildResult { +func newDmlBuildResult(query string, args []interface{}, uniqueKeyArgs []interface{}, rowsDelta int64, err error) *dmlBuildResult { return &dmlBuildResult{ - query: query, - args: args, - rowsDelta: rowsDelta, - err: err, + query: query, + args: args, + uniqueKeyArgs: uniqueKeyArgs, + rowsDelta: rowsDelta, + err: err, } } @@ -131,6 +133,7 @@ func (this *Applier) prepareQueries() (err error) { this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, + &this.migrationContext.UniqueKey.Columns, ); err != nil { return err } @@ -640,7 +643,15 @@ func (this *Applier) readMigrationMaxValues(tx *gosql.Tx, uniqueKey *sql.UniqueK return err } } - this.migrationContext.Log.Infof("Migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues) + + // Save a snapshot copy of the initial MigrationRangeMaxValues + if this.migrationContext.MigrationRangeMaxValues == nil { + this.migrationContext.MigrationRangeMaxValuesInitial = nil + } else { + abstractValues := make([]interface{}, len(this.migrationContext.MigrationRangeMaxValues.AbstractValues())) + copy(abstractValues, this.migrationContext.MigrationRangeMaxValues.AbstractValues()) + this.migrationContext.MigrationRangeMaxValuesInitial = sql.ToColumnValues(abstractValues) + } return rows.Err() } @@ -683,6 +694,63 @@ func (this *Applier) ReadMigrationRangeValues() error { return tx.Commit() } +// ResetMigrationRangeMaxValues updates the MigrationRangeMaxValues with new values +func (this *Applier) ResetMigrationRangeMaxValues(uniqueKeyAbstractValues []interface{}) { + abstractValues := make([]interface{}, len(uniqueKeyAbstractValues)) + copy(abstractValues, uniqueKeyAbstractValues) + this.migrationContext.MigrationRangeMaxValues = sql.ToColumnValues(abstractValues) + this.migrationContext.Log.Debugf("Reset migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues) +} + +// LockMigrationRangeMaxValues locks the MigrationRangeMaxValues to prevent further updates +func (this *Applier) LockMigrationRangeMaxValues() { + if this.migrationContext.IsMigrationRangeMaxValuesLocked { + return + } + this.migrationContext.IsMigrationRangeMaxValuesLocked = true + this.migrationContext.Log.Infof("Lock migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues) +} + +// AttemptToLockMigrationRangeMaxValues attempts to lock MigrationRangeMaxValues to prevent endless copying. +// To avoid infinite updates of MigrationRangeMaxValues causing the copy to never end, +// we need a strategy to stop updates. When the initial copy target is achieved, +// MigrationRangeMaxValues will be locked. +func (this *Applier) AttemptToLockMigrationRangeMaxValues() { + if this.migrationContext.IsMigrationRangeMaxValuesLocked { + return + } + + // Currently only supports single-column unique index of int type + uniqueKeyCols := this.migrationContext.UniqueKey.Columns.Columns() + if len(uniqueKeyCols) != 1 { + this.LockMigrationRangeMaxValues() + return + } + uniqueKeyCol := uniqueKeyCols[0] + if uniqueKeyCol.CompareValueFunc == nil { + this.LockMigrationRangeMaxValues() + return + } + + // Compare MigrationIterationRangeMinValues with MigrationRangeMaxValuesInitial to determine copy progress + if this.migrationContext.MigrationIterationRangeMinValues == nil { + return + } + than, err := uniqueKeyCol.CompareValueFunc( + this.migrationContext.MigrationIterationRangeMinValues.AbstractValues()[0], + this.migrationContext.MigrationRangeMaxValuesInitial.AbstractValues()[0], + ) + if err != nil { + // If comparison fails, fallback to locking MigrationRangeMaxValues + this.migrationContext.Log.Errore(err) + this.LockMigrationRangeMaxValues() + return + } + if than >= 0 { + this.LockMigrationRangeMaxValues() + } +} + // CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values, // which will be used for copying the next chunk of rows. Ir returns "false" if there is // no further chunk to work through, i.e. we're past the last chunk and are done with @@ -692,6 +760,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo if this.migrationContext.MigrationIterationRangeMinValues == nil { this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues } + this.AttemptToLockMigrationRangeMaxValues() for i := 0; i < 2; i++ { buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset if i == 1 { @@ -733,6 +802,8 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo } } this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate") + // Ensure MigrationRangeMaxValues is locked after iteration is complete + this.LockMigrationRangeMaxValues() return hasFurtherRange, nil } @@ -1315,12 +1386,12 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB case binlog.DeleteDML: { query, uniqueKeyArgs, err := this.dmlDeleteQueryBuilder.BuildQuery(dmlEvent.WhereColumnValues.AbstractValues()) - return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, -1, err)} + return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, uniqueKeyArgs, -1, err)} } case binlog.InsertDML: { - query, sharedArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues()) - return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, 1, err)} + query, sharedArgs, uniqueKeyArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues()) + return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, uniqueKeyArgs, 1, err)} } case binlog.UpdateDML: { @@ -1336,12 +1407,94 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB args := sqlutils.Args() args = append(args, sharedArgs...) args = append(args, uniqueKeyArgs...) - return []*dmlBuildResult{newDmlBuildResult(query, args, 0, err)} + return []*dmlBuildResult{newDmlBuildResult(query, args, uniqueKeyArgs, 0, err)} } } return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))} } +// IsIgnoreOverMaxChunkRangeEvent returns true if this event can be ignored, because the data will be synced by copy chunk +// min rangeMax max +// the value > rangeMax and value < max, ignore = true +// otherwise ignore = false +func (this *Applier) IsIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs []interface{}) (bool, error) { + if !this.migrationContext.IgnoreOverIterationRangeMaxBinlog { + return false, nil + } + + // Currently only supports single-column unique index of int type + uniqueKeyCols := this.migrationContext.UniqueKey.Columns.Columns() + if len(uniqueKeyCols) != 1 { + return false, nil + } + uniqueKeyCol := uniqueKeyCols[0] + if uniqueKeyCol.CompareValueFunc == nil { + return false, nil + } + + // Compare whether it is less than the MigrationIterationRangeMaxValues boundary value. If it is, it cannot be ignored and the corresponding binlog needs to be applied. + ignore, err := func() (bool, error) { + compareValues := this.migrationContext.MigrationIterationRangeMaxValues + if compareValues == nil { + // It means that the migration has not started yet, use MigrationRangeMinValues instead + compareValues = this.migrationContext.MigrationRangeMinValues + } + + than, err := uniqueKeyCol.CompareValueFunc(uniqueKeyArgs[0], compareValues.StringColumn(0)) + if err != nil { + return false, err + } + + switch { + case than > 0: + return true, nil + case than < 0: + return false, nil + default: + // Since rowcopy is left-open-right-closed, when it is equal to the MigrationIterationRangeMaxValues boundary value, it cannot be ignored. + return false, nil + } + }() + if err != nil { + return false, err + } + + if !ignore { + return false, nil + } + + // Compare whether it is greater than the MigrationRangeMaxValues boundary value. If it is, it cannot be ignored and the corresponding binlog needs to be applied. + ignore, err = func() (bool, error) { + compareValues := this.migrationContext.MigrationRangeMaxValues + than, err := uniqueKeyCol.CompareValueFunc(uniqueKeyArgs[0], compareValues) + if err != nil { + return false, err + } + + switch { + case than < 0: + return true, nil + case than > 0: + // When the value is greater than MigrationRangeMaxValues boundary, attempt to dynamically expand MigrationRangeMaxValues + // After expand, treat this comparison as equal, otherwise it cannot be ignored + if !this.migrationContext.IsMigrationRangeMaxValuesLocked { + this.ResetMigrationRangeMaxValues(uniqueKeyArgs) + return true, nil + } else { + return false, nil + } + default: + // Since rowcopy is left-open-right-closed, when it is equal to the MigrationRangeMaxValues boundary value, it can be ignored. + return true, nil + } + }() + if err != nil { + return false, err + } + + return ignore, nil +} + // ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error { var totalDelta int64 @@ -1369,6 +1522,7 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) return err } + var ignoredEventSize int64 buildResults := make([]*dmlBuildResult, 0, len(dmlEvents)) nArgs := 0 for _, dmlEvent := range dmlEvents { @@ -1376,10 +1530,25 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) if buildResult.err != nil { return rollback(buildResult.err) } + if ignore, err := this.IsIgnoreOverMaxChunkRangeEvent(buildResult.uniqueKeyArgs); err != nil { + return rollback(err) + } else if ignore { + ignoredEventSize++ + continue + } nArgs += len(buildResult.args) buildResults = append(buildResults, buildResult) } } + atomic.AddInt64(&this.migrationContext.TotalDMLEventsIgnored, ignoredEventSize) + + // If there are no statements to execute, return directly + if len(buildResults) == 0 { + if err := tx.Commit(); err != nil { + return err + } + return nil + } // We batch together the DML queries into multi-statements to minimize network trips. // We have to use the raw driver connection to access the rows affected diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 13e8a4d3b..d31d4b4e8 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -8,6 +8,8 @@ package logic import ( "context" gosql "database/sql" + "fmt" + "math/big" "strings" "testing" @@ -184,6 +186,63 @@ func TestApplierBuildDMLEventQuery(t *testing.T) { }) } +func TestIsIgnoreOverMaxChunkRangeEvent(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + uniqueColumns := sql.NewColumnList([]string{"id"}) + uniqueColumns.SetColumnCompareValueFunc("id", func(a interface{}, b interface{}) (int, error) { + _a := new(big.Int) + if _a, _ = _a.SetString(fmt.Sprintf("%+v", a), 10); a == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", a) + } + _b := new(big.Int) + if _b, _ = _b.SetString(fmt.Sprintf("%+v", b), 10); b == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", b) + } + return _a.Cmp(_b), nil + }) + + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY KEY", + Columns: *uniqueColumns, + } + migrationContext.MigrationRangeMinValues = sql.ToColumnValues([]interface{}{10}) + migrationContext.MigrationRangeMaxValues = sql.ToColumnValues([]interface{}{123456}) + migrationContext.MigrationIterationRangeMaxValues = sql.ToColumnValues([]interface{}{11111}) + + applier := NewApplier(migrationContext) + + t.Run("less than MigrationRangeMinValues", func(t *testing.T) { + ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{5}) + require.NoError(t, err) + require.False(t, ignore) + }) + + t.Run("equal to MigrationIterationRangeMaxValues", func(t *testing.T) { + ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{11111}) + require.NoError(t, err) + require.False(t, ignore) + }) + + t.Run("ignore event", func(t *testing.T) { + ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{88888}) + require.NoError(t, err) + require.True(t, ignore) + }) + + t.Run("equal to MigrationRangeMaxValues", func(t *testing.T) { + ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{123456}) + require.NoError(t, err) + require.True(t, ignore) + }) + + t.Run("larger than MigrationRangeMaxValues", func(t *testing.T) { + ignore, err := applier.IsIgnoreOverMaxChunkRangeEvent([]interface{}{123457}) + require.NoError(t, err) + require.True(t, ignore) + }) +} + func TestApplierInstantDDL(t *testing.T) { migrationContext := base.NewMigrationContext() migrationContext.DatabaseName = "test" diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 7a7dc8424..a1308a6d3 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -10,6 +10,7 @@ import ( gosql "database/sql" "errors" "fmt" + "math/big" "reflect" "strings" "sync/atomic" @@ -718,6 +719,19 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL continue } + if strings.Contains(columnType, "int") { + column.CompareValueFunc = func(a interface{}, b interface{}) (int, error) { + _a := new(big.Int) + if _a, _ = _a.SetString(fmt.Sprintf("%+v", a), 10); a == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", a) + } + _b := new(big.Int) + if _b, _ = _b.SetString(fmt.Sprintf("%+v", b), 10); b == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", b) + } + return _a.Cmp(_b), nil + } + } if strings.Contains(columnType, "unsigned") { column.IsUnsigned = true } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 4d7074b22..81d8cefe6 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1077,9 +1077,10 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { currentBinlogCoordinates := this.eventsStreamer.GetCurrentBinlogCoordinates() - status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s", + status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Ignored: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s", totalRowsCopied, rowsEstimate, progressPct, atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), + atomic.LoadInt64(&this.migrationContext.TotalDMLEventsIgnored), len(this.applyEventsQueue), cap(this.applyEventsQueue), base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), currentBinlogCoordinates, diff --git a/go/sql/builder.go b/go/sql/builder.go index f2683181f..b5617ede4 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -464,15 +464,15 @@ func (b *DMLDeleteQueryBuilder) BuildQuery(args []interface{}) (string, []interf // DMLInsertQueryBuilder can build INSERT queries for DML events. // It holds the prepared query statement so it doesn't need to be recreated every time. type DMLInsertQueryBuilder struct { - tableColumns, sharedColumns *ColumnList - preparedStatement string + tableColumns, sharedColumns, uniqueKeyColumns *ColumnList + preparedStatement string } // NewDMLInsertQueryBuilder creates a new DMLInsertQueryBuilder. // It prepares the INSERT query statement. // Returns an error if no shared columns are given, the shared columns are not a subset of the table columns, // or the prepared statement cannot be built. -func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList) (*DMLInsertQueryBuilder, error) { +func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList, uniqueKeyColumns *ColumnList) (*DMLInsertQueryBuilder, error) { if !sharedColumns.IsSubsetOf(tableColumns) { return nil, fmt.Errorf("shared columns is not a subset of table columns in NewDMLInsertQueryBuilder") } @@ -503,6 +503,7 @@ func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, shar return &DMLInsertQueryBuilder{ tableColumns: tableColumns, sharedColumns: sharedColumns, + uniqueKeyColumns: uniqueKeyColumns, preparedStatement: stmt, }, nil } @@ -510,17 +511,25 @@ func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, shar // BuildQuery builds the arguments array for a DML event INSERT query. // It returns the query string and the shared arguments array. // Returns an error if the number of arguments differs from the number of table columns. -func (b *DMLInsertQueryBuilder) BuildQuery(args []interface{}) (string, []interface{}, error) { +func (b *DMLInsertQueryBuilder) BuildQuery(args []interface{}) (string, []interface{}, []interface{}, error) { if len(args) != b.tableColumns.Len() { - return "", nil, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery") + return "", nil, nil, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery") } + sharedArgs := make([]interface{}, 0, b.sharedColumns.Len()) for _, column := range b.sharedColumns.Columns() { tableOrdinal := b.tableColumns.Ordinals[column.Name] arg := column.convertArg(args[tableOrdinal], false) sharedArgs = append(sharedArgs, arg) } - return b.preparedStatement, sharedArgs, nil + + uniqueKeyArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len()) + for _, column := range b.uniqueKeyColumns.Columns() { + tableOrdinal := b.tableColumns.Ordinals[column.Name] + arg := column.convertArg(args[tableOrdinal], true) + uniqueKeyArgs = append(uniqueKeyArgs, arg) + } + return b.preparedStatement, sharedArgs, uniqueKeyArgs, nil } // DMLUpdateQueryBuilder can build UPDATE queries for DML events. diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index a6735a324..0f5404dc4 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -533,9 +533,10 @@ func TestBuildDMLInsertQuery(t *testing.T) { args := []interface{}{3, "testname", "first", 17, 23} { sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) - builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns) + uniqueKeyColumns := NewColumnList([]string{"position"}) + builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.NoError(t, err) - query, sharedArgs, err := builder.BuildQuery(args) + query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(args) require.NoError(t, err) expected := ` replace /* gh-ost mydb.tbl */ @@ -546,12 +547,14 @@ func TestBuildDMLInsertQuery(t *testing.T) { ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{3, "testname", 17, 23}, sharedArgs) + require.Equal(t, []interface{}{17}, uniqueKeyArgs) } { sharedColumns := NewColumnList([]string{"position", "name", "age", "id"}) - builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns) + uniqueKeyColumns := NewColumnList([]string{"position", "name"}) + builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.NoError(t, err) - query, sharedArgs, err := builder.BuildQuery(args) + query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(args) require.NoError(t, err) expected := ` replace /* gh-ost mydb.tbl */ @@ -562,15 +565,18 @@ func TestBuildDMLInsertQuery(t *testing.T) { ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{17, "testname", 23, 3}, sharedArgs) + require.Equal(t, []interface{}{17, "testname"}, uniqueKeyArgs) } { sharedColumns := NewColumnList([]string{"position", "name", "surprise", "id"}) - _, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns) + uniqueKeyColumns := NewColumnList([]string{"age"}) + _, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.Error(t, err) } { sharedColumns := NewColumnList([]string{}) - _, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns) + uniqueKeyColumns := NewColumnList([]string{"age", "name"}) + _, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.Error(t, err) } } @@ -580,13 +586,14 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { tableName := "tbl" tableColumns := NewColumnList([]string{"id", "name", "rank", "position", "age"}) sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) + uniqueKeyColumns := NewColumnList([]string{"name", "age"}) { // testing signed args := []interface{}{3, "testname", "first", int8(-1), 23} sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) - builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns) + builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.NoError(t, err) - query, sharedArgs, err := builder.BuildQuery(args) + query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(args) require.NoError(t, err) expected := ` replace /* gh-ost mydb.tbl */ @@ -597,14 +604,15 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{3, "testname", int8(-1), 23}, sharedArgs) + require.Equal(t, []interface{}{"testname", 23}, uniqueKeyArgs) } { // testing unsigned args := []interface{}{3, "testname", "first", int8(-1), 23} sharedColumns.SetUnsigned("position") - builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns) + builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.NoError(t, err) - query, sharedArgs, err := builder.BuildQuery(args) + query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(args) require.NoError(t, err) expected := ` replace /* gh-ost mydb.tbl */ @@ -615,14 +623,15 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{3, "testname", uint8(255), 23}, sharedArgs) + require.Equal(t, []interface{}{"testname", 23}, uniqueKeyArgs) } { // testing unsigned args := []interface{}{3, "testname", "first", int32(-1), 23} sharedColumns.SetUnsigned("position") - builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns) + builder, err := NewDMLInsertQueryBuilder(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns) require.NoError(t, err) - query, sharedArgs, err := builder.BuildQuery(args) + query, sharedArgs, uniqueKeyArgs, err := builder.BuildQuery(args) require.NoError(t, err) expected := ` replace /* gh-ost mydb.tbl */ @@ -633,6 +642,7 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { ` require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{3, "testname", uint32(4294967295), 23}, sharedArgs) + require.Equal(t, []interface{}{"testname", 23}, uniqueKeyArgs) } } diff --git a/go/sql/types.go b/go/sql/types.go index aac52bc32..55fac947e 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -50,6 +50,8 @@ type Column struct { // https://github.com/github/gh-ost/issues/909 BinaryOctetLength uint charsetConversion *CharacterSetConversion + // compare a and b using this function, when a equal b, return 0, when a > b, return 1, when a < b, return -1 + CompareValueFunc func(a interface{}, b interface{}) (int, error) } func (this *Column) convertArg(arg interface{}, isUniqueKeyColumn bool) interface{} { @@ -226,6 +228,14 @@ func (this *ColumnList) IsEnumToTextConversion(columnName string) bool { return this.GetColumn(columnName).enumToTextConversion } +func (this *ColumnList) SetColumnCompareValueFunc(columnName string, f func(a interface{}, b interface{}) (int, error)) { + this.GetColumn(columnName).CompareValueFunc = f +} + +func (this *ColumnList) GetColumnCompareValueFunc(columnName string) func(a interface{}, b interface{}) (int, error) { + return this.GetColumn(columnName).CompareValueFunc +} + func (this *ColumnList) SetEnumValues(columnName string, enumValues string) { this.GetColumn(columnName).EnumValues = enumValues }