Skip to content

Commit 5be50d0

Browse files
rpc: remove DRPC dependency from Peer and Connection generics
Although `Peer` and `Connection` are generic and intended to support both gRPC and DRPC connections, the current implementation has a hardcoded dependency on DRPC. As a result, a DRPC connection is always dialed, regardless of the intended type. This PR removes the direct dependency on DRPC, allowing the appropriate connection type (gRPC or DRPC) to be used based on the generic parameter. Epic: CRDB-48923 Fixes: none Release note: none
1 parent 67d56fa commit 5be50d0

File tree

8 files changed

+91
-231
lines changed

8 files changed

+91
-231
lines changed

pkg/kv/kvpb/api.proto

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3755,6 +3755,11 @@ service Cluster {
37553755
rpc Join (JoinNodeRequest) returns (JoinNodeResponse) {}
37563756
}
37573757

3758+
// RangeFeed service implemented by nodes for KV API requests.
3759+
service RangeFeed {
3760+
rpc MuxRangeFeed (stream RangeFeedRequest) returns (stream MuxRangeFeedEvent) {}
3761+
}
3762+
37583763
// Batch and RangeFeed service implemented by nodes for KV API requests.
37593764
service Internal {
37603765
rpc Batch (BatchRequest) returns (BatchResponse) {}
@@ -3882,9 +3887,4 @@ message ScanStats {
38823887

38833888
// UsedFollowerRead indicates whether at least some reads were served by the
38843889
// follower replicas.
3885-
message UsedFollowerRead {}
3886-
3887-
// RangeFeed service implemented by nodes for KV API requests.
3888-
service RangeFeed {
3889-
rpc MuxRangeFeed (stream RangeFeedRequest) returns (stream MuxRangeFeedEvent) {}
3890-
}
3890+
message UsedFollowerRead {}

pkg/rpc/connection.go

Lines changed: 14 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/util/stop"
1616
"github.com/cockroachdb/errors"
1717
"google.golang.org/grpc"
18-
"storj.io/drpc"
1918
)
2019

2120
// rpcConn defines a lightweight interface that both grpc.ClientConn and drpc.Conn
@@ -91,9 +90,6 @@ type Connection[Conn rpcConn] struct {
9190
//
9291
// The pool is only initialized once the rpc connection is resolved.
9392
batchStreamPool streamPool[*kvpb.BatchRequest, *kvpb.BatchResponse, Conn]
94-
// TODO(server): remove this once the code is consolidated to use generic
95-
// Connection and Pool for drpc.Conn.
96-
drpcBatchStreamPool DRPCBatchStreamPool
9793
}
9894

9995
// newConnectionToNodeID makes a Connection for the given node, class, and nontrivial Signal
@@ -104,15 +100,13 @@ func newConnectionToNodeID[Conn rpcConn](
104100
breakerSignal func() circuit.Signal,
105101
connOptions *ConnectionOptions[Conn],
106102
) *Connection[Conn] {
107-
drpcConnEquals := func(a, b drpc.Conn) bool { return a == b }
108103
c := &Connection[Conn]{
109104
breakerSignalFn: breakerSignal,
110105
k: k,
111106
connFuture: connFuture[Conn]{
112107
ready: make(chan struct{}),
113108
},
114-
batchStreamPool: makeStreamPool(opts.Stopper, connOptions.newBatchStreamClient, connOptions.connEquals),
115-
drpcBatchStreamPool: makeStreamPool(opts.Stopper, newDRPCBatchStream, drpcConnEquals),
109+
batchStreamPool: makeStreamPool(opts.Stopper, connOptions.newBatchStreamClient, connOptions.connEquals),
116110
}
117111
return c
118112
}
@@ -124,15 +118,15 @@ func newConnectionToNodeID[Conn rpcConn](
124118
// block but fall back to defErr in this case.
125119
func (c *Connection[Conn]) waitOrDefault(
126120
ctx context.Context, defErr error, sig circuit.Signal,
127-
) (Conn, drpc.Conn, error) {
121+
) (Conn, error) {
128122
// Check the circuit breaker first. If it is already tripped now, we
129123
// want it to take precedence over connFuture below (which is closed in
130124
// the common case of a connection going bad after having been healthy
131125
// for a while).
132126
var nilConn Conn
133127
select {
134128
case <-sig.C():
135-
return nilConn, nil, sig.Err()
129+
return nilConn, sig.Err()
136130
default:
137131
}
138132

@@ -143,26 +137,26 @@ func (c *Connection[Conn]) waitOrDefault(
143137
select {
144138
case <-c.connFuture.C():
145139
case <-sig.C():
146-
return nilConn, nil, sig.Err()
140+
return nilConn, sig.Err()
147141
case <-ctx.Done():
148-
return nilConn, nil, errors.Wrapf(ctx.Err(), "while connecting to n%d at %s", c.k.NodeID, c.k.TargetAddr)
142+
return nilConn, errors.Wrapf(ctx.Err(), "while connecting to n%d at %s", c.k.NodeID, c.k.TargetAddr)
149143
}
150144
} else {
151145
select {
152146
case <-c.connFuture.C():
153147
case <-sig.C():
154-
return nilConn, nil, sig.Err()
148+
return nilConn, sig.Err()
155149
case <-ctx.Done():
156-
return nilConn, nil, errors.Wrapf(ctx.Err(), "while connecting to n%d at %s", c.k.NodeID, c.k.TargetAddr)
150+
return nilConn, errors.Wrapf(ctx.Err(), "while connecting to n%d at %s", c.k.NodeID, c.k.TargetAddr)
157151
default:
158-
return nilConn, nil, defErr
152+
return nilConn, defErr
159153
}
160154
}
161155

162156
// Done waiting, c.connFuture has resolved, return the result. Note that this
163157
// conn could be unhealthy (or there may not even be a conn, i.e. Err() !=
164158
// nil), if that's what the caller wanted (ConnectNoBreaker).
165-
return c.connFuture.Conn(), c.connFuture.DRPCConn(), c.connFuture.Err()
159+
return c.connFuture.Conn(), c.connFuture.Err()
166160
}
167161

168162
// Connect returns the underlying rpc connection after it has been validated,
@@ -172,18 +166,10 @@ func (c *Connection[Conn]) waitOrDefault(
172166
// an error. In rare cases, this behavior is undesired and ConnectNoBreaker may
173167
// be used instead.
174168
func (c *Connection[Conn]) Connect(ctx context.Context) (Conn, error) {
175-
cc, _, err := c.waitOrDefault(ctx, nil /* defErr */, c.breakerSignalFn())
169+
cc, err := c.waitOrDefault(ctx, nil /* defErr */, c.breakerSignalFn())
176170
return cc, err
177171
}
178172

179-
// ConnectEx is similar to Connect but it addition to gRPC connection, it also
180-
// returns underlying drpc connection after it has been validated.
181-
// TODO(server): remove this once the code is consolidated to use generic
182-
// Connection and Pool for drpc.Conn.
183-
func (c *Connection[Conn]) ConnectEx(ctx context.Context) (Conn, drpc.Conn, error) {
184-
return c.waitOrDefault(ctx, nil /* defErr */, c.breakerSignalFn())
185-
}
186-
187173
type neverTripSignal struct{}
188174

189175
func (s *neverTripSignal) Err() error {
@@ -202,7 +188,7 @@ func (s *neverTripSignal) IsTripped() bool {
202188
// that it will latch onto (or start) an existing connection attempt even if
203189
// previous attempts have not succeeded. This may be preferable to Connect
204190
// if the caller is already certain that a peer is available.
205-
func (c *Connection[Conn]) ConnectNoBreaker(ctx context.Context) (Conn, drpc.Conn, error) {
191+
func (c *Connection[Conn]) ConnectNoBreaker(ctx context.Context) (Conn, error) {
206192
// For ConnectNoBreaker we don't use the default Signal but pass a dummy one
207193
// that never trips. (The probe tears down the Conn on quiesce so we don't rely
208194
// on the Signal for that).
@@ -226,7 +212,7 @@ func (c *Connection[Conn]) ConnectNoBreaker(ctx context.Context) (Conn, drpc.Con
226212
// latest heartbeat. Returns ErrNotHeartbeated if the peer was just contacted for
227213
// the first time and the first heartbeat has not occurred yet.
228214
func (c *Connection[Conn]) Health() error {
229-
_, _, err := c.waitOrDefault(context.Background(), ErrNotHeartbeated, c.breakerSignalFn())
215+
_, err := c.waitOrDefault(context.Background(), ErrNotHeartbeated, c.breakerSignalFn())
230216
return err
231217
}
232218

@@ -241,17 +227,9 @@ func (c *Connection[Conn]) BatchStreamPool() *streamPool[*kvpb.BatchRequest, *kv
241227
return &c.batchStreamPool
242228
}
243229

244-
func (c *Connection[Conn]) DRPCBatchStreamPool() *DRPCBatchStreamPool {
245-
if !c.connFuture.Resolved() {
246-
panic("DRPCBatchStreamPool called on unresolved connection")
247-
}
248-
return &c.drpcBatchStreamPool
249-
}
250-
251230
type connFuture[Conn rpcConn] struct {
252231
ready chan struct{}
253232
cc Conn
254-
dc drpc.Conn
255233
err error
256234
}
257235

@@ -279,14 +257,6 @@ func (s *connFuture[Conn]) Conn() Conn {
279257
return s.cc
280258
}
281259

282-
// DRPCConn must only be called after C() has been closed.
283-
func (s *connFuture[Conn]) DRPCConn() drpc.Conn {
284-
if s.err != nil {
285-
return nil
286-
}
287-
return s.dc
288-
}
289-
290260
func (s *connFuture[Conn]) Resolved() bool {
291261
select {
292262
case <-s.ready:
@@ -298,12 +268,12 @@ func (s *connFuture[Conn]) Resolved() bool {
298268

299269
// Resolve is idempotent. Only the first call has any effect.
300270
// Not thread safe.
301-
func (s *connFuture[Conn]) Resolve(cc Conn, dc drpc.Conn, err error) {
271+
func (s *connFuture[Conn]) Resolve(cc Conn, err error) {
302272
select {
303273
case <-s.ready:
304274
// Already resolved, noop.
305275
default:
306-
s.cc, s.dc, s.err = cc, dc, err
276+
s.cc, s.err = cc, err
307277
close(s.ready)
308278
}
309279
}

pkg/rpc/drpc.go

Lines changed: 3 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (d *drpcCloseNotifier) CloseNotify(ctx context.Context) <-chan struct{} {
4141
return d.conn.Closed()
4242
}
4343

44-
// TODO DB Server: unexport this once dial methods are added in rpccontext.
44+
// TODO(server): unexport this once dial methods are added in rpccontext.
4545
func DialDRPC(
4646
rpcCtx *Context,
4747
) func(ctx context.Context, target string, _ rpcbase.ConnectionClass) (drpc.Conn, error) {
@@ -211,64 +211,12 @@ func newDRPCPeerOptions(
211211
locality: locality,
212212
peers: &rpcCtx.drpcPeers,
213213
connOptions: &ConnectionOptions[drpc.Conn]{
214-
dial: func(ctx context.Context, target string, _ rpcbase.ConnectionClass) (drpc.Conn, error) {
215-
// TODO(server): could use connection class instead of empty key here.
216-
pool := drpcpool.New[struct{}, drpcpool.Conn](drpcpool.Options{
217-
Expiration: defaultDRPCConnIdleTimeout,
218-
})
219-
pooledConn := pool.Get(ctx /* unused */, struct{}{}, func(ctx context.Context,
220-
_ struct{}) (drpcpool.Conn, error) {
221-
222-
netConn, err := drpcmigrate.DialWithHeader(ctx, "tcp", target, drpcmigrate.DRPCHeader)
223-
if err != nil {
224-
return nil, err
225-
}
226-
227-
opts := drpcconn.Options{
228-
Manager: drpcmanager.Options{
229-
Reader: drpcwire.ReaderOptions{
230-
MaximumBufferSize: math.MaxInt,
231-
},
232-
Stream: drpcstream.Options{
233-
MaximumBufferSize: 0, // unlimited
234-
},
235-
SoftCancel: true, // don't close the transport when stream context is canceled
236-
},
237-
}
238-
var conn *drpcconn.Conn
239-
if rpcCtx.ContextOptions.Insecure {
240-
conn = drpcconn.NewWithOptions(netConn, opts)
241-
} else {
242-
tlsConfig, err := rpcCtx.GetClientTLSConfig()
243-
if err != nil {
244-
return nil, err
245-
}
246-
// Clone TLS config to avoid modifying a cached TLS config.
247-
tlsConfig = tlsConfig.Clone()
248-
// TODO(server): remove this hack which is necessary at least in
249-
// testing to get TestDRPCSelectQuery to pass.
250-
tlsConfig.InsecureSkipVerify = true
251-
tlsConn := tls.Client(netConn, tlsConfig)
252-
conn = drpcconn.NewWithOptions(tlsConn, opts)
253-
}
254-
255-
return conn, nil
256-
})
257-
// `pooledConn.Close` doesn't tear down any of the underlying TCP
258-
// connections but simply marks the pooledConn handle as returning
259-
// errors. When we "close" this conn, we want to tear down all of
260-
// the connections in the pool (in effect mirroring the behavior of
261-
// gRPC where a single conn is shared).
262-
return &closeEntirePoolConn{
263-
Conn: pooledConn,
264-
pool: pool,
265-
}, nil
266-
},
214+
dial: DialDRPC(rpcCtx),
267215
connEquals: func(a, b drpc.Conn) bool {
268216
return a == b
269217
},
270218
newBatchStreamClient: func(ctx context.Context, cc drpc.Conn) (BatchStreamClient, error) {
271-
return kvpb.NewDRPCInternalClientAdapter(cc).BatchStream(ctx)
219+
return kvpb.NewDRPCKVBatchClientAdapter(cc).BatchStream(ctx)
272220
},
273221
newCloseNotifier: func(_ *stop.Stopper, cc drpc.Conn) closeNotifier {
274222
return &drpcCloseNotifier{conn: cc}

0 commit comments

Comments
 (0)