Skip to content

Commit b43afef

Browse files
committed
rpc: extend the test partitioner to dynamic partitions
The `Partitioner` is a testing tool that uses the unary and stream client interceptor knobs to induce network partitions between nodes at the gRPC level. This commit extends the `Partitioner` to be able to add and remove partitions dynamically during a test, and to be able to create asymmetric partitions. Release note: None
1 parent b0f5e8c commit b43afef

File tree

3 files changed

+93
-40
lines changed

3 files changed

+93
-40
lines changed

pkg/kv/kvclient/kvcoord/dist_sender_server_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4229,10 +4229,7 @@ func TestProxyTracing(t *testing.T) {
42294229
ctx := context.Background()
42304230

42314231
testutils.RunValues(t, "lease-type", roachpb.TestingAllLeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) {
4232-
if leaseType == roachpb.LeaseExpiration {
4233-
skip.UnderRace(t, "too slow")
4234-
skip.UnderDeadlock(t, "too slow")
4235-
} else if leaseType == roachpb.LeaseEpoch {
4232+
if leaseType == roachpb.LeaseEpoch {
42364233
// With epoch leases this test doesn't work reliably. It passes
42374234
// in cases where it should fail and fails in cases where it
42384235
// should pass.
@@ -4241,6 +4238,9 @@ func TestProxyTracing(t *testing.T) {
42414238
skip.IgnoreLint(t, "flaky with epoch leases")
42424239
}
42434240

4241+
skip.UnderRace(t, "too slow")
4242+
skip.UnderDeadlock(t, "too slow")
4243+
42444244
const numServers = 3
42454245
const numRanges = 3
42464246
st := cluster.MakeTestingClusterSettings()
@@ -4254,6 +4254,9 @@ func TestProxyTracing(t *testing.T) {
42544254
closedts.SideTransportCloseInterval.Override(ctx, &st.SV, 10*time.Millisecond)
42554255

42564256
var p rpc.Partitioner
4257+
// Partition between n1 and n3.
4258+
require.NoError(t, p.AddPartition(roachpb.NodeID(1), roachpb.NodeID(3)))
4259+
require.NoError(t, p.AddPartition(roachpb.NodeID(3), roachpb.NodeID(1)))
42574260
tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{
42584261
ServerArgs: base.TestServerArgs{
42594262
DefaultDRPCOption: base.TestDRPCDisabled,
@@ -4262,8 +4265,7 @@ func TestProxyTracing(t *testing.T) {
42624265
perNode := make(map[int]base.TestServerArgs)
42634266
for i := 0; i < numServers; i++ {
42644267
ctk := rpc.ContextTestingKnobs{}
4265-
// Partition between n1 and n3.
4266-
p.RegisterTestingKnobs(roachpb.NodeID(i+1), [][2]roachpb.NodeID{{1, 3}}, &ctk)
4268+
p.RegisterTestingKnobs(roachpb.NodeID(i+1), &ctk)
42674269
perNode[i] = base.TestServerArgs{
42684270
Settings: st,
42694271
Knobs: base.TestingKnobs{
@@ -4343,7 +4345,7 @@ func TestProxyTracing(t *testing.T) {
43434345
return checkLeaseCount(3, numRanges)
43444346
})
43454347

4346-
p.EnablePartition(true)
4348+
p.EnablePartitions(true)
43474349

43484350
_, err = conn.Exec("SET TRACING = on; SELECT FROM t where i = 987654321; SET TRACING = off")
43494351
require.NoError(t, err)

pkg/kv/kvclient/kvcoord/partial_partition_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ func TestPartialPartitionDirectFiveFail(t *testing.T) {
5858
// Additionally validate that a rangefeed sees the update.
5959
func testPartialPartition(t *testing.T, useProxy bool, numServers int) {
6060
skip.UnderDuress(t, "test does heavy lifting")
61-
partition := [][2]roachpb.NodeID{{1, 2}}
6261
ctx := context.Background()
6362

6463
t.Run(fmt.Sprintf("%t-%d", useProxy, numServers), func(t *testing.T) {
@@ -86,6 +85,9 @@ func testPartialPartition(t *testing.T, useProxy bool, numServers int) {
8685
zoneConfig.NumVoters = &numNodes
8786

8887
var p rpc.Partitioner
88+
// Partition between n1 and n2.
89+
require.NoError(t, p.AddPartition(roachpb.NodeID(1), roachpb.NodeID(2)))
90+
require.NoError(t, p.AddPartition(roachpb.NodeID(2), roachpb.NodeID(1)))
8991
tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{
9092
ServerArgs: base.TestServerArgs{
9193
DefaultDRPCOption: base.TestDRPCDisabled,
@@ -94,7 +96,7 @@ func testPartialPartition(t *testing.T, useProxy bool, numServers int) {
9496
perNode := make(map[int]base.TestServerArgs)
9597
for i := 0; i < numServers; i++ {
9698
ctk := rpc.ContextTestingKnobs{}
97-
p.RegisterTestingKnobs(roachpb.NodeID(i+1), partition, &ctk)
99+
p.RegisterTestingKnobs(roachpb.NodeID(i+1), &ctk)
98100
perNode[i] = base.TestServerArgs{
99101
Settings: st,
100102
DisableSQLServer: true,
@@ -148,7 +150,7 @@ func testPartialPartition(t *testing.T, useProxy bool, numServers int) {
148150
return nil
149151
})
150152

151-
p.EnablePartition(true)
153+
p.EnablePartitions(true)
152154

153155
txn := tc.ApplicationLayer(0).DB().NewTxn(ctx, "test")
154156
// DistSender will retry forever. For the failure cases we want

pkg/rpc/context_testutils.go

Lines changed: 79 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,16 @@ func (d disablingClientStream) RecvMsg(m interface{}) error {
129129

130130
// Partitioner is used to create partial partitions between nodes at the GRPC
131131
// layer. It uses StreamInterceptors to fail requests to nodes that are not
132-
// connected. Usage of it is something like the following:
132+
// connected. Node addresses need to be registered before enabling the
133+
// partition, but partitions can be added and removed at any point (before or
134+
// after starting the cluster or enabling the partition).
135+
//
136+
// Usage of it is something like the following:
133137
//
134138
// var p rpc.Partitioner
135139
//
136140
// for i := 0; i < numServers; i++ {
137-
// p.RegisterTestingKnobs(id, partitions, ContextTestingKnobs{})
141+
// p.RegisterTestingKnobs(id, ContextTestingKnobs{})
138142
// }
139143
//
140144
// TestCluster.Start()
@@ -143,57 +147,102 @@ func (d disablingClientStream) RecvMsg(m interface{}) error {
143147
// p.RegisterNodeAddr()
144148
// }
145149
//
146-
// p.EnablePartition(true)
147-
// ... run operations
150+
// p.AddPartition(from, to)
151+
//
152+
// p.EnablePartitions(true)
148153
//
149-
// TODO(baptist): This could be enhanced to allow dynamic partition injection.
154+
// p.{Add,Remove}Partition(from, to)
155+
// ... run operations
156+
// p.{Add,Remove}Partition(from, to)
150157
type Partitioner struct {
151-
partitionEnabled atomic.Bool
152-
nodeAddrMap syncutil.Map[string, roachpb.NodeID]
158+
partitionsEnabled atomic.Bool
159+
nodeAddrMap syncutil.Map[string, roachpb.NodeID]
160+
mu struct {
161+
syncutil.Mutex
162+
// partitions is a map from NodeID to a set of NodeIDs that the node should
163+
// not be able to connect to.
164+
partitions map[roachpb.NodeID]map[roachpb.NodeID]struct{}
165+
}
153166
}
154167

155-
// EnablePartition will enable or disable the partition.
156-
func (p *Partitioner) EnablePartition(enable bool) {
157-
p.partitionEnabled.Store(enable)
168+
// EnablePartitions will enable or disable the partition.
169+
func (p *Partitioner) EnablePartitions(enable bool) {
170+
p.partitionsEnabled.Store(enable)
158171
}
159172

160173
// RegisterNodeAddr is called after the cluster is started, but before
161-
// EnablePartition is called on every node to register the mapping from the
174+
// EnablePartitions is called on every node to register the mapping from the
162175
// address of the node to the NodeID.
163176
func (p *Partitioner) RegisterNodeAddr(addr string, id roachpb.NodeID) {
164-
if p.partitionEnabled.Load() {
177+
if p.partitionsEnabled.Load() {
165178
panic("Can not register node addresses with a partition enabled")
166179
}
167180
p.nodeAddrMap.Store(addr, &id)
168181
}
169182

170-
// RegisterTestingKnobs creates the testing knobs for this node. It will
171-
// override both the Unary and Stream Interceptors to return errors once
172-
// EnablePartition is called.
173-
func (p *Partitioner) RegisterTestingKnobs(
174-
id roachpb.NodeID, partition [][2]roachpb.NodeID, knobs *ContextTestingKnobs,
175-
) {
176-
// Structure the partition list for indexed lookup. We are partitioned from
177-
// the other node if we are found on either side of the pair.
178-
partitionedServers := make(map[roachpb.NodeID]bool)
179-
for _, p := range partition {
180-
if p[0] == id {
181-
partitionedServers[p[1]] = true
183+
func (p *Partitioner) AddPartition(from roachpb.NodeID, to roachpb.NodeID) error {
184+
if from == to {
185+
return errors.Newf("cannot add partition from node %d to itself", from)
186+
}
187+
p.mu.Lock()
188+
defer p.mu.Unlock()
189+
if p.mu.partitions == nil {
190+
p.mu.partitions = make(map[roachpb.NodeID]map[roachpb.NodeID]struct{})
191+
}
192+
if p.mu.partitions[from] == nil {
193+
p.mu.partitions[from] = make(map[roachpb.NodeID]struct{})
194+
}
195+
p.mu.partitions[from][to] = struct{}{}
196+
return nil
197+
}
198+
199+
func (p *Partitioner) RemovePartition(from roachpb.NodeID, to roachpb.NodeID) error {
200+
err := errors.Newf("cannot remove partition from node %d to %d; it doesn't exist", from, to)
201+
p.mu.Lock()
202+
defer p.mu.Unlock()
203+
if p.mu.partitions == nil {
204+
return err
205+
}
206+
if toNodes, ok := p.mu.partitions[from]; ok {
207+
if _, ok = toNodes[to]; ok {
208+
delete(toNodes, to)
209+
if len(toNodes) == 0 {
210+
delete(p.mu.partitions, from)
211+
}
212+
return nil
182213
}
183-
if p[1] == id {
184-
partitionedServers[p[0]] = true
214+
}
215+
return err
216+
}
217+
218+
func (p *Partitioner) isPartitioned(from roachpb.NodeID, to roachpb.NodeID) bool {
219+
p.mu.Lock()
220+
defer p.mu.Unlock()
221+
if p.mu.partitions == nil {
222+
return false
223+
}
224+
if toPartitions, ok := p.mu.partitions[from]; ok {
225+
if _, ok := toPartitions[to]; ok {
226+
return true
185227
}
186228
}
229+
return false
230+
}
231+
232+
// RegisterTestingKnobs creates the testing knobs for this node. It will
233+
// override both the Unary and Stream Interceptors to return errors once
234+
// EnablePartitions is called.
235+
func (p *Partitioner) RegisterTestingKnobs(id roachpb.NodeID, knobs *ContextTestingKnobs) {
187236
isPartitioned := func(addr string) error {
188-
if !p.partitionEnabled.Load() {
237+
if !p.partitionsEnabled.Load() {
189238
return nil
190239
}
191-
idPtr, ok := p.nodeAddrMap.Load(addr)
240+
toNodePtr, ok := p.nodeAddrMap.Load(addr)
192241
if !ok {
193242
panic("address not mapped, call RegisterNodeAddr before enabling the partition" + addr)
194243
}
195-
id := *idPtr
196-
if partitionedServers[id] {
244+
toNodeId := *toNodePtr
245+
if p.isPartitioned(id, toNodeId) {
197246
return errors.Newf("rpc error: partitioned from %s, n%d", addr, id)
198247
}
199248
return nil

0 commit comments

Comments
 (0)