Skip to content

Commit 6d9a8ba

Browse files
author
Shlomi Noach
committed
Merge pull request #44 from github/reconnect-streamer
Handling gh-ost replication timeouts
2 parents ae899bd + 896f560 commit 6d9a8ba

File tree

4 files changed

+65
-38
lines changed

4 files changed

+65
-38
lines changed

build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/bin/bash
22
#
33
#
4-
RELEASE_VERSION="0.7.16"
4+
RELEASE_VERSION="0.7.17"
55

66
buildpath=/tmp/gh-ost
77
target=gh-ost

go/binlog/binlog_reader.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,11 @@
55

66
package binlog
77

8-
import (
9-
"github.com/github/gh-ost/go/mysql"
10-
)
8+
import ()
119

1210
// BinlogReader is a general interface whose implementations can choose their methods of reading
1311
// a binary log file and parsing it into binlog entries
1412
type BinlogReader interface {
1513
StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error
16-
GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates
1714
Reconnect() error
1815
}

go/binlog/gomysql_reader.go

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package binlog
77

88
import (
99
"fmt"
10+
"sync"
1011

1112
"github.com/github/gh-ost/go/mysql"
1213
"github.com/github/gh-ost/go/sql"
@@ -23,20 +24,21 @@ const (
2324
)
2425

2526
type GoMySQLReader struct {
26-
connectionConfig *mysql.ConnectionConfig
27-
binlogSyncer *replication.BinlogSyncer
28-
binlogStreamer *replication.BinlogStreamer
29-
tableMap map[uint64]string
30-
currentCoordinates mysql.BinlogCoordinates
31-
lastHandledCoordinates mysql.BinlogCoordinates
27+
connectionConfig *mysql.ConnectionConfig
28+
binlogSyncer *replication.BinlogSyncer
29+
binlogStreamer *replication.BinlogStreamer
30+
currentCoordinates mysql.BinlogCoordinates
31+
currentCoordinatesMutex *sync.Mutex
32+
LastAppliedRowsEventHint mysql.BinlogCoordinates
3233
}
3334

3435
func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) {
3536
binlogReader = &GoMySQLReader{
36-
connectionConfig: connectionConfig,
37-
tableMap: make(map[uint64]string),
38-
currentCoordinates: mysql.BinlogCoordinates{},
39-
binlogStreamer: nil,
37+
connectionConfig: connectionConfig,
38+
currentCoordinates: mysql.BinlogCoordinates{},
39+
currentCoordinatesMutex: &sync.Mutex{},
40+
binlogSyncer: nil,
41+
binlogStreamer: nil,
4042
}
4143
binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql")
4244

@@ -63,25 +65,24 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin
6365

6466
func (this *GoMySQLReader) Reconnect() error {
6567
this.binlogSyncer.Close()
66-
67-
connectCoordinates := &this.lastHandledCoordinates
68-
if connectCoordinates.IsEmpty() {
69-
connectCoordinates = &this.currentCoordinates
70-
}
68+
connectCoordinates := &mysql.BinlogCoordinates{LogFile: this.currentCoordinates.LogFile, LogPos: 4}
7169
if err := this.ConnectBinlogStreamer(*connectCoordinates); err != nil {
7270
return err
7371
}
7472
return nil
7573
}
7674

7775
func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates {
78-
return &this.currentCoordinates
76+
this.currentCoordinatesMutex.Lock()
77+
defer this.currentCoordinatesMutex.Unlock()
78+
returnCoordinates := this.currentCoordinates
79+
return &returnCoordinates
7980
}
8081

8182
// StreamEvents
8283
func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
83-
if this.currentCoordinates.SmallerThanOrEquals(&this.lastHandledCoordinates) && !this.lastHandledCoordinates.IsEmpty() {
84-
log.Infof("Skipping handled query at %+v", this.currentCoordinates)
84+
if this.currentCoordinates.SmallerThanOrEquals(&this.LastAppliedRowsEventHint) {
85+
log.Debugf("Skipping handled query at %+v", this.currentCoordinates)
8586
return nil
8687
}
8788

@@ -122,6 +123,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
122123
// In reality, reads will be synchronous
123124
entriesChannel <- binlogEntry
124125
}
126+
this.LastAppliedRowsEventHint = this.currentCoordinates
125127
return nil
126128
}
127129

