Skip to content

Commit 339120e

Browse files
craig[bot]Dev-Kylejeffswenson
committed
149478: roachtest: add mutator to inject a network partition r=DarrylWong a=Dev-Kyle This change adds a new mutator that will enable the ability to randomly inject a network partition between one node and a random subset of other nodes. The network partition is safely recovered a random number of steps later, restricted by any steps that are incompatible with a network partition. Epic: None Release note: None Fixes: None 150935: backupsink: fix a resource leak in backupsink r=jeffswenson a=jeffswenson This fixes a bug in backupsink's Close implementation. If it was called with an un-flushed write (i.e. s.out != nil), Close would return early and leak the sstwriter. Fixes: #150831 Release note: none Co-authored-by: Kyle <[email protected]> Co-authored-by: Jeff Swenson <[email protected]>
3 parents 4713a4f + c47c22d + 126e1ab commit 339120e

File tree

11 files changed

+373
-32
lines changed

11 files changed

+373
-32
lines changed

pkg/backup/backupsink/file_sst_sink.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,11 +231,13 @@ func (s *FileSSTSink) Close() error {
231231
if s.cancel != nil {
232232
s.cancel()
233233
}
234+
235+
var err error
234236
if s.out != nil {
235-
return s.out.Close()
237+
err = s.out.Close()
236238
}
237239
s.sst.Close()
238-
return nil
240+
return err
239241
}
240242

241243
func (s *FileSSTSink) Flush(ctx context.Context) error {

pkg/backup/backupsink/file_sst_sink_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,25 @@ func TestFileSSTSinkWrite(t *testing.T) {
441441
}
442442
}
443443

