Skip to content

Commit b082b62

Browse files
committed
app: Bound memory under sustained session churn
Add three mechanisms that cap memory growth on app-access agents handling high session volumes with stalled upload streams. Reverse proxy buffer pool: add a `sync.Pool`-backed `httputil.BufferPool` to the reverse proxy `Forwarder`. Without a pool, every proxied request allocates a fresh 32 KiB buffer for `io.Copy` that becomes garbage immediately after the request completes. Under high concurrency this creates GC pressure. Session writer buffer cap: add a `MaxBufferSize` config field (default 4096) to `SessionWriter`. When the internal `[]PreparedSessionEvent` buffer reaches capacity, `processEvents` stops reading from `eventsCh`, creating backpressure through the unbuffered channel back to `RecordEvent` callers. Extract a `handleStreamDone()` helper to deduplicate stream recovery logic between the backpressure and main select blocks. Fix a pre-existing off-by-one in `updateStatus` where `lastIndex > 0` prevented trimming when only one event (index 0) was confirmed. Session chunk semaphore: add a `chunkSem` buffered channel to `ConnectionsHandler` that limits the number of concurrently active session chunks per agent to `MaxActiveSessionChunks` (default 256). A slot is acquired in `newSessionChunk` before opening the recording stream and released in `close()` after the stream shuts down. Use a `success` flag to release the slot on error paths. Log a warning when a chunk is rejected so the rejection is observable without correlating with generic request failure logs. Set `ReloadOnErr: true` on the `FnCache` so that `LimitExceeded` errors from a full semaphore are not cached for the full TTL.
1 parent a1c54f4 commit b082b62

File tree

4 files changed

+163
-13
lines changed

4 files changed

+163
-13
lines changed

lib/events/session_writer.go

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ import (
3737
"github.com/gravitational/teleport/lib/session"
3838
)
3939