@@ -135,21 +137,33 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
135137
if err != nil {
136138
return err
137139
}
138-
this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
140+
// if rand.Intn(1000) == 0 {
141+
// this.binlogSyncer.Close()
142+
// log.Debugf("current: %+v, hint: %+v", this.currentCoordinates, this.LastAppliedRowsEventHint)
143+
// return log.Errorf(".............haha got random error")
144+
// }
145+
// log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
146+
func() {
147+
this.currentCoordinatesMutex.Lock()
148+
defer this.currentCoordinatesMutex.Unlock()
149+
this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
150+
}()
139151
if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok {
140-
this.currentCoordinates.LogFile = string(rotateEvent.NextLogName)
152+
// log.Debugf("0008 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
153+
// ev.Dump(os.Stdout)
154+
func() {
155+
this.currentCoordinatesMutex.Lock()
156+
defer this.currentCoordinatesMutex.Unlock()
157+
this.currentCoordinates.LogFile = string(rotateEvent.NextLogName)
158+
}()
159+
// log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
141160
log.Infof("rotate to next log name: %s", rotateEvent.NextLogName)
142-
} else if tableMapEvent, ok := ev.Event.(*replication.TableMapEvent); ok {
143-
// Actually not being used, since Table is available in RowsEvent.
144-
// Keeping this here in case I'm wrong about this. Sometime in the near
145-
// future I should remove this.
146-
this.tableMap[tableMapEvent.TableID] = string(tableMapEvent.Table)
147161
} else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
148162
if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil {
149163
return err
150164
}
151165
}
152-
this.lastHandledCoordinates = this.currentCoordinates
166+
// log.Debugf("TODO ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
153167
}
154168
log.Debugf("done streaming events")
155169

go/logic/streamer.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type EventsStreamer struct {
4242
listeners [](*BinlogEventListener)
4343
listenersMutex *sync.Mutex
4444
eventsChannel chan *binlog.BinlogEntry
45-
binlogReader binlog.BinlogReader
45+
binlogReader *binlog.GoMySQLReader
4646
}
4747

4848
func NewEventsStreamer() *EventsStreamer {
@@ -110,15 +110,22 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
110110
if err := this.readCurrentBinlogCoordinates(); err != nil {
111111
return err
112112
}
113+
if err := this.initBinlogReader(this.initialBinlogCoordinates); err != nil {
114+
return err
115+
}
116+
117+
return nil
118+
}
119+
120+
func (this *EventsStreamer) initBinlogReader(binlogCoordinates *mysql.BinlogCoordinates) error {
113121
goMySQLReader, err := binlog.NewGoMySQLReader(this.migrationContext.InspectorConnectionConfig)
114122
if err != nil {
115123
return err
116124
}
117-
if err := goMySQLReader.ConnectBinlogStreamer(*this.initialBinlogCoordinates); err != nil {
125+
if err := goMySQLReader.ConnectBinlogStreamer(*binlogCoordinates); err != nil {
118126
return err
119127
}
120128
this.binlogReader = goMySQLReader
121-
122129
return nil
123130
}
124131

@@ -140,6 +147,10 @@ func (this *EventsStreamer) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinat
140147
return this.binlogReader.GetCurrentBinlogCoordinates()
141148
}
142149

150+
func (this *EventsStreamer) GetReconnectBinlogCoordinates() *mysql.BinlogCoordinates {
151+
return &mysql.BinlogCoordinates{LogFile: this.GetCurrentBinlogCoordinates().LogFile, LogPos: 4}
152+
}
153+
143154
// validateGrants verifies the user by which we're executing has necessary grants
144155
// to do its thang.
145156
func (this *EventsStreamer) readCurrentBinlogCoordinates() error {
@@ -177,14 +188,19 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
177188
// The next should block and execute forever, unless there's a serious error
178189
for {
179190
if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil {
180-
// Reposition at same coordinates. Single attempt (TODO: make multiple attempts?)
181191
log.Infof("StreamEvents encountered unexpected error: %+v", err)
182192
time.Sleep(ReconnectStreamerSleepSeconds * time.Second)
183-
log.Infof("Reconnecting...")
184-
err = this.binlogReader.Reconnect()
185-
if err != nil {
193+
194+
// Reposition at same binlog file. Single attempt (TODO: make multiple attempts?)
195+
lastAppliedRowsEventHint := this.binlogReader.LastAppliedRowsEventHint
196+
log.Infof("Reconnecting... Will resume at %+v", lastAppliedRowsEventHint)
197+
// if err := this.binlogReader.Reconnect(); err != nil {
198+
// return err
199+
// }
200+
if err := this.initBinlogReader(this.GetReconnectBinlogCoordinates()); err != nil {
186201
return err
187202
}
203+
this.binlogReader.LastAppliedRowsEventHint = lastAppliedRowsEventHint
188204
}
189205
}
190206
}

0 commit comments

Comments
 (0)