Skip to content

Commit 760a77e

Browse files
craig[bot]cthumuluru-crdbstevendanna
committed
152717: rpc, server: add loopback dialer support for DRPC r=cthumuluru-crdb a=cthumuluru-crdb Observed a performance regression in the sysbench `oltp_read_only` microbenchmark when DRPC was enabled by default (ref: #151312). No such regression was seen in `oltp_read_write`. Root cause analysis traced it to missing loopback dialer support in DRPC. Epic: CRDB-51459 Fixes: #148493 Release note: None 152921: kvcoord: correctly initialize txnWriteBuffer r=yuzefovich a=stevendanna Previously we were initializing the pipelineEnabler on a copy of the txnWriteBuffer that would later be overwritten. As a result, we were never re-enabling pipelining after a mid-transaction flush. This is OK from a correctness perspective since this was only intended as a potential performance improvement for some workloads under buffered writes. There is a potentially strong argument that we shouldn't do this at all and that if someone sees a regression of their workload under buffered writes they should turn it off rather than relying on this fallback. Epic: None Release note: None 152923: kvpb: always use t.ExpectExclusionSince in EarliestActiveTimestamp r=miraradeva a=stevendanna Previously, some cases used ExpectExclusionSince while others used ExpectExclusionSince.Next(). I think this was OK in practice because any write we needed to observe would need to be at ExpectExclusionSince.Next() since we set that field based on the ReadTimestamp of a locking Get. Such Get's scan with FailOnMoreRecent set which, despite the name, also fails if the current value equals the read timestamp. Thus, any new write violating our exclusion would have to be at .Next() or higher. If the above analysis doesn't convince your or is wrong, also note that this is only required for weak isolation buffered writes which can only be enabled via a non-public cluster setting (SSI transactions refresh their reads). In case the argument above is wrong, I've chosen to normalize these to the earlier timestamp. I've forgone a release note given the above argument. Epic: none Release note: None Co-authored-by: Chandra Thumuluru <[email protected]> Co-authored-by: Steven Danna <[email protected]>
4 parents 1df2c30 + c7ebb90 + 7c4290d + e52e4df commit 760a77e

File tree

12 files changed

+216
-28
lines changed

12 files changed

+216
-28
lines changed

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,9 +281,6 @@ func newRootTxnCoordSender(
281281
timeSource: timeutil.DefaultTimeSource{},
282282
txn: &tcs.mu.txn,
283283
}
284-
tcs.interceptorAlloc.txnWriteBuffer.init(
285-
&tcs.interceptorAlloc.txnPipeliner,
286-
)
287284

288285
tcs.initCommonInterceptors(tcf, txn, kv.RootTxn)
289286

@@ -363,6 +360,7 @@ func (tc *TxnCoordSender) initCommonInterceptors(
363360
allowConcurrentRequests: typ == kv.LeafTxn,
364361
}
365362
tc.interceptorAlloc.txnSeqNumAllocator.writeSeq = txn.Sequence
363+
tc.interceptorAlloc.txnWriteBuffer.init(&tc.interceptorAlloc.txnPipeliner)
366364
}
367365