40+
// DefaultMaxBufferSize is the default maximum number of events retained
41+
// in the SessionWriter's in-memory buffer while waiting for upload
42+
// confirmation. When reached, the writer stops consuming from its
43+
// internal channel, creating backpressure through to RecordEvent callers.
44+
const DefaultMaxBufferSize = 4096
45+
4046
// NewSessionWriter returns a new instance of session writer
4147
func NewSessionWriter(cfg SessionWriterConfig) (*SessionWriter, error) {
4248
if err := cfg.CheckAndSetDefaults(); err != nil {
@@ -100,6 +106,14 @@ type SessionWriterConfig struct {
100106

101107
// BackoffDuration is a duration of the backoff before the next try
102108
BackoffDuration time.Duration
109+
110+
// MaxBufferSize is the maximum number of events to retain in the
111+
// in-memory buffer while waiting for upload confirmation. When the
112+
// buffer reaches this size, processEvents stops reading new events
113+
// from the channel, which creates backpressure through the
114+
// unbuffered eventsCh all the way back to RecordEvent callers.
115+
// Zero or negative means use DefaultMaxBufferSize.
116+
MaxBufferSize int
103117
}
104118

105119
// CheckAndSetDefaults checks and sets defaults
@@ -128,6 +142,9 @@ func (cfg *SessionWriterConfig) CheckAndSetDefaults() error {
128142
if cfg.MakeEvents == nil {
129143
cfg.MakeEvents = bytesToSessionPrintEvents
130144
}
145+
if cfg.MaxBufferSize <= 0 {
146+
cfg.MaxBufferSize = DefaultMaxBufferSize
147+
}
131148
return nil
132149
}
133150

@@ -174,6 +191,10 @@ type SessionWriter struct {
174191
lostEvents atomic.Int64
175192
acceptedEvents atomic.Int64
176193
slowWrites atomic.Int64
194+
// bufferFull tracks whether the buffer cap warning has been
195+
// logged. It resets when the buffer drops below capacity so the
196+
// warning fires again if the buffer refills.
197+
bufferFull bool
177198
}
178199

179200
// SessionWriterStats provides stats about lost events and slow writes
@@ -428,6 +449,29 @@ func (a *SessionWriter) processEvents() {
428449
a.updateStatus(status)
429450
default:
430451
}
452+
// When the buffer is at capacity, stop reading new events and
453+
// only wait for status updates (which trim the buffer) or
454+
// shutdown signals. This creates backpressure through the
455+
// unbuffered eventsCh back to RecordEvent callers.
456+
if len(a.buffer) >= a.cfg.MaxBufferSize {
457+
if !a.bufferFull {
458+
a.bufferFull = true
459+
a.log.WarnContext(a.cfg.Context, "Session writer buffer at capacity, applying backpressure.",
460+
"buffer_size", len(a.buffer), "max_buffer_size", a.cfg.MaxBufferSize)
461+
}
462+
select {
463+
case status := <-a.stream.Status():
464+
a.updateStatus(status)
465+
case <-a.stream.Done():
466+
if !a.handleStreamDone() {
467+
return
468+
}
469+
case <-a.closeCtx.Done():
470+
a.completeStream(a.stream)
471+
return
472+
}
473+
continue
474+
}
431475
select {
432476
case status := <-a.stream.Status():
433477
a.updateStatus(status)
@@ -454,14 +498,7 @@ func (a *SessionWriter) processEvents() {
454498
a.log.DebugContext(a.cfg.Context, "Recovered stream", "duration", time.Since(start))
455499
}
456500
case <-a.stream.Done():
457-
if a.closeCtx.Err() != nil {
458-
// don't attempt recovery if we're closing
459-
return
460-
}
461-
a.log.DebugContext(a.cfg.Context, "Stream was closed by the server, attempting to recover.")
462-
if err := a.recoverStream(); err != nil {
463-
a.log.WarnContext(a.cfg.Context, "Failed to recover stream.", "error", err)
464-
501+
if !a.handleStreamDone() {
465502
return
466503
}
467504
case <-a.closeCtx.Done():
@@ -471,6 +508,21 @@ func (a *SessionWriter) processEvents() {
471508
}
472509
}
473510

511+
// handleStreamDone attempts to recover a server-closed stream. It
512+
// returns true if recovery succeeded and the caller should continue
513+
// the event loop, or false if the caller should return.
514+
func (a *SessionWriter) handleStreamDone() bool {
515+
if a.closeCtx.Err() != nil {
516+
return false
517+
}
518+
a.log.DebugContext(a.cfg.Context, "Stream was closed by the server, attempting to recover.")
519+
if err := a.recoverStream(); err != nil {
520+
a.log.WarnContext(a.cfg.Context, "Failed to recover stream.", "error", err)
521+
return false
522+
}
523+
return true
524+
}
525+
474526
// IsPermanentEmitError checks if the error contains either a sole
475527
// [trace.BadParameter] error in its chain, or a [trace.Aggregate] error
476528
// composed entirely of BadParameters.
@@ -612,10 +664,15 @@ func (a *SessionWriter) updateStatus(status apievents.StreamStatus) {
612664
}
613665
lastIndex = i
614666
}
615-
if lastIndex > 0 {
667+
if lastIndex >= 0 {
616668
before := len(a.buffer)
617669
a.buffer = a.buffer[lastIndex+1:]
618670
a.log.DebugContext(a.closeCtx, "Removed saved events", "removed", before-len(a.buffer), "remaining", len(a.buffer))
671+
if a.bufferFull && len(a.buffer) < a.cfg.MaxBufferSize {
672+
a.bufferFull = false
673+
a.log.InfoContext(a.closeCtx, "Session writer buffer below capacity, backpressure released.",
674+
"buffer_size", len(a.buffer), "max_buffer_size", a.cfg.MaxBufferSize)
675+
}
619676
}
620677
}
621678

lib/httplib/reverseproxy/reverse_proxy.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"net/http"
2626
"net/http/httputil"
2727
"net/url"
28+
"sync"
2829
"time"
2930

3031
"github.com/gravitational/trace"
@@ -33,6 +34,27 @@ import (
3334
logutils "github.com/gravitational/teleport/lib/utils/log"
3435
)
3536

37+
// bufferPool is a sync.Pool-backed implementation of httputil.BufferPool.
38+
// It reuses 32 KiB buffers across proxied requests to reduce GC pressure
39+
// under high concurrency.
40+
type bufferPool struct {
41+
pool sync.Pool
42+
}
43+
44+
func (b *bufferPool) Get() []byte {
45+
if v := b.pool.Get(); v != nil {
46+
return v.([]byte)
47+
}
48+
return make([]byte, 32*1024)
49+
}
50+
51+
func (b *bufferPool) Put(buf []byte) {
52+
b.pool.Put(buf) //nolint:staticcheck // SA6002: []byte is already a reference type; boxing cost is negligible
53+
}
54+
55+
// defaultBufferPool is shared across all Forwarder instances.
56+
var defaultBufferPool = &bufferPool{}
57+
3658
// X-* Header names.
3759
const (
3860
XForwardedProto = "X-Forwarded-Proto"
@@ -77,6 +99,7 @@ func New(opts ...Option) (*Forwarder, error) {
7799
ReverseProxy: &httputil.ReverseProxy{
78100
ErrorHandler: DefaultHandler.ServeHTTP,
79101
ErrorLog: log.Default(),
102+
BufferPool: defaultBufferPool,
80103
},
81104
logger: slog.Default(),
82105
}

lib/srv/app/connections_handler.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,22 @@ type ConnectionsHandlerConfig struct {
124124

125125
// InsecureMode defines whether insecure connections are allowed.
126126
InsecureMode bool
127+
128+
// MaxActiveSessionChunks limits the number of concurrently active
129+
// session chunks on this agent. Each active chunk holds an open
130+
// recording stream. Zero or negative means use
131+
// DefaultMaxActiveSessionChunks.
132+
MaxActiveSessionChunks int
127133
}
128134

129135
// CheckAndSetDefaults validates the config values and sets defaults.
130136
func (c *ConnectionsHandlerConfig) CheckAndSetDefaults() error {
131137
if c.Clock == nil {
132138
c.Clock = clockwork.NewRealClock()
133139
}
140+
if c.MaxActiveSessionChunks <= 0 {
141+
c.MaxActiveSessionChunks = DefaultMaxActiveSessionChunks
142+
}
134143
if c.DataDir == "" {
135144
return trace.BadParameter("data dir missing")
136145
}
@@ -211,6 +220,9 @@ type ConnectionsHandler struct {
211220

212221
// getAppByPublicAddress returns a types.Application using the public address as matcher.
213222
getAppByPublicAddress func(context.Context, string) (types.Application, error)
223+
224+
// chunkSem limits concurrent active session chunks.
225+
chunkSem chan struct{}
214226
}
215227

216228
// NewConnectionsHandler returns a new ConnectionsHandler.
@@ -244,6 +256,8 @@ func NewConnectionsHandler(closeContext context.Context, cfg *ConnectionsHandler
244256
return nil, trace.Wrap(err)
245257
}
246258

259+
chunkSem := make(chan struct{}, cfg.MaxActiveSessionChunks)
260+
247261
c := &ConnectionsHandler{
248262
cfg: cfg,
249263
closeContext: closeContext,
@@ -252,6 +266,7 @@ func NewConnectionsHandler(closeContext context.Context, cfg *ConnectionsHandler
252266
gcpHandler: gcpHandler,
253267
connAuth: make(map[net.Conn]error),
254268
log: slog.With(teleport.ComponentKey, cfg.ServiceComponent),
269+
chunkSem: chunkSem,
255270
getAppByPublicAddress: func(ctx context.Context, s string) (types.Application, error) {
256271
return nil, trace.NotFound("no applications are being proxied")
257272
},
@@ -265,6 +280,7 @@ func NewConnectionsHandler(closeContext context.Context, cfg *ConnectionsHandler
265280
Clock: c.cfg.Clock,
266281
CleanupInterval: time.Second,
267282
OnExpiry: c.onSessionExpired,
283+
ReloadOnErr: true,
268284
})
269285
if err != nil {
270286
return nil, trace.Wrap(err)

lib/srv/app/session.go

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,20 @@ import (
4545
// sessionChunkCloseTimeout is the default timeout used for sessionChunk.closeTimeout
4646
const sessionChunkCloseTimeout = 1 * time.Hour
4747

48+
// DefaultMaxActiveSessionChunks is the default limit on concurrently
49+
// active session chunks per agent. Each chunk holds an open recording
50+
// stream whose in-memory footprint depends on upload throughput
51+
// (typically 3-8 MiB under IOPS pressure). 256 chunks at 8 MiB is
52+
// ~2 GiB, which caps memory without restricting normal workloads.
53+
//
54+
// Under an IOPS stall, each slot is held for up to
55+
// sessionChunkCloseTimeout (1 hour) because the recording stream
56+
// cannot flush. With a 5-minute chunk TTL, all 256 slots fill in
57+
// roughly 5 minutes, and new sessions are rejected with
58+
// LimitExceeded for the duration of the incident. This is the
59+
// intended tradeoff: cap memory at the cost of availability.
60+
const DefaultMaxActiveSessionChunks = 256
61+
4862
var errSessionChunkAlreadyClosed = errors.New("session chunk already closed")
4963

5064
// sessionChunk holds an open request handler and stream closer for an app session.
@@ -81,6 +95,14 @@ type sessionChunk struct {
8195
// for ~7 minutes at most.
8296
closeTimeout time.Duration
8397

98+
// chunkSem is an optional semaphore shared across all session
99+
// chunks on the same agent. A slot is acquired in newSessionChunk
100+
// before opening the recording stream (returning LimitExceeded if
101+
// full) and released in close after the stream shuts down. This
102+
// bounds the number of concurrently open recording streams to
103+
// prevent unbounded memory growth.
104+
chunkSem chan struct{}
105+
84106
log *slog.Logger
85107
}
86108

@@ -92,11 +114,36 @@ type sessionOpt func(context.Context, *sessionChunk, *tlsca.Identity, types.Appl
92114
// and as such expects `release()` to eventually be called
93115
// by the caller of this function.
94116
func (c *ConnectionsHandler) newSessionChunk(ctx context.Context, identity *tlsca.Identity, app types.Application, startTime time.Time, opts ...sessionOpt) (*sessionChunk, error) {
117+
// Acquire a semaphore slot before opening the recording stream.
118+
// Each chunk holds an open stream that consumes memory; bounding
119+
// the number of concurrent chunks prevents OOM under load. The
120+
// slot is released in close() when the chunk shuts down.
121+
if c.chunkSem != nil {
122+
select {
123+
case c.chunkSem <- struct{}{}:
124+
default:
125+
c.log.WarnContext(ctx, "Rejecting session chunk, semaphore full.",
126+
"max_active_chunks", cap(c.chunkSem),
127+
)
128+
return nil, trace.LimitExceeded("too many active session chunks")
129+
}
130+
}
131+
132+
// If chunk creation fails after acquiring the semaphore slot,
133+
// release it so we do not leak slots on error paths.
134+
success := false
135+
defer func() {
136+
if !success && c.chunkSem != nil {
137+
<-c.chunkSem
138+
}
139+
}()
140+
95141
sess := &sessionChunk{
96142
id: uuid.New().String(),
97143
closeC: make(chan struct{}),
98144
inflightCond: sync.NewCond(&sync.Mutex{}),
99145
closeTimeout: sessionChunkCloseTimeout,
146+
chunkSem: c.chunkSem,
100147
log: c.log,
101148
}
102149

@@ -137,6 +184,7 @@ func (c *ConnectionsHandler) newSessionChunk(ctx context.Context, identity *tlsc
137184
return nil, trace.Wrap(err)
138185
}
139186

187+
success = true
140188
sess.log.DebugContext(ctx, "Created app session chunk", "session_id", sess.id)
141189
return sess, nil
142190
}
@@ -197,9 +245,9 @@ func (c *ConnectionsHandler) withGCPHandler(ctx context.Context, sess *sessionCh
197245
return nil
198246
}
199247

200-
// acquire() increments in-flight request count by 1.
201-
// It is supposed to be paired with a `release()` call,
202-
// after the chunk is done with for the individual request
248+
// acquire increments in-flight request count by 1. It is supposed to
249+
// be paired with a release call once the individual request finishes
250+
// using the chunk.
203251
func (s *sessionChunk) acquire() error {
204252
s.inflightCond.L.Lock()
205253
defer s.inflightCond.L.Unlock()
@@ -251,7 +299,13 @@ func (s *sessionChunk) close(ctx context.Context) error {
251299
s.inflightCond.L.Unlock()
252300
close(s.closeC)
253301
s.log.DebugContext(ctx, "Closed session chunk", "session_id", s.id)
254-
return trace.Wrap(s.streamCloser.Close(ctx))
302+
err := s.streamCloser.Close(ctx)
303+
// Release the semaphore slot after the stream is closed, not
304+
// before, so the slot accurately reflects an open stream.
305+
if s.chunkSem != nil {
306+
<-s.chunkSem
307+
}
308+
return trace.Wrap(err)
255309
}
256310

257311
func (c *ConnectionsHandler) onSessionExpired(ctx context.Context, key, expired any) {

0 commit comments

Comments
 (0)