Skip to content

Commit 7ffcca0

Browse files
author
Shlomi Noach
authored
Merge branch 'master' into patch-1
2 parents 5eec741 + 0d773f7 commit 7ffcca0

File tree

8 files changed

+20
-22
lines changed

8 files changed

+20
-22
lines changed

go/binlog/binlog_entry.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func NewBinlogEntry(logFile string, logPos uint64) *BinlogEntry {
2626
return binlogEntry
2727
}
2828

29-
// NewBinlogEntry creates an empty, ready to go BinlogEntry object
29+
// NewBinlogEntryAt creates an empty, ready to go BinlogEntry object
3030
func NewBinlogEntryAt(coordinates mysql.BinlogCoordinates) *BinlogEntry {
3131
binlogEntry := &BinlogEntry{
3232
Coordinates: coordinates,
@@ -41,7 +41,7 @@ func (this *BinlogEntry) Duplicate() *BinlogEntry {
4141
return binlogEntry
4242
}
4343

44-
// Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned
44+
// String() returns a string representation of this binlog entry
4545
func (this *BinlogEntry) String() string {
4646
return fmt.Sprintf("[BinlogEntry at %+v; dml:%+v]", this.Coordinates, this.DmlEvent)
4747
}

go/binlog/gomysql_reader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
112112
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
113113
}
114114
}
115-
// The channel will do the throttling. Whoever is reding from the channel
115+
// The channel will do the throttling. Whoever is reading from the channel
116116
// decides whether action is taken synchronously (meaning we wait before
117117
// next iteration) or asynchronously (we keep pushing more events)
118118
// In reality, reads will be synchronous

