Skip to content

Commit 9b54d02

Browse files
author
Shlomi Noach
committed
- Handling gomysql.replication connection timeouts: reconnecting on last known position
- `printStatus()` takes ETA into account - More info around `master_pos_wait()`
1 parent ec34a5e commit 9b54d02

File tree

8 files changed

+174
-244
lines changed

8 files changed

+174
-244
lines changed

build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/bin/bash
22
#
33
#
4-
RELEASE_VERSION="0.7.5"
4+
RELEASE_VERSION="0.7.13"
55

66
buildpath=/tmp/gh-ost
77
target=gh-ost

go/binlog/binlog.go

Lines changed: 0 additions & 147 deletions
This file was deleted.

go/binlog/binlog_reader.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,14 @@
55

66
package binlog
77

8+
import (
9+
"github.com/github/gh-ost/go/mysql"
10+
)
11+
812
// BinlogReader is a general interface whose implementations can choose their methods of reading
913
// a binary log file and parsing it into binlog entries
1014
type BinlogReader interface {
1115
StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error
16+
GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates
17+
Reconnect() error
1218
}

go/binlog/gomysql_reader.go

Lines changed: 83 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ const (
2323
)
2424

2525
type GoMySQLReader struct {
26-
connectionConfig *mysql.ConnectionConfig
27-
binlogSyncer *replication.BinlogSyncer
28-
binlogStreamer *replication.BinlogStreamer
29-
tableMap map[uint64]string
30-
currentCoordinates mysql.BinlogCoordinates
26+
connectionConfig *mysql.ConnectionConfig
27+
binlogSyncer *replication.BinlogSyncer
28+
binlogStreamer *replication.BinlogStreamer
29+
tableMap map[uint64]string
30+
currentCoordinates mysql.BinlogCoordinates
31+
lastHandledCoordinates mysql.BinlogCoordinates
3132
}
3233

3334
func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) {
@@ -39,24 +40,91 @@ func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *G
3940
}
4041
binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql")
4142

42-
// Register slave, the MySQL master is at 127.0.0.1:3306, with user root and an empty password
43-
err = binlogReader.binlogSyncer.RegisterSlave(connectionConfig.Key.Hostname, uint16(connectionConfig.Key.Port), connectionConfig.User, connectionConfig.Password)
44-
if err != nil {
45-
return binlogReader, err
46-
}
47-
4843
return binlogReader, err
4944
}
5045

5146
// ConnectBinlogStreamer
5247
func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordinates) (err error) {
48+
if coordinates.IsEmpty() {
49+
return log.Errorf("Emptry coordinates at ConnectBinlogStreamer()")
50+
}
51+
log.Infof("Registering replica at %+v:%+v", this.connectionConfig.Key.Hostname, uint16(this.connectionConfig.Key.Port))
52+
if err := this.binlogSyncer.RegisterSlave(this.connectionConfig.Key.Hostname, uint16(this.connectionConfig.Key.Port), this.connectionConfig.User, this.connectionConfig.Password); err != nil {
53+
return err
54+
}
55+
5356
this.currentCoordinates = coordinates
57+
log.Infof("Connecting binlog streamer at %+v", this.currentCoordinates)
5458
// Start sync with sepcified binlog file and position
55-
this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{coordinates.LogFile, uint32(coordinates.LogPos)})
59+
this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{this.currentCoordinates.LogFile, uint32(this.currentCoordinates.LogPos)})
5660

5761
return err
5862
}
5963

