Skip to content

Commit 8520f0a

Browse files
committed
fix(badgerd/gossip): wait for leader version catchup during init
Avoid transient init failures during sync replication rolling restarts. A follower can observe a replicated commit before the leader's MaxVersion has advanced, which caused "invalid replication version" errors in nightly stress runs. Add a bounded wait in Init and cover it with a regression test. Signed-off-by: Adphi <philippe.adrien.nousse@gmail.com>
1 parent 1d06f48 commit 8520f0a

File tree

4 files changed

+58
-8
lines changed

4 files changed

+58
-8
lines changed

Makefile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,10 @@ ci-replication-smoke:
162162

163163
.PHONY: ci-replication-stress
164164
ci-replication-stress:
165-
@go test -v -count 2 -shuffle=on -p 1 -timeout 20m -run '^TestReplicationModes/(async|sync)/leader_churn_under_writes$$' ./tests
166-
@go test -v -count 2 -shuffle=on -p 1 -timeout 20m -run '^TestReplicationModes/(async|sync)/follower_offline_catchup_under_load$$' ./tests
167-
@go test -v -count 1 -shuffle=on -p 1 -timeout 20m -run '^TestReplicationModes/(async|sync)/rolling_restart_no_data_loss$$' ./tests
168-
@go test -v -count 1 -shuffle=on -p 1 -timeout 20m -run '^TestReplicationModes/(async|sync)/delete_propagation_no_resurrection$$' ./tests
165+
@go test -v -count 10 -shuffle=on -p 1 -timeout 20m -run '^TestReplicationModes/(async|sync)/leader_churn_under_writes$$' ./tests
166+
@go test -v -count 10 -shuffle=on -p 1 -timeout 20m -run '^TestReplicationModes/(async|sync)/follower_offline_catchup_under_load$$' ./tests
167+
@go test -v -count 5 -shuffle=on -p 1 -timeout 20m -run '^TestReplicationModes/(async|sync)/rolling_restart_no_data_loss$$' ./tests
168+
@go test -v -count 5 -shuffle=on -p 1 -timeout 20m -run '^TestReplicationModes/(async|sync)/delete_propagation_no_resurrection$$' ./tests
169169

170170
.PHONY: ci-integration
171171
ci-integration:

internal/badgerd/replication/gossip/service.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,20 @@ func (r *Gossip) Init(req *pb2.InitRequest, ss pb2.ReplicationService_InitServer
6161
return gerrs.Abortedf("node %s is not in the cluster", addr)
6262
}
6363
m := r.db.MaxVersion()
64+
if req.Since > m {
65+
// In sync mode a follower can observe an incoming replicated commit just before
66+
// the leader's local MaxVersion advances. Give the leader a short window to catch up
67+
// before treating the follower as genuinely ahead.
68+
deadline := time.Now().Add(250 * time.Millisecond)
69+
for req.Since > m && time.Now().Before(deadline) {
70+
select {
71+
case <-ss.Context().Done():
72+
return ss.Context().Err()
73+
case <-time.After(10 * time.Millisecond):
74+
}
75+
m = r.db.MaxVersion()
76+
}
77+
}
6478
if req.Since > m {
6579
return gerrs.FailedPreconditionf("invalid replication version %d, max is %d", req.Since, m)
6680
}

internal/badgerd/replication/gossip/service_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,27 @@ func TestInitWaitsForNodeToAppear(t *testing.T) {
9999
assert.True(t, called)
100100
}
101101

102+
func TestInitWaitsForLeaderVersionToCatchUp(t *testing.T) {
103+
called := false
104+
db := &fakeDB{maxVersion: 0, streamFn: func(_ context.Context, at, since uint64, _ io.Writer) error {
105+
called = true
106+
assert.EqualValues(t, 2, at)
107+
assert.EqualValues(t, 1, since)
108+
return nil
109+
}}
110+
r := &Gossip{leading: NewAtomic(true), db: db, nodes: Map[*node]{}}
111+
r.nodes.Store("n1", &node{name: "n1", addr: net.ParseIP("127.0.0.1")})
112+
113+
go func() {
114+
time.Sleep(15 * time.Millisecond)
115+
db.SetMaxVersion(2)
116+
}()
117+
118+
err := r.Init(&pb.InitRequest{Since: 1}, &fakeInitSrv{ctx: peerCtx("127.0.0.1", 7000)})
119+
require.NoError(t, err)
120+
assert.True(t, called)
121+
}
122+
102123
func TestAliveTable(t *testing.T) {
103124
r := &Gossip{}
104125
err := r.Alive(&fakeAliveSrv{ctx: context.Background(), recvErrs: []error{io.EOF}})

internal/badgerd/replication/gossip/test_helpers_test.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"io"
66
"net"
7+
"sync"
78

89
"github.com/dgraph-io/badger/v3"
910
"google.golang.org/grpc"
@@ -16,6 +17,7 @@ import (
1617
)
1718

1819
type fakeDB struct {
20+
mu sync.RWMutex
1921
path string
2022
inMemory bool
2123
maxVersion uint64
@@ -27,10 +29,23 @@ type fakeDB struct {
2729
batch *fakeWriteBatch
2830
}
2931

30-
func (f *fakeDB) Path() string { return f.path }
31-
func (f *fakeDB) InMemory() bool { return f.inMemory }
32-
func (f *fakeDB) MaxVersion() uint64 { return f.maxVersion }
33-
func (f *fakeDB) SetVersion(v uint64) { f.version = v }
32+
func (f *fakeDB) Path() string { return f.path }
33+
func (f *fakeDB) InMemory() bool { return f.inMemory }
34+
func (f *fakeDB) MaxVersion() uint64 {
35+
f.mu.RLock()
36+
defer f.mu.RUnlock()
37+
return f.maxVersion
38+
}
39+
func (f *fakeDB) SetVersion(v uint64) {
40+
f.mu.Lock()
41+
defer f.mu.Unlock()
42+
f.version = v
43+
}
44+
func (f *fakeDB) SetMaxVersion(v uint64) {
45+
f.mu.Lock()
46+
defer f.mu.Unlock()
47+
f.maxVersion = v
48+
}
3449
func (f *fakeDB) Drop() error { return nil }
3550
func (f *fakeDB) Load(context.Context, io.Reader) (uint64, error) { return 0, nil }
3651
func (f *fakeDB) Stream(ctx context.Context, at, since uint64, w io.Writer) error {

0 commit comments

Comments
 (0)