Skip to content

Commit 58a3f6f

Browse files
craig[bot]miraradevakyle-a-wongaerfrei
committed
150141: kvnemesis: add network partitions r=stevendanna a=miraradeva **rpc: extend the test partitioner to dynamic partitions** The `Partitioner` is a testing tool that uses the unary and stream client interceptor knobs to induce network partitions between nodes at the gRPC level. This commit extends the `Partitioner` to be able to add and remove partitions dynamically during a test, and to be able to create asymmetric partitions. Release note: None ---- **kvnemesis: add network partitions** This commit adds support for symmetric, asymmetric, partial and full partitions to kvnemesis, using the `rpc.Partitioner`. Release note: None ---- **kvnemesis: introduce safety and liveness modes** Currently, all kvnemesis test variants test both safety and liveness properties: safety in the sense of validating serializability, and liveness in the sense of failing if unavailability is encountered. With the introduction of various faults (e.g. network partitions), kvnemesis tests can continue to operate the same way only if we carefully craft the fault patterns in order to avoid unavailability. While this is valuable, it is also important to test for safety testing in the presence of unavailability. This commit introduces two new modes of running kvnemesis tests, both of which still validate serializability: - Safety mode: all fault patterns are allowed; unavailability errors are ignored. - Liveness mode: faults are introduced carefully to ensure a well-connected quorum is preserved; unavailability errors fail the test. Fixes: #114814 Release note: None 154662: sql: fix transaction diagnostics for implicit txns r=kyle-a-wong a=kyle-a-wong Fixes transaction diagnostic collection to work for implicit transactions Epic: [CRDB-53541](https://cockroachlabs.atlassian.net/browse/CRDB-53541) Release note: None ---- This is a stacked PR, only the last commit needs to be reviewed 154863: changefeedccl: prevent panic in per-table PTS mixed-version r=asg0451 a=aerfrei Fixes a panic that occurred when evaluating per-table protected timestamp settings. The fix adds a nil check on the change aggregator spec ProgressConfig before accessing the per-table PTS flag. Fixes: #154830 Epic: CRDB-1421 Release note: None Co-authored-by: Mira Radeva <[email protected]> Co-authored-by: Kyle Wong <[email protected]> Co-authored-by: Aerin Freilich <[email protected]>
4 parents 52833a4 + bb2f6ed + d36b323 + 83e01aa commit 58a3f6f

17 files changed

+730
-154
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1975,7 +1975,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19751975
return cf.frontier.Frontier()
19761976
}()
19771977