368366
func (tc *TxnCoordSender) connectInterceptors() {

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_client_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,71 @@ func TestTxnCoordSenderWriteBufferingDisablesPipelining(t *testing.T) {
9292
kvpb.Scan, kvpb.Put, kvpb.EndTxn,
9393
}, calls)
9494
}
95+
96+
// TestTxnCoordSenderWriteBufferingReEnablesPipelining verifies that pipelining
97+
// is re-enabled after a mid-transaction flush.
98+
func TestTxnCoordSenderWriteBufferingReEnablesPipelining(t *testing.T) {
99+
defer leaktest.AfterTest(t)()
100+
defer log.Scope(t).Close(t)
101+
102+
ctx := context.Background()
103+
s := serverutils.StartServerOnly(t, base.TestServerArgs{})
104+
defer s.Stopper().Stop(ctx)
105+
106+
distSender := s.DistSenderI().(*DistSender)
107+
batchCount := 0
108+
var calls []kvpb.Method
109+
var senderFn kv.SenderFunc = func(
110+
ctx context.Context, ba *kvpb.BatchRequest,
111+
) (*kvpb.BatchResponse, *kvpb.Error) {
112+
batchCount++
113+
t.Logf("Batch: %#+v", ba.Methods())
114+
calls = append(calls, ba.Methods()...)
115+
if et, ok := ba.GetArg(kvpb.EndTxn); ok {
116+
// Ensure that no transactions enter a STAGING state.
117+
et.(*kvpb.EndTxnRequest).InFlightWrites = nil
118+
}
119+
return distSender.Send(ctx, ba)
120+
}
121+
122+
st := s.ClusterSettings()
123+
BufferedWritesMaxBufferSize.Override(ctx, &st.SV, defaultBufferSize)
124+
125+
tsf := NewTxnCoordSenderFactory(TxnCoordSenderFactoryConfig{
126+
AmbientCtx: s.AmbientCtx(),
127+
Settings: st,
128+
Clock: s.Clock(),
129+
Stopper: s.Stopper(),
130+
// Disable transaction heartbeats so that they don't disrupt our attempt to
131+
// track the requests issued by the transactions.
132+
HeartbeatInterval: -1,
133+
}, senderFn)
134+
db := kv.NewDB(s.AmbientCtx(), tsf, s.Clock(), s.Stopper())
135+
136+
require.NoError(t, db.Put(ctx, "test-key-a", "hello"))
137+
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
138+
txn.SetBufferedWritesEnabled(true)
139+
if err := txn.Put(ctx, "test-key-c", "hello"); err != nil {
140+
return err
141+
}
142+
if _, err := txn.DelRange(ctx, "test-key", "test-key-d", true); err != nil {
143+
return err
144+
}
145+
if err := txn.Put(ctx, "test-key-a", "hello"); err != nil {
146+
return err
147+
}
148+
return nil
149+
}))
150+
151+
require.Equal(t, 4, batchCount)
152+
require.Equal(t, []kvpb.Method{
153+
// The initial setup
154+
kvpb.Put,
155+
// The first (buffered) Put and the DeleteRange that flushes the buffer.
156+
kvpb.Put, kvpb.DeleteRange,
157+
// The second (pipelined) Put
158+
kvpb.Put,
159+
// EndTxn with the QueryIntent because pipelining was turned back on.
160+
kvpb.QueryIntent, kvpb.EndTxn,
161+
}, calls)
162+
}

