Skip to content

Commit d3bf3cd

Browse files
authored
Merge branch 'master' into cathal/safer_cut_over
2 parents 3135a25 + d4c91e6 commit d3bf3cd

File tree

6 files changed

+46
-15
lines changed

6 files changed

+46
-15
lines changed

go/base/utils.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"time"
1414

1515
gosql "database/sql"
16+
1617
"github.com/github/gh-ost/go/mysql"
1718
)
1819

@@ -62,7 +63,7 @@ func StringContainsAll(s string, substrings ...string) bool {
6263
return nonEmptyStringsFound
6364
}
6465

65-
func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext) (string, error) {
66+
func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext, name string) (string, error) {
6667
versionQuery := `select @@global.version`
6768
var port, extraPort int
6869
var version string
@@ -86,7 +87,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
8687
}
8788

8889
if connectionConfig.Key.Port == port || (extraPort > 0 && connectionConfig.Key.Port == extraPort) {
89-
migrationContext.Log.Infof("connection validated on %+v", connectionConfig.Key)
90+
migrationContext.Log.Infof("%s connection validated on %+v", name, connectionConfig.Key)
9091
return version, nil
9192
} else if extraPort == 0 {
9293
return "", fmt.Errorf("Unexpected database port reported: %+v", port)

go/logic/applier.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,15 @@ type Applier struct {
5757
singletonDB *gosql.DB
5858
migrationContext *base.MigrationContext
5959
finishedMigrating int64
60+
name string
6061
}
6162

6263
func NewApplier(migrationContext *base.MigrationContext) *Applier {
6364
return &Applier{
6465
connectionConfig: migrationContext.ApplierConnectionConfig,
6566
migrationContext: migrationContext,
6667
finishedMigrating: 0,
68+
name: "applier",
6769
}
6870
}
6971

@@ -78,11 +80,11 @@ func (this *Applier) InitDBConnections() (err error) {
7880
return err
7981
}
8082
this.singletonDB.SetMaxOpenConns(1)
81-
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext)
83+
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name)
8284
if err != nil {
8385
return err
8486
}
85-
if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig, this.migrationContext); err != nil {
87+
if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig, this.migrationContext, this.name); err != nil {
8688
return err
8789
}
8890
this.migrationContext.ApplierMySQLVersion = version

go/logic/inspect.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@ type Inspector struct {
2929
db *gosql.DB
3030
informationSchemaDb *gosql.DB
3131
migrationContext *base.MigrationContext
32+
name string
3233
}
3334

3435
func NewInspector(migrationContext *base.MigrationContext) *Inspector {
3536
return &Inspector{
3637
connectionConfig: migrationContext.InspectorConnectionConfig,
3738
migrationContext: migrationContext,
39+
name: "inspector",
3840
}
3941
}
4042

@@ -198,7 +200,7 @@ func (this *Inspector) validateConnection() error {
198200
return fmt.Errorf("MySQL replication length limited to 32 characters. See https://dev.mysql.com/doc/refman/5.7/en/assigning-passwords.html")
199201
}
200202

201-
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext)
203+
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name)
202204
this.migrationContext.InspectorMySQLVersion = version
203205
return err
204206
}
@@ -553,6 +555,7 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL
553555
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
554556
columnName := m.GetString("COLUMN_NAME")
555557
columnType := m.GetString("COLUMN_TYPE")
558+
columnOctetLength := m.GetUint("CHARACTER_OCTET_LENGTH")
556559
for _, columnsList := range columnsLists {
557560
column := columnsList.GetColumn(columnName)
558561
if column == nil {
@@ -580,6 +583,10 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL
580583
if strings.HasPrefix(columnType, "enum") {
581584
column.Type = sql.EnumColumnType
582585
}
586+
if strings.HasPrefix(columnType, "binary") {
587+
column.Type = sql.BinaryColumnType
588+
column.BinaryOctetLength = columnOctetLength
589+
}
583590
if charset := m.GetString("CHARACTER_SET_NAME"); charset != "" {
584591
column.Charset = charset
585592
}

go/logic/streamer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type EventsStreamer struct {
4242
listenersMutex *sync.Mutex
4343
eventsChannel chan *binlog.BinlogEntry
4444
binlogReader *binlog.GoMySQLReader
45+
name string
4546
}
4647

4748
func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer {
@@ -51,6 +52,7 @@ func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer
5152
listeners: [](*BinlogEventListener){},
5253
listenersMutex: &sync.Mutex{},
5354
eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
55+
name: "streamer",
5456
}
5557
}
5658

@@ -106,7 +108,7 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
106108
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, EventsStreamerUri); err != nil {
107109
return err
108110
}
109-
if _, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext); err != nil {
111+
if _, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name); err != nil {
110112
return err
111113
}
112114
if err := this.readCurrentBinlogCoordinates(); err != nil {

go/sql/builder.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKey
396396
}
397397
for _, column := range uniqueKeyColumns.Columns() {
398398
tableOrdinal := tableColumns.Ordinals[column.Name]
399-
arg := column.convertArg(args[tableOrdinal])
399+
arg := column.convertArg(args[tableOrdinal], true)
400400
uniqueKeyArgs = append(uniqueKeyArgs, arg)
401401
}
402402
databaseName = EscapeName(databaseName)
@@ -433,7 +433,7 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol
433433

