Skip to content

Commit e05939f

Browse files
committed
TUN-8621: Prevent QUIC connection from closing before grace period after unregistering
Whenever cloudflared receives a SIGTERM or SIGINT it goes into graceful shutdown mode, which unregisters the connection and closes the control stream. Unregistering makes it so we no longer receive any new requests and makes the edge close the connection, allowing in-flight requests to finish (within a 3 minute period). This was working fine for http2 connections, but the quic proxy was cancelling the context as soon as the controls stream ended, forcing the process to stop immediately. This commit changes the behavior so that we wait the full grace period before cancelling the request
1 parent ab0bce5 commit e05939f

File tree

8 files changed

+53
-16
lines changed

8 files changed

+53
-16
lines changed

CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 2024.9.1
2+
### Bug Fixes
3+
- We fixed a bug related to `--grace-period`. Tunnels that use QUIC as transport weren't abiding by this waiting period before forcefully closing the connections to the edge. From now on, both QUIC and HTTP2 tunnels will wait for either the grace period to end (defaults to 30 seconds) or until the last in-flight request is handled. Users that wish to maintain the previous behavior should set `--grace-period` to 0 if `--protocol` is set to `quic`. This will force `cloudflared` to shutdown as soon as either SIGTERM or SIGINT is received.
4+
15
## 2024.2.1
26
### Notices
37
- Starting from this version, tunnel diagnostics will be enabled by default. This will allow the engineering team to remotely get diagnostics from cloudflared during debug activities. Users still have the capability to opt-out of this feature by defining `--management-diagnostics=false` (or env `TUNNEL_MANAGEMENT_DIAGNOSTICS`).

