Skip to content

Commit 443c85e

Browse files
author
Deepak Gupta
committed
Enhancement to allow syncing binlogs using a specified GTID set
1 parent 4a082cf commit 443c85e

File tree

2 files changed

+167
-26
lines changed

2 files changed

+167
-26
lines changed

replication/backup.go

Lines changed: 90 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,20 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p mysql.Position, timeout t
2727
}
2828
}
2929

30+
func (b *BinlogSyncer) StartBackupGTID(backupDir string, gset mysql.GTIDSet, timeout time.Duration) error {
31+
err := os.MkdirAll(backupDir, 0o755)
32+
if err != nil {
33+
return errors.Trace(err)
34+
}
35+
if b.cfg.SynchronousEventHandler == nil {
36+
return b.StartBackupWithHandlerAndGTID(gset, timeout, func(filename string) (io.WriteCloser, error) {
37+
return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0o644)
38+
})
39+
} else {
40+
return b.StartSynchronousBackupWithGTID(gset, timeout)
41+
}
42+
}
43+
3044
// StartBackupWithHandler starts the backup process for the binary log using the specified position and handler.
3145
// The process will continue until the timeout is reached or an error occurs.
3246
// This method should not be used together with SynchronousEventHandler.
@@ -54,52 +68,73 @@ func (b *BinlogSyncer) StartBackupWithHandler(p mysql.Position, timeout time.Dur
5468
backupHandler := &BackupEventHandler{
5569
handler: handler,
5670
}
57-
5871
s, err := b.StartSync(p)
5972
if err != nil {
6073
return errors.Trace(err)
6174
}
75+
return processWithHandler(b, s, backupHandler, timeout)
76+
}
6277

