Skip to content

Commit 09e4059

Browse files
authored
PBM-1551 PBM reissues the previously executed command (#1146)
* remember commands by their ts and prune when possible * add tests
1 parent b208fd2 commit 09e4059

File tree

2 files changed

+234
-3
lines changed

2 files changed

+234
-3
lines changed

pbm/ctrl/recv.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func ListenCmd(ctx context.Context, m connect.Client, cl <-chan struct{}) (<-cha
4141

4242
ts := time.Now().UTC().Unix()
4343
var lastTS int64
44-
var lastCmd Command
44+
cmdBuckets := make(map[int64][]Cmd)
4545
for {
4646
select {
4747
case <-ctx.Done():
@@ -68,7 +68,7 @@ func ListenCmd(ctx context.Context, m connect.Client, cl <-chan struct{}) (<-cha
6868
continue
6969
}
7070

71-
if c.Cmd == lastCmd && c.TS == lastTS {
71+
if checkDuplicateCmd(cmdBuckets, c) {
7272
continue
7373
}
7474

@@ -80,7 +80,7 @@ func ListenCmd(ctx context.Context, m connect.Client, cl <-chan struct{}) (<-cha
8080

8181
c.OPID = OPID(opid)
8282

83-
lastCmd = c.Cmd
83+
cmdBuckets[c.TS] = append(cmdBuckets[c.TS], c)
8484
lastTS = c.TS
8585
cmd <- c
8686
ts = time.Now().UTC().Unix()
@@ -90,10 +90,39 @@ func ListenCmd(ctx context.Context, m connect.Client, cl <-chan struct{}) (<-cha
9090
cur.Close(ctx)
9191
return
9292
}
93+
94+
cleanupOldCmdBuckets(cmdBuckets, lastTS)
95+
9396
cur.Close(ctx)
9497
time.Sleep(time.Second * 1)
9598
}
9699
}()
97100

98101
return cmd, errc
99102
}
103+
104+
// checkDuplicateCmd returns true if the command already exists in the bucket for its timestamp.
105+
func checkDuplicateCmd(cmdBuckets map[int64][]Cmd, c Cmd) bool {
106+
cmds, ok := cmdBuckets[c.TS]
107+
108+
if !ok {
109+
return false
110+
}
111+
112+
for _, cmd := range cmds {
113+
if cmd.Cmd == c.Cmd && cmd.TS == c.TS {
114+
return true
115+
}
116+
}
117+
118+
return false
119+
}
120+
121+
// cleanupOldCmdBuckets deletes buckets older than the lastTS.
122+
func cleanupOldCmdBuckets(cmdBuckets map[int64][]Cmd, lastTS int64) {
123+
for ts := range cmdBuckets {
124+
if ts < lastTS {
125+
delete(cmdBuckets, ts)
126+
}
127+
}
128+
}

pbm/ctrl/recv_test.go

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
package ctrl
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"os"
8+
"testing"
9+
"time"
10+
11+
"github.com/testcontainers/testcontainers-go"
12+
"github.com/testcontainers/testcontainers-go/modules/mongodb"
13+
"go.mongodb.org/mongo-driver/bson"
14+
"go.mongodb.org/mongo-driver/bson/primitive"
15+
"go.mongodb.org/mongo-driver/mongo"
16+
"go.mongodb.org/mongo-driver/mongo/options"
17+
18+
"github.com/percona/percona-backup-mongodb/pbm/connect"
19+
)
20+
21+
var (
22+
mClient *mongo.Client
23+
connClient connect.Client
24+
)
25+
26+
func TestMain(m *testing.M) {
27+
ctx := context.Background()
28+
mongodbContainer, err := mongodb.Run(ctx, "perconalab/percona-server-mongodb:8.0.4-multi")
29+
if err != nil {
30+
log.Fatalf("error while creating mongo test container: %v", err)
31+
}
32+
connStr, err := mongodbContainer.ConnectionString(ctx)
33+
if err != nil {
34+
log.Fatalf("conn string error: %v", err)
35+
}
36+
mClient, err = mongo.Connect(ctx, options.Client().ApplyURI(connStr))
37+
if err != nil {
38+
log.Fatalf("mongo client connect error: %v", err)
39+
}
40+
41+
connClient = connect.UnsafeClient(mClient)
42+
43+
code := m.Run()
44+
45+
err = mClient.Disconnect(ctx)
46+
if err != nil {
47+
log.Fatalf("mongo client disconnect error: %v", err)
48+
}
49+
if err := testcontainers.TerminateContainer(mongodbContainer); err != nil {
50+
log.Fatalf("failed to terminate container: %s", err)
51+
}
52+
53+
os.Exit(code)
54+
}
55+
56+
func TestListenCmd(t *testing.T) {
57+
ctx := context.Background()
58+
coll := connClient.CmdStreamCollection()
59+
60+
testdata := [][]struct {
61+
cmd string
62+
offset int
63+
}{
64+
{
65+
{"restore", 0},
66+
{"backup", 1},
67+
},
68+
69+
{
70+
{"backup", 0},
71+
{"backup", 0},
72+
},
73+
{
74+
{"resync", 0},
75+
{"backup", 0},
76+
{"restore", 0},
77+
{"backup", 1},
78+
{"restore", 1},
79+
{"resync", 2},
80+
{"backup", 2},
81+
{"restore", 2},
82+
},
83+
}
84+
85+
for index, entries := range testdata {
86+
_ = coll.Drop(ctx)
87+
88+
stopCh := make(chan struct{})
89+
cmdC, errC := ListenCmd(ctx, connClient, stopCh)
90+
91+
partTS := time.Now().UTC().Unix()
92+
93+
var commands []struct {
94+
cmd string
95+
ts int64
96+
}
97+
for _, t := range entries {
98+
commands = append(commands, struct {
99+
cmd string
100+
ts int64
101+
}{t.cmd, partTS + int64(t.offset)})
102+
}
103+
104+
for _, e := range commands {
105+
doc := bson.D{{"_id", primitive.NewObjectID()}, {"cmd", e.cmd}, {"ts", e.ts}}
106+
if _, err := coll.InsertOne(ctx, doc); err != nil {
107+
t.Fatalf("insert %s@%d: %v", e.cmd, e.ts, err)
108+
}
109+
}
110+
111+
want := make(map[string]struct{})
112+
for _, e := range commands {
113+
want[fmt.Sprintf("%d/%s", e.ts, e.cmd)] = struct{}{}
114+
}
115+
116+
idle := 3 * time.Second
117+
timer := time.NewTimer(idle)
118+
got := make(map[string]struct{})
119+
120+
func() {
121+
for {
122+
select {
123+
case cmd := <-cmdC:
124+
key := fmt.Sprintf("%d/%s", cmd.TS, string(cmd.Cmd))
125+
got[key] = struct{}{}
126+
if !timer.Stop() {
127+
<-timer.C
128+
}
129+
timer.Reset(idle)
130+
case err := <-errC:
131+
t.Fatalf("listener error: %v", err)
132+
case <-timer.C:
133+
return
134+
}
135+
}
136+
}()
137+
138+
close(stopCh)
139+
140+
if len(got) != len(want) {
141+
t.Errorf("part %d: expected %d unique commands, got %d", index+1, len(want), len(got))
142+
}
143+
144+
for key := range want {
145+
if _, ok := got[key]; !ok {
146+
t.Errorf("part %d: missing expected command: %s", index+1, key)
147+
}
148+
}
149+
150+
for key := range got {
151+
if _, ok := want[key]; !ok {
152+
t.Errorf("part %d: unexpected extra command: %s", index+1, key)
153+
}
154+
}
155+
}
156+
}
157+
158+
func TestCheckDuplicateCmd(t *testing.T) {
159+
c1 := Cmd{Cmd: "backup", TS: 100}
160+
c2 := Cmd{Cmd: "restore", TS: 100}
161+
c3 := Cmd{Cmd: "backup", TS: 200}
162+
163+
buckets := map[int64][]Cmd{
164+
100: {c1},
165+
}
166+
167+
tests := []struct {
168+
name string
169+
cmd Cmd
170+
expected bool
171+
}{
172+
{"first command, new bucket", c3, false},
173+
{"different cmd, same ts", c2, false},
174+
{"exact duplicate", c1, true},
175+
}
176+
177+
for _, tt := range tests {
178+
if got := checkDuplicateCmd(buckets, tt.cmd); got != tt.expected {
179+
t.Errorf("%s: want %v, got %v", tt.name, tt.expected, got)
180+
}
181+
}
182+
}
183+
184+
func TestCleanupOldCmdBuckets(t *testing.T) {
185+
buckets := map[int64][]Cmd{
186+
90: {{Cmd: "backup", TS: 90}},
187+
100: {{Cmd: "backup", TS: 100}},
188+
110: {{Cmd: "backup", TS: 110}},
189+
}
190+
191+
cleanupOldCmdBuckets(buckets, 100)
192+
193+
if _, ok := buckets[90]; ok {
194+
t.Errorf("bucket 90 should have been deleted")
195+
}
196+
if _, ok := buckets[100]; !ok {
197+
t.Errorf("bucket 100 should be kept (ts == lastTS)")
198+
}
199+
if _, ok := buckets[110]; !ok {
200+
t.Errorf("bucket 110 should be kept (ts > lastTS)")
201+
}
202+
}

0 commit comments

Comments
 (0)