component-tests/test_termination.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,10 @@ def test_graceful_shutdown(self, tmp_path, component_tests_config, signal, proto
4545
with connected:
4646
connected.wait(self.timeout)
4747
# Send signal after the SSE connection is established
48-
self.terminate_by_signal(cloudflared, signal)
49-
self.wait_eyeball_thread(
50-
in_flight_req, self.grace_period + self.timeout)
48+
with self.within_grace_period():
49+
self.terminate_by_signal(cloudflared, signal)
50+
self.wait_eyeball_thread(
51+
in_flight_req, self.grace_period + self.timeout)
5152

5253
# test cloudflared terminates before grace period expires when all eyeball
5354
# connections are drained
@@ -66,7 +67,7 @@ def test_shutdown_once_no_connection(self, tmp_path, component_tests_config, sig
6667

6768
with connected:
6869
connected.wait(self.timeout)
69-
with self.within_grace_period():
70+
with self.within_grace_period(has_connection=False):
7071
# Send signal after the SSE connection is established
7172
self.terminate_by_signal(cloudflared, signal)
7273
self.wait_eyeball_thread(in_flight_req, self.grace_period)
@@ -78,7 +79,7 @@ def test_no_connection_shutdown(self, tmp_path, component_tests_config, signal,
7879
with start_cloudflared(
7980
tmp_path, config, cfd_pre_args=["tunnel", "--ha-connections", "1"], new_process=True, capture_output=False) as cloudflared:
8081
wait_tunnel_ready(tunnel_url=config.get_url())
81-
with self.within_grace_period():
82+
with self.within_grace_period(has_connection=False):
8283
self.terminate_by_signal(cloudflared, signal)
8384

8485
def terminate_by_signal(self, cloudflared, sig):
@@ -92,13 +93,21 @@ def wait_eyeball_thread(self, thread, timeout):
9293

9394
# Using this context asserts logic within the context is executed within grace period
9495
@contextmanager
95-
def within_grace_period(self):
96+
def within_grace_period(self, has_connection=True):
9697
try:
9798
start = time.time()
9899
yield
99100
finally:
101+
102+
# If the request takes longer than the grace period then we need to wait at most the grace period.
103+
# If the request fell within the grace period cloudflared can close earlier, but to ensure that it doesn't
104+
# close immediately we add a minimum boundary. If cloudflared shutdown in less than 1s it's likely that
105+
# it shutdown as soon as it received SIGINT. The only way cloudflared can close immediately is if it has no
106+
# in-flight requests
107+
minimum = 1 if has_connection else 0
100108
duration = time.time() - start
101-
assert duration < self.grace_period
109+
# Here we truncate to ensure that we don't fail on minute differences like 10.1 instead of 10
110+
assert minimum <= int(duration) <= self.grace_period
102111

103112
def stream_request(self, config, connected, early_terminate):
104113
expected_terminate_message = "502 Bad Gateway"

connection/control.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"net"
77
"time"
88

9+
"github.com/pkg/errors"
10+
911
"github.com/cloudflare/cloudflared/management"
1012
"github.com/cloudflare/cloudflared/tunnelrpc"
1113
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
@@ -116,27 +118,32 @@ func (c *controlStream) ServeControlStream(
116118
}
117119
}
118120

119-
c.waitForUnregister(ctx, registrationClient)
120-
return nil
121+
return c.waitForUnregister(ctx, registrationClient)
121122
}
122123

123-
func (c *controlStream) waitForUnregister(ctx context.Context, registrationClient tunnelrpc.RegistrationClient) {
124+
func (c *controlStream) waitForUnregister(ctx context.Context, registrationClient tunnelrpc.RegistrationClient) error {
124125
// wait for connection termination or start of graceful shutdown
125126
defer registrationClient.Close()
127+
var shutdownError error
126128
select {
127129
case <-ctx.Done():
130+
shutdownError = ctx.Err()
128131
break
129132
case <-c.gracefulShutdownC:
130133
c.stoppedGracefully = true
131134
}
132135

133136
c.observer.sendUnregisteringEvent(c.connIndex)
134-
registrationClient.GracefulShutdown(ctx, c.gracePeriod)
137+
err := registrationClient.GracefulShutdown(ctx, c.gracePeriod)
138+
if err != nil {
139+
return errors.Wrap(err, "Error shutting down control stream")
140+
}
135141
c.observer.log.Info().
136142
Int(management.EventTypeKey, int(management.Cloudflared)).
137143
Uint8(LogFieldConnIndex, c.connIndex).
138144
IPAddr(LogFieldIPAddress, c.edgeAddress).
139145
Msg("Unregistered tunnel connection")
146+
return shutdownError
140147
}
141148

142149
func (c *controlStream) IsStopped() bool {

connection/http2_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,9 @@ func (mc mockNamedTunnelRPCClient) RegisterConnection(
192192
}, nil
193193
}
194194

195-
func (mc mockNamedTunnelRPCClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) {
195+
func (mc mockNamedTunnelRPCClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) error {
196196
close(mc.unregistered)
197+
return nil
197198
}
198199

199200
func (mockNamedTunnelRPCClient) Close() {}

connection/quic.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type QUICConnection struct {
8383

8484
rpcTimeout time.Duration
8585
streamWriteTimeout time.Duration
86+
gracePeriod time.Duration
8687
}
8788

8889
// NewQUICConnection returns a new instance of QUICConnection.
@@ -100,6 +101,7 @@ func NewQUICConnection(
100101
packetRouterConfig *ingress.GlobalRouterConfig,
101102
rpcTimeout time.Duration,
102103
streamWriteTimeout time.Duration,
104+
gracePeriod time.Duration,
103105
) (*QUICConnection, error) {
104106
udpConn, err := createUDPConnForConnIndex(connIndex, localAddr, logger)
105107
if err != nil {
@@ -136,6 +138,7 @@ func NewQUICConnection(
136138
connIndex: connIndex,
137139
rpcTimeout: rpcTimeout,
138140
streamWriteTimeout: streamWriteTimeout,
141+
gracePeriod: gracePeriod,
139142
}, nil
140143
}
141144

@@ -158,8 +161,17 @@ func (q *QUICConnection) Serve(ctx context.Context) error {
158161
// In the future, if cloudflared can autonomously push traffic to the edge, we have to make sure the control
159162
// stream is already fully registered before the other goroutines can proceed.
160163
errGroup.Go(func() error {
161-
defer cancel()
162-
return q.serveControlStream(ctx, controlStream)
164+
// err is equal to nil if we exit due to unregistration. If that happens we want to wait the full
165+
// amount of the grace period, allowing requests to finish before we cancel the context, which will
166+
// make cloudflared exit.
167+
if err := q.serveControlStream(ctx, controlStream); err == nil {
168+
select {
169+
case <-ctx.Done():
170+
case <-time.Tick(q.gracePeriod):
171+
}
172+
}
173+
cancel()
174+
return err
163175
})
164176
errGroup.Go(func() error {
165177
defer cancel()

connection/quic_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -855,6 +855,7 @@ func testQUICConnection(udpListenerAddr net.Addr, t *testing.T, index uint8) *QU
855855
nil,
856856
15*time.Second,
857857
0*time.Second,
858+
0*time.Second,
858859
)
859860
require.NoError(t, err)
860861
return qc

supervisor/tunnel.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,7 @@ func (e *EdgeTunnelServer) serveQUIC(
604604
e.config.PacketConfig,
605605
e.config.RPCTimeout,
606606
e.config.WriteStreamTimeout,
607+
e.config.GracePeriod,
607608
)
608609
if err != nil {
609610
connLogger.ConnAwareLogger().Err(err).Msgf("Failed to create new quic connection")

tunnelrpc/registration_client.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type RegistrationClient interface {
2323
edgeAddress net.IP,
2424
) (*pogs.ConnectionDetails, error)
2525
SendLocalConfiguration(ctx context.Context, config []byte) error
26-
GracefulShutdown(ctx context.Context, gracePeriod time.Duration)
26+
GracefulShutdown(ctx context.Context, gracePeriod time.Duration) error
2727
Close()
2828
}
2929

@@ -79,7 +79,7 @@ func (r *registrationClient) SendLocalConfiguration(ctx context.Context, config
7979
return err
8080
}
8181

82-
func (r *registrationClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) {
82+
func (r *registrationClient) GracefulShutdown(ctx context.Context, gracePeriod time.Duration) error {
8383
ctx, cancel := context.WithTimeout(ctx, gracePeriod)
8484
defer cancel()
8585
defer metrics.CapnpMetrics.ClientOperations.WithLabelValues(metrics.Registration, metrics.OperationUnregisterConnection).Inc()
@@ -88,7 +88,9 @@ func (r *registrationClient) GracefulShutdown(ctx context.Context, gracePeriod t
8888
err := r.client.UnregisterConnection(ctx)
8989
if err != nil {
9090
metrics.CapnpMetrics.ClientFailures.WithLabelValues(metrics.Registration, metrics.OperationUnregisterConnection).Inc()
91+
return err
9192
}
93+
return nil
9294
}
9395

9496
func (r *registrationClient) Close() {

0 commit comments

Comments
 (0)