63-
defer func() {
64-
if backupHandler.w != nil {
65-
closeErr := backupHandler.w.Close()
66-
if retErr == nil {
67-
retErr = closeErr
68-
}
69-
}
70-
}()
78+
// StartBackupWithHandlerAndGTID starts the backup process for the binary log using the specified GTID set and handler.
79+
// - gset: The GTID set from which to begin the backup.
80+
// - timeout: The maximum duration to wait for new binlog events before stopping the backup process.
81+
// If set to 0, a default very long timeout (30 days) is used instead.
82+
// - handler: A function that takes a binlog filename and returns an WriteCloser for writing raw events to.
83+
func (b *BinlogSyncer) StartBackupWithHandlerAndGTID(gset mysql.GTIDSet, timeout time.Duration,
84+
handler func(binlogFilename string) (io.WriteCloser, error),
85+
) (retErr error) {
86+
if timeout == 0 {
87+
// a very long timeout here
88+
timeout = 30 * 3600 * 24 * time.Second
89+
}
90+
if b.cfg.SynchronousEventHandler != nil {
91+
return errors.New("StartBackupWithHandlerAndGTID cannot be used when SynchronousEventHandler is set. Use StartSynchronousBackupWithGTID instead.")
92+
}
7193

72-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
73-
defer cancel()
94+
// Force use raw mode
95+
b.parser.SetRawMode(true)
7496

75-
for {
76-
select {
77-
case <-ctx.Done():
78-
return nil
79-
case <-b.ctx.Done():
80-
return nil
81-
case err := <-s.ech:
82-
return errors.Trace(err)
83-
case e := <-s.ch:
84-
err = backupHandler.HandleEvent(e)
85-
if err != nil {
86-
return errors.Trace(err)
87-
}
88-
}
97+
// Set up the backup event handler
98+
backupHandler := &BackupEventHandler{
99+
handler: handler,
100+
}
101+
102+
s, err := b.StartSyncGTID(gset)
103+
if err != nil {
104+
return errors.Trace(err)
89105
}
106+
return processWithHandler(b, s, backupHandler, timeout)
90107
}
91108

92109
// StartSynchronousBackup starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig.
93110
func (b *BinlogSyncer) StartSynchronousBackup(p mysql.Position, timeout time.Duration) error {
94111
if b.cfg.SynchronousEventHandler == nil {
95112
return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackup")
96113
}
97-
98114
s, err := b.StartSync(p)
99115
if err != nil {
100116
return errors.Trace(err)
101117
}
102118

119+
return process(b, s, timeout)
120+
}
121+
122+
// StartSynchronousBackupWithGTID starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig with a specified GTID set.
123+
func (b *BinlogSyncer) StartSynchronousBackupWithGTID(gset mysql.GTIDSet, timeout time.Duration) error {
124+
125+
if b.cfg.SynchronousEventHandler == nil {
126+
return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackupWithGTID")
127+
}
128+
129+
s, err := b.StartSyncGTID(gset)
130+
if err != nil {
131+
return errors.Trace(err)
132+
}
133+
134+
return process(b, s, timeout)
135+
}
136+
137+
func process(b *BinlogSyncer, s *BinlogStreamer, timeout time.Duration) error {
103138
var ctx context.Context
104139
var cancel context.CancelFunc
105140

@@ -123,6 +158,35 @@ func (b *BinlogSyncer) StartSynchronousBackup(p mysql.Position, timeout time.Dur
123158
}
124159
}
125160

161+
func processWithHandler(b *BinlogSyncer, s *BinlogStreamer, backupHandler *BackupEventHandler, timeout time.Duration) (retErr error) {
162+
defer func() {
163+
if backupHandler.w != nil {
164+
closeErr := backupHandler.w.Close()
165+
if retErr == nil {
166+
retErr = closeErr
167+
}
168+
}
169+
}()
170+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
171+
defer cancel()
172+
173+
for {
174+
select {
175+
case <-ctx.Done():
176+
return nil
177+
case <-b.ctx.Done():
178+
return nil
179+
case err := <-s.ech:
180+
return errors.Trace(err)
181+
case e := <-s.ch:
182+
err := backupHandler.HandleEvent(e)
183+
if err != nil {
184+
return errors.Trace(err)
185+
}
186+
}
187+
}
188+
}
189+
126190
// BackupEventHandler handles writing events for backup
127191
type BackupEventHandler struct {
128192
handler func(binlogFilename string) (io.WriteCloser, error)

replication/backup_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package replication
22

33
import (
44
"context"
5+
"fmt"
6+
"github.com/google/uuid"
57
"io"
68
"os"
79
"path"
@@ -61,6 +63,16 @@ func (t *testSyncerSuite) TestSyncBackup() {
6163
testBackup(t, true) // true indicates synchronous mode
6264
}
6365

66+
// TestAsyncBackupWithGTID runs the backup process in asynchronous mode with GTID and verifies binlog file creation.
67+
func (t *testSyncerSuite) TestAsyncBackupWithGTID() {
68+
testBackUpWithGTID(t, false) // false indicates asynchronous mode
69+
}
70+
71+
// TestSyncBackupWithGTID runs the backup process in synchronous mode with GTID and verifies binlog file creation.
72+
func (t *testSyncerSuite) TestSyncBackupWithGTID() {
73+
testBackUpWithGTID(t, true) // true indicates synchronous mode
74+
}
75+
6476
// testBackup is a helper function that runs the backup process in the specified mode and checks if binlog files are written correctly.
6577
func testBackup(t *testSyncerSuite, isSynchronous bool) {
6678
t.setupTest(mysql.MySQLFlavor)
@@ -111,6 +123,71 @@ func testBackup(t *testSyncerSuite, isSynchronous bool) {
111123
}
112124
}
113125

126+
func testBackUpWithGTID(t *testSyncerSuite, isSynchronous bool) {
127+
t.setupTest(mysql.MySQLFlavor)
128+
t.b.cfg.SemiSyncEnabled = false // Ensure semi-sync is disabled
129+
130+
binlogDir := "./var"
131+
os.RemoveAll(binlogDir)
132+
timeout := 3 * time.Second
133+
134+
if isSynchronous {
135+
// Set up a BackupEventHandler for synchronous mode
136+
backupHandler := NewBackupEventHandler(
137+
func(filename string) (io.WriteCloser, error) {
138+
return os.OpenFile(path.Join(binlogDir, filename), os.O_CREATE|os.O_WRONLY, 0o644)
139+
},
140+
)
141+
t.b.cfg.SynchronousEventHandler = backupHandler
142+
} else {
143+
// Ensure SynchronousEventHandler is nil for asynchronous mode
144+
t.b.cfg.SynchronousEventHandler = nil
145+
}
146+
147+
r, err := t.c.Execute("SELECT @@gtid_mode")
148+
require.NoError(t.T(), err)
149+
modeOn, _ := r.GetString(0, 0)
150+
if modeOn != "ON" {
151+
t.T().Skip("GTID mode is not ON")
152+
}
153+
154+
r, err = t.c.Execute("SHOW GLOBAL VARIABLES LIKE 'SERVER_UUID'")
155+
require.NoError(t.T(), err)
156+
157+
var masterUuid uuid.UUID
158+
if s, _ := r.GetString(0, 1); len(s) > 0 && s != "NONE" {
159+
masterUuid, err = uuid.Parse(s)
160+
require.NoError(t.T(), err)
161+
}
162+
163+
set, _ := mysql.ParseMysqlGTIDSet(fmt.Sprintf("%s:%d-%d", masterUuid.String(), 1, 2))
164+
done := make(chan bool)
165+
166+
// Start the backup process in a goroutine
167+
go func() {
168+
err := t.b.StartBackupGTID(binlogDir, set, timeout)
169+
require.NoError(t.T(), err)
170+
done <- true
171+
}()
172+
173+
failTimeout := 2 * timeout
174+
ctx, cancel := context.WithTimeout(context.Background(), failTimeout)
175+
defer cancel()
176+
177+
// Wait for the backup to complete or timeout
178+
select {
179+
case <-done:
180+
files, err := os.ReadDir(binlogDir)
181+
require.NoError(t.T(), err, "Failed to read binlog directory")
182+
require.Greater(t.T(), len(files), 0, "Binlog files were not written to the directory")
183+
mode := modeLabel(isSynchronous)
184+
t.T().Logf("Backup completed successfully in %s mode using GTID with %d binlog file(s).", mode, len(files))
185+
case <-ctx.Done():
186+
mode := modeLabel(isSynchronous)
187+
t.T().Fatalf("Timeout error during backup in %s mode.", mode)
188+
}
189+
}
190+
114191
func modeLabel(isSynchronous bool) string {
115192
if isSynchronous {
116193
return "synchronous"

0 commit comments

Comments
 (0)