Skip to content

Commit 7c55880

Browse files
rpc: add DRPC dial methods
This commit adds DRPC dial methods to `rpc.nodedialer` and `rpc.Context`. In the future commits, helper functions for creation RPC clients are update to create gRPC or DRPC clients base on `rpc.experimental_drpc.enabled` cluster setting. Epic: CRDB-48923 Fixes: #148383 Release note: none
1 parent 7304665 commit 7c55880

File tree

3 files changed

+247
-4
lines changed

3 files changed

+247
-4
lines changed

pkg/rpc/context.go

Lines changed: 83 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import (
5151
"google.golang.org/grpc/encoding"
5252
"google.golang.org/grpc/metadata"
5353
"google.golang.org/grpc/stats"
54+
"storj.io/drpc"
5455
)
5556

5657
// NewServer sets up an RPC server. Depending on the ServerOptions, the Server
@@ -226,7 +227,10 @@ type Context struct {
226227

227228
localInternalClient RestrictedInternalClient
228229

230+
// peers is a map of gRPC connections to other nodes.
229231
peers peerMap[*grpc.ClientConn]
232+
// drpcPeers is a map of DRPC connections to other nodes.
233+
drpcPeers peerMap[drpc.Conn]
230234

231235
// dialbackMap is a map of currently executing dialback connections. This map
232236
// is typically empty or close to empty. It only holds entries that are being
@@ -1998,6 +2002,16 @@ func (rpcCtx *Context) GRPCUnvalidatedDial(
19982002
return rpcCtx.grpcDialNodeInternal(target, 0, locality, rpcbase.SystemClass)
19992003
}
20002004

2005+
// DRPCUnvalidatedDial uses DRPCDialNode and disables validation of the
2006+
// node ID between client and server. This function should only be
2007+
// used with the gossip client and CLI commands which can talk to any
2008+
// node. This method implies a SystemClass.
2009+
func (rpcCtx *Context) DRPCUnvalidatedDial(
2010+
target string, locality roachpb.Locality,
2011+
) *DRPCConnection {
2012+
return rpcCtx.drpcDialNodeInternal(target, 0, locality, rpcbase.SystemClass)
2013+
}
2014+
20012015
// GRPCDialNode calls grpc.Dial with options appropriate for the
20022016
// context and class (see the comment on ConnectionClass).
20032017
//
@@ -2019,6 +2033,27 @@ func (rpcCtx *Context) GRPCDialNode(
20192033
return rpcCtx.grpcDialNodeInternal(target, remoteNodeID, remoteLocality, class)
20202034
}
20212035

2036+
// DRPCDialNode dials a DRPC connection with options appropriate for the
2037+
// context and class (see the comment on ConnectionClass).
2038+
//
2039+
// The remoteNodeID becomes a constraint on the expected node ID of
2040+
// the remote node; this is checked during heartbeats. The caller is
2041+
// responsible for ensuring the remote node ID is known prior to using
2042+
// this function.
2043+
func (rpcCtx *Context) DRPCDialNode(
2044+
target string,
2045+
remoteNodeID roachpb.NodeID,
2046+
remoteLocality roachpb.Locality,
2047+
class rpcbase.ConnectionClass,
2048+
) *DRPCConnection {
2049+
if remoteNodeID == 0 && !rpcCtx.TestingAllowNamedRPCToAnonymousServer {
2050+
log.Fatalf(
2051+
rpcCtx.makeDialCtx(target, remoteNodeID, class),
2052+
"%v", errors.AssertionFailedf("invalid node ID 0 in DRPCDialNode()"))
2053+
}
2054+
return rpcCtx.drpcDialNodeInternal(target, remoteNodeID, remoteLocality, class)
2055+
}
2056+
20222057
// GRPCDialPod wraps GRPCDialNode and treats the `remoteInstanceID`
20232058
// argument as a `NodeID` which it converts. This works because the
20242059
// tenant gRPC server is initialized using the `InstanceID` so it
@@ -2035,6 +2070,22 @@ func (rpcCtx *Context) GRPCDialPod(
20352070
return rpcCtx.GRPCDialNode(target, roachpb.NodeID(remoteInstanceID), remoteLocality, class)
20362071
}
20372072

2073+
// DRPCDialPod wraps DRPCDialNode and treats the `remoteInstanceID`
2074+
// argument as a `NodeID` which it converts. This works because the
2075+
// tenant DRPC server is initialized using the `InstanceID` so it
2076+
// accepts our connection as matching the ID we're dialing.
2077+
//
2078+
// Since DRPCDialNode accepts a separate `target` and `NodeID` it
2079+
// requires no further modification to work between pods.
2080+
func (rpcCtx *Context) DRPCDialPod(
2081+
target string,
2082+
remoteInstanceID base.SQLInstanceID,
2083+
remoteLocality roachpb.Locality,
2084+
class rpcbase.ConnectionClass,
2085+
) *DRPCConnection {
2086+
return rpcCtx.DRPCDialNode(target, roachpb.NodeID(remoteInstanceID), remoteLocality, class)
2087+
}
2088+
20382089
// grpcDialNodeInternal connects to the remote node and sets up the async heartbeater.
20392090
// This intentionally takes no `context.Context`; it uses one derived from rpcCtx.masterCtx.
20402091
func (rpcCtx *Context) grpcDialNodeInternal(
@@ -2050,8 +2101,37 @@ func (rpcCtx *Context) grpcDialNodeInternal(
20502101
}
20512102

20522103
// Slow path. Race to create a peer.
2053-
conns := &rpcCtx.peers
2104+
return rpcDialNodeInternal(
2105+
rpcCtx, k, newGRPCPeerOptions(rpcCtx, k, remoteLocality),
2106+
)
2107+
}
2108+
2109+
// drpcDialNodeInternal connects to the remote node and sets up the async heartbeater.
2110+
// This intentionally takes no `context.Context`; it uses one derived from rpcCtx.masterCtx.
2111+
func (rpcCtx *Context) drpcDialNodeInternal(
2112+
target string,
2113+
remoteNodeID roachpb.NodeID,
2114+
remoteLocality roachpb.Locality,
2115+
class rpcbase.ConnectionClass,
2116+
) *DRPCConnection {
2117+
k := peerKey{TargetAddr: target, NodeID: remoteNodeID, Class: class}
2118+
if p, ok := rpcCtx.drpcPeers.get(k); ok {
2119+
// There's a cached peer, so we have a cached connection, use it.
2120+
return p.c
2121+
}
2122+
2123+
// Slow path. Race to create a peer.
2124+
return rpcDialNodeInternal(
2125+
rpcCtx, k, newDRPCPeerOptions(rpcCtx, k, remoteLocality),
2126+
)
2127+
}
20542128

2129+
// rpcDialNodeInternal is used to dial a new connection to the peer if one
2130+
// doesn't already exist.
2131+
func rpcDialNodeInternal[Conn rpcConn](
2132+
rpcCtx *Context, k peerKey, peerOpts *peerOptions[Conn],
2133+
) *Connection[Conn] {
2134+
conns := peerOpts.peers
20552135
conns.mu.Lock()
20562136
defer conns.mu.Unlock()
20572137

@@ -2060,12 +2140,11 @@ func (rpcCtx *Context) grpcDialNodeInternal(
20602140
}
20612141

20622142
// Won race. Actually create a peer.
2063-
20642143
if conns.mu.m == nil {
2065-
conns.mu.m = map[peerKey]*peer[*grpc.ClientConn]{}
2144+
conns.mu.m = map[peerKey]*peer[Conn]{}
20662145
}
20672146

2068-
p := newPeer(rpcCtx, k, newGRPCPeerOptions(rpcCtx, k, remoteLocality))
2147+
p := newPeer(rpcCtx, k, peerOpts)
20692148
// (Asynchronously) Start the probe (= heartbeat loop). The breaker is healthy
20702149
// right now (it was just created) but the call to `.Probe` will launch the
20712150
// probe[1] regardless.

pkg/rpc/drpc.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@ import (
1212
"net"
1313
"time"
1414

15+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
16+
"github.com/cockroachdb/cockroach/pkg/roachpb"
1517
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
1618
"github.com/cockroachdb/cockroach/pkg/util/log"
19+
"github.com/cockroachdb/cockroach/pkg/util/stop"
1720
"github.com/cockroachdb/errors"
1821
"storj.io/drpc"
1922
"storj.io/drpc/drpcconn"
@@ -29,6 +32,14 @@ import (
2932
// Default idle connection timeout for DRPC connections in the pool.
3033
var defaultDRPCConnIdleTimeout = 5 * time.Minute
3134

35+
type drpcCloseNotifier struct {
36+
conn drpc.Conn
37+
}
38+
39+
func (d *drpcCloseNotifier) CloseNotify(ctx context.Context) <-chan struct{} {
40+
return d.conn.Closed()
41+
}
42+
3243
func dialDRPC(
3344
rpcCtx *Context,
3445
) func(ctx context.Context, target string, _ rpcbase.ConnectionClass) (drpc.Conn, error) {
@@ -174,3 +185,82 @@ func (srv *drpcOffServer) Serve(_ context.Context, lis net.Listener) error {
174185
func (srv *drpcOffServer) Register(interface{}, drpc.Description) error {
175186
return nil
176187
}
188+
189+
// newDRPDCPeerOptions creates peerOptions for DRPC peers.
190+
func newDRPCPeerOptions(
191+
rpcCtx *Context, k peerKey, locality roachpb.Locality,
192+
) *peerOptions[drpc.Conn] {
193+
pm, _ := rpcCtx.metrics.acquire(k, locality)
194+
return &peerOptions[drpc.Conn]{
195+
locality: locality,
196+
peers: &rpcCtx.drpcPeers,
197+
connOptions: &ConnectionOptions[drpc.Conn]{
198+
dial: func(ctx context.Context, target string, _ rpcbase.ConnectionClass) (drpc.Conn, error) {
199+
// TODO(server): could use connection class instead of empty key here.
200+
pool := drpcpool.New[struct{}, drpcpool.Conn](drpcpool.Options{
201+
Expiration: defaultDRPCConnIdleTimeout,
202+
})
203+
pooledConn := pool.Get(ctx /* unused */, struct{}{}, func(ctx context.Context,
204+
_ struct{}) (drpcpool.Conn, error) {
205+
206+
netConn, err := drpcmigrate.DialWithHeader(ctx, "tcp", target, drpcmigrate.DRPCHeader)
207+
if err != nil {
208+
return nil, err
209+
}
210+
211+
opts := drpcconn.Options{
212+
Manager: drpcmanager.Options{
213+
Reader: drpcwire.ReaderOptions{
214+
MaximumBufferSize: math.MaxInt,
215+
},
216+
Stream: drpcstream.Options{
217+
MaximumBufferSize: 0, // unlimited
218+
},
219+
SoftCancel: true, // don't close the transport when stream context is canceled
220+
},
221+
}
222+
var conn *drpcconn.Conn
223+
if rpcCtx.ContextOptions.Insecure {
224+
conn = drpcconn.NewWithOptions(netConn, opts)
225+
} else {
226+
tlsConfig, err := rpcCtx.GetClientTLSConfig()
227+
if err != nil {
228+
return nil, err
229+
}
230+
// Clone TLS config to avoid modifying a cached TLS config.
231+
tlsConfig = tlsConfig.Clone()
232+
// TODO(server): remove this hack which is necessary at least in
233+
// testing to get TestDRPCSelectQuery to pass.
234+
tlsConfig.InsecureSkipVerify = true
235+
tlsConn := tls.Client(netConn, tlsConfig)
236+
conn = drpcconn.NewWithOptions(tlsConn, opts)
237+
}
238+
239+
return conn, nil
240+
})
241+
// `pooledConn.Close` doesn't tear down any of the underlying TCP
242+
// connections but simply marks the pooledConn handle as returning
243+
// errors. When we "close" this conn, we want to tear down all of
244+
// the connections in the pool (in effect mirroring the behavior of
245+
// gRPC where a single conn is shared).
246+
return &closeEntirePoolConn{
247+
Conn: pooledConn,
248+
pool: pool,
249+
}, nil
250+
},
251+
connEquals: func(a, b drpc.Conn) bool {
252+
return a == b
253+
},
254+
newBatchStreamClient: func(ctx context.Context, cc drpc.Conn) (BatchStreamClient, error) {
255+
return kvpb.NewDRPCInternalClientAdapter(cc).BatchStream(ctx)
256+
},
257+
newCloseNotifier: func(_ *stop.Stopper, cc drpc.Conn) closeNotifier {
258+
return &drpcCloseNotifier{conn: cc}
259+
},
260+
},
261+
newHeartbeatClient: func(cc drpc.Conn) RPCHeartbeatClient {
262+
return NewDRPCHeartbeatClientAdapter(cc)
263+
},
264+
pm: pm,
265+
}
266+
}

pkg/rpc/nodedialer/nodedialer.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,28 @@ func (n *Dialer) Dial(
106106
return conn, err
107107
}
108108

109+
// DRPCDial returns a drpc connection to the given node. It logs whenever the
110+
// node first becomes unreachable or reachable. This method is similar to
111+
// Dial, but it dials a DRPC connection instead of a gRPC connection.
112+
func (n *Dialer) DRPCDial(
113+
ctx context.Context, nodeID roachpb.NodeID, class rpcbase.ConnectionClass,
114+
) (drpc.Conn, error) {
115+
if n == nil || n.resolver == nil {
116+
return nil, errors.New("no node dialer configured")
117+
}
118+
// Don't trip the breaker if we're already canceled.
119+
if ctxErr := ctx.Err(); ctxErr != nil {
120+
return nil, errors.Wrap(ctxErr, "dial")
121+
}
122+
addr, locality, err := n.resolver(nodeID)
123+
if err != nil {
124+
err = errors.Wrapf(err, "failed to resolve n%d", nodeID)
125+
return nil, err
126+
}
127+
conn, _, err := n.drpcDial(ctx, nodeID, addr, locality, true, class)
128+
return conn, err
129+
}
130+
109131
// DialNoBreaker is like Dial, but will not check the circuit breaker before
110132
// trying to connect. This function should only be used when there is good
111133
// reason to believe that the node is reachable.
@@ -123,6 +145,23 @@ func (n *Dialer) DialNoBreaker(
123145
return conn, err
124146
}
125147

148+
// DRPCDialNoBreaker is like DRPCDial, but will not check the circuit breaker
149+
// before trying to connect. This function should only be used when there is
150+
// good reason to believe that the node is reachable.
151+
func (n *Dialer) DRPCDialNoBreaker(
152+
ctx context.Context, nodeID roachpb.NodeID, class rpcbase.ConnectionClass,
153+
) (drpc.Conn, error) {
154+
if n == nil || n.resolver == nil {
155+
return nil, errors.New("no node dialer configured")
156+
}
157+
addr, locality, err := n.resolver(nodeID)
158+
if err != nil {
159+
return nil, err
160+
}
161+
conn, _, err := n.drpcDial(ctx, nodeID, addr, locality, false, class)
162+
return conn, err
163+
}
164+
126165
// DialInternalClient is a specialization of DialClass for callers that
127166
// want a kvpb.InternalClient. This supports an optimization to bypass the
128167
// network for the local node.
@@ -213,6 +252,41 @@ func (n *Dialer) dial(
213252
return conn, pool, dconn, drpcStreamPool, nil
214253
}
215254

255+
// drcpDial performs the dialing of the remote connection. If checkBreaker
256+
// is set (which it usually is), circuit breakers for the peer will be
257+
// checked. This method is similar to dial, but it dials a DRPC
258+
// connection instead of a gRPC connection.
259+
func (n *Dialer) drpcDial(
260+
ctx context.Context,
261+
nodeID roachpb.NodeID,
262+
addr net.Addr,
263+
locality roachpb.Locality,
264+
checkBreaker bool,
265+
class rpcbase.ConnectionClass,
266+
) (drpc.Conn, *rpc.DRPCBatchStreamPool, error) {
267+
const ctxWrapMsg = "drpcDial"
268+
// Don't trip the breaker if we're already canceled.
269+
if ctxErr := ctx.Err(); ctxErr != nil {
270+
return nil, nil, errors.Wrap(ctxErr, ctxWrapMsg)
271+
}
272+
rpcConn := n.rpcContext.DRPCDialNode(addr.String(), nodeID, locality, class)
273+
connect := rpcConn.ConnectEx
274+
if !checkBreaker {
275+
connect = rpcConn.ConnectNoBreaker
276+
}
277+
_, dconn, err := connect(ctx)
278+
if err != nil {
279+
// If we were canceled during the dial, don't trip the breaker.
280+
if ctxErr := ctx.Err(); ctxErr != nil {
281+
return nil, nil, errors.Wrap(ctxErr, ctxWrapMsg)
282+
}
283+
err = errors.Wrapf(err, "failed to connect to n%d at %v", nodeID, addr)
284+
return nil, nil, err
285+
}
286+
drpcStreamPool := rpcConn.DRPCBatchStreamPool()
287+
return dconn, drpcStreamPool, nil
288+
}
289+
216290
// ConnHealth returns nil if we have an open connection of the request
217291
// class to the given node that succeeded on its most recent heartbeat.
218292
// Returns circuit.ErrBreakerOpen if the breaker is tripped, otherwise

0 commit comments

Comments
 (0)