Skip to content

Commit 8c93138

Browse files
committed
kvnemesis: add network partitions
This commit adds support for symmetric, asymmetric, partial and full partitions to kvnemesis, using the `rpc.Partitioner`. Release note: None
1 parent b43afef commit 8c93138

File tree

10 files changed

+204
-27
lines changed

10 files changed

+204
-27
lines changed

pkg/kv/kvnemesis/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ go_library(
3232
"//pkg/kv/kvserver/liveness",
3333
"//pkg/kv/kvtestutils",
3434
"//pkg/roachpb",
35+
"//pkg/rpc",
3536
"//pkg/settings/cluster",
3637
"//pkg/sql/catalog/bootstrap",
3738
"//pkg/storage",
@@ -77,6 +78,7 @@ go_test(
7778
deps = [
7879
"//pkg/base",
7980
"//pkg/config/zonepb",
81+
"//pkg/gossip",
8082
"//pkg/kv",
8183
"//pkg/kv/kvclient/kvcoord",
8284
"//pkg/kv/kvnemesis/kvnemesisutil",
@@ -89,6 +91,7 @@ go_test(
8991
"//pkg/kv/kvserver/kvserverpb",
9092
"//pkg/kv/kvtestutils",
9193
"//pkg/roachpb",
94+
"//pkg/rpc",
9295
"//pkg/security/securityassets",
9396
"//pkg/security/securitytest",
9497
"//pkg/server",

pkg/kv/kvnemesis/applier.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,12 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
150150
o.Result = resultInit(ctx, err)
151151
case *FlushLockTableOperation:
152152
o.Result = resultInit(ctx, db.FlushLockTable(ctx, o.Key, o.EndKey))
153+
case *AddNetworkPartitionOperation:
154+
err := env.Partitioner.AddPartition(roachpb.NodeID(o.FromNode), roachpb.NodeID(o.ToNode))
155+
o.Result = resultInit(ctx, err)
156+
case *RemoveNetworkPartitionOperation:
157+
err := env.Partitioner.RemovePartition(roachpb.NodeID(o.FromNode), roachpb.NodeID(o.ToNode))
158+
o.Result = resultInit(ctx, err)
153159
case *ClosureTxnOperation:
154160
// Use a backoff loop to avoid thrashing on txn aborts. Don't wait between
155161
// epochs of the same transaction to avoid waiting while holding locks.

pkg/kv/kvnemesis/env.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
1616
"github.com/cockroachdb/cockroach/pkg/kv/kvtestutils"
1717
"github.com/cockroachdb/cockroach/pkg/roachpb"
18+
"github.com/cockroachdb/cockroach/pkg/rpc"
1819
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
1920
"github.com/cockroachdb/errors"
2021
)
@@ -29,9 +30,10 @@ type Logger interface {
2930
// Env manipulates the environment (cluster settings, zone configurations) that
3031
// the Applier operates in.
3132
type Env struct {
32-
SQLDBs []*gosql.DB
33-
Tracker *SeqTracker
34-
L Logger
33+
SQLDBs []*gosql.DB
34+
Tracker *SeqTracker
35+
L Logger
36+
Partitioner *rpc.Partitioner
3537
}
3638

3739
func (e *Env) anyNode() *gosql.DB {

pkg/kv/kvnemesis/generator.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type OperationConfig struct {
6060
ChangeLease ChangeLeaseConfig
6161
ChangeSetting ChangeSettingConfig
6262
ChangeZone ChangeZoneConfig
63+
Fault FaultConfig
6364
}
6465

6566
// ClosureTxnConfig configures the relative probability of running some
@@ -392,6 +393,20 @@ type SavepointConfig struct {
392393
SavepointRollback int
393394
}
394395

396+
// FaultConfig configures the relative probabilities of generating different
397+
// types of faults. Network partitions can be symmetric or asymmetric, partial
398+
// or full, but they may need multiple operations to set up; e.g. a symmetric
399+
// partition between node A and node B requires to partitions: from A to B, and
400+
// from B to A.
401+
type FaultConfig struct {
402+
// AddNetworkPartition is an operation that simulates a network partition.
403+
AddNetworkPartition int
404+
// RemoveNetworkPartition is an operation that simulates healing a network
405+
// partition.
406+
RemoveNetworkPartition int
407+
// Disk stalls and node crashes belong here.
408+
}
409+
395410
// newAllOperationsConfig returns a GeneratorConfig that exercises *all*
396411
// options. You probably want NewDefaultConfig. Most of the time, these will be
397412
// the same, but having both allows us to merge code for operations that do not
@@ -511,6 +526,10 @@ func newAllOperationsConfig() GeneratorConfig {
511526
ChangeZone: ChangeZoneConfig{
512527
ToggleGlobalReads: 1,
513528
},
529+
Fault: FaultConfig{
530+
AddNetworkPartition: 1,
531+
RemoveNetworkPartition: 1,
532+
},
514533
}}
515534
}
516535

@@ -603,6 +622,11 @@ func NewDefaultConfig() GeneratorConfig {
603622
config.Ops.ClosureTxn.CommitBatchOps.FlushLockTable = 0
604623
config.Ops.ClosureTxn.TxnClientOps.FlushLockTable = 0
605624
config.Ops.ClosureTxn.TxnBatchOps.Ops.FlushLockTable = 0
625+
626+
// Network partitions can result in unavailability and need to be enabled with
627+
// care by specific test variants.
628+
config.Ops.Fault.AddNetworkPartition = 0
629+
config.Ops.Fault.RemoveNetworkPartition = 0
606630
return config
607631
}
608632

@@ -663,13 +687,27 @@ func MakeGenerator(config GeneratorConfig, replicasFn GetReplicasFn) (*Generator
663687
return nil, errors.Errorf(`NumReplicas (%d) must <= NumNodes (%d)`,
664688
config.NumReplicas, config.NumNodes)
665689
}
690+
p := partitions{
691+
healthy: make(map[connection]struct{}),
692+
partitioned: make(map[connection]struct{}),
693+
}
694+
for i := 1; i <= config.NumNodes; i++ {
695+
for j := 1; j <= config.NumNodes; j++ {
696+
if i == j {
697+
continue
698+
}
699+
conn := connection{from: i, to: j}
700+
p.healthy[conn] = struct{}{}
701+
}
702+
}
666703
g := &Generator{}
667704
g.mu.generator = generator{
668705
Config: config,
669706
replicasFn: replicasFn,
670707
keys: make(map[string]string),
671708
currentSplits: make(map[string]struct{}),
672709
historicalSplits: make(map[string]struct{}),
710+
partitions: p,
673711
}
674712
return g, nil
675713
}
@@ -703,6 +741,20 @@ type generator struct {
703741
// emitted, regardless of whether the split has since been applied or been
704742
// merged away again.
705743
historicalSplits map[string]struct{}
744+
745+
// partitions contains the sets of healthy and partitioned connections
746+
// between nodes.
747+
partitions
748+
}
749+
750+
type connection struct {
751+
from int // node ID
752+
to int // node ID
753+
}
754+
755+
type partitions struct {
756+
healthy map[connection]struct{}
757+
partitioned map[connection]struct{}
706758
}
707759

708760
// RandStep returns a single randomly generated next operation to execute.
@@ -763,6 +815,8 @@ func (g *generator) RandStep(rng *rand.Rand) Step {
763815

764816
addOpGen(&allowed, setLeaseType, g.Config.Ops.ChangeSetting.SetLeaseType)
765817
addOpGen(&allowed, toggleGlobalReads, g.Config.Ops.ChangeZone.ToggleGlobalReads)
818+
addOpGen(&allowed, addRandNetworkPartition, g.Config.Ops.Fault.AddNetworkPartition)
819+
addOpGen(&allowed, removeRandNetworkPartition, g.Config.Ops.Fault.RemoveNetworkPartition)
766820

767821
return step(g.selectOp(rng, allowed))
768822
}
@@ -1643,6 +1697,34 @@ func toggleGlobalReads(_ *generator, _ *rand.Rand) Operation {
16431697
return changeZone(ChangeZoneType_ToggleGlobalReads)
16441698
}
16451699

1700+
func addRandNetworkPartition(g *generator, rng *rand.Rand) Operation {
1701+
if len(g.partitions.healthy) == 0 {
1702+
return addNetworkPartition(0, 0)
1703+
}
1704+
all := make([]connection, 0, len(g.partitions.healthy))
1705+
for conn := range g.partitions.healthy {
1706+
all = append(all, conn)
1707+
}
1708+
randConn := all[rng.Intn(len(all))]
1709+
delete(g.partitions.healthy, randConn)
1710+
g.partitions.partitioned[randConn] = struct{}{}
1711+
return addNetworkPartition(randConn.from, randConn.to)
1712+
}
1713+
1714+
func removeRandNetworkPartition(g *generator, rng *rand.Rand) Operation {
1715+
if len(g.partitions.partitioned) == 0 {
1716+
return removeNetworkPartition(0, 0)
1717+
}
1718+
all := make([]connection, 0, len(g.partitions.partitioned))
1719+
for conn := range g.partitions.partitioned {
1720+
all = append(all, conn)
1721+
}
1722+
randConn := all[rng.Intn(len(all))]
1723+
delete(g.partitions.partitioned, randConn)
1724+
g.partitions.healthy[randConn] = struct{}{}
1725+
return removeNetworkPartition(randConn.from, randConn.to)
1726+
}
1727+
16461728
func makeRandBatch(c *ClientOperationConfig) opGenFunc {
16471729
return func(g *generator, rng *rand.Rand) Operation {
16481730
var allowed []opGen
@@ -2252,6 +2334,18 @@ func rollbackSavepoint(id int) Operation {
22522334
return Operation{SavepointRollback: &SavepointRollbackOperation{ID: int32(id)}}
22532335
}
22542336

2337+
func addNetworkPartition(from int, to int) Operation {
2338+
return Operation{
2339+
AddNetworkPartition: &AddNetworkPartitionOperation{FromNode: int32(from), ToNode: int32(to)},
2340+
}
2341+
}
2342+
2343+
func removeNetworkPartition(from int, to int) Operation {
2344+
return Operation{
2345+
RemoveNetworkPartition: &RemoveNetworkPartitionOperation{FromNode: int32(from), ToNode: int32(to)},
2346+
}
2347+
}
2348+
22552349
type countingRandSource struct {
22562350
count atomic.Uint64
22572351
inner rand.Source64

pkg/kv/kvnemesis/generator_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,10 @@ func TestRandStep(t *testing.T) {
418418
case ChangeZoneType_ToggleGlobalReads:
419419
counts.ChangeZone.ToggleGlobalReads++
420420
}
421+
case *AddNetworkPartitionOperation:
422+
counts.Fault.AddNetworkPartition++
423+
case *RemoveNetworkPartitionOperation:
424+
counts.Fault.RemoveNetworkPartition++
421425
default:
422426
t.Fatalf("%T", o)
423427
}

pkg/kv/kvnemesis/kvnemesis.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,11 @@ func RunNemesis(
142142
}
143143
return nil
144144
}
145+
env.Partitioner.EnablePartitions(true)
145146
if err := ctxgroup.GroupWorkers(ctx, concurrency, workerFn); err != nil {
146147
return nil, err
147148
}
149+
env.Partitioner.EnablePartitions(false)
148150

149151
allSteps := make(steps, 0, numSteps)
150152
for _, steps := range stepsByWorker {

pkg/kv/kvnemesis/kvnemesis_test.go

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"time"
1616

1717
"github.com/cockroachdb/cockroach/pkg/base"
18+
"github.com/cockroachdb/cockroach/pkg/gossip"
1819
"github.com/cockroachdb/cockroach/pkg/kv"
1920
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
2021
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
@@ -25,6 +26,8 @@ import (
2526
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
2627
"github.com/cockroachdb/cockroach/pkg/kv/kvtestutils"
2728
"github.com/cockroachdb/cockroach/pkg/roachpb"
29+
"github.com/cockroachdb/cockroach/pkg/rpc"
30+
"github.com/cockroachdb/cockroach/pkg/server"
2831
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2932
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
3033
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
@@ -46,7 +49,7 @@ import (
4649
var defaultNumSteps = envutil.EnvOrDefaultInt("COCKROACH_KVNEMESIS_STEPS", 100)
4750

4851
func (cfg kvnemesisTestCfg) testClusterArgs(
49-
ctx context.Context, tr *SeqTracker,
52+
ctx context.Context, tr *SeqTracker, partitioner *rpc.Partitioner,
5053
) base.TestClusterArgs {
5154
storeKnobs := &kvserver.StoreTestingKnobs{
5255
DisableRaftLogQueue: true,
@@ -173,31 +176,46 @@ func (cfg kvnemesisTestCfg) testClusterArgs(
173176
cfg.testSettings(ctx, st)
174177
}
175178

176-
args := base.TestClusterArgs{
177-
ServerArgs: base.TestServerArgs{
178-
Knobs: base.TestingKnobs{
179-
Store: storeKnobs,
180-
KVClient: &kvcoord.ClientTestingKnobs{
181-
// Don't let DistSender split DeleteRangeUsingTombstone across range boundaries.
182-
// This does happen in real CRDB, but leads to separate atomic subunits, which
183-
// would add complexity to kvnemesis that isn't worth it. Instead, the operation
184-
// generator for the most part tries to avoid range-spanning requests, and the
185-
// ones that do end up happening get a hard error.
186-
OnRangeSpanningNonTxnalBatch: func(ba *kvpb.BatchRequest) *kvpb.Error {
187-
for _, req := range ba.Requests {
188-
if req.GetInner().Method() != kvpb.DeleteRange {
189-
continue
190-
}
191-
if req.GetDeleteRange().UseRangeTombstone == true {
192-
return kvpb.NewError(errDelRangeUsingTombstoneStraddlesRangeBoundary)
193-
}
179+
commonServerArgs := base.TestServerArgs{
180+
Knobs: base.TestingKnobs{
181+
Store: storeKnobs,
182+
KVClient: &kvcoord.ClientTestingKnobs{
183+
// Don't let DistSender split DeleteRangeUsingTombstone across range boundaries.
184+
// This does happen in real CRDB, but leads to separate atomic subunits, which
185+
// would add complexity to kvnemesis that isn't worth it. Instead, the operation
186+
// generator for the most part tries to avoid range-spanning requests, and the
187+
// ones that do end up happening get a hard error.
188+
OnRangeSpanningNonTxnalBatch: func(ba *kvpb.BatchRequest) *kvpb.Error {
189+
for _, req := range ba.Requests {
190+
if req.GetInner().Method() != kvpb.DeleteRange {
191+
continue
192+
}
193+
if req.GetDeleteRange().UseRangeTombstone == true {
194+
return kvpb.NewError(errDelRangeUsingTombstoneStraddlesRangeBoundary)
194195
}
195-
return nil
196-
},
196+
}
197+
return nil
197198
},
198199
},
199-
Settings: st,
200200
},
201+
Settings: st,
202+
}
203+
204+
args := base.TestClusterArgs{
205+
ServerArgs: commonServerArgs,
206+
ServerArgsPerNode: func() map[int]base.TestServerArgs {
207+
perNode := make(map[int]base.TestServerArgs)
208+
for i := 0; i < cfg.numNodes; i++ {
209+
ctk := rpc.ContextTestingKnobs{}
210+
partitioner.RegisterTestingKnobs(roachpb.NodeID(i+1), &ctk)
211+
perNodeServerArgs := commonServerArgs
212+
perNodeServerArgs.Knobs.Server = &server.TestingKnobs{
213+
ContextTestingKnobs: ctk,
214+
}
215+
perNode[i] = perNodeServerArgs
216+
}
217+
return perNode
218+
}(),
201219
}
202220

203221
if cfg.testArgs != nil {
@@ -470,8 +488,15 @@ func testKVNemesisImpl(t testing.TB, cfg kvnemesisTestCfg) {
470488
// 4 nodes so we have somewhere to move 3x replicated ranges to.
471489
ctx := context.Background()
472490
tr := &SeqTracker{}
473-
tc := testcluster.StartTestCluster(t, cfg.numNodes, cfg.testClusterArgs(ctx, tr))
491+
var partitioner rpc.Partitioner
492+
tc := testcluster.StartTestCluster(t, cfg.numNodes, cfg.testClusterArgs(ctx, tr, &partitioner))
474493
defer tc.Stopper().Stop(ctx)
494+
for i := 0; i < cfg.numNodes; i++ {
495+
g := tc.Servers[i].StorageLayer().GossipI().(*gossip.Gossip)
496+
addr := g.GetNodeAddr().String()
497+
nodeID := g.NodeID.Get()
498+
partitioner.RegisterNodeAddr(addr, nodeID)
499+
}
475500
dbs, sqlDBs := make([]*kv.DB, cfg.numNodes), make([]*gosql.DB, cfg.numNodes)
476501
for i := 0; i < cfg.numNodes; i++ {
477502
dbs[i] = tc.Server(i).DB()
@@ -500,7 +525,7 @@ func testKVNemesisImpl(t testing.TB, cfg kvnemesisTestCfg) {
500525

501526
logger := newTBridge(t)
502527
defer dumpRaftLogsOnFailure(t, logger.ll.dir, tc.Servers)
503-
env := &Env{SQLDBs: sqlDBs, Tracker: tr, L: logger}
528+
env := &Env{SQLDBs: sqlDBs, Tracker: tr, L: logger, Partitioner: &partitioner}
504529
failures, err := RunNemesis(ctx, rng, env, config, cfg.concurrency, cfg.numSteps, dbs...)
505530

506531
for i := 0; i < cfg.numNodes; i++ {

0 commit comments

Comments
 (0)