Skip to content

Commit a0b2049

Browse files
craig[bot]cthumuluru-crdb
andcommitted
Merge #157161
157161: rpc: return DRPC status from ConnHealth and GetBreaker when enabled r=cthumuluru-crdb a=cthumuluru-crdb Currently, ConnHealth and GetBreakerForAddr methods in nodedialer always return gRPC status even when DRPC is enabled. When DRPC is enabled, they should return the connection health and breaker status of DRPC instead. This change fixes that behavior. Fixes: #153860 Epic: CRDB-51459 Release note: None Co-authored-by: Chandra Thumuluru <[email protected]>
2 parents 616f0dd + d6c7d74 commit a0b2049

File tree

6 files changed

+333
-138
lines changed

6 files changed

+333
-138
lines changed

pkg/kv/kvserver/client_raft_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2850,6 +2850,10 @@ func TestReportUnreachableHeartbeats(t *testing.T) {
28502850
ctx := context.Background()
28512851
tc := testcluster.StartTestCluster(t, 3,
28522852
base.TestClusterArgs{
2853+
ServerArgs: base.TestServerArgs{
2854+
// TODO(server): enabled DRPC once serverutils adds support for DRPC.
2855+
DefaultDRPCOption: base.TestDRPCDisabled,
2856+
},
28532857
ReplicationMode: base.ReplicationManual,
28542858
})
28552859
defer tc.Stopper().Stop(ctx)
@@ -2911,6 +2915,10 @@ func TestReportUnreachableRemoveRace(t *testing.T) {
29112915
ctx := context.Background()
29122916
tc := testcluster.StartTestCluster(t, 3,
29132917
base.TestClusterArgs{
2918+
ServerArgs: base.TestServerArgs{
2919+
// TODO(server): enabled DRPC once serverutils adds support for DRPC.
2920+
DefaultDRPCOption: base.TestDRPCDisabled,
2921+
},
29142922
ReplicationMode: base.ReplicationManual,
29152923
})
29162924
defer tc.Stopper().Stop(ctx)

pkg/rpc/context.go

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1392,12 +1392,31 @@ func (rpcCtx *Context) ConnHealth(
13921392
if rpcCtx.GetLocalInternalClientForAddr(nodeID) != nil {
13931393
return nil
13941394
}
1395+
1396+
if !rpcbase.DRPCEnabled(context.Background(), rpcCtx.Settings) {
1397+
return rpcCtx.grpcConnHealth(target, nodeID, class)
1398+
}
1399+
return rpcCtx.drpcConnHealth(target, nodeID, class)
1400+
}
1401+
1402+
func (rpcCtx *Context) grpcConnHealth(
1403+
target string, nodeID roachpb.NodeID, class rpcbase.ConnectionClass,
1404+
) error {
13951405
if p, ok := rpcCtx.peers.get(peerKey{target, nodeID, class}); ok {
13961406
return p.c.Health()
13971407
}
13981408
return ErrNotHeartbeated
13991409
}
14001410

1411+
func (rpcCtx *Context) drpcConnHealth(
1412+
target string, nodeID roachpb.NodeID, class rpcbase.ConnectionClass,
1413+
) error {
1414+
if p, ok := rpcCtx.drpcPeers.get(peerKey{target, nodeID, class}); ok {
1415+
return p.c.Health()
1416+
}
1417+
return ErrNotHeartbeated
1418+
}
1419+
14011420
type transportType bool
14021421

14031422
const (
@@ -1499,14 +1518,32 @@ func (rpcCtx *Context) dialOptsLocal() ([]grpc.DialOption, error) {
14991518
func (rpcCtx *Context) GetBreakerForAddr(
15001519
nodeID roachpb.NodeID, class rpcbase.ConnectionClass, addr net.Addr,
15011520
) (*circuitbreaker.Breaker, bool) {
1502-
sAddr := addr.String()
1503-
rpcCtx.peers.mu.RLock()
1504-
defer rpcCtx.peers.mu.RUnlock()
1505-
p, ok := rpcCtx.peers.mu.m[peerKey{
1506-
TargetAddr: sAddr,
1521+
k := peerKey{
1522+
TargetAddr: addr.String(),
15071523
NodeID: nodeID,
15081524
Class: class,
1509-
}]
1525+
}
1526+
1527+
if !rpcbase.DRPCEnabled(context.Background(), rpcCtx.Settings) {
1528+
return rpcCtx.grpcGetBreakerForAddr(k)
1529+
}
1530+
return rpcCtx.drpcGetBreakerForAddr(k)
1531+
}
1532+
1533+
func (rpcCtx *Context) grpcGetBreakerForAddr(k peerKey) (*circuitbreaker.Breaker, bool) {
1534+
rpcCtx.peers.mu.RLock()
1535+
defer rpcCtx.peers.mu.RUnlock()
1536+
p, ok := rpcCtx.peers.mu.m[k]
1537+
if !ok {
1538+
return nil, false
1539+
}
1540+
return p.b, true
1541+
}
1542+
1543+
func (rpcCtx *Context) drpcGetBreakerForAddr(k peerKey) (*circuitbreaker.Breaker, bool) {
1544+
rpcCtx.drpcPeers.mu.RLock()
1545+
defer rpcCtx.drpcPeers.mu.RUnlock()
1546+
p, ok := rpcCtx.drpcPeers.mu.m[k]
15101547
if !ok {
15111548
return nil, false
15121549
}

pkg/rpc/nodedialer/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ go_test(
5252
"@com_github_cockroachdb_errors//:errors",
5353
"@com_github_stretchr_testify//assert",
5454
"@com_github_stretchr_testify//require",
55+
"@io_storj_drpc//drpcmigrate",
56+
"@io_storj_drpc//drpcmux",
57+
"@io_storj_drpc//drpcserver",
5558
"@org_golang_google_grpc//:grpc",
5659
"@org_golang_google_grpc//codes",
5760
],

pkg/rpc/nodedialer/nodedialer.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,11 @@ func (n *Dialer) ConnHealthTryDial(nodeID roachpb.NodeID, class rpcbase.Connecti
282282
return err
283283
}
284284
// NB: This will always return `ErrNotHeartbeated` since the heartbeat will
285-
// not be done by the time `Health` is called since GRPCDialNode is async.
286-
return n.rpcContext.GRPCDialNode(addr.String(), nodeID, locality, class).Health()
285+
// not be done by the time `Health` is called since DialNode is async.
286+
if !rpcbase.DRPCEnabled(context.Background(), n.rpcContext.Settings) {
287+
return n.rpcContext.GRPCDialNode(addr.String(), nodeID, locality, class).Health()
288+
}
289+
return n.rpcContext.DRPCDialNode(addr.String(), nodeID, locality, class).Health()
287290
}
288291

289292
// ConnHealthTryDialInstance returns nil if we have an open connection of the
@@ -298,7 +301,11 @@ func (n *Dialer) ConnHealthTryDialInstance(id base.SQLInstanceID, addr string) e
298301
addr, roachpb.NodeID(id), rpcbase.DefaultClass); err == nil {
299302
return nil
300303
}
301-
return n.rpcContext.GRPCDialPod(addr, id, roachpb.Locality{}, rpcbase.DefaultClass).Health()
304+
305+
if !rpcbase.DRPCEnabled(context.Background(), n.rpcContext.Settings) {
306+
return n.rpcContext.GRPCDialPod(addr, id, roachpb.Locality{}, rpcbase.DefaultClass).Health()
307+
}
308+
return n.rpcContext.DRPCDialPod(addr, id, roachpb.Locality{}, rpcbase.DefaultClass).Health()
302309
}
303310

304311
// GetCircuitBreaker retrieves the circuit breaker for connections to the

0 commit comments

Comments
 (0)