Skip to content

Commit c37f669

Browse files
rpc: wire drpc client interceptors in dialer
As part of this change, we use clientConn object to wire in client interceptors. clientConn has a supplier which provides `drpc.Conn` connection from a drpc connection pool. The pooled connection object previously returned, encapsulated within closeEntirePoolConn is now replaced with drpcclient.ClientConn so that client interceptors are applied to it. Epic: CRDB-51616 Fixes: #148430 Part of: CRDB-49359 Release note: None
1 parent 64dc647 commit c37f669

File tree

11 files changed

+144
-16
lines changed

11 files changed

+144
-16
lines changed

DEPS.bzl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11129,10 +11129,10 @@ def go_deps():
1112911129
name = "io_storj_drpc",
1113011130
build_file_proto_mode = "disable_global",
1113111131
importpath = "storj.io/drpc",
11132-
sha256 = "67a517d0a6c90586f265135ab60619a06a3be5b5c871c15093865716a6de5b39",
11133-
strip_prefix = "github.com/cockroachdb/[email protected]20250603054748-5b0c5d2c7b38",
11132+
sha256 = "612016b7a145f386a2163d788e3376cb0f63410b8423f2ef3f723f7aa24c1971",
11133+
strip_prefix = "github.com/cockroachdb/[email protected]20250618091105-e79a954a2193",
1113411134
urls = [
11135-
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/drpc/com_github_cockroachdb_drpc-v0.0.0-20250603054748-5b0c5d2c7b38.zip",
11135+
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/drpc/com_github_cockroachdb_drpc-v0.0.0-20250618091105-e79a954a2193.zip",
1113611136
],
1113711137
)
1113811138
go_repository(

build/bazelutil/distdir_files.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ DISTDIR_FILES = {
349349
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/crlfmt/com_github_cockroachdb_crlfmt-v0.0.0-20221214225007-b2fc5c302548.zip": "fedc01bdd6d964da0425d5eaac8efadc951e78e13f102292cc0774197f09ab63",
350350
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/crlib/com_github_cockroachdb_crlib-v0.0.0-20241205160938-4a90b184f49c.zip": "1afc910b4ff270de79eecb42ab7bd5e6404e6128666c6c55e96db9e27d28e69e",
351351
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/datadriven/com_github_cockroachdb_datadriven-v1.0.3-0.20250407164829-2945557346d5.zip": "251593cd9c040fe84a99a3919de7ce6f85030d522159a37d625dc2dea7a4d17f",
352-
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/drpc/com_github_cockroachdb_drpc-v0.0.0-20250603054748-5b0c5d2c7b38.zip": "67a517d0a6c90586f265135ab60619a06a3be5b5c871c15093865716a6de5b39",
352+
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/drpc/com_github_cockroachdb_drpc-v0.0.0-20250618091105-e79a954a2193.zip": "612016b7a145f386a2163d788e3376cb0f63410b8423f2ef3f723f7aa24c1971",
353353
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/errors/com_github_cockroachdb_errors-v1.12.0.zip": "f73d8a5f4d8fcbc4ed61db2b47f17e2601d8b32e9a49c0665667489d9d9d6e7c",
354354
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/go-test-teamcity/com_github_cockroachdb_go_test_teamcity-v0.0.0-20191211140407-cff980ad0a55.zip": "bac30148e525b79d004da84d16453ddd2d5cd20528e9187f1d7dac708335674b",
355355
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/gogoproto/com_github_cockroachdb_gogoproto-v1.3.3-0.20241216150617-2358cdb156a1.zip": "bf052c9a7f9e23fb3ec7e9f3b7201cfc264c18ed6da0d662952d276dbc339003",

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ replace golang.org/x/time => github.com/cockroachdb/x-time v0.3.1-0.202305251236
501501

502502
replace github.com/gogo/protobuf => github.com/cockroachdb/gogoproto v1.3.3-0.20241216150617-2358cdb156a1
503503

504-
replace storj.io/drpc => github.com/cockroachdb/drpc v0.0.0-20250603054748-5b0c5d2c7b38
504+
replace storj.io/drpc => github.com/cockroachdb/drpc v0.0.0-20250618091105-e79a954a2193
505505

506506
// Note: This forked dependency adds a commit that opens up some
507507
// private APIs to enable us to make some perf improvements to

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -562,8 +562,8 @@ github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:z
562562
github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU=
563563
github.com/cockroachdb/datadriven v1.0.3-0.20250407164829-2945557346d5 h1:UycK/E0TkisVrQbSoxvU827FwgBBcZ95nRRmpj/12QI=
564564
github.com/cockroachdb/datadriven v1.0.3-0.20250407164829-2945557346d5/go.mod h1:jsaKMvD3RBCATk1/jbUZM8C9idWBJME9+VRZ5+Liq1g=
565-
github.com/cockroachdb/drpc v0.0.0-20250603054748-5b0c5d2c7b38 h1:7FZw1FKZIOabgsOYY/D8cSj60E7B7h9Tm7xSos/lWF8=
566-
github.com/cockroachdb/drpc v0.0.0-20250603054748-5b0c5d2c7b38/go.mod h1:UWP+upGv1Z+4nWxcdwhv3/wQXSOgCZteytaRVez5PDc=
565+
github.com/cockroachdb/drpc v0.0.0-20250618091105-e79a954a2193 h1:c/lx52g2cnFUVPkHqYX+6Nyh+3L/l5ae0SSrkvrUM4M=
566+
github.com/cockroachdb/drpc v0.0.0-20250618091105-e79a954a2193/go.mod h1:UWP+upGv1Z+4nWxcdwhv3/wQXSOgCZteytaRVez5PDc=
567567
github.com/cockroachdb/errors v1.9.1/go.mod h1:2sxOtL2WIc096WSZqZ5h8fa17rdDq9HZOZLBCor4mBk=
568568
github.com/cockroachdb/errors v1.12.0 h1:d7oCs6vuIMUQRVbi6jWWWEJZahLCfJpnJSVobd1/sUo=
569569
github.com/cockroachdb/errors v1.12.0/go.mod h1:SvzfYNNBshAVbZ8wzNc/UPK3w1vf0dKDUP41ucAIf7g=

pkg/rpc/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ go_library(
7777
"@com_github_vividcortex_ewma//:ewma",
7878
"@io_opentelemetry_go_otel//attribute",
7979
"@io_storj_drpc//:drpc",
80+
"@io_storj_drpc//drpcclient",
8081
"@io_storj_drpc//drpcconn",
8182
"@io_storj_drpc//drpcmanager",
8283
"@io_storj_drpc//drpcmigrate",

pkg/rpc/context.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import (
5252
"google.golang.org/grpc/metadata"
5353
"google.golang.org/grpc/stats"
5454
"storj.io/drpc"
55+
"storj.io/drpc/drpcclient"
5556
)
5657

5758
// NewServer sets up an RPC server. Depending on the ServerOptions, the Server
@@ -251,6 +252,9 @@ type Context struct {
251252
clientUnaryInterceptors []grpc.UnaryClientInterceptor
252253
clientStreamInterceptors []grpc.StreamClientInterceptor
253254

255+
clientUnaryInterceptorsDRPC []drpcclient.UnaryClientInterceptor
256+
clientStreamInterceptorsDRPC []drpcclient.StreamClientInterceptor
257+
254258
// loopbackDialFn, when non-nil, is used when the target of the dial
255259
// is ourselves (== AdvertiseAddr).
256260
//

pkg/rpc/context_testutils.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/util/uuid"
2020
"github.com/cockroachdb/errors"
2121
"google.golang.org/grpc"
22+
"storj.io/drpc/drpcclient"
2223
)
2324

2425
// ContextTestingKnobs provides hooks to aid in testing the system. The testing
@@ -38,6 +39,10 @@ type ContextTestingKnobs struct {
3839
// unary RPC.
3940
UnaryClientInterceptor func(target string, class rpcbase.ConnectionClass) grpc.UnaryClientInterceptor
4041

42+
UnaryClientInterceptorDRPC func(target string, class rpcbase.ConnectionClass) drpcclient.UnaryClientInterceptor
43+
44+
StreamClientInterceptorDRPC func(target string, class rpcbase.ConnectionClass) drpcclient.StreamClientInterceptor
45+
4146
// InjectedLatencyOracle if non-nil contains a map from target address
4247
// (server.RPCServingAddr() of a remote node) to artificial latency in
4348
// milliseconds to inject. Setting this will cause the server to pause for

pkg/rpc/drpc.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/util/stop"
2020
"github.com/cockroachdb/errors"
2121
"storj.io/drpc"
22+
"storj.io/drpc/drpcclient"
2223
"storj.io/drpc/drpcconn"
2324
"storj.io/drpc/drpcmanager"
2425
"storj.io/drpc/drpcmigrate"
@@ -40,7 +41,8 @@ func (d *drpcCloseNotifier) CloseNotify(ctx context.Context) <-chan struct{} {
4041
return d.conn.Closed()
4142
}
4243

43-
func dialDRPC(
44+
// TODO DB Server: unexport this once dial methods are added in rpccontext.
45+
func DialDRPC(
4446
rpcCtx *Context,
4547
) func(ctx context.Context, target string, _ rpcbase.ConnectionClass) (drpc.Conn, error) {
4648
return func(ctx context.Context, target string, _ rpcbase.ConnectionClass) (drpc.Conn, error) {
@@ -86,13 +88,27 @@ func dialDRPC(
8688

8789
return conn, nil
8890
})
89-
// `pooledConn.Close` doesn't tear down any of the underlying TCP
90-
// connections but simply marks the pooledConn handle as returning
91-
// errors. When we "close" this conn, we want to tear down all of
92-
// the connections in the pool (in effect mirroring the behavior of
93-
// gRPC where a single conn is shared).
91+
92+
if rpcCtx.Knobs.UnaryClientInterceptorDRPC != nil {
93+
if interceptor := rpcCtx.Knobs.UnaryClientInterceptorDRPC(target, rpcbase.DefaultClass); interceptor != nil {
94+
rpcCtx.clientUnaryInterceptorsDRPC = append(rpcCtx.clientUnaryInterceptorsDRPC, interceptor)
95+
}
96+
}
97+
if rpcCtx.Knobs.StreamClientInterceptorDRPC != nil {
98+
if interceptor := rpcCtx.Knobs.StreamClientInterceptorDRPC(target, rpcbase.DefaultClass); interceptor != nil {
99+
rpcCtx.clientStreamInterceptorsDRPC = append(rpcCtx.clientStreamInterceptorsDRPC, interceptor)
100+
}
101+
}
102+
clientConn, _ := drpcclient.NewClientConnWithOptions(
103+
ctx,
104+
pooledConn,
105+
drpcclient.WithChainUnaryInterceptor(rpcCtx.clientUnaryInterceptorsDRPC...),
106+
drpcclient.WithChainStreamInterceptor(rpcCtx.clientStreamInterceptorsDRPC...),
107+
)
108+
109+
// Wrap the clientConn to ensure the entire pool is closed when this connection handle is closed.
94110
return &closeEntirePoolConn{
95-
Conn: pooledConn,
111+
Conn: clientConn,
96112
pool: pool,
97113
}, nil
98114
}
@@ -129,7 +145,7 @@ type drpcServer struct {
129145
}
130146

131147
// NewDRPCServer creates a new DRPCServer with the provided rpc context.
132-
func NewDRPCServer(_ context.Context, rpcCtx *Context) (DRPCServer, error) {
148+
func NewDRPCServer(_ context.Context, _ *Context) (DRPCServer, error) {
133149
d := &drpcServer{}
134150
mux := drpcmux.New()
135151
d.Server = drpcserver.NewWithOptions(mux, drpcserver.Options{

pkg/rpc/peer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ func newPeer[Conn rpcConn](rpcCtx *Context, k peerKey, peerOpts *peerOptions[Con
248248
opts: &rpcCtx.ContextOptions,
249249
peers: peerOpts.peers,
250250
connOptions: peerOpts.connOptions,
251-
drpcDial: dialDRPC(rpcCtx),
251+
drpcDial: DialDRPC(rpcCtx),
252252
newHeartbeatClient: peerOpts.newHeartbeatClient,
253253
heartbeatInterval: rpcCtx.RPCHeartbeatInterval,
254254
heartbeatTimeout: rpcCtx.RPCHeartbeatTimeout,

pkg/server/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,8 @@ go_test(
619619
"@com_github_stretchr_testify//require",
620620
"@in_gopkg_yaml_v2//:yaml_v2",
621621
"@io_opentelemetry_go_otel//attribute",
622+
"@io_storj_drpc//:drpc",
623+
"@io_storj_drpc//drpcclient",
622624
"@io_storj_drpc//drpcconn",
623625
"@io_storj_drpc//drpcmanager",
624626
"@io_storj_drpc//drpcmigrate",

0 commit comments

Comments
 (0)