1978-
if cf.spec.ProgressConfig.PerTableProtectedTimestamps {
1978+
if cf.spec.ProgressConfig != nil && cf.spec.ProgressConfig.PerTableProtectedTimestamps {
19791979
return cf.managePerTableProtectedTimestamps(ctx, txn, &ptsEntries, highwater)
19801980
}
19811981

pkg/kv/kvclient/kvcoord/dist_sender_server_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4229,10 +4229,7 @@ func TestProxyTracing(t *testing.T) {
42294229
ctx := context.Background()
42304230

42314231
testutils.RunValues(t, "lease-type", roachpb.TestingAllLeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) {
4232-
if leaseType == roachpb.LeaseExpiration {
4233-
skip.UnderRace(t, "too slow")
4234-
skip.UnderDeadlock(t, "too slow")
4235-
} else if leaseType == roachpb.LeaseEpoch {
4232+
if leaseType == roachpb.LeaseEpoch {
42364233
// With epoch leases this test doesn't work reliably. It passes
42374234
// in cases where it should fail and fails in cases where it
42384235
// should pass.
@@ -4241,6 +4238,9 @@ func TestProxyTracing(t *testing.T) {
42414238
skip.IgnoreLint(t, "flaky with epoch leases")
42424239
}
42434240

4241+
skip.UnderRace(t, "too slow")
4242+
skip.UnderDeadlock(t, "too slow")
4243+
42444244
const numServers = 3
42454245
const numRanges = 3
42464246
st := cluster.MakeTestingClusterSettings()
@@ -4254,6 +4254,9 @@ func TestProxyTracing(t *testing.T) {
42544254
closedts.SideTransportCloseInterval.Override(ctx, &st.SV, 10*time.Millisecond)
42554255

42564256
var p rpc.Partitioner
4257+
// Partition between n1 and n3.
4258+
require.NoError(t, p.AddPartition(roachpb.NodeID(1), roachpb.NodeID(3)))
4259+
require.NoError(t, p.AddPartition(roachpb.NodeID(3), roachpb.NodeID(1)))
42574260
tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{
42584261
ServerArgs: base.TestServerArgs{
42594262
DefaultDRPCOption: base.TestDRPCDisabled,
@@ -4262,8 +4265,7 @@ func TestProxyTracing(t *testing.T) {
42624265
perNode := make(map[int]base.TestServerArgs)
42634266
for i := 0; i < numServers; i++ {
42644267
ctk := rpc.ContextTestingKnobs{}
4265-
// Partition between n1 and n3.
4266-
p.RegisterTestingKnobs(roachpb.NodeID(i+1), [][2]roachpb.NodeID{{1, 3}}, &ctk)
4268+
p.RegisterTestingKnobs(roachpb.NodeID(i+1), &ctk)
42674269
perNode[i] = base.TestServerArgs{
42684270
Settings: st,
42694271
Knobs: base.TestingKnobs{
@@ -4343,7 +4345,7 @@ func TestProxyTracing(t *testing.T) {
43434345
return checkLeaseCount(3, numRanges)
43444346
})
43454347

4346-
p.EnablePartition(true)
4348+
p.EnablePartitions(true)
43474349

43484350
_, err = conn.Exec("SET TRACING = on; SELECT FROM t where i = 987654321; SET TRACING = off")
43494351
require.NoError(t, err)

pkg/kv/kvclient/kvcoord/partial_partition_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ func TestPartialPartitionDirectFiveFail(t *testing.T) {
5858
// Additionally validate that a rangefeed sees the update.
5959
func testPartialPartition(t *testing.T, useProxy bool, numServers int) {
6060
skip.UnderDuress(t, "test does heavy lifting")
61-
partition := [][2]roachpb.NodeID{{1, 2}}
6261
ctx := context.Background()
6362

6463
t.Run(fmt.Sprintf("%t-%d", useProxy, numServers), func(t *testing.T) {
@@ -86,6 +85,9 @@ func testPartialPartition(t *testing.T, useProxy bool, numServers int) {
8685
zoneConfig.NumVoters = &numNodes
8786

8887
var p rpc.Partitioner
88+
// Partition between n1 and n2.
89+
require.NoError(t, p.AddPartition(roachpb.NodeID(1), roachpb.NodeID(2)))
90+
require.NoError(t, p.AddPartition(roachpb.NodeID(2), roachpb.NodeID(1)))
8991
tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{
9092
ServerArgs: base.TestServerArgs{
9193
DefaultDRPCOption: base.TestDRPCDisabled,
@@ -94,7 +96,7 @@ func testPartialPartition(t *testing.T, useProxy bool, numServers int) {
9496
perNode := make(map[int]base.TestServerArgs)
9597
for i := 0; i < numServers; i++ {
9698
ctk := rpc.ContextTestingKnobs{}
97-
p.RegisterTestingKnobs(roachpb.NodeID(i+1), partition, &ctk)
99+
p.RegisterTestingKnobs(roachpb.NodeID(i+1), &ctk)
98100
perNode[i] = base.TestServerArgs{
99101
Settings: st,
100102
DisableSQLServer: true,
@@ -148,7 +150,7 @@ func testPartialPartition(t *testing.T, useProxy bool, numServers int) {
148150
return nil
149151
})
150152

151-
p.EnablePartition(true)
153+
p.EnablePartitions(true)
152154

153155
txn := tc.ApplicationLayer(0).DB().NewTxn(ctx, "test")
154156
// DistSender will retry forever. For the failure cases we want

pkg/kv/kvnemesis/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ go_library(
3232
"//pkg/kv/kvserver/liveness",
3333
"//pkg/kv/kvtestutils",
3434
"//pkg/roachpb",
35+
"//pkg/rpc",
3536
"//pkg/settings/cluster",
3637
"//pkg/sql/catalog/bootstrap",
3738
"//pkg/storage",
@@ -77,6 +78,7 @@ go_test(
7778
deps = [
7879
"//pkg/base",
7980
"//pkg/config/zonepb",
81+
"//pkg/gossip",
8082
"//pkg/kv",
8183
"//pkg/kv/kvclient/kvcoord",
8284
"//pkg/kv/kvnemesis/kvnemesisutil",
@@ -89,12 +91,15 @@ go_test(
8991
"//pkg/kv/kvserver/kvserverpb",
9092
"//pkg/kv/kvtestutils",
9193
"//pkg/roachpb",
94+
"//pkg/rpc",
9295
"//pkg/security/securityassets",
9396
"//pkg/security/securitytest",
9497
"//pkg/server",
9598
"//pkg/settings/cluster",
99+
"//pkg/spanconfig",
96100
"//pkg/storage",
97101
"//pkg/storage/enginepb",
102+
"//pkg/testutils",
98103
"//pkg/testutils/datapathutils",
99104
"//pkg/testutils/echotest",
100105
"//pkg/testutils/serverutils",

pkg/kv/kvnemesis/applier.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package kvnemesis
77

88
import (
99
"context"
10+
"strings"
1011
"time"
1112

1213
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
@@ -98,7 +99,8 @@ func exceptUnhandledRetry(err error) bool {
9899
}
99100

100101
func exceptAmbiguous(err error) bool { // true if ambiguous result
101-
return errors.HasInterface(err, (*kvpb.ClientVisibleAmbiguousError)(nil))
102+
return errors.HasInterface(err, (*kvpb.ClientVisibleAmbiguousError)(nil)) ||
103+
strings.Contains(err.Error(), "result is ambiguous")
102104
}
103105

104106
func exceptDelRangeUsingTombstoneStraddlesRangeBoundary(err error) bool {
@@ -109,6 +111,15 @@ func exceptConditionFailed(err error) bool {
109111
return errors.HasType(err, (*kvpb.ConditionFailedError)(nil))
110112
}
111113

114+
func exceptReplicaUnavailable(err error) bool {
115+
return errors.HasType(err, (*kvpb.ReplicaUnavailableError)(nil))
116+
}
117+
118+
func exceptContextCanceled(err error) bool {
119+
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) ||
120+
strings.Contains(err.Error(), "query execution canceled")
121+
}
122+
112123
func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
113124
switch o := op.GetValue().(type) {
114125
case *GetOperation,
@@ -150,6 +161,12 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
150161
o.Result = resultInit(ctx, err)
151162
case *FlushLockTableOperation:
152163
o.Result = resultInit(ctx, db.FlushLockTable(ctx, o.Key, o.EndKey))
164+
case *AddNetworkPartitionOperation:
165+
err := env.Partitioner.AddPartition(roachpb.NodeID(o.FromNode), roachpb.NodeID(o.ToNode))
166+
o.Result = resultInit(ctx, err)
167+
case *RemoveNetworkPartitionOperation:
168+
err := env.Partitioner.RemovePartition(roachpb.NodeID(o.FromNode), roachpb.NodeID(o.ToNode))
169+
o.Result = resultInit(ctx, err)
153170
case *ClosureTxnOperation:
154171
// Use a backoff loop to avoid thrashing on txn aborts. Don't wait between
155172
// epochs of the same transaction to avoid waiting while holding locks.
@@ -745,7 +762,12 @@ func getRangeDesc(ctx context.Context, key roachpb.Key, dbs ...*kv.DB) roachpb.R
745762
var opts = retry.Options{}
746763
for r := retry.StartWithCtx(ctx, opts); r.Next(); dbIdx = (dbIdx + 1) % len(dbs) {
747764
sender := dbs[dbIdx].NonTransactionalSender()
748-
descs, _, err := kv.RangeLookup(ctx, sender, key, kvpb.CONSISTENT, 0, false)
765+
// Use kvpb.INCONSISTENT because kv.CONSISTENT requires a transactional
766+
// sender. In the generator, range lookups are usually used for finding
767+
// replica/lease change targets, so it's ok if these are not consistent.
768+
// Using kv.CONSISTENT with a non-transactional sender and in the presence
769+
// of network partitions can lead to infinitely stuck lookups.
770+
descs, _, err := kv.RangeLookup(ctx, sender, key, kvpb.INCONSISTENT, 0, false)
749771
if err != nil {
750772
log.Dev.Infof(ctx, "looking up descriptor for %s: %+v", key, err)
751773
continue
@@ -756,12 +778,11 @@ func getRangeDesc(ctx context.Context, key roachpb.Key, dbs ...*kv.DB) roachpb.R
756778
}
757779
return descs[0]
758780
}
759-
panic(`unreachable`)
781+
return roachpb.RangeDescriptor{}
760782
}
761783

762784
func newGetReplicasFn(dbs ...*kv.DB) GetReplicasFn {
763-
ctx := context.Background()
764-
return func(key roachpb.Key) ([]roachpb.ReplicationTarget, []roachpb.ReplicationTarget) {
785+
return func(ctx context.Context, key roachpb.Key) ([]roachpb.ReplicationTarget, []roachpb.ReplicationTarget) {
765786
desc := getRangeDesc(ctx, key, dbs...)
766787
replicas := desc.Replicas().Descriptors()
767788
var voters []roachpb.ReplicationTarget

pkg/kv/kvnemesis/env.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
1616
"github.com/cockroachdb/cockroach/pkg/kv/kvtestutils"
1717
"github.com/cockroachdb/cockroach/pkg/roachpb"
18+
"github.com/cockroachdb/cockroach/pkg/rpc"
1819
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
1920
"github.com/cockroachdb/errors"
2021
)
@@ -29,9 +30,10 @@ type Logger interface {
2930
// Env manipulates the environment (cluster settings, zone configurations) that
3031
// the Applier operates in.
3132
type Env struct {
32-
SQLDBs []*gosql.DB
33-
Tracker *SeqTracker
34-
L Logger
33+
SQLDBs []*gosql.DB
34+
Tracker *SeqTracker
35+
L Logger
36+
Partitioner *rpc.Partitioner
3537
}
3638

3739
func (e *Env) anyNode() *gosql.DB {

0 commit comments

Comments
 (0)