Skip to content

Commit 0d3c8c2

Browse files
committed
Add support for SO_REUSEPORT to vtgate (vitessio#19168)
Signed-off-by: Riley Laine <rlaine@slack-corp.com>
1 parent 946a513 commit 0d3c8c2

File tree

12 files changed

+370
-14
lines changed

12 files changed

+370
-14
lines changed

go/flags/endtoend/vtgate.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ Flags:
188188
--redact-debug-ui-queries redact full queries and bind variables from debug UI
189189
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
190190
--retry-count int retry count (default 2)
191+
--reuse-port Enable SO_REUSEPORT when binding sockets; available on Linux 3.9+ (default false)
191192
--schema_change_signal Enable the schema tracker; requires queryserver-config-schema-change-signal to be enabled on the underlying vttablets for this to work (default true)
192193
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
193194
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice

go/mysql/server.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ func NewFromListener(
235235
handler Handler,
236236
connReadTimeout time.Duration,
237237
connWriteTimeout time.Duration,
238+
proxyProtocol bool,
238239
connBufferPooling bool,
239240
keepAlivePeriod time.Duration,
240241
flushDelay time.Duration,
@@ -252,6 +253,11 @@ func NewFromListener(
252253
FlushDelay: flushDelay,
253254
MultiQuery: multiQuery,
254255
}
256+
257+
if proxyProtocol {
258+
cfg.Listener = &proxyproto.Listener{Listener: l}
259+
}
260+
255261
return NewListenerWithConfig(cfg)
256262
}
257263

@@ -272,12 +278,8 @@ func NewListener(
272278
if err != nil {
273279
return nil, err
274280
}
275-
if proxyProtocol {
276-
proxyListener := &proxyproto.Listener{Listener: listener}
277-
return NewFromListener(proxyListener, authServer, handler, connReadTimeout, connWriteTimeout, connBufferPooling, keepAlivePeriod, flushDelay, multiQuery)
278-
}
279281

280-
return NewFromListener(listener, authServer, handler, connReadTimeout, connWriteTimeout, connBufferPooling, keepAlivePeriod, flushDelay, multiQuery)
282+
return NewFromListener(listener, authServer, handler, connReadTimeout, connWriteTimeout, proxyProtocol, connBufferPooling, keepAlivePeriod, flushDelay, multiQuery)
281283
}
282284

283285
// ListenerConfig should be used with NewListenerWithConfig to specify listener parameters.

go/mysql/server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ func TestConnectionFromListener(t *testing.T) {
300300
listener, err := net.Listen("tcp", "127.0.0.1:")
301301
require.NoError(t, err, "net.Listener failed")
302302

303-
l, err := NewFromListener(listener, authServer, th, 0, 0, false, 0, 0, false)
303+
l, err := NewFromListener(listener, authServer, th, 0, 0, false, false, 0, 0, false)
304304
require.NoError(t, err, "NewListener failed")
305305
host, port := getHostPort(t, l.Addr())
306306
fmt.Printf("host: %s, port: %d\n", host, port)

go/netutil/reuseport_unix.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
//go:build !windows
2+
3+
/*
4+
Copyright 2026 The Vitess Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package netutil
20+
21+
import (
22+
"context"
23+
"fmt"
24+
"net"
25+
"syscall"
26+
27+
"golang.org/x/sys/unix"
28+
)
29+
30+
// ListenReusePort binds a host:port and sets SO_REUSEPORT on the listener.
31+
// SO_REUSEPORT allows multiple processes to bind to the same port, enabling
32+
// kernel-level load balancing of incoming connections.
33+
//
34+
// Requires Linux 3.9+ or equivalent kernel support.
35+
func ListenReusePort(network, address string) (net.Listener, error) {
36+
switch network {
37+
case "tcp", "tcp4", "tcp6":
38+
default:
39+
return nil, fmt.Errorf("SO_REUSEPORT: protocol not supported: %s", network)
40+
}
41+
42+
lc := net.ListenConfig{
43+
Control: func(network, address string, c syscall.RawConn) error {
44+
var opErr error
45+
err := c.Control(func(fd uintptr) {
46+
opErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
47+
})
48+
if err != nil {
49+
return err
50+
}
51+
if opErr != nil {
52+
return fmt.Errorf("failed to set SO_REUSEPORT on %s %s: %w", network, address, opErr)
53+
}
54+
return nil
55+
},
56+
}
57+
58+
return lc.Listen(context.Background(), network, address)
59+
}

go/netutil/reuseport_unix_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
//go:build !windows
2+
3+
/*
4+
Copyright 2026 The Vitess Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package netutil
20+
21+
import (
22+
"net"
23+
"testing"
24+
25+
"golang.org/x/sys/unix"
26+
)
27+
28+
func TestListenReusePort(t *testing.T) {
29+
l1, err := ListenReusePort("tcp", "127.0.0.1:0")
30+
if err != nil {
31+
t.Fatal(err)
32+
}
33+
defer l1.Close()
34+
35+
// Bind to the same address. This should be possible with SO_REUSEPORT.
36+
addr := l1.Addr().String()
37+
l2, err := ListenReusePort("tcp", addr)
38+
if err != nil {
39+
t.Fatal(err)
40+
}
41+
defer l2.Close()
42+
43+
tcpListener := l1.(*net.TCPListener)
44+
file, err := tcpListener.File()
45+
if err != nil {
46+
t.Fatal(err)
47+
}
48+
defer file.Close()
49+
50+
val, err := unix.GetsockoptInt(int(file.Fd()), unix.SOL_SOCKET, unix.SO_REUSEPORT)
51+
if err != nil {
52+
t.Fatal(err)
53+
}
54+
if val != 1 {
55+
t.Fatalf("SO_REUSEPORT not set: got %d, want 1", val)
56+
}
57+
}

go/netutil/reuseport_windows.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
//go:build windows
2+
3+
/*
4+
Copyright 2026 The Vitess Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package netutil
20+
21+
import (
22+
"fmt"
23+
"net"
24+
"runtime"
25+
)
26+
27+
// ListenReusePort binds a host:port and sets SO_REUSEPORT on the listener.
28+
// SO_REUSEPORT allows multiple processes to bind to the same port, enabling
29+
// kernel-level load balancing of incoming connections.
30+
//
31+
// Requires Linux 3.9+ or equivalent kernel support.
32+
func ListenReusePort(network, address string) (net.Listener, error) {
33+
return nil, fmt.Errorf("SO_REUSEPORT: not supported on OS: %s", runtime.GOOS)
34+
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
//go:build !windows
2+
3+
/*
4+
Copyright 2024 The Vitess Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package reuseport
20+
21+
import (
22+
"context"
23+
_ "embed"
24+
"flag"
25+
"os"
26+
"testing"
27+
28+
"github.com/stretchr/testify/require"
29+
30+
"vitess.io/vitess/go/mysql"
31+
"vitess.io/vitess/go/test/endtoend/cluster"
32+
"vitess.io/vitess/go/test/endtoend/utils"
33+
"vitess.io/vitess/go/vt/log"
34+
)
35+
36+
var (
37+
keyspaceName = "ks"
38+
cell = "zone-1"
39+
40+
//go:embed schema.sql
41+
schemaSQL string
42+
)
43+
44+
func TestMain(m *testing.M) {
45+
flag.Parse()
46+
os.Exit(m.Run())
47+
}
48+
49+
func setupCluster(t *testing.T) (*cluster.LocalProcessCluster, mysql.ConnParams) {
50+
clusterInstance := cluster.NewCluster(cell, "localhost")
51+
52+
// Start topo server
53+
err := clusterInstance.StartTopo()
54+
require.NoError(t, err)
55+
56+
// Start keyspace
57+
keyspace := &cluster.Keyspace{
58+
Name: keyspaceName,
59+
SchemaSQL: schemaSQL,
60+
}
61+
err = clusterInstance.StartUnshardedKeyspace(*keyspace, 0, false)
62+
require.NoError(t, err)
63+
64+
// Start vtgate
65+
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--reuse-port")
66+
err = clusterInstance.StartVtgate()
67+
require.NoError(t, err)
68+
69+
vtParams := clusterInstance.GetVTParams(keyspaceName)
70+
return clusterInstance, vtParams
71+
}
72+
73+
func start(t *testing.T, vtParams mysql.ConnParams) (*mysql.Conn, func()) {
74+
vtConn, err := mysql.Connect(context.Background(), &vtParams)
75+
require.NoError(t, err)
76+
77+
deleteAll := func() {
78+
_, _ = utils.ExecAllowError(t, vtConn, "set workload = oltp")
79+
80+
tables := []string{"t1"}
81+
for _, table := range tables {
82+
_, _ = utils.ExecAllowError(t, vtConn, "delete from "+table)
83+
}
84+
}
85+
86+
deleteAll()
87+
88+
return vtConn, func() {
89+
deleteAll()
90+
vtConn.Close()
91+
}
92+
}
93+
94+
func TestReusePort(t *testing.T) {
95+
clusterInstance, vtParams := setupCluster(t)
96+
defer clusterInstance.Teardown()
97+
98+
// Create a connection to the first vtgate
99+
vtConn, closer := start(t, vtParams)
100+
defer closer()
101+
102+
// Create a second vtgate with the same configuration
103+
duplicateVtGate := cluster.VtgateProcessInstance(
104+
clusterInstance.GetAndReservePort(),
105+
clusterInstance.VtgateGrpcPort,
106+
clusterInstance.VtgateMySQLPort,
107+
clusterInstance.Cell,
108+
clusterInstance.Cell,
109+
clusterInstance.Hostname,
110+
"PRIMARY",
111+
clusterInstance.TopoProcess.Port,
112+
clusterInstance.TmpDirectory,
113+
clusterInstance.VtGateExtraArgs,
114+
clusterInstance.VtGatePlannerVersion)
115+
// Unix domain sockets do not support multiplexing
116+
duplicateVtGate.MySQLServerSocketPath = ""
117+
err := duplicateVtGate.Setup()
118+
require.NoError(t, err)
119+
defer func() {
120+
if err := duplicateVtGate.TearDown(); err != nil {
121+
log.Errorf("Error in vtgate teardown: %v", err)
122+
}
123+
}()
124+
125+
// Should be able to connect to the first vtgate
126+
_, err = vtConn.ExecuteFetch("select id1 from t1", 1, false)
127+
require.NoError(t, err)
128+
129+
// Tear down the first vtgate
130+
err = clusterInstance.VtgateProcess.TearDown()
131+
require.NoError(t, err)
132+
require.True(t, clusterInstance.VtgateProcess.IsShutdown())
133+
134+
// Should fail since the vtgate has stopped
135+
_, err = vtConn.ExecuteFetch("select id1 from t1", 1, false)
136+
require.Error(t, err, "first vtgate should be stopped and should not serve requests")
137+
138+
// Create a second connection with the same parameters, which will
139+
// now go to the duplicate vtgate
140+
vtConn2, err := mysql.Connect(context.Background(), &vtParams)
141+
require.NoError(t, err, "second vtgate should handle the connection")
142+
defer vtConn2.Close()
143+
144+
// Should be able to fetch from the same host:port on the duplicate vtgate
145+
_, err = vtConn2.ExecuteFetch("select id1 from t1", 1, false)
146+
require.NoError(t, err, "second vtgate should serve requests")
147+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
create table t1(
2+
id1 bigint,
3+
id2 bigint,
4+
primary key(id1)
5+
) Engine=InnoDB;

go/vt/servenv/grpc_server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,8 @@ func serveGRPC() {
324324

325325
// listen on the port
326326
log.Infof("Listening for gRPC calls on port %v", gRPCPort)
327-
listener, err := net.Listen("tcp", net.JoinHostPort(gRPCBindAddress, strconv.Itoa(gRPCPort)))
327+
328+
listener, err := Listen("tcp", net.JoinHostPort(gRPCBindAddress, strconv.Itoa(gRPCPort)))
328329
if err != nil {
329330
log.Exitf("Cannot listen on port %v for gRPC: %v", gRPCPort, err)
330331
}

0 commit comments

Comments
 (0)