Skip to content

Commit 9d2edb8

Browse files
Move concurrent connection dial limit out of healthcheck.
There's a concurrent connection dial limit implemented in the healthcheck code that doesn't semantically or logically belong here. First, the healthcheck code does neither know nor care about the network protocol used to execute healthchecks. Arguably, there's no other protocol used for this apart from `grpc`, but it seems wrong to set up a `grpc` connection specific options in the healthcheck code. Additionally, the dial concurrency limit modifies the global `grpc` connection options the first time a healthcheck is started. That seems unexpected, and I believe we want the concurrency limit set for all `grpc`` dial operations, irrespective of whether those connections are used for healtchecking or anything else. This change moves the concurrency limit into the `grpcclient` package, and sets it on any `grpc` connection opened via that package. Signed-off-by: Arthur Schreiber <[email protected]>
1 parent 3d36adb commit 9d2edb8

File tree

3 files changed

+61
-40
lines changed

3 files changed

+61
-40
lines changed

go/vt/discovery/healthcheck.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ import (
4646
"github.com/google/safehtml/template"
4747
"github.com/google/safehtml/template/uncheckedconversions"
4848
"github.com/spf13/pflag"
49-
"golang.org/x/sync/semaphore"
5049

5150
"vitess.io/vitess/go/flagutil"
5251
"vitess.io/vitess/go/netutil"
@@ -92,9 +91,6 @@ var (
9291
// refreshKnownTablets tells us whether to process all tablets or only new tablets.
9392
refreshKnownTablets = true
9493

95-
// healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000.
96-
healthCheckDialConcurrency int64 = 1024
97-
9894
// How much to sleep between each check.
9995
waitAvailableTabletInterval = 100 * time.Millisecond
10096

@@ -177,7 +173,6 @@ func registerWebUIFlags(fs *pflag.FlagSet) {
177173
fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.")
178174
fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.")
179175
fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.")
180-
fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.")
181176
ParseTabletURLTemplateFromFlag()
182177
}
183178

@@ -297,8 +292,6 @@ type HealthCheckImpl struct {
297292
subscribers map[chan *TabletHealth]struct{}
298293
// loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted
299294
loadTabletsTrigger chan struct{}
300-
// healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once.
301-
healthCheckDialSem *semaphore.Weighted
302295
}
303296

304297
// NewHealthCheck creates a new HealthCheck object.
@@ -333,7 +326,6 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
333326
cell: localCell,
334327
retryDelay: retryDelay,
335328
healthCheckTimeout: healthCheckTimeout,
336-
healthCheckDialSem: semaphore.NewWeighted(healthCheckDialConcurrency),
337329
healthByAlias: make(map[tabletAliasString]*tabletHealthCheck),
338330
healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth),
339331
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
@@ -844,7 +836,7 @@ func (hc *HealthCheckImpl) TabletConnection(ctx context.Context, alias *topodata
844836
// TODO: test that throws this error
845837
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias)
846838
}
847-
return thc.Connection(ctx, hc), nil
839+
return thc.Connection(ctx), nil
848840
}
849841

850842
// getAliasByCell should only be called while holding hc.mu

