File tree Expand file tree Collapse file tree 2 files changed +14
-14
lines changed Expand file tree Collapse file tree 2 files changed +14
-14
lines changed Original file line number Diff line number Diff line change @@ -63,15 +63,6 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin
63
63
return err
64
64
}
65
65
66
- func (this * GoMySQLReader ) Reconnect () error {
67
- this .binlogSyncer .Close ()
68
- connectCoordinates := & mysql.BinlogCoordinates {LogFile : this .currentCoordinates .LogFile , LogPos : 4 }
69
- if err := this .ConnectBinlogStreamer (* connectCoordinates ); err != nil {
70
- return err
71
- }
72
- return nil
73
- }
74
-
75
66
func (this * GoMySQLReader ) GetCurrentBinlogCoordinates () * mysql.BinlogCoordinates {
76
67
this .currentCoordinatesMutex .Lock ()
77
68
defer this .currentCoordinatesMutex .Unlock ()
Original file line number Diff line number Diff line change @@ -189,18 +189,27 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
189
189
}
190
190
}()
191
191
// The next should block and execute forever, unless there's a serious error
192
+ var successiveFailures int64
193
+ var lastAppliedRowsEventHint mysql.BinlogCoordinates
192
194
for {
193
195
if err := this .binlogReader .StreamEvents (canStopStreaming , this .eventsChannel ); err != nil {
194
196
log .Infof ("StreamEvents encountered unexpected error: %+v" , err )
195
197
this .migrationContext .MarkPointOfInterest ()
196
198
time .Sleep (ReconnectStreamerSleepSeconds * time .Second )
197
199
198
- // Reposition at same binlog file. Single attempt (TODO: make multiple attempts?)
199
- lastAppliedRowsEventHint := this .binlogReader .LastAppliedRowsEventHint
200
+ // See if there's retry overflow
201
+ if this .binlogReader .LastAppliedRowsEventHint .Equals (& lastAppliedRowsEventHint ) {
202
+ successiveFailures += 1
203
+ } else {
204
+ successiveFailures = 0
205
+ }
206
+ if successiveFailures > this .migrationContext .MaxRetries () {
207
+ return fmt .Errorf ("%d successive failures in streamer reconnect at coordinates %+v" , lastAppliedRowsEventHint )
208
+ }
209
+
210
+ // Reposition at same binlog file.
211
+ lastAppliedRowsEventHint = this .binlogReader .LastAppliedRowsEventHint
200
212
log .Infof ("Reconnecting... Will resume at %+v" , lastAppliedRowsEventHint )
201
- // if err := this.binlogReader.Reconnect(); err != nil {
202
- // return err
203
- // }
204
213
if err := this .initBinlogReader (this .GetReconnectBinlogCoordinates ()); err != nil {
205
214
return err
206
215
}
You can’t perform that action at this time.
0 commit comments