Skip to content

Commit 10e3b11

Browse files
craig[bot]shghasemitbg
committed
155150: pgwire: limit the number of connection error logs r=shghasemi a=shghasemi Rejecting new connections while server is draining printed too many logs, making it difficult to spot other errors during drain. Added a EveryN log to limit the log frequency to every 10 seconds. Fixes #102339 Release note: None 155154: rpc: disable loopbackTransport via NoLoopbackDialer knob r=tbg a=tbg Prior to this commit, the NoLoopbackDialer knob would replace the loopback dialer implementation with the regular TCP dialer. However, the loopbackTransport (which is what causes a call to the loopbackDialerFn in the first place) has its own call path and in particular, it sets different gRPC options that might be cheaper than what is commonly used (for example, no snappy compression). It is desirable to remove this discrepancy. This commit disables use of the loopbackTransport entirely if the knob is set, the desired effect being that when talking to a TestCluster under this knob, it doesn't matter for performance whether the SQL gateway matches the location of the lease; local communication should be exactly as expensive as "remote" communication (all within the same process, but communicating through identical TCP network connections). Unblocks #153865. Epic: CRDB-55052 Co-authored-by: Shadi Ghasemitaheri <[email protected]> Co-authored-by: Tobias Grieger <[email protected]>
3 parents 13c99d3 + 2acab5d + dcd00a7 commit 10e3b11

File tree

3 files changed

+25
-12
lines changed

3 files changed

+25
-12
lines changed

pkg/rpc/context.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ import (
5555
"google.golang.org/grpc/stats"
5656
"storj.io/drpc"
5757
"storj.io/drpc/drpcclient"
58-
"storj.io/drpc/drpcmigrate"
5958
)
6059

6160
// NewServer sets up an RPC server. Depending on the ServerOptions, the Server
@@ -608,12 +607,12 @@ func NewContext(ctx context.Context, opts ContextOptions) *Context {
608607
if opts.Knobs.NoLoopbackDialer {
609608
// The test has decided it doesn't need/want a loopback dialer.
610609
// Ensure we still have a working dial function in that case.
610+
var errAttemptToLoopbackDial = errors.AssertionFailedf("loopback dialer called but NoLoopbackDialer was set")
611611
rpcCtx.loopbackDialFn = func(ctx context.Context) (net.Conn, error) {
612-
d := onlyOnceDialer{}
613-
return d.dial(ctx, opts.AdvertiseAddr)
612+
return nil, errAttemptToLoopbackDial
614613
}
615614
rpcCtx.loopbackDRPCDialFn = func(ctx context.Context) (net.Conn, error) {
616-
return drpcmigrate.DialWithHeader(ctx, "tcp", opts.AdvertiseAddr, drpcmigrate.DRPCHeader)
615+
return nil, errAttemptToLoopbackDial
617616
}
618617
}
619618

@@ -1403,13 +1402,17 @@ const (
14031402
tcpTransport transportType = true
14041403
)
14051404

1405+
func (rpcCtx *Context) canLoopbackDial() bool {
1406+
return !rpcCtx.ClientOnly && !rpcCtx.Knobs.NoLoopbackDialer
1407+
}
1408+
14061409
// GRPCDialOptions returns the minimal `grpc.DialOption`s necessary to connect
14071410
// to a server.
14081411
func (rpcCtx *Context) GRPCDialOptions(
14091412
ctx context.Context, target string, class rpcbase.ConnectionClass,
14101413
) ([]grpc.DialOption, error) {
14111414
transport := tcpTransport
1412-
if rpcCtx.ContextOptions.AdvertiseAddr == target && !rpcCtx.ClientOnly {
1415+
if rpcCtx.ContextOptions.AdvertiseAddr == target && rpcCtx.canLoopbackDial() {
14131416
// See the explanation on loopbackDialFn for an explanation about this.
14141417
transport = loopbackTransport
14151418
}
@@ -2042,7 +2045,7 @@ func (rpcCtx *Context) grpcDialRaw(
20422045
additionalOpts ...grpc.DialOption,
20432046
) (*grpc.ClientConn, error) {
20442047
transport := tcpTransport
2045-
if rpcCtx.ContextOptions.AdvertiseAddr == target && !rpcCtx.ClientOnly {
2048+
if rpcCtx.ContextOptions.AdvertiseAddr == target && rpcCtx.canLoopbackDial() {
20462049
// See the explanation on loopbackDialFn for an explanation about this.
20472050
transport = loopbackTransport
20482051
}

pkg/rpc/context_testutils.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,16 @@ type ContextTestingKnobs struct {
5656
// this value if non-nil at construction time.
5757
StorageClusterID *uuid.UUID
5858

59-
// NoLoopbackDialer, when set, indicates that a test does not care
60-
// about the special loopback dial semantics.
61-
// If this is left unset, the test is responsible for ensuring
62-
// SetLoopbackDialer() has been called on the rpc.Context.
63-
// (This is done automatically by server.Server/server.SQLServerWrapper.)
59+
// NoLoopbackDialer, when true, indicates that a test does not care about the
60+
// special loopback dial semantics. In this case, an error results from use of
61+
// the loopbackTransport or loopbackDialerFn (which would indicate a
62+
// programming error in RPCContext, since NoLoopbackDialer is consulted when
63+
// determining whether to use those).
64+
//
65+
// When false, SetLoopbackDialer() must have been called on the rpc.Context In
66+
// productino code, this is done automatically by
67+
// server.Server/server.SQLServerWrapper, but some lower-level tests may have
68+
// to do it manually.
6469
NoLoopbackDialer bool
6570
}
6671

pkg/sql/pgwire/server.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,8 @@ type Server struct {
309309
// destinations tracks the metrics for each destination.
310310
destinations map[string]*destinationMetrics
311311
}
312+
// limit the number of rejectNewConnection errors
313+
connectionErrorLogEveryN log.EveryN
312314

313315
auth struct {
314316
syncutil.RWMutex
@@ -490,6 +492,7 @@ func MakeServer(
490492
server.mu.drainCh = make(chan struct{})
491493
server.mu.destinations = make(map[string]*destinationMetrics)
492494
server.mu.Unlock()
495+
server.connectionErrorLogEveryN = log.Every(10 * time.Second)
493496
executorConfig.CidrLookup.SetOnChange(server.onCidrChange)
494497

495498
connAuthConf.SetOnChange(&st.SV, func(ctx context.Context) {
@@ -931,7 +934,9 @@ func (s *Server) ServeConn(
931934
st := s.execCfg.Settings
932935
// If the server is shutting down, terminate the connection early.
933936
if rejectNewConnections {
934-
log.Ops.Info(ctx, "rejecting new connection while server is draining")
937+
if s.connectionErrorLogEveryN.ShouldLog() {
938+
log.Ops.Info(ctx, "rejecting new connection while server is draining")
939+
}
935940
return s.sendErr(ctx, st, conn, newAdminShutdownErr(ErrDrainingNewConn))
936941
}
937942

0 commit comments

Comments
 (0)