434434
for _, column := range sharedColumns.Columns() {
435435
tableOrdinal := tableColumns.Ordinals[column.Name]
436-
arg := column.convertArg(args[tableOrdinal])
436+
arg := column.convertArg(args[tableOrdinal], false)
437437
sharedArgs = append(sharedArgs, arg)
438438
}
439439

@@ -481,13 +481,13 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol
481481

482482
for _, column := range sharedColumns.Columns() {
483483
tableOrdinal := tableColumns.Ordinals[column.Name]
484-
arg := column.convertArg(valueArgs[tableOrdinal])
484+
arg := column.convertArg(valueArgs[tableOrdinal], false)
485485
sharedArgs = append(sharedArgs, arg)
486486
}
487487

488488
for _, column := range uniqueKeyColumns.Columns() {
489489
tableOrdinal := tableColumns.Ordinals[column.Name]
490-
arg := column.convertArg(whereArgs[tableOrdinal])
490+
arg := column.convertArg(whereArgs[tableOrdinal], true)
491491
uniqueKeyArgs = append(uniqueKeyArgs, arg)
492492
}
493493

go/sql/types.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package sql
77

88
import (
9+
"bytes"
910
"fmt"
1011
"reflect"
1112
"strconv"
@@ -22,6 +23,7 @@ const (
2223
MediumIntColumnType
2324
JSONColumnType
2425
FloatColumnType
26+
BinaryColumnType
2527
)
2628

2729
const maxMediumintUnsigned int32 = 16777215
@@ -31,19 +33,36 @@ type TimezoneConversion struct {
3133
}
3234

3335
type Column struct {
34-
Name string
35-
IsUnsigned bool
36-
Charset string
37-
Type ColumnType
36+
Name string
37+
IsUnsigned bool
38+
Charset string
39+
Type ColumnType
40+
41+
// add Octet length for binary type, fix bytes with suffix "00" get clipped in mysql binlog.
42+
// https://github.com/github/gh-ost/issues/909
43+
BinaryOctetLength uint
3844
timezoneConversion *TimezoneConversion
3945
}
4046

41-
func (this *Column) convertArg(arg interface{}) interface{} {
47+
func (this *Column) convertArg(arg interface{}, isUniqueKeyColumn bool) interface{} {
4248
if s, ok := arg.(string); ok {
4349
// string, charset conversion
4450
if encoding, ok := charsetEncodingMap[this.Charset]; ok {
4551
arg, _ = encoding.NewDecoder().String(s)
4652
}
53+
54+
if this.Type == BinaryColumnType && isUniqueKeyColumn {
55+
arg2Bytes := []byte(arg.(string))
56+
size := len(arg2Bytes)
57+
if uint(size) < this.BinaryOctetLength {
58+
buf := bytes.NewBuffer(arg2Bytes)
59+
for i := uint(0); i < (this.BinaryOctetLength - uint(size)); i++ {
60+
buf.Write([]byte{0})
61+
}
62+
arg = buf.String()
63+
}
64+
}
65+
4766
return arg
4867
}
4968

0 commit comments

Comments
 (0)