Skip to content

Commit 071817a

Browse files
author
Shlomi Noach
authored
Merge pull request #103 from github/limited-streamer-retries
capped streamer retries
2 parents a74c1c0 + 1d77425 commit 071817a

File tree

2 files changed

+14
-14
lines changed

2 files changed

+14
-14
lines changed

go/binlog/gomysql_reader.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,6 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin
6363
return err
6464
}
6565

66-
func (this *GoMySQLReader) Reconnect() error {
67-
this.binlogSyncer.Close()
68-
connectCoordinates := &mysql.BinlogCoordinates{LogFile: this.currentCoordinates.LogFile, LogPos: 4}
69-
if err := this.ConnectBinlogStreamer(*connectCoordinates); err != nil {
70-
return err
71-
}
72-
return nil
73-
}
74-
7566
func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates {
7667
this.currentCoordinatesMutex.Lock()
7768
defer this.currentCoordinatesMutex.Unlock()

go/logic/streamer.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -189,18 +189,27 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
189189
}
190190
}()
191191
// The next should block and execute forever, unless there's a serious error
192+
var successiveFailures int64
193+
var lastAppliedRowsEventHint mysql.BinlogCoordinates
192194
for {
193195
if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil {
194196
log.Infof("StreamEvents encountered unexpected error: %+v", err)
195197
this.migrationContext.MarkPointOfInterest()
196198
time.Sleep(ReconnectStreamerSleepSeconds * time.Second)
197199

198-
// Reposition at same binlog file. Single attempt (TODO: make multiple attempts?)
199-
lastAppliedRowsEventHint := this.binlogReader.LastAppliedRowsEventHint
200+
// See if there's retry overflow
201+
if this.binlogReader.LastAppliedRowsEventHint.Equals(&lastAppliedRowsEventHint) {
202+
successiveFailures += 1
203+
} else {
204+
successiveFailures = 0
205+
}
206+
if successiveFailures > this.migrationContext.MaxRetries() {
207+
return fmt.Errorf("%d successive failures in streamer reconnect at coordinates %+v", lastAppliedRowsEventHint)
208+
}
209+
210+
// Reposition at same binlog file.
211+
lastAppliedRowsEventHint = this.binlogReader.LastAppliedRowsEventHint
200212
log.Infof("Reconnecting... Will resume at %+v", lastAppliedRowsEventHint)
201-
// if err := this.binlogReader.Reconnect(); err != nil {
202-
// return err
203-
// }
204213
if err := this.initBinlogReader(this.GetReconnectBinlogCoordinates()); err != nil {
205214
return err
206215
}

0 commit comments

Comments
 (0)