64+
func (this *GoMySQLReader) Reconnect() error {
65+
this.binlogSyncer.Close()
66+
67+
connectCoordinates := &this.lastHandledCoordinates
68+
if connectCoordinates.IsEmpty() {
69+
connectCoordinates = &this.currentCoordinates
70+
}
71+
if err := this.ConnectBinlogStreamer(*connectCoordinates); err != nil {
72+
return err
73+
}
74+
return nil
75+
}
76+
77+
func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates {
78+
return &this.currentCoordinates
79+
}
80+
81+
// StreamEvents
82+
func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
83+
if this.currentCoordinates.SmallerThanOrEquals(&this.lastHandledCoordinates) && !this.lastHandledCoordinates.IsEmpty() {
84+
log.Infof("Skipping handled query at %+v", this.currentCoordinates)
85+
return nil
86+
}
87+
88+
dml := ToEventDML(ev.Header.EventType.String())
89+
if dml == NotDML {
90+
return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String())
91+
}
92+
for i, row := range rowsEvent.Rows {
93+
if dml == UpdateDML && i%2 == 1 {
94+
// An update has two rows (WHERE+SET)
95+
// We do both at the same time
96+
continue
97+
}
98+
binlogEntry := NewBinlogEntryAt(this.currentCoordinates)
99+
binlogEntry.DmlEvent = NewBinlogDMLEvent(
100+
string(rowsEvent.Table.Schema),
101+
string(rowsEvent.Table.Table),
102+
dml,
103+
)
104+
switch dml {
105+
case InsertDML:
106+
{
107+
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row)
108+
}
109+
case UpdateDML:
110+
{
111+
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
112+
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1])
113+
}
114+
case DeleteDML:
115+
{
116+
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
117+
}
118+
}
119+
// The channel will do the throttling. Whoever is reding from the channel
120+
// decides whether action is taken sycnhronously (meaning we wait before
121+
// next iteration) or asynchronously (we keep pushing more events)
122+
// In reality, reads will be synchronous
123+
entriesChannel <- binlogEntry
124+
}
125+
return nil
126+
}
127+
60128
// StreamEvents
61129
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
62130
for {
@@ -77,44 +145,11 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
77145
// future I should remove this.
78146
this.tableMap[tableMapEvent.TableID] = string(tableMapEvent.Table)
79147
} else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
80-
dml := ToEventDML(ev.Header.EventType.String())
81-
if dml == NotDML {
82-
return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String())
83-
}
84-
for i, row := range rowsEvent.Rows {
85-
if dml == UpdateDML && i%2 == 1 {
86-
// An update has two rows (WHERE+SET)
87-
// We do both at the same time
88-
continue
89-
}
90-
binlogEntry := NewBinlogEntryAt(this.currentCoordinates)
91-
binlogEntry.DmlEvent = NewBinlogDMLEvent(
92-
string(rowsEvent.Table.Schema),
93-
string(rowsEvent.Table.Table),
94-
dml,
95-
)
96-
switch dml {
97-
case InsertDML:
98-
{
99-
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row)
100-
}
101-
case UpdateDML:
102-
{
103-
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
104-
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1])
105-
}
106-
case DeleteDML:
107-
{
108-
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
109-
}
110-
}
111-
// The channel will do the throttling. Whoever is reding from the channel
112-
// decides whether action is taken sycnhronously (meaning we wait before
113-
// next iteration) or asynchronously (we keep pushing more events)
114-
// In reality, reads will be synchronous
115-
entriesChannel <- binlogEntry
148+
if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil {
149+
return err
116150
}
117151
}
152+
this.lastHandledCoordinates = this.currentCoordinates
118153
}
119154
log.Debugf("done streaming events")
120155

go/logic/applier.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -552,8 +552,8 @@ func (this *Applier) StopSlaveIOThread() error {
552552
// MasterPosWait is applicable with --test-on-replica
553553
func (this *Applier) MasterPosWait(binlogCoordinates *mysql.BinlogCoordinates) error {
554554
var appliedRows int64
555-
if err := this.db.QueryRow(`select ifnull(master_pos_wait(?, ?, ?), 0)`, binlogCoordinates.LogFile, binlogCoordinates.LogPos, 1).Scan(&appliedRows); err != nil {
556-
return err
555+
if err := this.db.QueryRow(`select master_pos_wait(?, ?, ?)`, binlogCoordinates.LogFile, binlogCoordinates.LogPos, 3).Scan(&appliedRows); err != nil {
556+
return log.Errore(err)
557557
}
558558
if appliedRows < 0 {
559559
return fmt.Errorf("Timeout waiting on master_pos_wait()")
@@ -565,15 +565,17 @@ func (this *Applier) StopSlaveNicely() error {
565565
if err := this.StopSlaveIOThread(); err != nil {
566566
return err
567567
}
568-
binlogCoordinates, err := mysql.GetReadBinlogCoordinates(this.db)
568+
readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.db)
569569
if err != nil {
570570
return err
571571
}
572-
log.Infof("Replication stopped at %+v. Will wait for SQL thread to apply", *binlogCoordinates)
573-
if err := this.MasterPosWait(binlogCoordinates); err != nil {
572+
log.Infof("Replication IO thread at %+v. SQL thread is at %+v", *readBinlogCoordinates, *executeBinlogCoordinates)
573+
log.Infof("Will wait for SQL thread to catch up with IO thread")
574+
if err := this.MasterPosWait(readBinlogCoordinates); err != nil {
575+
log.Errorf("Error waiting for SQL thread to catch up. Replication IO thread at %+v. SQL thread is at %+v", *readBinlogCoordinates, *executeBinlogCoordinates)
574576
return err
575577
}
576-
log.Infof("Replication SQL thread applied all events")
578+
log.Infof("Replication SQL thread applied all events up to %+v", *readBinlogCoordinates)
577579
if selfBinlogCoordinates, err := mysql.GetSelfBinlogCoordinates(this.db); err != nil {
578580
return err
579581
} else {

0 commit comments

Comments
 (0)