Skip to content

Commit 5657eaa

Browse files
authored
[PBM-1223] backup oplog from the oldest active transaction timestamp (#900)
1 parent 3607a38 commit 5657eaa

File tree

3 files changed

+74
-3
lines changed

3 files changed

+74
-3
lines changed

pbm/backup/backup.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/percona/percona-backup-mongodb/pbm/errors"
1717
"github.com/percona/percona-backup-mongodb/pbm/lock"
1818
"github.com/percona/percona-backup-mongodb/pbm/log"
19+
"github.com/percona/percona-backup-mongodb/pbm/oplog"
1920
"github.com/percona/percona-backup-mongodb/pbm/storage"
2021
"github.com/percona/percona-backup-mongodb/pbm/topo"
2122
"github.com/percona/percona-backup-mongodb/pbm/util"
@@ -174,7 +175,7 @@ func (b *Backup) Run(ctx context.Context, bcp *ctrl.BackupCmd, opid ctrl.OPID, l
174175
return errors.Wrap(err, "get cluster info")
175176
}
176177

177-
oplogTS, err := topo.OpTimeFromNodeInfo(inf, true)
178+
oplogTS, err := oplog.GetOplogStartTime(ctx, b.nodeConn)
178179
if err != nil {
179180
return errors.Wrap(err, "define oplog start position")
180181
}
@@ -656,7 +657,7 @@ func (b *Backup) setClusterFirstWrite(ctx context.Context, bcpName string) error
656657
return errors.New("no replset metadata")
657658
}
658659

659-
if condAll(bcp.Replsets, func(br *BackupReplset) bool { return !br.FirstWriteTS.IsZero() }) {
660+
if condAll(bcp.Replsets, func(br *BackupReplset) bool { return br.FirstWriteTS.T > 1 }) {
660661
break
661662
}
662663

pbm/oplog/oplog.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package oplog
2+
3+
import (
4+
"context"
5+
6+
"go.mongodb.org/mongo-driver/bson"
7+
"go.mongodb.org/mongo-driver/bson/primitive"
8+
"go.mongodb.org/mongo-driver/mongo"
9+
"go.mongodb.org/mongo-driver/mongo/options"
10+
"go.mongodb.org/mongo-driver/mongo/readconcern"
11+
12+
"github.com/percona/percona-backup-mongodb/pbm/errors"
13+
)
14+
15+
var errNoTransaction = errors.New("no transaction found")
16+
17+
func GetOplogStartTime(ctx context.Context, m *mongo.Client) (primitive.Timestamp, error) {
18+
ts, err := findTransactionStartTime(ctx, m)
19+
if errors.Is(err, errNoTransaction) {
20+
ts, err = findLastOplogTS(ctx, m)
21+
}
22+
23+
return ts, err
24+
}
25+
26+
func findTransactionStartTime(ctx context.Context, m *mongo.Client) (primitive.Timestamp, error) {
27+
coll := m.Database("config").Collection("transactions", options.Collection().SetReadConcern(readconcern.Local()))
28+
f := bson.D{{"state", bson.D{{"$in", bson.A{"prepared", "inProgress"}}}}}
29+
o := options.FindOne().SetSort(bson.D{{"startOpTime", 1}})
30+
doc, err := coll.FindOne(ctx, f, o).Raw()
31+
if err != nil {
32+
if errors.Is(err, mongo.ErrNoDocuments) {
33+
return primitive.Timestamp{}, errNoTransaction
34+
}
35+
return primitive.Timestamp{}, errors.Wrap(err, "query transactions")
36+
}
37+
38+
rawTS, err := doc.LookupErr("startOpTime", "ts")
39+
if err != nil {
40+
return primitive.Timestamp{}, errors.Wrap(err, "lookup timestamp")
41+
}
42+
43+
t, i, ok := rawTS.TimestampOK()
44+
if !ok {
45+
return primitive.Timestamp{}, errors.Wrap(err, "parse timestamp")
46+
}
47+
48+
return primitive.Timestamp{T: t, I: i}, nil
49+
}
50+
51+
func findLastOplogTS(ctx context.Context, m *mongo.Client) (primitive.Timestamp, error) {
52+
coll := m.Database("local").Collection("oplog.rs")
53+
o := options.FindOne().SetSort(bson.M{"$natural": -1})
54+
doc, err := coll.FindOne(ctx, bson.D{}, o).Raw()
55+
if err != nil {
56+
return primitive.Timestamp{}, errors.Wrap(err, "query oplog")
57+
}
58+
59+
rawTS, err := doc.LookupErr("ts")
60+
if err != nil {
61+
return primitive.Timestamp{}, errors.Wrap(err, "lookup oplog ts")
62+
}
63+
64+
t, i, ok := rawTS.TimestampOK()
65+
if !ok {
66+
return primitive.Timestamp{}, errors.Wrap(err, "parse oplog ts")
67+
}
68+
69+
return primitive.Timestamp{T: t, I: i}, nil
70+
}

pbm/oplog/restore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ func bsonDocToOplog(doc bson.D) (*db.Oplog, error) {
574574
func (o *OplogRestore) applyTxn(id string) error {
575575
t, ok := o.txnData[id]
576576
if !ok {
577-
return errors.Errorf("unknown transaction id %s", id)
577+
return nil
578578
}
579579

580580
for _, op := range t.applyOps {

0 commit comments

Comments
 (0)