pkg/kv/kvpb/batch.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (ba *BatchRequest) EarliestActiveTimestamp() hlc.Timestamp {
101101
//
102102
// See the example in RefreshRequest for more details.
103103
if !t.ExpectExclusionSince.IsEmpty() {
104-
ts.Backward(t.ExpectExclusionSince.Next())
104+
ts.Backward(t.ExpectExclusionSince)
105105
}
106106
case *ExportRequest:
107107
if !t.StartTime.IsEmpty() {
@@ -115,7 +115,7 @@ func (ba *BatchRequest) EarliestActiveTimestamp() hlc.Timestamp {
115115
//
116116
// See the example in RefreshRequest for more details.
117117
if !t.ExpectExclusionSince.IsEmpty() {
118-
ts.Backward(t.ExpectExclusionSince.Next())
118+
ts.Backward(t.ExpectExclusionSince)
119119
}
120120
case *PutRequest:
121121
// A PutRequest with ExpectExclusionSince set need to be able to observe MVCC

pkg/rpc/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ go_test(
183183
"@com_github_stretchr_testify//require",
184184
"@io_storj_drpc//drpcctx",
185185
"@io_storj_drpc//drpcmetadata",
186+
"@io_storj_drpc//drpcmux",
187+
"@io_storj_drpc//drpcserver",
186188
"@org_golang_google_grpc//:grpc",
187189
"@org_golang_google_grpc//codes",
188190
"@org_golang_google_grpc//credentials",

pkg/rpc/context.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import (
5454
"google.golang.org/grpc/stats"
5555
"storj.io/drpc"
5656
"storj.io/drpc/drpcclient"
57+
"storj.io/drpc/drpcmigrate"
5758
)
5859

5960
// NewServer sets up an RPC server. Depending on the ServerOptions, the Server
@@ -287,6 +288,9 @@ type Context struct {
287288
// the gRPC protocol over an in-memory pipe.
288289
loopbackDialFn func(context.Context) (net.Conn, error)
289290

291+
// This is similar to the loopbackDialFn above, but for DRPC connections.
292+
loopbackDRPCDialFn func(context.Context) (net.Conn, error)
293+
290294
// clientCreds is used to pass additional headers to called RPCs.
291295
clientCreds credentials.PerRPCCredentials
292296

@@ -295,7 +299,7 @@ type Context struct {
295299
windowSizeSettings
296300
}
297301

298-
// SetLoopbackDialer configures the loopback dialer function.
302+
// SetLoopbackDialer configures the loopback dialer function to dial gRPC connections.
299303
func (c *Context) SetLoopbackDialer(loopbackDialFn func(context.Context) (net.Conn, error)) {
300304
if c.ContextOptions.Knobs.NoLoopbackDialer {
301305
// A test has decided it is opting out of the special loopback
@@ -306,6 +310,17 @@ func (c *Context) SetLoopbackDialer(loopbackDialFn func(context.Context) (net.Co
306310
c.loopbackDialFn = loopbackDialFn
307311
}
308312

313+
// SetLoopbackDRPCDialer configures the loopback dialer function to dial DRPC connections.
314+
func (c *Context) SetLoopbackDRPCDialer(loopbackDialFn func(context.Context) (net.Conn, error)) {
315+
if c.ContextOptions.Knobs.NoLoopbackDialer {
316+
// A test has decided it is opting out of the special loopback
317+
// dialing mechanism. Obey it. We already have defined
318+
// loopbackDialFn in that case in NewContext().
319+
return
320+
}
321+
c.loopbackDRPCDialFn = loopbackDialFn
322+
}
323+
309324
// StoreLivenessGracePeriod computes the grace period after a store restarts before which it will
310325
// not withdraw support from other stores.
311326
func (c *Context) StoreLivenessWithdrawalGracePeriod() time.Duration {
@@ -596,6 +611,9 @@ func NewContext(ctx context.Context, opts ContextOptions) *Context {
596611
d := onlyOnceDialer{}
597612
return d.dial(ctx, opts.AdvertiseAddr)
598613
}
614+
rpcCtx.loopbackDRPCDialFn = func(ctx context.Context) (net.Conn, error) {
615+
return drpcmigrate.DialWithHeader(ctx, "tcp", opts.AdvertiseAddr, drpcmigrate.DRPCHeader)
616+
}
599617
}
600618

601619
// We only monitor remote clocks in server-to-server connections.

pkg/rpc/context_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package rpc
88
import (
99
"context"
1010
"crypto/rand"
11+
"crypto/tls"
1112
"fmt"
1213
"io"
1314
"net"
@@ -50,6 +51,8 @@ import (
5051
"google.golang.org/grpc/credentials"
5152
"google.golang.org/grpc/metadata"
5253
"google.golang.org/grpc/status"
54+
"storj.io/drpc/drpcmux"
55+
"storj.io/drpc/drpcserver"
5356
)
5457

5558
// TestingConnHealth returns nil if we have an open connection to the given
@@ -807,6 +810,86 @@ func TestConnectLoopback(t *testing.T) {
807810
require.Equal(t, codes.DataLoss, gogostatus.Code(errors.UnwrapAll(err)), "%+v", err)
808811
}
809812

813+
// TestDRPCConnectLoopback verifies that we correctly go through the internal
814+
// server when dialing the local address.
815+
func TestDRPCConnectLoopback(t *testing.T) {
816+
defer leaktest.AfterTest(t)()
817+
818+
stopper := stop.NewStopper()
819+
defer stopper.Stop(context.Background())
820+
821+
clock := timeutil.NewManualTime(timeutil.Unix(0, 1))
822+
maxOffset := time.Duration(0)
823+
824+
clusterID := uuid.MakeV4()
825+
const nodeID = 1
826+
827+
ctx := context.Background()
828+
clientCtx := newTestContext(clusterID, clock, maxOffset, stopper)
829+
clientCtx.NodeID.Set(context.Background(), nodeID)
830+
drpcLoopbackL := netutil.NewLoopbackListener(ctx, stopper)
831+
clientCtx.loopbackDRPCDialFn = func(ctx context.Context) (net.Conn, error) {
832+
return drpcLoopbackL.Connect(ctx)
833+
}
834+
// Ensure that there's no error connecting to the local node when an internal
835+
// server has been registered.
836+
clientCtx.SetLocalInternalServer(
837+
&internalServer{},
838+
ServerInterceptorInfo{}, ClientInterceptorInfo{})
839+
840+
// Create a DRPC server to listen on loopback.
841+
newTestServer := func() *drpcServer {
842+
d := &drpcServer{}
843+
mux := drpcmux.New()
844+
d.Server = drpcserver.NewWithOptions(mux, drpcserver.Options{})
845+
d.Mux = mux
846+
847+
return d
848+
}
849+
drpcS := newTestServer()
850+
851+
// Register a heartbeat service that always returns an error.
852+
errLoopback := gogostatus.Newf(codes.Unknown, "loopback!").Err() // TODO(server): replace code with DRPC error codes
853+
require.NoError(t, DRPCRegisterHeartbeat(drpcS, &ManualHeartbeatService{readyFn: func() error {
854+
return errLoopback
855+
}}))
856+
857+
tlsConfig, err := clientCtx.GetServerTLSConfig()
858+
require.NoError(t, err)
859+
860+
// Start the DRPC server on loopback.
861+
require.NoError(t, stopper.RunAsyncTask(ctx, "listen-server-loopback", func(ctx context.Context) {
862+
drpcLoopbackTLSL := tls.NewListener(drpcLoopbackL, tlsConfig)
863+
netutil.FatalIfUnexpected(drpcS.Serve(ctx, drpcLoopbackTLSL))
864+
}))
865+
866+
serveDRPC := func(stopper *stop.Stopper, drpcS *drpcServer, drpcTLSL net.Listener) error {
867+
waitQuiesce := func(context.Context) {
868+
<-stopper.ShouldQuiesce()
869+
netutil.FatalIfUnexpected(drpcTLSL.Close())
870+
}
871+
if err := stopper.RunAsyncTask(ctx, "listen-quiesce", waitQuiesce); err != nil {
872+
waitQuiesce(ctx)
873+
return err
874+
}
875+
876+
return stopper.RunAsyncTask(ctx, "listen-serve", func(context.Context) {
877+
netutil.FatalIfUnexpected(drpcS.Serve(ctx, drpcTLSL))
878+
})
879+
}
880+
drpcL, err := net.Listen("tcp", util.TestAddr.String())
881+
require.NoError(t, err)
882+
drpcTLSL := tls.NewListener(drpcL, tlsConfig)
883+
require.NoError(t, serveDRPC(stopper, drpcS, drpcTLSL))
884+
885+
addr := drpcL.Addr().String()
886+
clientCtx.AdvertiseAddr = addr
887+
888+
// Connect and get the error that comes from loopbackLn, proving that we were routed there.
889+
_, err = clientCtx.DRPCDialNode(addr, nodeID, roachpb.Locality{}, rpcbase.DefaultClass).Connect(ctx)
890+
require.Equal(t, codes.Unknown, gogostatus.Code(errors.UnwrapAll(err)), "%+v", err)
891+
}
892+
810893
func TestOffsetMeasurement(t *testing.T) {
811894
defer leaktest.AfterTest(t)()
812895

pkg/rpc/drpc.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,12 @@ func DialDRPC(
5353
pooledConn := pool.Get(ctx /* unused */, struct{}{}, func(ctx context.Context,
5454
_ struct{}) (drpcpool.Conn, error) {
5555

56-
netConn, err := drpcmigrate.DialWithHeader(ctx, "tcp", target, drpcmigrate.DRPCHeader)
56+
netConn, err := func(ctx context.Context) (net.Conn, error) {
57+
if rpcCtx.ContextOptions.AdvertiseAddr == target && !rpcCtx.ClientOnly {
58+
return rpcCtx.loopbackDRPCDialFn(ctx)
59+
}
60+
return drpcmigrate.DialWithHeader(ctx, "tcp", target, drpcmigrate.DRPCHeader)
61+
}(ctx)
5762
if err != nil {
5863
return nil, err
5964
}

pkg/server/drpc_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -220,16 +220,13 @@ func TestDialDRPC_InterceptorsAreSet(t *testing.T) {
220220
args := base.TestClusterArgs{
221221
ReplicationMode: base.ReplicationManual,
222222
ServerArgs: base.TestServerArgs{
223-
Settings: cluster.MakeClusterSettings(),
224-
Insecure: true,
223+
Settings: cluster.MakeClusterSettings(),
224+
DefaultDRPCOption: base.TestDRPCEnabled,
225225
},
226226
}
227227

228-
rpc.ExperimentalDRPCEnabled.Override(ctx, &args.ServerArgs.Settings.SV, true)
229228
c := testcluster.StartTestCluster(t, numNodes, args)
230229
defer c.Stopper().Stop(ctx)
231-
require.True(t, c.Server(0).RPCContext().Insecure)
232-
233230
rpcAddr := c.Server(0).RPCAddr()
234231

235232
// Client setup
@@ -238,9 +235,10 @@ func TestDialDRPC_InterceptorsAreSet(t *testing.T) {
238235
rpcContextOptions.Stopper = c.Stopper()
239236
rpcContextOptions.Settings = c.Server(0).ClusterSettings()
240237
rpcCtx := rpc.NewContext(ctx, rpcContextOptions)
241-
rpcCtx.ContextOptions = rpc.ContextOptions{Insecure: true}
238+
242239
// Adding test interceptors
243240
rpcCtx.Knobs = rpc.ContextTestingKnobs{
241+
NoLoopbackDialer: true,
244242
UnaryClientInterceptorDRPC: func(target string, class rpcbase.ConnectionClass) drpcclient.UnaryClientInterceptor {
245243
return mockUnaryInterceptor
246244
},

pkg/server/server.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1659,7 +1659,7 @@ func (s *topLevelServer) PreStart(ctx context.Context) error {
16591659
// and dispatches the server worker for the RPC.
16601660
// The SQL listener is returned, to start the SQL server later
16611661
// below when the server has initialized.
1662-
pgL, loopbackPgL, rpcLoopbackDialFn, startRPCServer, err := startListenRPCAndSQL(
1662+
pgL, loopbackPgL, grpcLoopbackDialFn, drpcLoopbackDialFn, startRPCServer, err := startListenRPCAndSQL(
16631663
ctx, workersCtx, s.cfg.BaseConfig,
16641664
s.stopper, s.grpc, s.drpc, ListenAndUpdateAddrs, true /* enableSQLListener */, s.cfg.AcceptProxyProtocolHeaders)
16651665
if err != nil {
@@ -1669,7 +1669,8 @@ func (s *topLevelServer) PreStart(ctx context.Context) error {
16691669
s.loopbackPgL = loopbackPgL
16701670

16711671
// Tell the RPC context how to connect in-memory.
1672-
s.rpcContext.SetLoopbackDialer(rpcLoopbackDialFn)
1672+
s.rpcContext.SetLoopbackDialer(grpcLoopbackDialFn)
1673+
s.rpcContext.SetLoopbackDRPCDialer(drpcLoopbackDialFn)
16731674

16741675
if s.cfg.TestingKnobs.Server != nil {
16751676
knobs := s.cfg.TestingKnobs.Server.(*TestingKnobs)

0 commit comments

Comments
 (0)