Skip to content

Commit 7f9290d

Browse files
chideatclaude
andauthored
fix: prevent data loss during cross-version rolling upgrade (#66)
* fix: use SHUTDOWN NOSAVE when dbsize is 0 to prevent data loss during cross-version rolling upgrade During a rolling upgrade (e.g. 7.2 → 9.0), a node demoted to replica may attempt a fullsync from the new-version master. If the RDB format is incompatible, the load fails after memory has already been cleared, leaving an empty in-memory state. A subsequent SHUTDOWN (with save) would then overwrite the valid dump.rdb with empty data. Fix: re-check Dbsize right before SHUTDOWN. If 0, use SHUTDOWN NOSAVE to preserve the existing dump.rdb (which holds valid pre-upgrade data). If >0, use normal SHUTDOWN. Applies to both failover/sentinel and cluster shutdown paths. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: improve test coverage for shutdown dbsize check to fix Coveralls failure Extract shutdownNode helper in cluster and failover packages, switch from Info().Dbsize to DBSIZE command for testability with miniredis, and replace no-op tests with real tests that exercise both branches. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 30119a9 commit 7f9290d

File tree

4 files changed

+154
-4
lines changed

4 files changed

+154
-4
lines changed

cmd/helper/commands/cluster/shutdown.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,12 +203,33 @@ func Shutdown(ctx context.Context, c *cli.Context, client *kubernetes.Clientset,
203203
time.Sleep(time.Second * 10)
204204

205205
logger.Info("do shutdown node")
206-
if _, err = valkeyClient.Do(ctx, "SHUTDOWN"); err != nil && !errors.Is(err, io.EOF) {
206+
if err = shutdownNode(ctx, valkeyClient, logger); err != nil {
207207
logger.Error(err, "graceful shutdown failed")
208208
}
209209
return nil
210210
}
211211

212+
// shutdownNode re-checks dbsize before issuing SHUTDOWN.
213+
// If dbsize == 0 (e.g. memory was cleared by a failed cross-version fullsync),
214+
// SHUTDOWN NOSAVE is used to preserve the existing dump.rdb on disk.
215+
// Otherwise, normal SHUTDOWN is issued to persist the current dataset.
216+
func shutdownNode(ctx context.Context, valkeyClient valkey.ValkeyClient, logger logr.Logger) error {
217+
dbsize, _ := valkey.Int64(valkeyClient.Do(ctx, "DBSIZE"))
218+
if dbsize == 0 {
219+
logger.Info("dbsize is 0, using SHUTDOWN NOSAVE to preserve existing dump.rdb")
220+
_, err := valkeyClient.Do(ctx, "SHUTDOWN", "NOSAVE")
221+
if err != nil && !errors.Is(err, io.EOF) {
222+
return err
223+
}
224+
return nil
225+
}
226+
_, err := valkeyClient.Do(ctx, "SHUTDOWN")
227+
if err != nil && !errors.Is(err, io.EOF) {
228+
return err
229+
}
230+
return nil
231+
}
232+
212233
func getPodAccessAddr(pod *v1.Pod) string {
213234
addr := net.JoinHostPort(pod.Status.PodIP, "6379")
214235
ipFamilyPrefer := os.Getenv("IP_FAMILY_PREFER")
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
Copyright 2024 chideat.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cluster
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
"github.com/alicebob/miniredis/v2"
24+
"github.com/chideat/valkey-operator/pkg/valkey"
25+
"github.com/go-logr/logr"
26+
)
27+
28+
// Test_shutdownNode verifies the SHUTDOWN/SHUTDOWN NOSAVE decision in shutdownNode.
29+
// miniredis doesn't support SHUTDOWN, so both cases return an error — but each exercises
30+
// the correct branch (NOSAVE for dbsize==0, normal SHUTDOWN for dbsize>0).
31+
func Test_shutdownNode(t *testing.T) {
32+
tests := []struct {
33+
name string
34+
setup func(s *miniredis.Miniredis)
35+
wantErr bool
36+
}{
37+
{
38+
name: "empty database uses SHUTDOWN NOSAVE",
39+
setup: func(_ *miniredis.Miniredis) {},
40+
wantErr: true, // miniredis returns ERR for SHUTDOWN NOSAVE
41+
},
42+
{
43+
name: "non-empty database uses normal SHUTDOWN",
44+
setup: func(s *miniredis.Miniredis) {
45+
_ = s.Set("key1", "value1")
46+
_ = s.Set("key2", "value2")
47+
},
48+
wantErr: true, // miniredis returns ERR for SHUTDOWN
49+
},
50+
}
51+
52+
for _, tt := range tests {
53+
t.Run(tt.name, func(t *testing.T) {
54+
s := miniredis.RunT(t)
55+
tt.setup(s)
56+
client := valkey.NewValkeyClient(s.Addr(), valkey.AuthInfo{})
57+
defer client.Close()
58+
59+
err := shutdownNode(context.Background(), client, logr.Discard())
60+
if (err != nil) != tt.wantErr {
61+
t.Errorf("shutdownNode() error = %v, wantErr %v", err, tt.wantErr)
62+
}
63+
})
64+
}
65+
}

cmd/helper/commands/failover/shutdown.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -365,10 +365,32 @@ func Shutdown(ctx context.Context, c *cli.Context, client *kubernetes.Clientset,
365365
time.Sleep(time.Second * 30)
366366

367367
logger.Info("do shutdown node")
368+
if shutdownErr := shutdownNode(ctx, valkeyClient, logger); shutdownErr != nil {
369+
logger.Error(shutdownErr, "graceful shutdown failed")
370+
}
371+
return err
372+
}
373+
374+
// shutdownNode re-checks dbsize before issuing SHUTDOWN.
375+
// If dbsize == 0 (e.g. memory was cleared by a failed cross-version fullsync),
376+
// SHUTDOWN NOSAVE is used to preserve the existing dump.rdb on disk.
377+
// Otherwise, normal SHUTDOWN is issued to persist the current dataset.
378+
// Uses a 300s timeout to accommodate large RDB snapshots.
379+
func shutdownNode(ctx context.Context, valkeyClient valkey.ValkeyClient, logger logr.Logger) error {
380+
dbsize, _ := valkey.Int64(valkeyClient.Do(ctx, "DBSIZE"))
381+
if dbsize == 0 {
382+
logger.Info("dbsize is 0, using SHUTDOWN NOSAVE to preserve existing dump.rdb")
383+
_, err := valkeyClient.DoWithTimeout(ctx, time.Second*300, "SHUTDOWN", "NOSAVE")
384+
if err != nil && !errors.Is(err, io.EOF) {
385+
return err
386+
}
387+
return nil
388+
}
368389
// NOTE: here set timeout to 300s, which will try best to do a shutdown snapshot
369390
// if the data is too large, this snapshot may not be completed
370-
if _, err := valkeyClient.DoWithTimeout(ctx, time.Second*300, "SHUTDOWN"); err != nil && !errors.Is(err, io.EOF) {
371-
logger.Error(err, "graceful shutdown failed")
391+
_, err := valkeyClient.DoWithTimeout(ctx, time.Second*300, "SHUTDOWN")
392+
if err != nil && !errors.Is(err, io.EOF) {
393+
return err
372394
}
373-
return err
395+
return nil
374396
}

cmd/helper/commands/failover/shutdown_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@ limitations under the License.
1717
package failover
1818

1919
import (
20+
"context"
2021
"os"
2122
"testing"
2223

24+
"github.com/alicebob/miniredis/v2"
25+
"github.com/chideat/valkey-operator/pkg/valkey"
2326
"github.com/go-logr/logr"
2427
)
2528

@@ -76,3 +79,42 @@ replica-announce-port 31095`,
7679
})
7780
}
7881
}
82+
83+
// Test_shutdownNode verifies the SHUTDOWN/SHUTDOWN NOSAVE decision in shutdownNode.
84+
// miniredis doesn't support SHUTDOWN, so both cases return an error — but each exercises
85+
// the correct branch (NOSAVE for dbsize==0, normal SHUTDOWN for dbsize>0).
86+
func Test_shutdownNode(t *testing.T) {
87+
tests := []struct {
88+
name string
89+
setup func(s *miniredis.Miniredis)
90+
wantErr bool
91+
}{
92+
{
93+
name: "empty database uses SHUTDOWN NOSAVE",
94+
setup: func(_ *miniredis.Miniredis) {},
95+
wantErr: true, // miniredis returns ERR for SHUTDOWN NOSAVE
96+
},
97+
{
98+
name: "non-empty database uses normal SHUTDOWN",
99+
setup: func(s *miniredis.Miniredis) {
100+
_ = s.Set("key1", "value1")
101+
_ = s.Set("key2", "value2")
102+
},
103+
wantErr: true, // miniredis returns ERR for SHUTDOWN
104+
},
105+
}
106+
107+
for _, tt := range tests {
108+
t.Run(tt.name, func(t *testing.T) {
109+
s := miniredis.RunT(t)
110+
tt.setup(s)
111+
client := valkey.NewValkeyClient(s.Addr(), valkey.AuthInfo{})
112+
defer client.Close()
113+
114+
err := shutdownNode(context.Background(), client, logr.Discard())
115+
if (err != nil) != tt.wantErr {
116+
t.Errorf("shutdownNode() error = %v, wantErr %v", err, tt.wantErr)
117+
}
118+
})
119+
}
120+
}

0 commit comments

Comments
 (0)