Skip to content

Commit 70f12e3

Browse files
craig[bot]dodeca12
andcommitted
Merge #159853
159853: kvnemesis: enable node crashes to kvnemesis r=dodeca12 a=dodeca12 Previously, `kvnemesis` could only simulate graceful node stops and restarts, which limited its ability to test crash recovery scenarios. This was inadequate because real-world failures often involve abrupt crashes of nodes, leaving data in an inconsistent state that must be recovered on restart. To address this, this PR adds crash operation support to `kvnemesis` through three commits: 1. Renames the `Restarter` interface to `ServerController` to better reflect its broader purpose of controlling server lifecycle operations, preparing for the addition of crash operations. 2. Extends `testcluster.TestCluster` with a `CrashServer` method that emulates a crash by stopping a server and creating a snapshot of its in-memory filesystems at the last sync point using `vfs.MemFS.CrashClone`. This simulates what would persist on disk after a real crash. The method also isolates the crashed node from peers by tripping circuit breakers, simulating network partition behavior. Adds `CrashNodeOperation` to the kvnemesis protobuf schema, integrates it into the generator to randomly crash nodes during test runs, implements crash application in the applier, and updates the validator to handle crash scenarios. The generator now tracks crashed nodes separately from stopped nodes. 3. Enables crash operations in kvnemesis test configurations by adding `removeRandCrashed` and `removeRandStoppedOrCrashed` methods to the nodes tracker. The `restartRandNode` function now randomly selects from both stopped and crashed nodes. Adds a new test `TestKVNemesisMultiNode_Crash_Liveness` that exercises crash operations with strict in-memory filesystem mode, which is required for proper crash simulation. Fixes #64828 Co-authored-by: Swapneeth Gorantla <[email protected]>
2 parents e97bac2 + 0b2a658 commit 70f12e3

File tree

11 files changed

+215
-18
lines changed

11 files changed

+215
-18
lines changed

pkg/kv/kvnemesis/applier.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,14 +182,19 @@ func (a *Applier) applyOp(ctx context.Context, db *kv.DB, op *Operation) {
182182
o.Result = resultInit(ctx, err)
183183
case *StopNodeOperation:
184184
serverID := int(o.NodeId) - 1
185-
a.env.Restarter.StopServer(serverID)
185+
a.env.ServerController.StopServer(serverID)
186186
a.nodes.setStopped(int(o.NodeId))
187187
o.Result = resultInit(ctx, nil)
188188
case *RestartNodeOperation:
189189
serverID := int(o.NodeId) - 1
190-
err := a.env.Restarter.RestartServer(serverID)
190+
err := a.env.ServerController.RestartServer(serverID)
191191
a.nodes.setRunning(int(o.NodeId))
192192
o.Result = resultInit(ctx, err)
193+
case *CrashNodeOperation:
194+
serverID := int(o.NodeId) - 1
195+
a.env.ServerController.CrashNode(serverID)
196+
a.nodes.setCrashed(int(o.NodeId))
197+
o.Result = resultInit(ctx, nil)
193198
case *ClosureTxnOperation:
194199
// Use a backoff loop to avoid thrashing on txn aborts. Don't wait between
195200
// epochs of the same transaction to avoid waiting while holding locks.

pkg/kv/kvnemesis/env.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,20 @@ type Logger interface {
2727
WriteFile(basename string, contents string) string
2828
}
2929

30-
type Restarter interface {
30+
type ServerController interface {
3131
StopServer(idx int)
3232
RestartServer(idx int) error
33+
CrashNode(idx int)
3334
}
3435

3536
// Env manipulates the environment (cluster settings, zone configurations) that
3637
// the Applier operates in.
3738
type Env struct {
38-
SQLDBs []*gosql.DB
39-
Tracker *SeqTracker
40-
L Logger
41-
Partitioner *rpc.Partitioner
42-
Restarter Restarter
39+
SQLDBs []*gosql.DB
40+
Tracker *SeqTracker
41+
L Logger
42+
Partitioner *rpc.Partitioner
43+
ServerController ServerController
4344
}
4445

4546
func (e *Env) anyNode() *gosql.DB {

pkg/kv/kvnemesis/generator.go

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,8 @@ type FaultConfig struct {
423423
StopNode int
424424
// RestartNode is an operation that restarts a randomly chosen node.
425425
RestartNode int
426+
// CrashNode is an operation that crashes a randomly chosen node.
427+
CrashNode int
426428
// Disk stalls and other faults belong here.
427429
}
428430

@@ -554,6 +556,7 @@ func newAllOperationsConfig() GeneratorConfig {
554556
RemoveNetworkPartition: 1,
555557
StopNode: 1,
556558
RestartNode: 1,
559+
CrashNode: 1,
557560
},
558561
}}
559562
}
@@ -654,6 +657,7 @@ func NewDefaultConfig() GeneratorConfig {
654657
config.Ops.Fault.RemoveNetworkPartition = 0
655658
config.Ops.Fault.StopNode = 0
656659
config.Ops.Fault.RestartNode = 0
660+
config.Ops.Fault.CrashNode = 0
657661
return config
658662
}
659663