go/vt/discovery/tablet_health_check.go

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package discovery
1919
import (
2020
"context"
2121
"fmt"
22-
"net"
2322
"strings"
2423
"sync"
2524
"sync/atomic"
@@ -34,16 +33,12 @@ import (
3433
"vitess.io/vitess/go/vt/vttablet/queryservice"
3534
"vitess.io/vitess/go/vt/vttablet/tabletconn"
3635

37-
"google.golang.org/grpc"
3836
"google.golang.org/protobuf/proto"
3937

4038
"vitess.io/vitess/go/vt/proto/query"
4139
"vitess.io/vitess/go/vt/proto/topodata"
4240
)
4341

44-
// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options.
45-
var withDialerContextOnce sync.Once
46-
4742
// tabletHealthCheck maintains the health status of a tablet. A map of this
4843
// structure is maintained in HealthCheck.
4944
type tabletHealthCheck struct {
@@ -127,8 +122,8 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) {
127122
}
128123

129124
// stream streams healthcheck responses to callback.
130-
func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*query.StreamHealthResponse) error) error {
131-
conn := thc.Connection(ctx, hc)
125+
func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.StreamHealthResponse) error) error {
126+
conn := thc.Connection(ctx)
132127
if conn == nil {
133128
// This signals the caller to retry
134129
return nil
@@ -141,34 +136,14 @@ func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, c
141136
return err
142137
}
143138

144-
func (thc *tabletHealthCheck) Connection(ctx context.Context, hc *HealthCheckImpl) queryservice.QueryService {
139+
func (thc *tabletHealthCheck) Connection(ctx context.Context) queryservice.QueryService {
145140
thc.connMu.Lock()
146141
defer thc.connMu.Unlock()
147-
return thc.connectionLocked(ctx, hc)
148-
}
149-
150-
func healthCheckDialerFactory(hc *HealthCheckImpl) func(ctx context.Context, addr string) (net.Conn, error) {
151-
return func(ctx context.Context, addr string) (net.Conn, error) {
152-
// Limit the number of healthcheck connections opened in parallel to avoid high OS-thread
153-
// usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens,
154-
// etc). Without this limit it is possible for vtgates watching >10k tablets to hit
155-
// the panic: 'runtime: program exceeds 10000-thread limit'.
156-
if err := hc.healthCheckDialSem.Acquire(ctx, 1); err != nil {
157-
return nil, err
158-
}
159-
defer hc.healthCheckDialSem.Release(1)
160-
var dialer net.Dialer
161-
return dialer.DialContext(ctx, "tcp", addr)
162-
}
142+
return thc.connectionLocked(ctx)
163143
}
164144

165-
func (thc *tabletHealthCheck) connectionLocked(ctx context.Context, hc *HealthCheckImpl) queryservice.QueryService {
145+
func (thc *tabletHealthCheck) connectionLocked(ctx context.Context) queryservice.QueryService {
166146
if thc.Conn == nil {
167-
withDialerContextOnce.Do(func() {
168-
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
169-
return append(opts, grpc.WithContextDialer(healthCheckDialerFactory(hc))), nil
170-
})
171-
})
172147
conn, err := tabletconn.GetDialer()(ctx, thc.Tablet, grpcclient.FailFast(true))
173148
if err != nil {
174149
thc.LastError = err
@@ -297,7 +272,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
297272
}()
298273

299274
// Read stream health responses.
300-
err := thc.stream(streamCtx, hc, func(shr *query.StreamHealthResponse) error {
275+
err := thc.stream(streamCtx, func(shr *query.StreamHealthResponse) error {
301276
// We received a message. Reset the back-off.
302277
retryDelay = hc.retryDelay
303278
// Don't block on send to avoid deadlocks.

go/vt/grpcclient/client.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@ package grpcclient
2121
import (
2222
"context"
2323
"crypto/tls"
24+
"net"
2425
"sync"
2526
"time"
2627

2728
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
2829
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
2930
"github.com/spf13/pflag"
31+
"golang.org/x/sync/semaphore"
3032
"google.golang.org/grpc"
3133
"google.golang.org/grpc/credentials"
3234
"google.golang.org/grpc/credentials/insecure"
@@ -46,6 +48,10 @@ var (
4648
initialConnWindowSize int
4749
initialWindowSize int
4850

51+
// `dialConcurrencyLimit` tells us how many tablet grpc connections can be dialed concurrently.
52+
// This should be less than the golang max thread limit of 10000.
53+
dialConcurrencyLimit int64 = 1024
54+
4955
// every vitess binary that makes grpc client-side calls.
5056
grpcclientBinaries = []string{
5157
"mysqlctld",
@@ -74,9 +80,24 @@ func RegisterFlags(fs *pflag.FlagSet) {
7480
fs.StringVar(&credsFile, "grpc_auth_static_client_creds", credsFile, "When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.")
7581
}
7682

83+
func RegisterDialConcurrencyFlagsHealthcheck(fs *pflag.FlagSet) {
84+
// TODO: Deprecate this and rename it to `grpc-dial-concurrency-limit`
85+
fs.Int64Var(&dialConcurrencyLimit, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.")
86+
}
87+
88+
func RegisterDialConcurrencyFlags(fs *pflag.FlagSet) {
89+
fs.Int64Var(&dialConcurrencyLimit, "grpc-dial-concurrency-limit", 1024, "Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000.")
90+
}
91+
7792
func init() {
7893
for _, cmd := range grpcclientBinaries {
7994
servenv.OnParseFor(cmd, RegisterFlags)
95+
96+
if cmd == "vtgate" || cmd == "vtcombo" || cmd == "vtctld" {
97+
servenv.OnParseFor(cmd, RegisterDialConcurrencyFlagsHealthcheck)
98+
} else {
99+
servenv.OnParseFor(cmd, RegisterDialConcurrencyFlags)
100+
}
80101
}
81102
}
82103

@@ -129,6 +150,10 @@ func DialContext(ctx context.Context, target string, failFast FailFast, opts ...
129150
newopts = append(newopts, grpc.WithInitialWindowSize(int32(initialWindowSize)))
130151
}
131152

153+
if dialConcurrencyLimit > 0 {
154+
newopts = append(newopts, dialConcurrencyLimitOption())
155+
}
156+
132157
newopts = append(newopts, opts...)
133158
var err error
134159
grpcDialOptionsMu.Lock()
@@ -175,6 +200,35 @@ func SecureDialOption(cert, key, ca, crl, name string) (grpc.DialOption, error)
175200
return grpc.WithTransportCredentials(creds), nil
176201
}
177202

203+
var dialConcurrencyLimitOpt grpc.DialOption
204+
205+
// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options.
206+
var dialConcurrencyLimitOnce sync.Once
207+
208+
func dialConcurrencyLimitOption() grpc.DialOption {
209+
dialConcurrencyLimitOnce.Do(func() {
210+
// This semaphore is used to limit how many grpc connections can be dialed to tablets simultanously.
211+
// This does not limit how many tablet connections can be open at the same time.
212+
sem := semaphore.NewWeighted(dialConcurrencyLimit)
213+
214+
dialConcurrencyLimitOpt = grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
215+
// Limit the number of grpc connections opened in parallel to avoid high OS-thread
216+
// usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens,
217+
// etc). Without this limit it is possible for vtgates watching >10k tablets to hit
218+
// the panic: 'runtime: program exceeds 10000-thread limit'.
219+
if err := sem.Acquire(ctx, 1); err != nil {
220+
return nil, err
221+
}
222+
defer sem.Release(1)
223+
224+
var dialer net.Dialer
225+
return dialer.DialContext(ctx, "tcp", addr)
226+
})
227+
})
228+
229+
return dialConcurrencyLimitOpt
230+
}
231+
178232
// Allows for building a chain of interceptors without knowing the total size up front
179233
type clientInterceptorBuilder struct {
180234
unaryInterceptors []grpc.UnaryClientInterceptor

0 commit comments

Comments
 (0)