Skip to content

Commit 93923b1

Browse files
authored
[PBM-907] slice oplog during logical backup (#902)
1 parent 534ede6 commit 93923b1

File tree

33 files changed

+837
-542
lines changed

33 files changed

+837
-542
lines changed

.github/workflows/reviewdog.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ jobs:
7575
with:
7676
go-version: "1.19"
7777
- run: go install golang.org/x/tools/cmd/goimports@latest
78-
- run: go install mvdan.cc/gofumpt@latest
78+
- run: go install mvdan.cc/gofumpt@v0.5.0
7979
- run: goimports -w -local "github.com/percona" $(find . -not -path "*/vendor/*" -name "*.go")
8080
- run: gofumpt -w -extra $(find . -not -path "*/vendor/*" -name "*.go")
8181
- uses: reviewdog/action-suggester@v1

cmd/pbm-agent/agent.go

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func (a *Agent) Resync(ctx context.Context, opid ctrl.OPID, ep config.Epoch) {
182182
Epoch: &epts,
183183
})
184184

185-
got, err := a.acquireLock(ctx, lock, l, nil)
185+
got, err := a.acquireLock(ctx, lock, l)
186186
if err != nil {
187187
l.Error("acquiring lock: %v", err)
188188
return
@@ -194,7 +194,7 @@ func (a *Agent) Resync(ctx context.Context, opid ctrl.OPID, ep config.Epoch) {
194194

195195
defer func() {
196196
if err := lock.Release(); err != nil {
197-
l.Error("reslase lock %v: %v", lock, err)
197+
l.Error("release lock %v: %v", lock, err)
198198
}
199199
}()
200200

@@ -215,16 +215,10 @@ func (a *Agent) Resync(ctx context.Context, opid ctrl.OPID, ep config.Epoch) {
215215
l.Debug("epoch set to %v", epch)
216216
}
217217

218-
type lockAquireFn func(context.Context) (bool, error)
219-
220218
// acquireLock tries to acquire the lock. If there is a stale lock
221219
// it tries to mark op that held the lock (backup, [pitr]restore) as failed.
222-
func (a *Agent) acquireLock(ctx context.Context, l *lock.Lock, lg log.LogEvent, acquireFn lockAquireFn) (bool, error) {
223-
if acquireFn == nil {
224-
acquireFn = l.Acquire
225-
}
226-
227-
got, err := acquireFn(ctx)
220+
func (a *Agent) acquireLock(ctx context.Context, l *lock.Lock, lg log.LogEvent) (bool, error) {
221+
got, err := l.Acquire(ctx)
228222
if err == nil {
229223
return got, nil
230224
}
@@ -248,14 +242,14 @@ func (a *Agent) acquireLock(ctx context.Context, l *lock.Lock, lg log.LogEvent,
248242
case ctrl.CmdRestore:
249243
fn = markRestoreStale
250244
default:
251-
return acquireFn(ctx)
245+
return l.Acquire(ctx)
252246
}
253247

254248
if err := fn(ctx, l, lck.OPID); err != nil {
255249
lg.Warning("failed to mark stale op '%s' as failed: %v", lck.OPID, err)
256250
}
257251

258-
return acquireFn(ctx)
252+
return l.Acquire(ctx)
259253
}
260254

261255
func (a *Agent) HbPause() {

cmd/pbm-agent/backup.go

Lines changed: 46 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"time"
66

7+
"golang.org/x/sync/errgroup"
8+
79
"github.com/percona/percona-backup-mongodb/internal/backup"
810
"github.com/percona/percona-backup-mongodb/internal/config"
911
"github.com/percona/percona-backup-mongodb/internal/ctrl"
@@ -17,36 +19,27 @@ import (
1719
)
1820

1921
type currentBackup struct {
20-
header *ctrl.BackupCmd
2122
cancel context.CancelFunc
2223
}
2324

24-
func (a *Agent) setBcp(b *currentBackup) bool {
25+
func (a *Agent) setBcp(b *currentBackup) {
2526
a.mx.Lock()
2627
defer a.mx.Unlock()
27-
if a.bcp != nil {
28-
return false
29-
}
3028

3129
a.bcp = b
32-
return true
33-
}
34-
35-
func (a *Agent) unsetBcp() {
36-
a.mx.Lock()
37-
a.bcp = nil
38-
a.mx.Unlock()
3930
}
4031

4132
// CancelBackup cancels current backup
4233
func (a *Agent) CancelBackup() {
4334
a.mx.Lock()
4435
defer a.mx.Unlock()
36+
4537
if a.bcp == nil {
4638
return
4739
}
4840

4941
a.bcp.cancel()
42+
a.bcp = nil
5043
}
5144

5245
// Backup starts backup
@@ -67,6 +60,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
6760
l.Error("get node info: %v", err)
6861
return
6962
}
63+
// TODO: do the check on the agent start only
7064
if nodeInfo.IsStandalone() {
7165
l.Error("mongod node can not be used to fetch a consistent backup because it has no oplog. " +
7266
"Please restart it as a primary in a single-node replicaset " +
@@ -78,7 +72,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
7872
canRunBackup, err := topo.NodeSuitsExt(ctx, a.nodeConn, nodeInfo, cmd.Type)
7973
if err != nil {
8074
l.Error("node check: %v", err)
81-
if !isClusterLeader {
75+
if errors.Is(err, context.Canceled) || !isClusterLeader {
8276
return
8377
}
8478
}
@@ -89,10 +83,8 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
8983
}
9084
}
9185

92-
// wakeup the slicer not to wait for the tick
93-
if p := a.getPitr(); p != nil {
94-
p.w <- &opid
95-
}
86+
// wakeup the slicer to not wait for the tick
87+
go a.sliceNow(opid)
9688

9789
var bcp *backup.Backup
9890
switch cmd.Type {
@@ -117,6 +109,8 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
117109
l.Error("backups cannot be saved because PBM storage configuration hasn't been set yet")
118110
return
119111
}
112+
113+
bcp.SetSlicerInterval(cfg.BackupSlicerInterval())
120114
bcp.SetTimeouts(cfg.Backup.Timeouts)
121115

122116
if isClusterLeader {
@@ -187,21 +181,28 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
187181
l.Error("get cluster members: %v", err)
188182
return
189183
}
190-
for _, sh := range shards {
191-
go func(rs string) {
192-
err := a.nominateRS(ctx, cmd.Name, rs, nodes.RS(rs), l)
193-
if err != nil {
194-
l.Error("nodes nomination for %s: %v", rs, err)
195-
}
196-
}(sh.RS)
184+
185+
errGrp, grpCtx := errgroup.WithContext(ctx)
186+
for i := range shards {
187+
rs := shards[i].RS
188+
189+
errGrp.Go(func() error {
190+
err := a.nominateRS(grpCtx, cmd.Name, rs, nodes.RS(rs))
191+
return errors.Wrapf(err, "nodes nomination for %s", rs)
192+
})
193+
}
194+
195+
err = errGrp.Wait()
196+
if err != nil {
197+
l.Error(err.Error())
198+
return
197199
}
198200
}
199201

200-
nominated, err := a.waitNomination(ctx, cmd.Name, nodeInfo.SetName, nodeInfo.Me, l)
202+
nominated, err := a.waitNomination(ctx, cmd.Name, nodeInfo.SetName, nodeInfo.Me)
201203
if err != nil {
202204
l.Error("wait for nomination: %v", err)
203205
}
204-
205206
if !nominated {
206207
l.Debug("skip after nomination, probably started by another node")
207208
return
@@ -216,13 +217,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
216217
Epoch: &epoch,
217218
})
218219

219-
// install a backup lock despite having PITR one
220-
got, err := a.acquireLock(ctx, lck, l, func(ctx context.Context) (bool, error) {
221-
return lck.Rewrite(ctx, &lock.LockHeader{
222-
Replset: a.brief.SetName,
223-
Type: ctrl.CmdPITR,
224-
})
225-
})
220+
got, err := a.acquireLock(ctx, lck, l)
226221
if err != nil {
227222
l.Error("acquiring lock: %v", err)
228223
return
@@ -231,20 +226,27 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
231226
l.Debug("skip: lock not acquired")
232227
return
233228
}
229+
defer func() {
230+
l.Debug("releasing lock")
231+
err = lck.Release()
232+
if err != nil {
233+
l.Error("unable to release backup lock %v: %v", lck, err)
234+
}
235+
}()
234236

235237
err = backup.SetRSNomineeACK(ctx, a.leadConn, cmd.Name, nodeInfo.SetName, nodeInfo.Me)
236238
if err != nil {
237239
l.Warning("set nominee ack: %v", err)
238240
}
239241

240242
bcpCtx, cancel := context.WithCancel(ctx)
241-
a.setBcp(&currentBackup{
242-
header: cmd,
243-
cancel: cancel,
244-
})
243+
defer cancel()
244+
245+
a.setBcp(&currentBackup{cancel: cancel})
246+
defer a.setBcp(nil)
247+
245248
l.Info("backup started")
246249
err = bcp.Run(bcpCtx, cmd, opid, l)
247-
a.unsetBcp()
248250
if err != nil {
249251
if errors.Is(err, storage.ErrCancelled) || errors.Is(err, context.Canceled) {
250252
l.Info("backup was canceled")
@@ -254,18 +256,14 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
254256
} else {
255257
l.Info("backup finished")
256258
}
257-
258-
l.Debug("releasing lock")
259-
err = lck.Release()
260-
if err != nil {
261-
l.Error("unable to release backup lock %v: %v", lck, err)
262-
}
263259
}
264260

265261
const renominationFrame = 5 * time.Second
266262

267-
func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string, l log.LogEvent) error {
263+
func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string) error {
264+
l := log.LogEventFromContext(ctx)
268265
l.Debug("nomination list for %s: %v", rs, nodes)
266+
269267
err := backup.SetRSNomination(ctx, a.leadConn, bcp, rs)
270268
if err != nil {
271269
return errors.Wrap(err, "set nomination meta")
@@ -298,7 +296,9 @@ func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string
298296
return nil
299297
}
300298

301-
func (a *Agent) waitNomination(ctx context.Context, bcp, rs, node string, l log.LogEvent) (bool, error) {
299+
func (a *Agent) waitNomination(ctx context.Context, bcp, rs, node string) (bool, error) {
300+
l := log.LogEventFromContext(ctx)
301+
302302
tk := time.NewTicker(time.Millisecond * 500)
303303
defer tk.Stop()
304304

cmd/pbm-agent/delete.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,15 @@ func (a *Agent) Delete(ctx context.Context, d *ctrl.DeleteBackupCmd, opid ctrl.O
4343
}
4444

4545
epts := ep.TS()
46-
lock := lock.NewOpLock(a.leadConn, lock.LockHeader{
46+
lock := lock.NewLock(a.leadConn, lock.LockHeader{
4747
Replset: a.brief.SetName,
4848
Node: a.brief.Me,
4949
Type: ctrl.CmdDeleteBackup,
5050
OPID: opid.String(),
5151
Epoch: &epts,
5252
})
5353

54-
got, err := a.acquireLock(ctx, lock, l, nil)
54+
got, err := a.acquireLock(ctx, lock, l)
5555
if err != nil {
5656
l.Error("acquire lock: %v", err)
5757
return
@@ -70,8 +70,10 @@ func (a *Agent) Delete(ctx context.Context, d *ctrl.DeleteBackupCmd, opid ctrl.O
7070
case d.OlderThan > 0:
7171
t := time.Unix(d.OlderThan, 0).UTC()
7272
obj := t.Format("2006-01-02T15:04:05Z")
73+
7374
l = logger.NewEvent(string(ctrl.CmdDeleteBackup), obj, opid.String(), ep.TS())
7475
ctx := log.SetLogEventToContext(ctx, l)
76+
7577
l.Info("deleting backups older than %v", t)
7678
err := backup.DeleteBackupBefore(ctx, a.leadConn, t, "")
7779
if err != nil {
@@ -81,6 +83,7 @@ func (a *Agent) Delete(ctx context.Context, d *ctrl.DeleteBackupCmd, opid ctrl.O
8183
case d.Backup != "":
8284
l = logger.NewEvent(string(ctrl.CmdDeleteBackup), d.Backup, opid.String(), ep.TS())
8385
ctx := log.SetLogEventToContext(ctx, l)
86+
8487
l.Info("deleting backup")
8588
err := backup.DeleteBackup(ctx, a.leadConn, d.Backup)
8689
if err != nil {
@@ -119,15 +122,15 @@ func (a *Agent) DeletePITR(ctx context.Context, d *ctrl.DeletePITRCmd, opid ctrl
119122
}
120123

121124
epts := ep.TS()
122-
lock := lock.NewOpLock(a.leadConn, lock.LockHeader{
125+
lock := lock.NewLock(a.leadConn, lock.LockHeader{
123126
Replset: a.brief.SetName,
124127
Node: a.brief.Me,
125128
Type: ctrl.CmdDeletePITR,
126129
OPID: opid.String(),
127130
Epoch: &epts,
128131
})
129132

130-
got, err := a.acquireLock(ctx, lock, l, nil)
133+
got, err := a.acquireLock(ctx, lock, l)
131134
if err != nil {
132135
l.Error("acquire lock: %v", err)
133136
return
@@ -173,11 +176,6 @@ func (a *Agent) Cleanup(ctx context.Context, d *ctrl.CleanupCmd, opid ctrl.OPID,
173176

174177
ctx = log.SetLogEventToContext(ctx, l)
175178

176-
if d == nil {
177-
l.Error("missed command")
178-
return
179-
}
180-
181179
nodeInfo, err := topo.GetNodeInfoExt(ctx, a.nodeConn)
182180
if err != nil {
183181
l.Error("get node info data: %v", err)
@@ -189,15 +187,15 @@ func (a *Agent) Cleanup(ctx context.Context, d *ctrl.CleanupCmd, opid ctrl.OPID,
189187
}
190188

191189
epts := ep.TS()
192-
lock := lock.NewOpLock(a.leadConn, lock.LockHeader{
190+
lock := lock.NewLock(a.leadConn, lock.LockHeader{
193191
Replset: a.brief.SetName,
194192
Node: a.brief.Me,
195193
Type: ctrl.CmdCleanup,
196194
OPID: opid.String(),
197195
Epoch: &epts,
198196
})
199197

200-
got, err := a.acquireLock(ctx, lock, l, nil)
198+
got, err := a.acquireLock(ctx, lock, l)
201199
if err != nil {
202200
l.Error("acquire lock: %v", err)
203201
return

cmd/pbm-agent/oplog.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (a *Agent) OplogReplay(ctx context.Context, r *ctrl.ReplayCmd, opID ctrl.OP
5050
Epoch: &epoch,
5151
})
5252

53-
nominated, err := a.acquireLock(ctx, lck, l, nil)
53+
nominated, err := a.acquireLock(ctx, lck, l)
5454
if err != nil {
5555
l.Error("acquiring lock: %s", err.Error())
5656
return

0 commit comments

Comments
 (0)