444+
func TestFileSSTSink_CloseLeakRegression(t *testing.T) {
445+
defer leaktest.AfterTest(t)()
446+
defer log.Scope(t).Close(t)
447+
448+
// This test is a regression unit test for a resource leak in
449+
// FileSSTSink.Close. Previously, if Close was called with an in-complete
450+
// flush, it would leak the SST writer.
451+
452+
ctx := context.Background()
453+
st := cluster.MakeTestingClusterSettings()
454+
sink, _ := fileSSTSinkTestSetup(t, st, execinfrapb.ElidePrefix_None)
455+
456+
// Write a span to initialize s.out and s.sst.
457+
es := newExportedSpanBuilder("a", "b").withKVs([]kvAndTS{{key: "a", timestamp: 1}}).build()
458+
_, err := sink.Write(ctx, es)
459+
require.NoError(t, err)
460+
_ = sink.Close()
461+
}
462+
444463
func s2k(s string) roachpb.Key {
445464
tbl := 1
446465
k := []byte(s)

pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ go_library(
2525
"//pkg/cmd/roachtest/spec",
2626
"//pkg/cmd/roachtest/test",
2727
"//pkg/roachpb",
28+
"//pkg/roachprod/failureinjection/failures",
2829
"//pkg/roachprod/install",
2930
"//pkg/roachprod/logger",
3031
"//pkg/roachprod/vm",

pkg/cmd/roachtest/roachtestutil/mixedversion/context.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ type (
3434
// upgraded to a certain version, and the migrations are being
3535
// executed).
3636
Finalizing bool
37+
// hasUnavailableNodes indicates whether this step has any nodes
38+
// that are currently marked as unavailable.
39+
hasUnavailableNodes bool
3740

3841
// nodesByVersion maps released versions to which nodes are
3942
// currently running that version.

pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -934,9 +934,14 @@ func (t *Test) plan() (plan *TestPlan, retErr error) {
934934
hooks: t.hooks,
935935
prng: t.prng,
936936
bgChans: t.bgChans,
937+
logger: t.logger,
938+
cluster: t.cluster,
937939
}
938940
// Let's generate a plan.
939-
plan = planner.Plan()
941+
plan, err = planner.Plan()
942+
if err != nil {
943+
return nil, errors.Wrapf(err, "error generating test plan")
944+
}
940945
if plan.length <= t.options.maxNumPlanSteps {
941946
break
942947
}

pkg/cmd/roachtest/roachtestutil/mixedversion/mutators.go

Lines changed: 178 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010
"math/rand"
1111
"sort"
1212

13+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
1314
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade"
15+
"github.com/cockroachdb/cockroach/pkg/roachprod/failureinjection/failures"
1416
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
1517
"golang.org/x/exp/maps"
1618
)
@@ -54,7 +56,7 @@ func (m preserveDowngradeOptionRandomizerMutator) Probability() float64 {
5456
// mutations is always even.
5557
func (m preserveDowngradeOptionRandomizerMutator) Generate(
5658
rng *rand.Rand, plan *TestPlan, planner *testPlanner,
57-
) []mutation {
59+
) ([]mutation, error) {
5860
var mutations []mutation
5961
for _, upgradeSelector := range randomUpgrades(rng, plan) {
6062
removeExistingStep := upgradeSelector.
@@ -99,7 +101,7 @@ func (m preserveDowngradeOptionRandomizerMutator) Generate(
99101
mutations = append(mutations, addRandomly...)
100102
}
101103

102-
return mutations
104+
return mutations, nil
103105
}
104106

105107
// randomUpgrades returns selectors for the steps of a random subset
@@ -223,7 +225,7 @@ func (m clusterSettingMutator) Probability() float64 {
223225
// happen any time after cluster setup.
224226
func (m clusterSettingMutator) Generate(
225227
rng *rand.Rand, plan *TestPlan, planner *testPlanner,
226-
) []mutation {
228+
) ([]mutation, error) {
227229
var mutations []mutation
228230

229231
// possiblePointsInTime is the list of steps in the plan that are
@@ -264,7 +266,7 @@ func (m clusterSettingMutator) Generate(
264266
mutations = append(mutations, applyChange...)
265267
}
266268

267-
return mutations
269+
return mutations, nil
268270
}
269271

270272
// clusterSettingChangeStep encapsulates the information necessary to
@@ -401,7 +403,7 @@ func (m panicNodeMutator) Probability() float64 {
401403

402404
func (m panicNodeMutator) Generate(
403405
rng *rand.Rand, plan *TestPlan, planner *testPlanner,
404-
) []mutation {
406+
) ([]mutation, error) {
405407
var mutations []mutation
406408
upgrades := randomUpgrades(rng, plan)
407409
idx := newStepIndex(plan)
@@ -411,8 +413,10 @@ func (m panicNodeMutator) Generate(
411413
possiblePointsInTime := upgrade.
412414
// We don't want to panic concurrently with other steps, and inserting before a concurrent step
413415
// causes the step to run concurrently with that step, so we filter out any concurrent steps.
416+
// We don't want to panic the system on a node while a system node is already down, as that could cause
417+
// the cluster to lose quorum, so we filter out any steps with unavailable system nodes.
414418
Filter(func(s *singleStep) bool {
415-
return s.context.System.Stage >= InitUpgradeStage && !idx.IsConcurrent(s)
419+
return s.context.System.Stage >= InitUpgradeStage && !idx.IsConcurrent(s) && !s.context.System.hasUnavailableNodes
416420
})
417421

418422
targetNode := nodeList.SeededRandNode(rng)
@@ -441,7 +445,7 @@ func (m panicNodeMutator) Generate(
441445
firstStepInConcurrentBlock = nil
442446
}
443447

444-
return restart || waitForStable || runHook
448+
return restart || waitForStable || runHook || s.context.System.hasUnavailableNodes
445449
}
446450

447451
// The node should be restarted after the panic, but before any steps that are
@@ -468,19 +472,183 @@ func (m panicNodeMutator) Generate(
468472
addPanicStep := stepToPanic.
469473
InsertBefore(panicNodeStep{planner.currentContext.System.Descriptor.Nodes[0], targetNode})
470474
var addRestartStep []mutation
475+
var restartStep stepSelector
471476
// If validEndStep is nil, it means that there are no steps after the panic step that
472477
// are compatible with a dead node, so we immediately restart the node after the panic.
473478
if validEndStep == nil {
479+
restartStep = cutStep
474480
addRestartStep = cutStep.InsertBefore(restartNodeStep{planner.currentContext.System.Descriptor.Nodes[0], targetNode, planner.rt, restartDesc})
475481
} else {
476-
addRestartStep = validEndStep.
477-
RandomStep(rng).
482+
restartStep = validEndStep.RandomStep(rng)
483+
addRestartStep = restartStep.
478484
Insert(rng, restartNodeStep{planner.currentContext.System.Descriptor.Nodes[0], targetNode, planner.rt, restartDesc})
479485
}
480486

487+
failureContextSteps, _ := validStartStep.CutBefore(func(s *singleStep) bool {
488+
return s == restartStep[0]
489+
})
490+
failureContextSteps.MarkNodesUnavailable(true, false)
491+
481492
mutations = append(mutations, addPanicStep...)
482493
mutations = append(mutations, addRestartStep...)
483494
}
484495

485-
return mutations
496+
return mutations, nil
497+
}
498+
499+
type networkPartitionMutator struct{}
500+
501+
func (m networkPartitionMutator) Name() string { return failures.IPTablesNetworkPartitionName }
502+
503+
func (m networkPartitionMutator) Probability() float64 {
504+
return 0.3
505+
}
506+
507+
func (m networkPartitionMutator) Generate(
508+
rng *rand.Rand, plan *TestPlan, planner *testPlanner,
509+
) ([]mutation, error) {
510+
var mutations []mutation
511+
upgrades := randomUpgrades(rng, plan)
512+
idx := newStepIndex(plan)
513+
nodeList := planner.currentContext.System.Descriptor.Nodes
514+
515+
failure := failures.GetFailureRegistry()
516+
f, err := failure.GetFailer(planner.cluster.Name(), failures.IPTablesNetworkPartitionName, planner.logger)
517+
if err != nil {
518+
return nil, fmt.Errorf("failed to get failer for %s: %w", failures.IPTablesNetworkPartitionName, err)
519+
}
520+
521+
for _, upgrade := range upgrades {
522+
possiblePointsInTime := upgrade.
523+
Filter(func(s *singleStep) bool {
524+
// We don't want to set up a partition concurrently with other steps, and inserting
525+
// before a concurrent step causes the step to run concurrently with that step, so
526+
// we filter out any concurrent steps. We don't want to set up a partition while
527+
// nodes are unavailable, as that could cause the cluster to lose quorum,
528+
// so we filter out steps with unavailable nodes.
529+
var unavailableNodes bool
530+
if planner.isMultitenant() {
531+
unavailableNodes = s.context.Tenant.hasUnavailableNodes || s.context.System.hasUnavailableNodes
532+
} else {
533+
unavailableNodes = s.context.System.hasUnavailableNodes
534+
}
535+
return s.context.System.Stage >= InitUpgradeStage && !idx.IsConcurrent(s) && !unavailableNodes
536+
})
537+
538+
stepToPartition := possiblePointsInTime.RandomStep(rng)
539+
hasInvalidConcurrentStep := false
540+
var firstStepInConcurrentBlock *singleStep
541+
542+
isInvalidRecoverStep := func(s *singleStep) bool {
543+
// Restarting a node in the middle of a network partition has a chance of
544+
// loss of quorum, so we do should recover the network partition before this
545+
// if the restarted node is not the node being partitioned.
546+
// e.g. In a 4-node cluster, if node 1 is partitioned from nodes 2, 3, and
547+
// 4, then restarting node 2 would cause a loss of quorum since 3 and 4
548+
// cannot talk to 1.
549+
550+
// TODO: The partitioned node should be able to restart safely, provided
551+
// the necessary steps are altered to allow it.
552+
553+
_, restartSystem := s.impl.(restartWithNewBinaryStep)
554+
_, restartTenant := s.impl.(restartVirtualClusterStep)
555+
// Many hook steps require communication between specific nodes, so we
556+
// should recover the network partition before running them.
557+
_, runHook := s.impl.(runHookStep)
558+
559+
if idx.IsConcurrent(s) {
560+
if firstStepInConcurrentBlock == nil {
561+
firstStepInConcurrentBlock = s
562+
}
563+
hasInvalidConcurrentStep = true
564+
} else {
565+
hasInvalidConcurrentStep = false
566+
firstStepInConcurrentBlock = nil
567+
}
568+
569+
var unavailableNodes bool
570+
if planner.isMultitenant() {
571+
unavailableNodes = s.context.Tenant.hasUnavailableNodes || s.context.System.hasUnavailableNodes
572+
} else {
573+
unavailableNodes = s.context.System.hasUnavailableNodes
574+
}
575+
return unavailableNodes || restartTenant || restartSystem || runHook
576+
}
577+
578+
_, validStartStep := upgrade.CutAfter(func(s *singleStep) bool {
579+
return s == stepToPartition[0]
580+
})
581+
582+
validEndStep, _, cutStep := validStartStep.Cut(func(s *singleStep) bool {
583+
return isInvalidRecoverStep(s)
584+
})
585+
586+
// Inserting before a concurrent step will cause the step to run concurrently with that step,
587+
// so we remove the concurrent steps from the list of possible insertions if they contain
588+
// any invalid steps.
589+
if hasInvalidConcurrentStep {
590+
validEndStep, _ = validEndStep.CutAfter(func(s *singleStep) bool {
591+
return s == firstStepInConcurrentBlock
592+
})
593+
}
594+
595+
partitionedNode, leftPartition, rightPartition := selectPartitions(rng, nodeList)
596+
partitionType := failures.AllPartitionTypes[rng.Intn(len(failures.AllPartitionTypes))]
597+
598+
partition := failures.NetworkPartition{Source: leftPartition, Destination: rightPartition, Type: partitionType}
599+
600+
addPartition := stepToPartition.
601+
InsertBefore(networkPartitionInjectStep{f, partition, partitionedNode})
602+
var addRecoveryStep []mutation
603+
var recoveryStep stepSelector
604+
// If validEndStep is nil, it means that there are no steps after the partition step that are
605+
// compatible with a network partition, so we immediately restart the node after the partition.
606+
if validEndStep == nil {
607+
recoveryStep = cutStep
608+
addRecoveryStep = cutStep.InsertBefore(networkPartitionRecoveryStep{f, partition, partitionedNode})
609+
} else {
610+
recoveryStep = validEndStep.RandomStep(rng)
611+
addRecoveryStep = recoveryStep.
612+
Insert(rng, networkPartitionRecoveryStep{f, partition, partitionedNode})
613+
}
614+
615+
failureContextSteps, _ := validStartStep.CutBefore(func(s *singleStep) bool {
616+
return s == recoveryStep[0]
617+
})
618+
619+
failureContextSteps.MarkNodesUnavailable(true, true)
620+
621+
mutations = append(mutations, addPartition...)
622+
mutations = append(mutations, addRecoveryStep...)
623+
}
624+
625+
return mutations, nil
626+
}
627+
func selectPartitions(
628+
rng *rand.Rand, nodeList option.NodeListOption,
629+
) (option.NodeListOption, []install.Node, []install.Node) {
630+
rand.Shuffle(len(nodeList), func(i, j int) {
631+
nodeList[i], nodeList[j] = nodeList[j], nodeList[i]
632+
})
633+
partitionedNode := nodeList[0]
634+
635+
leftPartition := []install.Node{install.Node(partitionedNode)}
636+
var rightPartition []install.Node
637+
// To make an even distribution of partial vs total partitions, 50% of the
638+
// time we will default to a total partition, and the other 50% we will
639+
// randomly choose which nodes to partition.
640+
isTotalPartition := rng.Float64() < 0.5
641+
if isTotalPartition {
642+
for _, n := range nodeList[1:] {
643+
rightPartition = append(rightPartition, install.Node(n))
644+
}
645+
} else {
646+
rightPartition = append(rightPartition, install.Node(nodeList[1]))
647+
for _, n := range nodeList[2:] {
648+
if rng.Float64() < 0.5 {
649+
rightPartition = append(rightPartition, install.Node(n))
650+
}
651+
}
652+
}
653+
return option.NodeListOption{partitionedNode}, leftPartition, rightPartition
486654
}

pkg/cmd/roachtest/roachtestutil/mixedversion/mutators_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ func TestPreserveDowngradeOptionRandomizerMutator(t *testing.T) {
2929
require.NoError(t, err)
3030

3131
var mut preserveDowngradeOptionRandomizerMutator
32-
mutations := mut.Generate(newRand(), plan, nil)
32+
mutations, err := mut.Generate(newRand(), plan, nil)
33+
require.NoError(t, err)
3334
require.NotEmpty(t, mutations)
3435
require.True(t, len(mutations)%2 == 0, "should produce even number of mutations") // one removal and one insertion per upgrade
3536

@@ -97,7 +98,8 @@ func TestClusterSettingMutator(t *testing.T) {
9798

9899
const settingName = "test_cluster_setting"
99100
mut := newClusterSettingMutator(settingName, possibleValues, options...)
100-
mutations := mut.Generate(newRand(), plan, nil)
101+
mutations, err := mut.Generate(newRand(), plan, nil)
102+
require.NoError(t, err)
101103

102104
// Number of mutations should be 1 <= n <= maxChanges
103105
require.GreaterOrEqual(t, len(mutations), 1, "plan:\n%s", plan.PrettyPrint())

0 commit comments

Comments
 (0)