Skip to content

Commit a8bcf4c

Browse files
author
Dylan Terry
committed
Remove the timeout for synchronous backup, revert the timeout move to return the behavior to 30 days _between_ events, restore some comments, use struct instead of bool as recommended, add a note about SynchronousEventHandler and the parseEvent return values
1 parent ca576c9 commit a8bcf4c

File tree

3 files changed

+16
-21
lines changed

3 files changed

+16
-21
lines changed

replication/backup.go

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du
2323
return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
2424
})
2525
} else {
26-
return b.StartSynchronousBackup(p, timeout)
26+
return b.StartSynchronousBackup(p)
2727
}
2828
}
2929

@@ -68,10 +68,10 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
6868
}
6969
}()
7070

71-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
72-
defer cancel()
73-
7471
for {
72+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
73+
defer cancel()
74+
7575
select {
7676
case <-ctx.Done():
7777
return nil
@@ -89,27 +89,17 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
8989
}
9090

9191
// StartSynchronousBackup starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig.
92-
func (b *BinlogSyncer) StartSynchronousBackup(p Position, timeout time.Duration) error {
92+
func (b *BinlogSyncer) StartSynchronousBackup(p Position) error {
9393
if b.cfg.SynchronousEventHandler == nil {
9494
return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackup")
9595
}
9696

97-
if timeout == 0 {
98-
timeout = 30 * 3600 * 24 * time.Second // Long timeout by default
99-
}
100-
101-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
102-
defer cancel()
103-
10497
s, err := b.StartSync(p)
10598
if err != nil {
10699
return errors.Trace(err)
107100
}
108101

109-
// Wait for the binlog syncer to finish or encounter an error
110102
select {
111-
case <-ctx.Done():
112-
return nil
113103
case <-b.ctx.Done():
114104
return nil
115105
case err := <-s.ech:
@@ -138,6 +128,7 @@ func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error {
138128
rotateEvent := e.Event.(*RotateEvent)
139129
h.filename = string(rotateEvent.NextLogName)
140130
if e.Header.Timestamp == 0 || offset == 0 {
131+
// fake rotate event
141132
return nil
142133
}
143134
} else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT {
@@ -157,6 +148,7 @@ func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error {
157148
return errors.Trace(err)
158149
}
159150

151+
// Write binlog header 0xfebin
160152
_, err = h.w.Write(BinLogFileHeader)
161153
if err != nil {
162154
return errors.Trace(err)

replication/backup_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() {
3232
os.RemoveAll(binlogDir)
3333
timeout := 2 * time.Second
3434

35-
done := make(chan bool)
35+
done := make(chan struct{})
3636

3737
go func() {
3838
err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout)
3939
require.NoError(t.T(), err)
40-
done <- true
40+
close(done)
4141
}()
4242
failTimeout := 5 * timeout
4343
ctx, cancel := context.WithTimeout(context.Background(), failTimeout)

replication/binlogsyncer.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ type BinlogSyncerConfig struct {
128128

129129
// SynchronousEventHandler is used for synchronous event handling.
130130
// This should not be used together with StartBackupWithHandler.
131+
// If this is not nil, GetEvent does not need to be called.
131132
SynchronousEventHandler EventHandler
132133
}
133134

@@ -805,24 +806,26 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
805806

806807
// parseEvent parses the raw data into a BinlogEvent.
807808
// It only handles parsing and does not perform any side effects.
808-
func (b *BinlogSyncer) parseEvent(data []byte) (*BinlogEvent, bool, error) {
809+
// Returns the parsed BinlogEvent, a boolean indicating if an ACK is needed, and an error if the
810+
// parsing fails
811+
func (b *BinlogSyncer) parseEvent(data []byte) (event *BinlogEvent, needACK bool, err error) {
809812
// Skip OK byte (0x00)
810813
data = data[1:]
811814

812-
needACK := false
815+
needACK = false
813816
if b.cfg.SemiSyncEnabled && data[0] == SemiSyncIndicator {
814817
needACK = data[1] == 0x01
815818
// Skip semi-sync header
816819
data = data[2:]
817820
}
818821

819822
// Parse the event using the BinlogParser
820-
e, err := b.parser.Parse(data)
823+
event, err = b.parser.Parse(data)
821824
if err != nil {
822825
return nil, false, errors.Trace(err)
823826
}
824827

825-
return e, needACK, nil
828+
return event, needACK, nil
826829
}
827830

828831
// handleEventAndACK processes an event and sends an ACK if necessary.

0 commit comments

Comments
 (0)