Skip to content

Commit 26f6d35

Browse files
craig[bot]shubhamdhama
andcommitted
Merge #148136
148136: kvserver: register `StoreLiveness` service with DRPC server r=cthumuluru-crdb a=shubhamdhama Enable the `StoreLiveness` service on the DRPC server in addition to gRPC. This is controlled by `rpc.experimental_drpc.enabled` (off by default). This change is part of a series and is similar to: #146926 Note: This only registers the service; the client is not updated to use the DRPC client, so this service will not have any functional effect. Epic: CRDB-48925 Release note: None Co-authored-by: Shubham Dhama <[email protected]>
2 parents f645ff3 + a2ee825 commit 26f6d35

File tree

6 files changed

+48
-9
lines changed

6 files changed

+48
-9
lines changed

pkg/kv/kvserver/store_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,10 @@ func createTestStoreWithoutStart(
248248
livenessInterval, heartbeatInterval := cfg.StoreLivenessDurations()
249249
supportGracePeriod := rpcContext.StoreLivenessWithdrawalGracePeriod()
250250
options := storeliveness.NewOptions(livenessInterval, heartbeatInterval, supportGracePeriod)
251-
transport := storeliveness.NewTransport(
252-
cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, grpcServer, nil, /* knobs */
251+
transport, err := storeliveness.NewTransport(
252+
cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, grpcServer, drpcServer, nil, /* knobs */
253253
)
254+
require.NoError(t, err)
254255
knobs := cfg.TestingKnobs.StoreLivenessKnobs
255256
cfg.StoreLiveness = storeliveness.NewNodeContainer(stopper, options, transport, knobs)
256257
}

pkg/kv/kvserver/storeliveness/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ go_library(
3737
"//pkg/util/timeutil",
3838
"@com_github_cockroachdb_errors//:errors",
3939
"@com_github_cockroachdb_redact//:redact",
40+
"@io_storj_drpc//:drpc",
4041
"@org_golang_google_grpc//:grpc",
4142
"@org_golang_x_exp//maps",
4243
],

pkg/kv/kvserver/storeliveness/transport.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
2222
"github.com/cockroachdb/errors"
2323
"google.golang.org/grpc"
24+
"storj.io/drpc"
2425
)
2526

