Skip to content

Commit 4eb0f8c

Browse files
committed
TUN-8861: Rename Session Limiter to Flow Limiter
## Summary Session is the concept used for UDP flows. Therefore, to make the session limiter ambiguous for both TCP and UDP, this commit renames it to flow limiter. Closes TUN-8861
1 parent 8c2eda1 commit 4eb0f8c

23 files changed

+295
-295
lines changed

connection/connection_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
pkgerrors "github.com/pkg/errors"
1313
"github.com/rs/zerolog"
1414

15-
cfdsession "github.com/cloudflare/cloudflared/session"
15+
cfdflow "github.com/cloudflare/cloudflared/flow"
1616

1717
"github.com/cloudflare/cloudflared/stream"
1818
"github.com/cloudflare/cloudflared/tracing"
@@ -107,7 +107,7 @@ func (moc *mockOriginProxy) ProxyTCP(
107107
r *TCPRequest,
108108
) error {
109109
if r.CfTraceID == "flow-rate-limited" {
110-
return pkgerrors.Wrap(cfdsession.ErrTooManyActiveSessions, "tcp flow rate limited")
110+
return pkgerrors.Wrap(cfdflow.ErrTooManyActiveFlows, "tcp flow rate limited")
111111
}
112112

113113
return nil

connection/http2.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
"github.com/rs/zerolog"
1717
"golang.org/x/net/http2"
1818

19-
cfdsession "github.com/cloudflare/cloudflared/session"
19+
cfdflow "github.com/cloudflare/cloudflared/flow"
2020

2121
"github.com/cloudflare/cloudflared/tracing"
2222
tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
@@ -336,7 +336,7 @@ func (rp *http2RespWriter) WriteErrorResponse(err error) bool {
336336
return false
337337
}
338338

339-
if errors.Is(err, cfdsession.ErrTooManyActiveSessions) {
339+
if errors.Is(err, cfdflow.ErrTooManyActiveFlows) {
340340
rp.setResponseMetaHeader(responseMetaHeaderCfdFlowRateLimited)
341341
} else {
342342
rp.setResponseMetaHeader(responseMetaHeaderCfd)

connection/quic_connection.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
"github.com/rs/zerolog"
1818
"golang.org/x/sync/errgroup"
1919

20-
cfdsession "github.com/cloudflare/cloudflared/session"
20+
cfdflow "github.com/cloudflare/cloudflared/flow"
2121

2222
cfdquic "github.com/cloudflare/cloudflared/quic"
2323
"github.com/cloudflare/cloudflared/tracing"
@@ -185,7 +185,7 @@ func (q *quicConnection) handleDataStream(ctx context.Context, stream *rpcquic.R
185185

186186
var metadata []pogs.Metadata
187187
// Check the type of error that was throw and add metadata that will help identify it on OTD.
188-
if errors.Is(err, cfdsession.ErrTooManyActiveSessions) {
188+
if errors.Is(err, cfdflow.ErrTooManyActiveFlows) {
189189
metadata = append(metadata, pogs.ErrorFlowConnectRateLimitedKey)
190190
}
191191

connection/quic_connection_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
"github.com/stretchr/testify/require"
3030
"golang.org/x/net/nettest"
3131

32-
cfdsession "github.com/cloudflare/cloudflared/session"
32+
cfdflow "github.com/cloudflare/cloudflared/flow"
3333

3434
"github.com/cloudflare/cloudflared/datagramsession"
3535
"github.com/cloudflare/cloudflared/ingress"
@@ -508,7 +508,7 @@ func TestBuildHTTPRequest(t *testing.T) {
508508

509509
func (moc *mockOriginProxyWithRequest) ProxyTCP(ctx context.Context, rwa ReadWriteAcker, tcpRequest *TCPRequest) error {
510510
if tcpRequest.Dest == "rate-limit-me" {
511-
return pkgerrors.Wrap(cfdsession.ErrTooManyActiveSessions, "failed tcp stream")
511+
return pkgerrors.Wrap(cfdflow.ErrTooManyActiveFlows, "failed tcp stream")
512512
}
513513

514514
_ = rwa.AckConnection("")
@@ -828,7 +828,7 @@ func testTunnelConnection(t *testing.T, serverAddr netip.AddrPort, index uint8)
828828
conn,
829829
index,
830830
sessionManager,
831-
cfdsession.NewLimiter(0),
831+
cfdflow.NewLimiter(0),
832832
datagramMuxer,
833833
packetRouter,
834834
15 * time.Second,

connection/quic_datagram_v2.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"go.opentelemetry.io/otel/trace"
1515
"golang.org/x/sync/errgroup"
1616

17-
cfdsession "github.com/cloudflare/cloudflared/session"
17+
cfdflow "github.com/cloudflare/cloudflared/flow"
1818

1919
"github.com/cloudflare/cloudflared/datagramsession"
2020
"github.com/cloudflare/cloudflared/ingress"
@@ -46,8 +46,8 @@ type datagramV2Connection struct {
4646

4747
// sessionManager tracks active sessions. It receives datagrams from quic connection via datagramMuxer
4848
sessionManager datagramsession.Manager
49-
// sessionLimiter tracks active sessions across the tunnel and limits new sessions if they are above the limit.
50-
sessionLimiter cfdsession.Limiter
49+
// flowLimiter tracks active sessions across the tunnel and limits new sessions if they are above the limit.
50+
flowLimiter cfdflow.Limiter
5151

5252
// datagramMuxer mux/demux datagrams from quic connection
5353
datagramMuxer *cfdquic.DatagramMuxerV2
@@ -65,7 +65,7 @@ func NewDatagramV2Connection(ctx context.Context,
6565
index uint8,
6666
rpcTimeout time.Duration,
6767
streamWriteTimeout time.Duration,
68-
sessionLimiter cfdsession.Limiter,
68+
flowLimiter cfdflow.Limiter,
6969
logger *zerolog.Logger,
7070
) DatagramSessionHandler {
7171
sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity)
@@ -77,7 +77,7 @@ func NewDatagramV2Connection(ctx context.Context,
7777
conn: conn,
7878
index: index,
7979
sessionManager: sessionManager,
80-
sessionLimiter: sessionLimiter,
80+
flowLimiter: flowLimiter,
8181
datagramMuxer: datagramMuxer,
8282
packetRouter: packetRouter,
8383
rpcTimeout: rpcTimeout,
@@ -121,7 +121,7 @@ func (q *datagramV2Connection) RegisterUdpSession(ctx context.Context, sessionID
121121
log := q.logger.With().Int(management.EventTypeKey, int(management.UDP)).Logger()
122122

123123
// Try to start a new session
124-
if err := q.sessionLimiter.Acquire(management.UDP.String()); err != nil {
124+
if err := q.flowLimiter.Acquire(management.UDP.String()); err != nil {
125125
log.Warn().Msgf("Too many concurrent sessions being handled, rejecting udp proxy to %s:%d", dstIP, dstPort)
126126

127127
err := pkgerrors.Wrap(err, "failed to start udp session due to rate limiting")
@@ -135,7 +135,7 @@ func (q *datagramV2Connection) RegisterUdpSession(ctx context.Context, sessionID
135135
if err != nil {
136136
log.Err(err).Msgf("Failed to create udp proxy to %s:%d", dstIP, dstPort)
137137
tracing.EndWithErrorStatus(registerSpan, err)
138-
q.sessionLimiter.Release()
138+
q.flowLimiter.Release()
139139
return nil, err
140140
}
141141
registerSpan.SetAttributes(
@@ -148,12 +148,12 @@ func (q *datagramV2Connection) RegisterUdpSession(ctx context.Context, sessionID
148148
originProxy.Close()
149149
log.Err(err).Str(datagramsession.LogFieldSessionID, datagramsession.FormatSessionID(sessionID)).Msgf("Failed to register udp session")
150150
tracing.EndWithErrorStatus(registerSpan, err)
151-
q.sessionLimiter.Release()
151+
q.flowLimiter.Release()
152152
return nil, err
153153
}
154154

155155
go func() {
156-
defer q.sessionLimiter.Release() // we do the release here, instead of inside the `serveUDPSession` just to keep all acquire/release calls in the same method.
156+
defer q.flowLimiter.Release() // we do the release here, instead of inside the `serveUDPSession` just to keep all acquire/release calls in the same method.
157157
q.serveUDPSession(session, closeAfterIdleHint)
158158
}()
159159

connection/quic_datagram_v2_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import (
1212
"github.com/stretchr/testify/require"
1313
"go.uber.org/mock/gomock"
1414

15+
cfdflow "github.com/cloudflare/cloudflared/flow"
1516
"github.com/cloudflare/cloudflared/mocks"
16-
cfdsession "github.com/cloudflare/cloudflared/session"
1717
)
1818

1919
type mockQuicConnection struct {
@@ -75,7 +75,7 @@ func TestRateLimitOnNewDatagramV2UDPSession(t *testing.T) {
7575
log := zerolog.Nop()
7676
conn := &mockQuicConnection{}
7777
ctrl := gomock.NewController(t)
78-
sessionLimiterMock := mocks.NewMockLimiter(ctrl)
78+
flowLimiterMock := mocks.NewMockLimiter(ctrl)
7979

8080
datagramConn := NewDatagramV2Connection(
8181
context.Background(),
@@ -84,13 +84,13 @@ func TestRateLimitOnNewDatagramV2UDPSession(t *testing.T) {
8484
0,
8585
0*time.Second,
8686
0*time.Second,
87-
sessionLimiterMock,
87+
flowLimiterMock,
8888
&log,
8989
)
9090

91-
sessionLimiterMock.EXPECT().Acquire("udp").Return(cfdsession.ErrTooManyActiveSessions)
92-
sessionLimiterMock.EXPECT().Release().Times(0)
91+
flowLimiterMock.EXPECT().Acquire("udp").Return(cfdflow.ErrTooManyActiveFlows)
92+
flowLimiterMock.EXPECT().Release().Times(0)
9393

9494
_, err := datagramConn.RegisterUdpSession(context.Background(), uuid.New(), net.IPv4(0, 0, 0, 0), 1000, 1*time.Second, "")
95-
require.ErrorIs(t, err, cfdsession.ErrTooManyActiveSessions)
95+
require.ErrorIs(t, err, cfdflow.ErrTooManyActiveFlows)
9696
}

flow/limiter.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package flow
2+
3+
import (
4+
"errors"
5+
"sync"
6+
)
7+
8+
const (
9+
unlimitedActiveFlows = 0
10+
)
11+
12+
var (
13+
ErrTooManyActiveFlows = errors.New("too many active flows")
14+
)
15+
16+
type Limiter interface {
17+
// Acquire tries to acquire a free slot for a flow, if the value of flows is already above
18+
// the maximum it returns ErrTooManyActiveFlows.
19+
Acquire(flowType string) error
20+
// Release releases a slot for a flow.
21+
Release()
22+
// SetLimit allows to hot swap the limit value of the limiter.
23+
SetLimit(uint64)
24+
}
25+
26+
type flowLimiter struct {
27+
limiterLock sync.Mutex
28+
activeFlowsCounter uint64
29+
maxActiveFlows uint64
30+
unlimited bool
31+
}
32+
33+
func NewLimiter(maxActiveFlows uint64) Limiter {
34+
flowLimiter := &flowLimiter{
35+
maxActiveFlows: maxActiveFlows,
36+
unlimited: isUnlimited(maxActiveFlows),
37+
}
38+
39+
return flowLimiter
40+
}
41+
42+
func (s *flowLimiter) Acquire(flowType string) error {
43+
s.limiterLock.Lock()
44+
defer s.limiterLock.Unlock()
45+
46+
if !s.unlimited && s.activeFlowsCounter >= s.maxActiveFlows {
47+
flowRegistrationsDropped.WithLabelValues(flowType).Inc()
48+
return ErrTooManyActiveFlows
49+
}
50+
51+
s.activeFlowsCounter++
52+
return nil
53+
}
54+
55+
func (s *flowLimiter) Release() {
56+
s.limiterLock.Lock()
57+
defer s.limiterLock.Unlock()
58+
59+
if s.activeFlowsCounter <= 0 {
60+
return
61+
}
62+
63+
s.activeFlowsCounter--
64+
}
65+
66+
func (s *flowLimiter) SetLimit(newMaxActiveFlows uint64) {
67+
s.limiterLock.Lock()
68+
defer s.limiterLock.Unlock()
69+
70+
s.maxActiveFlows = newMaxActiveFlows
71+
s.unlimited = isUnlimited(newMaxActiveFlows)
72+
}
73+
74+
// isUnlimited checks if the value received matches the configuration for the unlimited flow limiter.
75+
func isUnlimited(value uint64) bool {
76+
return value == unlimitedActiveFlows
77+
}

flow/limiter_test.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package flow_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
8+
"github.com/cloudflare/cloudflared/flow"
9+
)
10+
11+
func TestFlowLimiter_Unlimited(t *testing.T) {
12+
unlimitedLimiter := flow.NewLimiter(0)
13+
14+
for i := 0; i < 1000; i++ {
15+
err := unlimitedLimiter.Acquire("test")
16+
require.NoError(t, err)
17+
}
18+
}
19+
20+
func TestFlowLimiter_Limited(t *testing.T) {
21+
maxFlows := uint64(5)
22+
limiter := flow.NewLimiter(maxFlows)
23+
24+
for i := uint64(0); i < maxFlows; i++ {
25+
err := limiter.Acquire("test")
26+
require.NoError(t, err)
27+
}
28+
29+
err := limiter.Acquire("should fail")
30+
require.ErrorIs(t, err, flow.ErrTooManyActiveFlows)
31+
}
32+
33+
func TestFlowLimiter_AcquireAndReleaseFlow(t *testing.T) {
34+
maxFlows := uint64(5)
35+
limiter := flow.NewLimiter(maxFlows)
36+
37+
// Acquire the maximum number of flows
38+
for i := uint64(0); i < maxFlows; i++ {
39+
err := limiter.Acquire("test")
40+
require.NoError(t, err)
41+
}
42+
43+
// Validate acquire 1 more flows fails
44+
err := limiter.Acquire("should fail")
45+
require.ErrorIs(t, err, flow.ErrTooManyActiveFlows)
46+
47+
// Release the maximum number of flows
48+
for i := uint64(0); i < maxFlows; i++ {
49+
limiter.Release()
50+
}
51+
52+
// Validate acquire 1 more flows works
53+
err = limiter.Acquire("shouldn't fail")
54+
require.NoError(t, err)
55+
56+
// Release a 10x the number of max flows
57+
for i := uint64(0); i < 10*maxFlows; i++ {
58+
limiter.Release()
59+
}
60+
61+
// Validate it still can only acquire a value = number max flows.
62+
for i := uint64(0); i < maxFlows; i++ {
63+
err := limiter.Acquire("test")
64+
require.NoError(t, err)
65+
}
66+
err = limiter.Acquire("should fail")
67+
require.ErrorIs(t, err, flow.ErrTooManyActiveFlows)
68+
}
69+
70+
func TestFlowLimiter_SetLimit(t *testing.T) {
71+
maxFlows := uint64(5)
72+
limiter := flow.NewLimiter(maxFlows)
73+
74+
// Acquire the maximum number of flows
75+
for i := uint64(0); i < maxFlows; i++ {
76+
err := limiter.Acquire("test")
77+
require.NoError(t, err)
78+
}
79+
80+
// Validate acquire 1 more flows fails
81+
err := limiter.Acquire("should fail")
82+
require.ErrorIs(t, err, flow.ErrTooManyActiveFlows)
83+
84+
// Set the flow limiter to support one more request
85+
limiter.SetLimit(maxFlows + 1)
86+
87+
// Validate acquire 1 more flows now works
88+
err = limiter.Acquire("shouldn't fail")
89+
require.NoError(t, err)
90+
91+
// Validate acquire 1 more flows doesn't work because we already reached the limit
92+
err = limiter.Acquire("should fail")
93+
require.ErrorIs(t, err, flow.ErrTooManyActiveFlows)
94+
95+
// Release all flows
96+
for i := uint64(0); i < maxFlows+1; i++ {
97+
limiter.Release()
98+
}
99+
100+
// Validate 1 flow works again
101+
err = limiter.Acquire("shouldn't fail")
102+
require.NoError(t, err)
103+
104+
// Set the flow limit to 1
105+
limiter.SetLimit(1)
106+
107+
// Validate acquire 1 more flows doesn't work
108+
err = limiter.Acquire("should fail")
109+
require.ErrorIs(t, err, flow.ErrTooManyActiveFlows)
110+
111+
// Set the flow limit to unlimited
112+
limiter.SetLimit(0)
113+
114+
// Validate it can acquire a lot of flows because it is now unlimited.
115+
for i := uint64(0); i < 10*maxFlows; i++ {
116+
err := limiter.Acquire("shouldn't fail")
117+
require.NoError(t, err)
118+
}
119+
}

0 commit comments

Comments
 (0)