go/logic/applier.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ func (this *Applier) readTableColumns() (err error) {
126126

127127
// showTableStatus returns the output of `show table status like '...'` command
128128
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
129-
rowMap = nil
130129
query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(this.migrationContext.DatabaseName), tableName)
131130
sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
132131
rowMap = m
@@ -482,6 +481,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
482481
if err != nil {
483482
return nil, err
484483
}
484+
defer tx.Rollback()
485485
sessionQuery := fmt.Sprintf(`SET
486486
SESSION time_zone = '%s',
487487
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
@@ -1001,15 +1001,19 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
10011001
if err != nil {
10021002
return err
10031003
}
1004+
rollback := func(err error) error {
1005+
tx.Rollback()
1006+
return err
1007+
}
10041008
sessionQuery := `SET
10051009
SESSION time_zone = '+00:00',
10061010
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
10071011
`
10081012
if _, err := tx.Exec(sessionQuery); err != nil {
1009-
return err
1013+
return rollback(err)
10101014
}
10111015
if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil {
1012-
return err
1016+
return rollback(err)
10131017
}
10141018
if err := tx.Commit(); err != nil {
10151019
return err

go/logic/inspect.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -622,8 +622,6 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](*
622622
GROUP BY TABLE_SCHEMA, TABLE_NAME, INDEX_NAME
623623
) AS UNIQUES
624624
ON (
625-
COLUMNS.TABLE_SCHEMA = UNIQUES.TABLE_SCHEMA AND
626-
COLUMNS.TABLE_NAME = UNIQUES.TABLE_NAME AND
627625
COLUMNS.COLUMN_NAME = UNIQUES.FIRST_COLUMN_NAME
628626
)
629627
WHERE
@@ -692,14 +690,17 @@ func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.Colum
692690
for _, ghostColumn := range ghostColumns.Names() {
693691
if strings.EqualFold(originalColumn, ghostColumn) {
694692
isSharedColumn = true
693+
break
695694
}
696695
if strings.EqualFold(columnRenameMap[originalColumn], ghostColumn) {
697696
isSharedColumn = true
697+
break
698698
}
699699
}
700700
for droppedColumn := range this.migrationContext.DroppedColumnsMap {
701701
if strings.EqualFold(originalColumn, droppedColumn) {
702702
isSharedColumn = false
703+
break
703704
}
704705
}
705706
for _, virtualColumn := range originalVirtualColumns.Names() {
@@ -758,9 +759,8 @@ func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.Connect
758759
}
759760

760761
func (this *Inspector) getReplicationLag() (replicationLag time.Duration, err error) {
761-
replicationLag, err = mysql.GetReplicationLag(
762+
replicationLag, err = mysql.GetReplicationLagFromSlaveStatus(
762763
this.informationSchemaDb,
763-
this.migrationContext.InspectorConnectionConfig,
764764
)
765765
return replicationLag, err
766766
}

go/logic/migrator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ type Migrator struct {
7878

7979
rowCopyCompleteFlag int64
8080
// copyRowsQueue should not be buffered; if buffered some non-damaging but
81-
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
81+
// excessive work happens at the end of the iteration as new copy-jobs arrive before realizing the copy is complete
8282
copyRowsQueue chan tableWriteFunc
8383
applyEventsQueue chan *applyEventStruct
8484

go/logic/throttler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- boo
140140
if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica {
141141
// when running on replica, the heartbeat injection is also done on the replica.
142142
// This means we will always get a good heartbeat value.
143-
// When runnign on replica, we should instead check the `SHOW SLAVE STATUS` output.
144-
if lag, err := mysql.GetReplicationLag(this.inspector.informationSchemaDb, this.inspector.connectionConfig); err != nil {
143+
// When running on replica, we should instead check the `SHOW SLAVE STATUS` output.
144+
if lag, err := mysql.GetReplicationLagFromSlaveStatus(this.inspector.informationSchemaDb); err != nil {
145145
return log.Errore(err)
146146
} else {
147147
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))

go/mysql/utils.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,8 @@ func GetDB(migrationUuid string, mysql_uri string) (*gosql.DB, bool, error) {
5858
return knownDBs[cacheKey], exists, nil
5959
}
6060

61-
// GetReplicationLag returns replication lag for a given connection config; either by explicit query
62-
// or via SHOW SLAVE STATUS
63-
func GetReplicationLag(informationSchemaDb *gosql.DB, connectionConfig *ConnectionConfig) (replicationLag time.Duration, err error) {
61+
// GetReplicationLagFromSlaveStatus returns replication lag for a given db; via SHOW SLAVE STATUS
62+
func GetReplicationLagFromSlaveStatus(informationSchemaDb *gosql.DB) (replicationLag time.Duration, err error) {
6463
err = sqlutils.QueryRowsMap(informationSchemaDb, `show slave status`, func(m sqlutils.RowMap) error {
6564
slaveIORunning := m.GetString("Slave_IO_Running")
6665
slaveSQLRunning := m.GetString("Slave_SQL_Running")
@@ -84,9 +83,6 @@ func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey
8483
}
8584
defer db.Close()
8685

87-
if err != nil {
88-
return nil, err
89-
}
9086
err = sqlutils.QueryRowsMap(db, `show slave status`, func(rowMap sqlutils.RowMap) error {
9187
// We wish to recognize the case where the topology's master actually has replication configuration.
9288
// This can happen when a DBA issues a `RESET SLAVE` instead of `RESET SLAVE ALL`.
@@ -99,7 +95,6 @@ func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey
9995
slaveIORunning := rowMap.GetString("Slave_IO_Running")
10096
slaveSQLRunning := rowMap.GetString("Slave_SQL_Running")
10197

102-
//
10398
if slaveIORunning != "Yes" || slaveSQLRunning != "Yes" {
10499
return fmt.Errorf("Replication on %+v is broken: Slave_IO_Running: %s, Slave_SQL_Running: %s. Please make sure replication runs before using gh-ost.",
105100
connectionConfig.Key,

go/sql/builder.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,13 +140,12 @@ func BuildRangeComparison(columns []string, values []string, args []interface{},
140140
comparisons := []string{}
141141

142142
for i, column := range columns {
143-
//
144143
value := values[i]
145144
rangeComparison, err := BuildValueComparison(column, value, comparisonSign)
146145
if err != nil {
147146
return "", explodedArgs, err
148147
}
149-
if len(columns[0:i]) > 0 {
148+
if i > 0 {
150149
equalitiesComparison, err := BuildEqualsComparison(columns[0:i], values[0:i])
151150
if err != nil {
152151
return "", explodedArgs, err

0 commit comments

Comments
 (0)