@@ -811,6 +815,7 @@ type nodes struct {
811815
mu syncutil.RWMutex
812816
running map[int]struct{}
813817
stopped map[int]struct{}
818+
crashed map[int]struct{}
814819
}
815820

816821
func randNodeFromMap(m map[int]struct{}, rng *rand.Rand) int {
@@ -825,11 +830,26 @@ func (n *nodes) removeRandRunning(rng *rand.Rand) int {
825830
return nodeID
826831
}
827832

828-
func (n *nodes) removeRandStopped(rng *rand.Rand) int {
833+
// removeRandStoppedOrCrashed randomly picks a node from either the stopped or
834+
// crashed sets with uniform probability across all nodes.
835+
func (n *nodes) removeRandStoppedOrCrashed(rng *rand.Rand) int {
829836
n.mu.Lock()
830837
defer n.mu.Unlock()
831-
nodeID := randNodeFromMap(n.stopped, rng)
832-
delete(n.stopped, nodeID)
838+
stopped := len(n.stopped)
839+
crashed := len(n.crashed)
840+
841+
if stopped == 0 && crashed == 0 {
842+
panic("no stopped or crashed nodes available")
843+
}
844+
845+
var nodeID int
846+
if rng.Intn(stopped+crashed) < stopped {
847+
nodeID = randNodeFromMap(n.stopped, rng)
848+
delete(n.stopped, nodeID)
849+
} else {
850+
nodeID = randNodeFromMap(n.crashed, rng)
851+
delete(n.crashed, nodeID)
852+
}
833853
return nodeID
834854
}
835855

@@ -845,6 +865,12 @@ func (n *nodes) setStopped(nodeID int) {
845865
n.stopped[nodeID] = struct{}{}
846866
}
847867

868+
func (n *nodes) setCrashed(nodeID int) {
869+
n.mu.Lock()
870+
defer n.mu.Unlock()
871+
n.crashed[nodeID] = struct{}{}
872+
}
873+
848874
// RandStep returns a single randomly generated next operation to execute.
849875
//
850876
// RandStep is not concurrency safe.
@@ -929,9 +955,12 @@ func (g *generator) RandStep(rng *rand.Rand) Step {
929955
if len(g.nodes.running) > 0 {
930956
addOpGen(&allowed, stopRandNode, g.Config.Ops.Fault.StopNode)
931957
}
932-
if len(g.nodes.stopped) > 0 {
958+
if len(g.nodes.stopped) > 0 || len(g.nodes.crashed) > 0 {
933959
addOpGen(&allowed, restartRandNode, g.Config.Ops.Fault.RestartNode)
934960
}
961+
if len(g.nodes.running) > 0 {
962+
addOpGen(&allowed, crashRandNode, g.Config.Ops.Fault.CrashNode)
963+
}
935964

936965
return step(g.selectOp(rng, allowed))
937966
}
@@ -1877,10 +1906,15 @@ func stopRandNode(g *generator, rng *rand.Rand) Operation {
18771906
}
18781907

18791908
func restartRandNode(g *generator, rng *rand.Rand) Operation {
1880-
randNode := g.nodes.removeRandStopped(rng)
1909+
randNode := g.nodes.removeRandStoppedOrCrashed(rng)
18811910
return restartNode(randNode)
18821911
}
18831912

1913+
func crashRandNode(g *generator, rng *rand.Rand) Operation {
1914+
randNode := g.nodes.removeRandRunning(rng)
1915+
return crashNode(randNode)
1916+
}
1917+
18841918
func isFollowerReadEligibleOp(op Operation) bool {
18851919
if op.Get != nil && op.Get.FollowerReadEligible {
18861920
return true
@@ -2529,6 +2563,10 @@ func restartNode(nodeID int) Operation {
25292563
return Operation{RestartNode: &RestartNodeOperation{NodeId: int32(nodeID)}}
25302564
}
25312565

2566+
func crashNode(nodeID int) Operation {
2567+
return Operation{CrashNode: &CrashNodeOperation{NodeId: int32(nodeID)}}
2568+
}
2569+
25322570
type countingRandSource struct {
25332571
count atomic.Uint64
25342572
inner rand.Source64

pkg/kv/kvnemesis/generator_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func TestRandStep(t *testing.T) {
8080
n := nodes{
8181
running: map[int]struct{}{1: {}, 2: {}, 3: {}},
8282
stopped: make(map[int]struct{}),
83+
crashed: make(map[int]struct{}),
8384
}
8485
g, err := MakeGenerator(config, getReplicasFn, 0, &n)
8586
require.NoError(t, err)
@@ -444,6 +445,11 @@ func TestRandStep(t *testing.T) {
444445
n.mu.Lock()
445446
n.running[int(o.NodeId)] = struct{}{}
446447
n.mu.Unlock()
448+
case *CrashNodeOperation:
449+
counts.Fault.CrashNode++
450+
n.mu.Lock()
451+
n.crashed[int(o.NodeId)] = struct{}{}
452+
n.mu.Unlock()
447453
default:
448454
t.Fatalf("%T", o)
449455
}

pkg/kv/kvnemesis/kvnemesis.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ func RunNemesis(
109109
n := nodes{
110110
running: make(map[int]struct{}),
111111
stopped: make(map[int]struct{}),
112+
crashed: make(map[int]struct{}),
112113
}
113114
for i := 1; i <= config.NumNodes; i++ {
114115
// In liveness mode, we don't allow stopping and restarting the two
@@ -197,7 +198,7 @@ func RunNemesis(
197198
}
198199
env.Partitioner.EnablePartitions(false)
199200
for i := 0; i < config.NumNodes; i++ {
200-
_ = env.Restarter.RestartServer(i)
201+
_ = env.ServerController.RestartServer(i)
201202
}
202203

203204
allSteps := make(steps, 0, numSteps)

pkg/kv/kvnemesis/kvnemesis_test.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ func (cfg kvnemesisTestCfg) testClusterArgs(
270270
}
271271
}
272272

273-
reg := fs.NewStickyRegistry()
273+
reg := fs.NewStickyRegistry(fs.UseStrictMemFS)
274274
lisReg := listenerutil.NewListenerRegistry()
275275
args := base.TestClusterArgs{
276276
ReusableListenerReg: lisReg,
@@ -583,6 +583,47 @@ func TestKVNemesisMultiNode_Restart_Liveness(t *testing.T) {
583583
})
584584
}
585585

586+
func TestKVNemesisMultiNode_Crash(t *testing.T) {
587+
defer leaktest.AfterTest(t)()
588+
defer log.Scope(t).Close(t)
589+
590+
testKVNemesisImpl(t, kvnemesisTestCfg{
591+
numNodes: 4,
592+
numSteps: defaultNumSteps,
593+
concurrency: 5,
594+
seedOverride: 0,
595+
invalidLeaseAppliedIndexProb: 0.2,
596+
injectReproposalErrorProb: 0.2,
597+
// assertRaftApply is disabled because the asserter assumes no node
598+
// restarts (see asserter.go:29).
599+
//
600+
// Crashing nodes violates this assumption since node crashes differ
601+
// from graceful restarts (abrupt stop vs a graceful shutdown).
602+
//
603+
// There's a race condition in crash simulation where
604+
// CrashNode/CrashClone() can run while the server is still active, so
605+
// WAL syncs and Raft applies can complete between the snapshot and
606+
// stopServerLocked(). When assertRaftApply is true, the Asserter may
607+
// record applies that aren't in the crash snapshot; after restart, the
608+
// replica recovers from an earlier snapshot and re-applies, causing the
609+
// Asserter to flag a false regression. This is a bug in the crash
610+
// simulation code, not a bug in the database logic.
611+
// TODO(dodeca12): resolve crash simulation bug for kvnemesis crashes for
612+
// when assertRaftApply is true.
613+
assertRaftApply: false,
614+
mode: Liveness,
615+
testGeneratorConfig: func(cfg *GeneratorConfig) {
616+
cfg.Ops.Fault.CrashNode = 1
617+
cfg.Ops.Fault.RestartNode = 1
618+
619+
// Disallow replica changes because they interfere with the zone config
620+
// constraints (at least one replica on nodes 1 and 2).
621+
cfg.Ops.ChangeReplicas = ChangeReplicasConfig{}
622+
},
623+
},
624+
)
625+
}
626+
586627
func TestKVNemesisMultiNode(t *testing.T) {
587628
defer leaktest.AfterTest(t)()
588629
defer log.Scope(t).Close(t)
@@ -697,7 +738,7 @@ func testKVNemesisImpl(t testing.TB, cfg kvnemesisTestCfg) {
697738

698739
logger := newTBridge(t)
699740
defer dumpRaftLogsOnFailure(t, logger.ll.dir, tc.Servers)
700-
env := &Env{SQLDBs: sqlDBs, Tracker: tr, L: logger, Partitioner: &partitioner, Restarter: tc}
741+
env := &Env{SQLDBs: sqlDBs, Tracker: tr, L: logger, Partitioner: &partitioner, ServerController: tc}
701742
failures, err := RunNemesis(ctx, rng, env, config, cfg.concurrency, cfg.numSteps, cfg.mode, dbs...)
702743

703744
logMetricsReport(t, tc)

pkg/kv/kvnemesis/operations.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ func (op Operation) Result() *Result {
7474
return &o.Result
7575
case *RestartNodeOperation:
7676
return &o.Result
77+
case *CrashNodeOperation:
78+
return &o.Result
7779
default:
7880
panic(errors.AssertionFailedf(`unknown operation: %T %v`, o, o))
7981
}
@@ -254,6 +256,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) {
254256
o.format(w, fctx)
255257
case *RestartNodeOperation:
256258
o.format(w, fctx)
259+
case *CrashNodeOperation:
260+
o.format(w, fctx)
257261
default:
258262
fmt.Fprintf(w, "%v", op.GetValue())
259263
}
@@ -526,12 +530,17 @@ func (op RemoveNetworkPartitionOperation) format(w *strings.Builder, fctx format
526530
}
527531

528532
func (op StopNodeOperation) format(w *strings.Builder, fctx formatCtx) {
529-
fmt.Fprintf(w, `env.Restarter.StopNode(%d)`, int(op.NodeId))
533+
fmt.Fprintf(w, `env.ServerController.StopNode(%d)`, int(op.NodeId))
530534
op.Result.format(w)
531535
}
532536

533537
func (op RestartNodeOperation) format(w *strings.Builder, fctx formatCtx) {
534-
fmt.Fprintf(w, `env.Restarter.RestartNode(%d)`, int(op.NodeId))
538+
fmt.Fprintf(w, `env.ServerController.RestartNode(%d)`, int(op.NodeId))
539+
op.Result.format(w)
540+
}
541+
542+
func (op CrashNodeOperation) format(w *strings.Builder, fctx formatCtx) {
543+
fmt.Fprintf(w, `env.ServerController.CrashNode(%d)`, int(op.NodeId))
535544
op.Result.format(w)
536545
}
537546

pkg/kv/kvnemesis/operations.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,11 @@ message RestartNodeOperation {
206206
Result result = 2 [(gogoproto.nullable) = false];
207207
}
208208

209+
message CrashNodeOperation {
210+
int32 node_id = 1;
211+
Result result = 2 [(gogoproto.nullable) = false];
212+
}
213+
209214
message Operation {
210215
option (gogoproto.goproto_stringer) = false;
211216
option (gogoproto.onlyone) = true;
@@ -242,6 +247,7 @@ message Operation {
242247
RemoveNetworkPartitionOperation removeNetworkPartition = 28;
243248
StopNodeOperation stopNode = 29;
244249
RestartNodeOperation restartNode = 30;
250+
CrashNodeOperation crashNode = 31;
245251
}
246252

247253
enum ResultType {

pkg/kv/kvnemesis/validator.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,6 +1024,9 @@ func (v *validator) processOp(op Operation) {
10241024
case *RestartNodeOperation:
10251025
execTimestampStrictlyOptional = true
10261026
v.checkError(op, t.Result)
1027+
case *CrashNodeOperation:
1028+
execTimestampStrictlyOptional = true
1029+
v.checkError(op, t.Result)
10271030
default:
10281031
panic(errors.AssertionFailedf(`unknown operation type: %T %v`, t, t))
10291032
}

pkg/testutils/testcluster/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ go_library(
4848
"//pkg/util/tracing/tracingpb",
4949
"@com_github_cockroachdb_errors//:errors",
5050
"@com_github_cockroachdb_logtags//:logtags",
51+
"@com_github_cockroachdb_pebble//vfs",
5152
"@com_github_stretchr_testify//require",
5253
],
5354
)

0 commit comments

Comments
 (0)