Skip to content

Commit 8c1f711

Browse files
committed
feature: introduce better timeout handling TCPQueueTimeoutServer, because ReadHeaderTimeoutServer and ReadTimeoutServer are not the right timeouts to use for a tcp queuerefactor: create metrics.NoMetrics to not have to check for nil accessing metrics.Metrics, this might be even an optimization because of omitting branches and compiler can likely figure out that we call an empty body function
test: add good coverage for StackListener and a benchmark benchmark: stackListner and queueListener Signed-off-by: Sandor Szücs <[email protected]>
1 parent c302468 commit 8c1f711

File tree

14 files changed

+1390
-7
lines changed

14 files changed

+1390
-7
lines changed

config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ type Config struct {
252252
BackendFlushInterval time.Duration `yaml:"backend-flush-interval"`
253253
ExperimentalUpgrade bool `yaml:"experimental-upgrade"`
254254
ExperimentalUpgradeAudit bool `yaml:"experimental-upgrade-audit"`
255+
TCPQueueTimeoutServer time.Duration `yaml:"tcp-queue-timeout-server"`
255256
ReadTimeoutServer time.Duration `yaml:"read-timeout-server"`
256257
ReadHeaderTimeoutServer time.Duration `yaml:"read-header-timeout-server"`
257258
WriteTimeoutServer time.Duration `yaml:"write-timeout-server"`
@@ -576,6 +577,7 @@ func NewConfig() *Config {
576577
flag.DurationVar(&cfg.BackendFlushInterval, "backend-flush-interval", 20*time.Millisecond, "flush interval for upgraded proxy connections")
577578
flag.BoolVar(&cfg.ExperimentalUpgrade, "experimental-upgrade", false, "enable experimental feature to handle upgrade protocol requests")
578579
flag.BoolVar(&cfg.ExperimentalUpgradeAudit, "experimental-upgrade-audit", false, "enable audit logging of the request line and the messages during the experimental web socket upgrades")
580+
flag.DurationVar(&cfg.TCPQueueTimeoutServer, "tcp-queue-timeout-server", time.Second, "set timeout for how long TCP connections can be queued in http server connections")
579581
flag.DurationVar(&cfg.ReadTimeoutServer, "read-timeout-server", 5*time.Minute, "set ReadTimeout for http server connections")
580582
flag.DurationVar(&cfg.ReadHeaderTimeoutServer, "read-header-timeout-server", 60*time.Second, "set ReadHeaderTimeout for http server connections")
581583
flag.DurationVar(&cfg.WriteTimeoutServer, "write-timeout-server", 60*time.Second, "set WriteTimeout for http server connections")
@@ -948,6 +950,7 @@ func (c *Config) ToOptions() skipper.Options {
948950
BackendFlushInterval: c.BackendFlushInterval,
949951
ExperimentalUpgrade: c.ExperimentalUpgrade,
950952
ExperimentalUpgradeAudit: c.ExperimentalUpgradeAudit,
953+
TCPQueueTimeoutServer: c.TCPQueueTimeoutServer,
951954
ReadTimeoutServer: c.ReadTimeoutServer,
952955
ReadHeaderTimeoutServer: c.ReadHeaderTimeoutServer,
953956
WriteTimeoutServer: c.WriteTimeoutServer,

config/config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ func defaultConfig(with func(*Config)) *Config {
129129
IdleConnsPerHost: 64,
130130
CloseIdleConnsPeriod: 20 * time.Second,
131131
BackendFlushInterval: 20 * time.Millisecond,
132+
TCPQueueTimeoutServer: time.Second,
132133
ReadTimeoutServer: 5 * time.Minute,
133134
ReadHeaderTimeoutServer: 1 * time.Minute,
134135
WriteTimeoutServer: 1 * time.Minute,

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ require (
6969
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
7070
github.com/Microsoft/go-winio v0.6.2 // indirect
7171
github.com/agnivade/levenshtein v1.2.1 // indirect
72+
github.com/amirylm/go-options v0.0.2 // indirect
73+
github.com/amirylm/lockfree v0.0.4 // indirect
7274
github.com/armon/go-metrics v0.4.1 // indirect
7375
github.com/beorn7/perks v1.0.1 // indirect
7476
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect
@@ -99,6 +101,7 @@ require (
99101
github.com/go-ole/go-ole v1.2.6 // indirect
100102
github.com/gobwas/glob v0.2.3 // indirect
101103
github.com/gogo/protobuf v1.3.2 // indirect
104+
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 // indirect
102105
github.com/golang/protobuf v1.5.4 // indirect
103106
github.com/google/btree v1.0.0 // indirect
104107
github.com/google/flatbuffers v25.2.10+incompatible // indirect

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
3434
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
3535
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
3636
github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
37+
github.com/amirylm/go-options v0.0.2 h1:OvuFcKUg3+7jdKeY54XrRnAIxP9Dmultlg9dS7Q3TpA=
38+
github.com/amirylm/go-options v0.0.2/go.mod h1:OmhJW65Aeyb74akzydI9SVgCjuwKlPNcTZeXk7TETPk=
39+
github.com/amirylm/lockfree v0.0.4 h1:SAC96Droepe6HjDqymFY3E6UyJ6GR2crOGvbXFlk+kY=
40+
github.com/amirylm/lockfree v0.0.4/go.mod h1:92tGIqOCCQdd9SR5nGLYwK4GN9PTKlQmRwXKxqfVz/U=
3741
github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig+0+Ap1h4unLjW6YQJpKZVmUzxsD4E/Q=
3842
github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE=
3943
github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA=
@@ -167,6 +171,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
167171
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
168172
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
169173
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
174+
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 h1:zN2lZNZRflqFyxVaTIU61KNKQ9C0055u9CAfpmqUvo4=
175+
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3/go.mod h1:nPpo7qLxd6XL3hWJG/O60sR8ZKfMCiIoNap5GvD12KU=
170176
github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
171177
github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI=
172178
github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=

metrics/metrics.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,35 @@ type Metrics interface {
8585
Close()
8686
}
8787

88+
type NoMetric struct{}
89+
90+
func (NoMetric) MeasureSince(string, time.Time) {}
91+
func (NoMetric) IncCounter(string) {}
92+
func (NoMetric) IncCounterBy(string, int64) {}
93+
func (NoMetric) IncFloatCounterBy(string, float64) {}
94+
func (NoMetric) MeasureRouteLookup(time.Time) {}
95+
func (NoMetric) MeasureFilterCreate(string, time.Time) {}
96+
func (NoMetric) MeasureFilterRequest(string, time.Time) {}
97+
func (NoMetric) MeasureAllFiltersRequest(string, time.Time) {}
98+
func (NoMetric) MeasureBackend(string, time.Time) {}
99+
func (NoMetric) MeasureBackendHost(string, time.Time) {}
100+
func (NoMetric) MeasureFilterResponse(string, time.Time) {}
101+
func (NoMetric) MeasureAllFiltersResponse(string, time.Time) {}
102+
func (NoMetric) MeasureResponse(int, string, string, time.Time) {}
103+
func (NoMetric) MeasureProxy(time.Duration, time.Duration) {}
104+
func (NoMetric) MeasureServe(string, string, string, int, time.Time) {}
105+
func (NoMetric) IncRoutingFailures() {}
106+
func (NoMetric) IncErrorsBackend(string) {}
107+
func (NoMetric) MeasureBackend5xx(time.Time) {}
108+
func (NoMetric) IncErrorsStreaming(string) {}
109+
func (NoMetric) RegisterHandler(string, *http.ServeMux) {}
110+
func (NoMetric) UpdateGauge(string, float64) {}
111+
func (NoMetric) IncValidRoutes() {}
112+
func (NoMetric) IncInvalidRoutes(string) {}
113+
func (NoMetric) Close() {}
114+
115+
var _ Metrics = NoMetric{}
116+
88117
// Options for initializing metrics collection.
89118
type Options struct {
90119
// the metrics exposing format.

queuelistener/listener.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const (
2020
maxCalculatedQueueSize = 50_000
2121
acceptedConnectionsKey = "listener.accepted.connections"
2222
queuedConnectionsKey = "listener.queued.connections"
23+
queueTimeoutKey = "listener.queued.timeouts"
2324
acceptLatencyKey = "listener.accept.latency"
2425
)
2526

@@ -96,6 +97,7 @@ type listener struct {
9697
var (
9798
token struct{}
9899
errListenerClosed = errors.New("listener closed")
100+
errAcceptTimeout = errors.New("accept timeout")
99101
)
100102

101103
func (c *connection) Close() error {
@@ -132,10 +134,7 @@ func (o Options) maxQueueSize() int64 {
132134
return int64(o.MaxQueueSize)
133135
}
134136

135-
maxQueueSize := 10 * o.maxConcurrency()
136-
if maxQueueSize > maxCalculatedQueueSize {
137-
maxQueueSize = maxCalculatedQueueSize
138-
}
137+
maxQueueSize := min(10*o.maxConcurrency(), maxCalculatedQueueSize)
139138

140139
return maxQueueSize
141140
}

queuelistener/stack.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package queuelistener
2+
3+
import (
4+
"sync"
5+
)
6+
7+
const stackSize int = 10000
8+
9+
type naiveStack[T any] struct {
10+
mu sync.Mutex
11+
top int
12+
items [stackSize]*T
13+
}
14+
15+
func NewStack() *naiveStack[external] {
16+
return &naiveStack[external]{
17+
top: -1,
18+
}
19+
}
20+
21+
func (s *naiveStack[T]) Push(data *T) {
22+
s.mu.Lock()
23+
defer s.mu.Unlock()
24+
25+
if s.top == len(s.items)-1 {
26+
return
27+
}
28+
29+
s.top++
30+
s.items[s.top] = data
31+
}
32+
33+
func (s *naiveStack[T]) Pop() *T {
34+
s.mu.Lock()
35+
defer s.mu.Unlock()
36+
37+
if s.top == -1 {
38+
return nil
39+
} else {
40+
defer func() { s.top-- }()
41+
}
42+
43+
return s.items[s.top]
44+
}

queuelistener/stack_listener.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package queuelistener
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"net"
7+
"net/http"
8+
"sync"
9+
"time"
10+
11+
"github.com/zalando/skipper/logging"
12+
"github.com/zalando/skipper/metrics"
13+
)
14+
15+
type stackListener struct {
16+
log logging.Logger
17+
metrics metrics.Metrics
18+
maxConcurrency int64
19+
maxQueueSize int64
20+
memoryLimitBytes int64
21+
connectionBytes int
22+
queueTimeout time.Duration
23+
stack *naiveStack[external]
24+
externalListener net.Listener
25+
acceptInternal chan external
26+
quit chan struct{}
27+
once sync.Once
28+
}
29+
30+
func StackListener(o Options) (net.Listener, error) {
31+
nl, err := net.Listen(o.Network, o.Address)
32+
if err != nil {
33+
return nil, fmt.Errorf("StackListener failed net.Listen: %w", err)
34+
}
35+
36+
acceptCH := make(chan external)
37+
38+
if o.Log == nil {
39+
o.Log = &logging.DefaultLog{}
40+
}
41+
42+
if o.MemoryLimitBytes <= 0 {
43+
o.MemoryLimitBytes = defaultMemoryLimitBytes
44+
}
45+
46+
if o.ConnectionBytes <= 0 {
47+
o.ConnectionBytes = defaultConnectionBytes
48+
}
49+
50+
m := o.Metrics
51+
if m == nil {
52+
m = metrics.NoMetric{}
53+
}
54+
l := &stackListener{
55+
log: o.Log,
56+
metrics: m,
57+
externalListener: nl,
58+
maxConcurrency: o.maxConcurrency(),
59+
maxQueueSize: o.maxQueueSize(),
60+
memoryLimitBytes: o.MemoryLimitBytes,
61+
connectionBytes: o.ConnectionBytes,
62+
queueTimeout: o.QueueTimeout,
63+
stack: NewStack(),
64+
acceptInternal: acceptCH,
65+
quit: make(chan struct{}),
66+
once: sync.Once{},
67+
}
68+
l.log.Infof("TCP lifo listener config: %s", l)
69+
70+
go l.listenExternal()
71+
go l.listenInternal()
72+
return l, nil
73+
}
74+
75+
func (l *stackListener) String() string {
76+
return fmt.Sprintf("stackListener concurrency: %d, queue size: %d, memory limit: %d, bytes per connection: %d, queue timeout: %s", l.maxConcurrency, l.maxQueueSize, l.memoryLimitBytes, l.connectionBytes, l.queueTimeout)
77+
}
78+
79+
func (l *stackListener) Accept() (net.Conn, error) {
80+
select {
81+
case <-l.quit:
82+
return nil, errListenerClosed
83+
case c := <-l.acceptInternal:
84+
l.metrics.MeasureSince(acceptLatencyKey, c.accepted)
85+
d := time.Since(c.accepted)
86+
if d > l.queueTimeout {
87+
l.metrics.IncCounter(queueTimeoutKey)
88+
if c.Conn != nil {
89+
c.Conn.Close()
90+
}
91+
return nil, errAcceptTimeout
92+
}
93+
return c, nil
94+
}
95+
}
96+
97+
func (l *stackListener) Addr() net.Addr {
98+
return l.externalListener.Addr()
99+
}
100+
101+
func (l *stackListener) Close() error {
102+
l.once.Do(func() {
103+
close(l.quit)
104+
l.externalListener.Close()
105+
close(l.acceptInternal)
106+
})
107+
108+
return nil
109+
}
110+
111+
func (l *stackListener) listenExternal() {
112+
var (
113+
err error
114+
c net.Conn
115+
)
116+
for {
117+
select {
118+
case <-l.quit:
119+
return
120+
default:
121+
}
122+
123+
c, err = l.externalListener.Accept()
124+
if err != nil {
125+
if errors.Is(err, http.ErrServerClosed) {
126+
l.log.Infof("Server closed: %v", err)
127+
return
128+
}
129+
130+
// client closed for example
131+
//l.log.Infof("Failed to accept connection (%T): %v", err, err)
132+
if c != nil {
133+
l.log.Info("close connection")
134+
c.Close()
135+
}
136+
continue
137+
}
138+
cc := external{c, time.Now()}
139+
l.stack.Push(&cc)
140+
}
141+
}
142+
143+
func (l *stackListener) listenInternal() {
144+
for {
145+
select {
146+
case <-l.quit:
147+
return
148+
default:
149+
}
150+
cc := l.stack.Pop()
151+
if cc == nil {
152+
// reduce cpu usage caused by busywait
153+
time.Sleep(10 * time.Microsecond)
154+
continue
155+
}
156+
l.metrics.IncCounter(acceptedConnectionsKey)
157+
l.metrics.UpdateGauge(queuedConnectionsKey, float64(l.stack.top+1))
158+
159+
l.acceptInternal <- *cc
160+
}
161+
}

0 commit comments

Comments
 (0)