2627
const (
@@ -89,8 +90,9 @@ func NewTransport(
8990
clock *hlc.Clock,
9091
dialer *nodedialer.Dialer,
9192
grpcServer *grpc.Server,
93+
drpcMux drpc.Mux,
9294
knobs *TransportKnobs,
93-
) *Transport {
95+
) (*Transport, error) {
9496
if knobs == nil {
9597
knobs = &TransportKnobs{}
9698
}
@@ -105,7 +107,19 @@ func NewTransport(
105107
if grpcServer != nil {
106108
slpb.RegisterStoreLivenessServer(grpcServer, t)
107109
}
108-
return t
110+
if drpcMux != nil {
111+
if err := slpb.DRPCRegisterStoreLiveness(drpcMux, t.AsDRPCServer()); err != nil {
112+
return nil, err
113+
}
114+
}
115+
return t, nil
116+
}
117+
118+
type drpcTransport Transport
119+
120+
// AsDRPCServer returns the DRPC server implementation for the StoreLiveness service.
121+
func (t *Transport) AsDRPCServer() slpb.DRPCStoreLivenessServer {
122+
return (*drpcTransport)(t)
109123
}
110124

111125
// Metrics returns metrics tracking this transport.
@@ -123,6 +137,18 @@ func (t *Transport) ListenMessages(storeID roachpb.StoreID, handler MessageHandl
123137
// only within the same RPC call; in other words, this guarantee does not hold
124138
// across stream re-connects.
125139
func (t *Transport) Stream(stream slpb.StoreLiveness_StreamServer) error {
140+
return t.stream(stream)
141+
}
142+
143+
// Stream proxies the incoming requests to the corresponding store's
144+
// MessageHandler. Messages between a pair of nodes are delivered in order
145+
// only within the same RPC call; in other words, this guarantee does not hold
146+
// across stream re-connects.
147+
func (t *drpcTransport) Stream(stream slpb.DRPCStoreLiveness_StreamStream) error {
148+
return (*Transport)(t).stream(stream)
149+
}
150+
151+
func (t *Transport) stream(stream slpb.RPCStoreLiveness_StreamStream) error {
126152
errCh := make(chan error, 1)
127153

128154
// Node stopping error is caught below in the select.

pkg/kv/kvserver/storeliveness/transport_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,14 +119,18 @@ func (tt *transportTester) AddNodeWithoutGossip(
119119
tt.clocks[nodeID] = clockWithManualSource{manual: manual, clock: clock}
120120
grpcServer, err := rpc.NewServer(context.Background(), tt.nodeRPCContext)
121121
require.NoError(tt.t, err)
122-
transport := NewTransport(
122+
drpcServer, err := rpc.NewDRPCServer(context.Background(), tt.nodeRPCContext)
123+
require.NoError(tt.t, err)
124+
transport, err := NewTransport(
123125
log.MakeTestingAmbientCtxWithNewTracer(),
124126
tt.stopper,
125127
clock,
126128
nodedialer.New(tt.nodeRPCContext, gossip.AddressResolver(tt.gossip)),
127129
grpcServer,
130+
drpcServer,
128131
nil, /* knobs */
129132
)
133+
require.NoError(tt.t, err)
130134
tt.transports[nodeID] = transport
131135

132136
listener, err := netutil.ListenAndServeGRPC(stopper, grpcServer, util.TestAddr)

pkg/server/server.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -660,9 +660,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
660660
livenessInterval, heartbeatInterval := cfg.StoreLivenessDurations()
661661
supportGracePeriod := rpcContext.StoreLivenessWithdrawalGracePeriod()
662662
options := storeliveness.NewOptions(livenessInterval, heartbeatInterval, supportGracePeriod)
663-
transport := storeliveness.NewTransport(
664-
cfg.AmbientCtx, stopper, clock, kvNodeDialer, grpcServer.Server, nil, /* knobs */
663+
transport, err := storeliveness.NewTransport(
664+
cfg.AmbientCtx, stopper, clock, kvNodeDialer,
665+
grpcServer.Server, drpcServer.DRPCServer, nil, /* knobs */
665666
)
667+
if err != nil {
668+
return nil, err
669+
}
666670
nodeRegistry.AddMetricStruct(transport.Metrics())
667671
var knobs *storeliveness.TestingKnobs
668672
if storeKnobs := cfg.TestingKnobs.Store; storeKnobs != nil {

pkg/testutils/localtestcluster/local_test_cluster.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,10 +208,13 @@ func (ltc *LocalTestCluster) Start(t testing.TB, initFactory InitFactoryFn) {
208208
livenessInterval, heartbeatInterval := cfg.StoreLivenessDurations()
209209
supportGracePeriod := cfg.RPCContext.StoreLivenessWithdrawalGracePeriod()
210210
options := storeliveness.NewOptions(livenessInterval, heartbeatInterval, supportGracePeriod)
211-
transport := storeliveness.NewTransport(
211+
transport, err := storeliveness.NewTransport(
212212
cfg.AmbientCtx, ltc.stopper, ltc.Clock,
213-
nil /* dialer */, nil /* grpcServer */, nil, /* knobs */
213+
nil /* dialer */, nil /* grpcServer */, nil /* drpcServer */, nil, /* knobs */
214214
)
215+
if err != nil {
216+
t.Fatal(err)
217+
}
215218
knobs := cfg.TestingKnobs.StoreLivenessKnobs
216219
cfg.StoreLiveness = storeliveness.NewNodeContainer(ltc.stopper, options, transport, knobs)
217220
}

0 commit comments

Comments
 (0)