Skip to content

Commit c7ebb90

Browse files
rpc, server: add loopback dialer support for DRPC
Observed a performance regression in the sysbench `oltp_read_only` microbenchmark when DRPC was enabled by default (ref: #151312). No such regression was seen in `oltp_read_write`. Root cause analysis traced it to missing loopback dialer support in DRPC. Epic: CRDB-51459 Fixes: #148493 Release note: None
1 parent ded3a0d commit c7ebb90

File tree

8 files changed

+144
-23
lines changed

8 files changed

+144
-23
lines changed

pkg/rpc/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ go_test(
183183
"@com_github_stretchr_testify//require",
184184
"@io_storj_drpc//drpcctx",
185185
"@io_storj_drpc//drpcmetadata",
186+
"@io_storj_drpc//drpcmux",
187+
"@io_storj_drpc//drpcserver",
186188
"@org_golang_google_grpc//:grpc",
187189
"@org_golang_google_grpc//codes",
188190
"@org_golang_google_grpc//credentials",

pkg/rpc/context.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import (
5454
"google.golang.org/grpc/stats"
5555
"storj.io/drpc"
5656
"storj.io/drpc/drpcclient"
57+
"storj.io/drpc/drpcmigrate"
5758
)
5859

5960
// NewServer sets up an RPC server. Depending on the ServerOptions, the Server
@@ -287,6 +288,9 @@ type Context struct {
287288
// the gRPC protocol over an in-memory pipe.
288289
loopbackDialFn func(context.Context) (net.Conn, error)
289290

291+
// This is similar to the loopbackDialFn above, but for DRPC connections.
292+
loopbackDRPCDialFn func(context.Context) (net.Conn, error)
293+
290294
// clientCreds is used to pass additional headers to called RPCs.
291295
clientCreds credentials.PerRPCCredentials
292296

@@ -295,7 +299,7 @@ type Context struct {
295299
windowSizeSettings
296300
}
297301

298-
// SetLoopbackDialer configures the loopback dialer function.
302+
// SetLoopbackDialer configures the loopback dialer function to dial gRPC connections.
299303
func (c *Context) SetLoopbackDialer(loopbackDialFn func(context.Context) (net.Conn, error)) {
300304
if c.ContextOptions.Knobs.NoLoopbackDialer {
301305
// A test has decided it is opting out of the special loopback
@@ -306,6 +310,17 @@ func (c *Context) SetLoopbackDialer(loopbackDialFn func(context.Context) (net.Co
306310
c.loopbackDialFn = loopbackDialFn
307311
}
308312

313+
// SetLoopbackDRPCDialer configures the loopback dialer function to dial DRPC connections.
314+
func (c *Context) SetLoopbackDRPCDialer(loopbackDialFn func(context.Context) (net.Conn, error)) {
315+
if c.ContextOptions.Knobs.NoLoopbackDialer {
316+
// A test has decided it is opting out of the special loopback
317+
// dialing mechanism. Obey it. We already have defined
318+
// loopbackDialFn in that case in NewContext().
319+
return
320+
}
321+
c.loopbackDRPCDialFn = loopbackDialFn
322+
}
323+
309324
// StoreLivenessGracePeriod computes the grace period after a store restarts before which it will
310325
// not withdraw support from other stores.
311326
func (c *Context) StoreLivenessWithdrawalGracePeriod() time.Duration {
@@ -596,6 +611,9 @@ func NewContext(ctx context.Context, opts ContextOptions) *Context {
596611
d := onlyOnceDialer{}
597612
return d.dial(ctx, opts.AdvertiseAddr)
598613
}
614+
rpcCtx.loopbackDRPCDialFn = func(ctx context.Context) (net.Conn, error) {
615+
return drpcmigrate.DialWithHeader(ctx, "tcp", opts.AdvertiseAddr, drpcmigrate.DRPCHeader)
616+
}
599617
}
600618

601619
// We only monitor remote clocks in server-to-server connections.

pkg/rpc/context_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package rpc
88
import (
99
"context"
1010
"crypto/rand"
11+
"crypto/tls"
1112
"fmt"
1213
"io"
1314
"net"
@@ -50,6 +51,8 @@ import (
5051
"google.golang.org/grpc/credentials"
5152
"google.golang.org/grpc/metadata"
5253
"google.golang.org/grpc/status"
54+
"storj.io/drpc/drpcmux"
55+
"storj.io/drpc/drpcserver"
5356
)
5457

5558
// TestingConnHealth returns nil if we have an open connection to the given
@@ -807,6 +810,86 @@ func TestConnectLoopback(t *testing.T) {
807810
require.Equal(t, codes.DataLoss, gogostatus.Code(errors.UnwrapAll(err)), "%+v", err)
808811
}
809812

813+
// TestDRPCConnectLoopback verifies that we correctly go through the internal
814+
// server when dialing the local address.
815+
func TestDRPCConnectLoopback(t *testing.T) {
816+
defer leaktest.AfterTest(t)()
817+
818+
stopper := stop.NewStopper()
819+
defer stopper.Stop(context.Background())
820+
821+
clock := timeutil.NewManualTime(timeutil.Unix(0, 1))
822+
maxOffset := time.Duration(0)
823+
824+
clusterID := uuid.MakeV4()
825+
const nodeID = 1
826+
827+
ctx := context.Background()
828+
clientCtx := newTestContext(clusterID, clock, maxOffset, stopper)
829+
clientCtx.NodeID.Set(context.Background(), nodeID)
830+
drpcLoopbackL := netutil.NewLoopbackListener(ctx, stopper)
831+
clientCtx.loopbackDRPCDialFn = func(ctx context.Context) (net.Conn, error) {
832+
return drpcLoopbackL.Connect(ctx)
833+
}
834+
// Ensure that there's no error connecting to the local node when an internal
835+
// server has been registered.
836+
clientCtx.SetLocalInternalServer(
837+
&internalServer{},
838+
ServerInterceptorInfo{}, ClientInterceptorInfo{})
839+
840+
// Create a DRPC server to listen on loopback.
841+
newTestServer := func() *drpcServer {
842+
d := &drpcServer{}
843+
mux := drpcmux.New()
844+
d.Server = drpcserver.NewWithOptions(mux, drpcserver.Options{})
845+
d.Mux = mux
846+
847+
return d
848+
}
849+
drpcS := newTestServer()
850+
851+
// Register a heartbeat service that always returns an error.
852+
errLoopback := gogostatus.Newf(codes.Unknown, "loopback!").Err() // TODO(server): replace code with DRPC error codes
853+
require.NoError(t, DRPCRegisterHeartbeat(drpcS, &ManualHeartbeatService{readyFn: func() error {
854+
return errLoopback
855+
}}))
856+
857+
tlsConfig, err := clientCtx.GetServerTLSConfig()
858+
require.NoError(t, err)
859+
860+
// Start the DRPC server on loopback.
861+
require.NoError(t, stopper.RunAsyncTask(ctx, "listen-server-loopback", func(ctx context.Context) {
862+
drpcLoopbackTLSL := tls.NewListener(drpcLoopbackL, tlsConfig)
863+
netutil.FatalIfUnexpected(drpcS.Serve(ctx, drpcLoopbackTLSL))
864+
}))
865+
866+
serveDRPC := func(stopper *stop.Stopper, drpcS *drpcServer, drpcTLSL net.Listener) error {
867+
waitQuiesce := func(context.Context) {
868+
<-stopper.ShouldQuiesce()
869+
netutil.FatalIfUnexpected(drpcTLSL.Close())
870+
}
871+
if err := stopper.RunAsyncTask(ctx, "listen-quiesce", waitQuiesce); err != nil {
872+
waitQuiesce(ctx)
873+
return err
874+
}
875+
876+
return stopper.RunAsyncTask(ctx, "listen-serve", func(context.Context) {
877+
netutil.FatalIfUnexpected(drpcS.Serve(ctx, drpcTLSL))
878+
})
879+
}
880+
drpcL, err := net.Listen("tcp", util.TestAddr.String())
881+
require.NoError(t, err)
882+
drpcTLSL := tls.NewListener(drpcL, tlsConfig)
883+
require.NoError(t, serveDRPC(stopper, drpcS, drpcTLSL))
884+
885+
addr := drpcL.Addr().String()
886+
clientCtx.AdvertiseAddr = addr
887+
888+
// Connect and get the error that comes from loopbackLn, proving that we were routed there.
889+
_, err = clientCtx.DRPCDialNode(addr, nodeID, roachpb.Locality{}, rpcbase.DefaultClass).Connect(ctx)
890+
require.Equal(t, codes.Unknown, gogostatus.Code(errors.UnwrapAll(err)), "%+v", err)
891+
}
892+
810893
func TestOffsetMeasurement(t *testing.T) {
811894
defer leaktest.AfterTest(t)()
812895

pkg/rpc/drpc.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,12 @@ func DialDRPC(
5353
pooledConn := pool.Get(ctx /* unused */, struct{}{}, func(ctx context.Context,
5454
_ struct{}) (drpcpool.Conn, error) {
5555

56-
netConn, err := drpcmigrate.DialWithHeader(ctx, "tcp", target, drpcmigrate.DRPCHeader)
56+
netConn, err := func(ctx context.Context) (net.Conn, error) {
57+
if rpcCtx.ContextOptions.AdvertiseAddr == target && !rpcCtx.ClientOnly {
58+
return rpcCtx.loopbackDRPCDialFn(ctx)
59+
}
60+
return drpcmigrate.DialWithHeader(ctx, "tcp", target, drpcmigrate.DRPCHeader)
61+
}(ctx)
5762
if err != nil {
5863
return nil, err
5964
}

pkg/server/drpc_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -220,16 +220,13 @@ func TestDialDRPC_InterceptorsAreSet(t *testing.T) {
220220
args := base.TestClusterArgs{
221221
ReplicationMode: base.ReplicationManual,
222222
ServerArgs: base.TestServerArgs{
223-
Settings: cluster.MakeClusterSettings(),
224-
Insecure: true,
223+
Settings: cluster.MakeClusterSettings(),
224+
DefaultDRPCOption: base.TestDRPCEnabled,
225225
},
226226
}
227227

228-
rpc.ExperimentalDRPCEnabled.Override(ctx, &args.ServerArgs.Settings.SV, true)
229228
c := testcluster.StartTestCluster(t, numNodes, args)
230229
defer c.Stopper().Stop(ctx)
231-
require.True(t, c.Server(0).RPCContext().Insecure)
232-
233230
rpcAddr := c.Server(0).RPCAddr()
234231

235232
// Client setup
@@ -238,9 +235,10 @@ func TestDialDRPC_InterceptorsAreSet(t *testing.T) {
238235
rpcContextOptions.Stopper = c.Stopper()
239236
rpcContextOptions.Settings = c.Server(0).ClusterSettings()
240237
rpcCtx := rpc.NewContext(ctx, rpcContextOptions)
241-
rpcCtx.ContextOptions = rpc.ContextOptions{Insecure: true}
238+
242239
// Adding test interceptors
243240
rpcCtx.Knobs = rpc.ContextTestingKnobs{
241+
NoLoopbackDialer: true,
244242
UnaryClientInterceptorDRPC: func(target string, class rpcbase.ConnectionClass) drpcclient.UnaryClientInterceptor {
245243
return mockUnaryInterceptor
246244
},

pkg/server/server.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1659,7 +1659,7 @@ func (s *topLevelServer) PreStart(ctx context.Context) error {
16591659
// and dispatches the server worker for the RPC.
16601660
// The SQL listener is returned, to start the SQL server later
16611661
// below when the server has initialized.
1662-
pgL, loopbackPgL, rpcLoopbackDialFn, startRPCServer, err := startListenRPCAndSQL(
1662+
pgL, loopbackPgL, grpcLoopbackDialFn, drpcLoopbackDialFn, startRPCServer, err := startListenRPCAndSQL(
16631663
ctx, workersCtx, s.cfg.BaseConfig,
16641664
s.stopper, s.grpc, s.drpc, ListenAndUpdateAddrs, true /* enableSQLListener */, s.cfg.AcceptProxyProtocolHeaders)
16651665
if err != nil {
@@ -1669,7 +1669,8 @@ func (s *topLevelServer) PreStart(ctx context.Context) error {
16691669
s.loopbackPgL = loopbackPgL
16701670

16711671
// Tell the RPC context how to connect in-memory.
1672-
s.rpcContext.SetLoopbackDialer(rpcLoopbackDialFn)
1672+
s.rpcContext.SetLoopbackDialer(grpcLoopbackDialFn)
1673+
s.rpcContext.SetLoopbackDRPCDialer(drpcLoopbackDialFn)
16731674

16741675
if s.cfg.TestingKnobs.Server != nil {
16751676
knobs := s.cfg.TestingKnobs.Server.(*TestingKnobs)

pkg/server/start_listen.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ func startListenRPCAndSQL(
4949
) (
5050
sqlListener net.Listener,
5151
pgLoopbackListener *netutil.LoopbackListener,
52-
rpcLoopbackDial func(context.Context) (net.Conn, error),
52+
grpcLoopbackDial func(context.Context) (net.Conn, error),
53+
drpcLoopbackDial func(context.Context) (net.Conn, error),
5354
startRPCServer func(ctx context.Context),
5455
err error,
5556
) {
@@ -66,7 +67,7 @@ func startListenRPCAndSQL(
6667
var err error
6768
ln, err = rpcListenerFactory(ctx, &cfg.Addr, &cfg.AdvertiseAddr, rpcChanName, acceptProxyProtocolHeaders)
6869
if err != nil {
69-
return nil, nil, nil, nil, err
70+
return nil, nil, nil, nil, nil, err
7071
}
7172
log.Eventf(ctx, "listening on port %s", cfg.Addr)
7273
}
@@ -79,7 +80,7 @@ func startListenRPCAndSQL(
7980
pgL = cfg.SQLAddrListener
8081
}
8182
if err != nil {
82-
return nil, nil, nil, nil, err
83+
return nil, nil, nil, nil, nil, err
8384
}
8485
// The SQL listener shutdown worker, which closes everything under
8586
// the SQL port when the stopper indicates we are shutting down.
@@ -95,7 +96,7 @@ func startListenRPCAndSQL(
9596
}
9697
if err := stopper.RunAsyncTask(workersCtx, "wait-quiesce", waitQuiesce); err != nil {
9798
waitQuiesce(workersCtx)
98-
return nil, nil, nil, nil, err
99+
return nil, nil, nil, nil, nil, err
99100
}
100101
log.Eventf(ctx, "listening on sql port %s", cfg.SQLAddr)
101102
}
@@ -129,7 +130,7 @@ func startListenRPCAndSQL(
129130
// Then we update the advertised addr with the right port, if
130131
// the port had been auto-allocated.
131132
if err := UpdateAddrs(ctx, &cfg.SQLAddr, &cfg.SQLAdvertiseAddr, ln.Addr()); err != nil {
132-
return nil, nil, nil, nil, errors.Wrapf(err, "internal error")
133+
return nil, nil, nil, nil, nil, errors.Wrapf(err, "internal error")
133134
}
134135
}
135136

@@ -158,18 +159,22 @@ func startListenRPCAndSQL(
158159
}
159160
}
160161

161-
rpcLoopbackL := netutil.NewLoopbackListener(ctx, stopper)
162+
grpcLoopbackL := netutil.NewLoopbackListener(ctx, stopper)
162163
sqlLoopbackL := netutil.NewLoopbackListener(ctx, stopper)
163164
drpcCtx, drpcCancel := context.WithCancel(workersCtx)
165+
// Create a dedicated DRPC loopback listener. Since this listener is exclusively for DRPC,
166+
// no protocol header inspection is needed.
167+
drpcLoopbackL := netutil.NewLoopbackListener(ctx, stopper)
164168

165169
// The remainder shutdown worker.
166170
waitForQuiesce := func(context.Context) {
167171
<-stopper.ShouldQuiesce()
168172
drpcCancel()
169173
// TODO(bdarnell): Do we need to also close the other listeners?
170174
netutil.FatalIfUnexpected(grpcL.Close())
171-
netutil.FatalIfUnexpected(drpcL.Close())
172-
netutil.FatalIfUnexpected(rpcLoopbackL.Close())
175+
netutil.FatalIfUnexpected(grpcLoopbackL.Close())
176+
netutil.FatalIfUnexpected(drpcL.Close()) // Closing this listener is as good as closing drpcTLSL
177+
netutil.FatalIfUnexpected(drpcLoopbackL.Close()) // Closing this listener is as good as closing drpcLoopbackTLSL
173178
netutil.FatalIfUnexpected(sqlLoopbackL.Close())
174179
netutil.FatalIfUnexpected(ln.Close())
175180
}
@@ -191,7 +196,7 @@ func startListenRPCAndSQL(
191196
waitForQuiesce(ctx)
192197
stopGRPC()
193198
drpcCancel()
194-
return nil, nil, nil, nil, err
199+
return nil, nil, nil, nil, nil, err
195200
}
196201
stopper.AddCloser(stop.CloserFn(stopGRPC))
197202

@@ -213,7 +218,15 @@ func startListenRPCAndSQL(
213218
}
214219
})
215220
_ = stopper.RunAsyncTask(workersCtx, "serve-loopback-grpc", func(context.Context) {
216-
netutil.FatalIfUnexpected(grpc.Serve(rpcLoopbackL))
221+
netutil.FatalIfUnexpected(grpc.Serve(grpcLoopbackL))
222+
})
223+
_ = stopper.RunAsyncTask(workersCtx, "serve-loopback-drpc", func(context.Context) {
224+
if cfg := drpc.tlsCfg; cfg != nil {
225+
drpcdrpcLoopbackTLSL := tls.NewListener(drpcLoopbackL, cfg)
226+
netutil.FatalIfUnexpected(drpc.Serve(ctx, drpcdrpcLoopbackTLSL))
227+
} else {
228+
netutil.FatalIfUnexpected(drpc.Serve(ctx, drpcLoopbackL))
229+
}
217230
})
218231

219232
_ = stopper.RunAsyncTask(ctx, "serve-mux", func(context.Context) {
@@ -223,5 +236,5 @@ func startListenRPCAndSQL(
223236
})
224237
}
225238

226-
return pgL, sqlLoopbackL, rpcLoopbackL.Connect, startRPCServer, nil
239+
return pgL, sqlLoopbackL, grpcLoopbackL.Connect, drpcLoopbackL.Connect, startRPCServer, nil
227240
}

pkg/server/tenant.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error {
604604
lf = s.sqlServer.cfg.RPCListenerFactory
605605
}
606606

607-
pgL, loopbackPgL, rpcLoopbackDialFn, startRPCServer, err := startListenRPCAndSQL(
607+
pgL, loopbackPgL, grpcLoopbackDialFn, drpcLoopbackDialFn, startRPCServer, err := startListenRPCAndSQL(
608608
ctx, workersCtx, *s.sqlServer.cfg, s.stopper,
609609
s.grpc, s.drpc, lf, enableSQLListener, s.cfg.AcceptProxyProtocolHeaders)
610610
if err != nil {
@@ -616,7 +616,8 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error {
616616
s.loopbackPgL = loopbackPgL
617617

618618
// Tell the RPC context how to connect in-memory.
619-
s.rpcContext.SetLoopbackDialer(rpcLoopbackDialFn)
619+
s.rpcContext.SetLoopbackDialer(grpcLoopbackDialFn)
620+
s.rpcContext.SetLoopbackDRPCDialer(drpcLoopbackDialFn)
620621

621622
// NB: This is where (*Server).PreStart() reports the listener readiness
622623
// via testing knobs: PauseAfterGettingRPCAddress, SignalAfterGettingRPCAddress.

0 commit comments

Comments
 (0)