Skip to content

Commit 30bc2b7

Browse files
committed
app: Bound memory under sustained session churn
Add three mechanisms that cap memory growth on app-access agents handling high session volumes with stalled upload streams. Reverse proxy buffer pool: add a `sync.Pool`-backed `httputil.BufferPool` to the reverse proxy `Forwarder`. Without a pool, every proxied request allocates a fresh 32 KiB buffer for `io.Copy` that becomes garbage immediately after the request completes. Under high concurrency this creates GC pressure. Session writer buffer cap: add a `MaxBufferSize` config field (default 4096) to `SessionWriter`. When the internal `[]PreparedSessionEvent` buffer reaches capacity, `processEvents` stops reading from `eventsCh`, creating backpressure through the unbuffered channel back to `RecordEvent` callers. Extract a `handleStreamDone()` helper to deduplicate stream recovery logic between the backpressure and main select blocks. Fix a pre-existing off-by-one in `updateStatus` where `lastIndex > 0` prevented trimming when only one event (index 0) was confirmed. Session chunk semaphore: add a `chunkSem` buffered channel to `ConnectionsHandler` that limits the number of concurrently active session chunks per agent to `MaxActiveSessionChunks` (default 256). A slot is acquired in `newSessionChunk` before opening the recording stream and released in `close()` after the stream shuts down. Use a `success` flag to release the slot on error paths. Set `ReloadOnErr: true` on the `FnCache` so that `LimitExceeded` errors from a full semaphore are not cached for the full TTL.
1 parent 055a62f commit 30bc2b7

File tree

4 files changed

+153
-13
lines changed

4 files changed

+153
-13
lines changed

lib/events/session_writer.go

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ import (
3737
"github.com/gravitational/teleport/lib/session"
3838
)
3939

40+
// DefaultMaxBufferSize is the default maximum number of events retained
41+
// in the SessionWriter's in-memory buffer while waiting for upload
42+
// confirmation. When reached, the writer stops consuming from its
43+
// internal channel, creating backpressure through to RecordEvent callers.
44+
const DefaultMaxBufferSize = 4096
45+
4046
// NewSessionWriter returns a new instance of session writer
4147
func NewSessionWriter(cfg SessionWriterConfig) (*SessionWriter, error) {
4248
if err := cfg.CheckAndSetDefaults(); err != nil {
@@ -100,6 +106,14 @@ type SessionWriterConfig struct {
100106

101107
// BackoffDuration is a duration of the backoff before the next try
102108
BackoffDuration time.Duration
109+
110+
// MaxBufferSize is the maximum number of events to retain in the
111+
// in-memory buffer while waiting for upload confirmation. When the
112+
// buffer reaches this size, processEvents stops reading new events
113+
// from the channel, which creates backpressure through the
114+
// unbuffered eventsCh all the way back to RecordEvent callers.
115+
// Zero or negative means use DefaultMaxBufferSize.
116+
MaxBufferSize int
103117
}
104118

105119
// CheckAndSetDefaults checks and sets defaults
@@ -128,6 +142,9 @@ func (cfg *SessionWriterConfig) CheckAndSetDefaults() error {
128142
if cfg.MakeEvents == nil {
129143
cfg.MakeEvents = bytesToSessionPrintEvents
130144
}
145+
if cfg.MaxBufferSize <= 0 {
146+
cfg.MaxBufferSize = DefaultMaxBufferSize
147+
}
131148
return nil
132149
}
133150

@@ -174,6 +191,10 @@ type SessionWriter struct {
174191
lostEvents atomic.Int64
175192
acceptedEvents atomic.Int64
176193
slowWrites atomic.Int64
194+
// bufferFull tracks whether the buffer cap warning has been
195+
// logged. It resets when the buffer drops below capacity so the
196+
// warning fires again if the buffer refills.
197+
bufferFull bool
177198
}
178199

179200
// SessionWriterStats provides stats about lost events and slow writes
@@ -428,6 +449,29 @@ func (a *SessionWriter) processEvents() {
428449
a.updateStatus(status)
429450
default:
430451
}
452+
// When the buffer is at capacity, stop reading new events and
453+
// only wait for status updates (which trim the buffer) or
454+
// shutdown signals. This creates backpressure through the
455+
// unbuffered eventsCh back to RecordEvent callers.
456+
if len(a.buffer) >= a.cfg.MaxBufferSize {
457+
if !a.bufferFull {
458+
a.bufferFull = true
459+
a.log.WarnContext(a.cfg.Context, "Session writer buffer at capacity, applying backpressure.",
460+
"buffer_size", len(a.buffer), "max_buffer_size", a.cfg.MaxBufferSize)
461+
}
462+
select {
463+
case status := <-a.stream.Status():
464+
a.updateStatus(status)
465+
case <-a.stream.Done():
466+
if !a.handleStreamDone() {
467+
return
468+
}
469+
case <-a.closeCtx.Done():
470+
a.completeStream(a.stream)
471+
return
472+
}
473+
continue
474+
}
431475
select {
432476
case status := <-a.stream.Status():
433477
a.updateStatus(status)
@@ -454,14 +498,7 @@ func (a *SessionWriter) processEvents() {
454498
a.log.DebugContext(a.cfg.Context, "Recovered stream", "duration", time.Since(start))
455499
}
456500
case <-a.stream.Done():
457-
if a.closeCtx.Err() != nil {
458-
// don't attempt recovery if we're closing
459-
return
460-
}
461-
a.log.DebugContext(a.cfg.Context, "Stream was closed by the server, attempting to recover.")
462-
if err := a.recoverStream(); err != nil {
463-
a.log.WarnContext(a.cfg.Context, "Failed to recover stream.", "error", err)
464-
501+
if !a.handleStreamDone() {
465502
return
466503
}
467504
case <-a.closeCtx.Done():
@@ -471,6 +508,21 @@ func (a *SessionWriter) processEvents() {
471508
}
472509
}
473510

511+
// handleStreamDone attempts to recover a server-closed stream. It
512+
// returns true if recovery succeeded and the caller should continue
513+
// the event loop, or false if the caller should return.
514+
func (a *SessionWriter) handleStreamDone() bool {
515+
if a.closeCtx.Err() != nil {
516+
return false
517+
}
518+
a.log.DebugContext(a.cfg.Context, "Stream was closed by the server, attempting to recover.")
519+
if err := a.recoverStream(); err != nil {
520+
a.log.WarnContext(a.cfg.Context, "Failed to recover stream.", "error", err)
521+
return false
522+
}
523+
return true
524+
}
525+
474526
// IsPermanentEmitError checks if the error contains either a sole
475527
// [trace.BadParameter] error in its chain, or a [trace.Aggregate] error
476528
// composed entirely of BadParameters.
@@ -612,10 +664,15 @@ func (a *SessionWriter) updateStatus(status apievents.StreamStatus) {
612664
}
613665
lastIndex = i
614666
}
615-
if lastIndex > 0 {
667+
if lastIndex >= 0 {
616668
before := len(a.buffer)
617669
a.buffer = a.buffer[lastIndex+1:]
618670
a.log.DebugContext(a.closeCtx, "Removed saved events", "removed", before-len(a.buffer), "remaining", len(a.buffer))
671+
if a.bufferFull && len(a.buffer) < a.cfg.MaxBufferSize {
672+
a.bufferFull = false
673+
a.log.InfoContext(a.closeCtx, "Session writer buffer below capacity, backpressure released.",
674+
"buffer_size", len(a.buffer), "max_buffer_size", a.cfg.MaxBufferSize)
675+
}
619676
}
620677
}
621678

lib/httplib/reverseproxy/reverse_proxy.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"net/http"
2626
"net/http/httputil"
2727
"net/url"
28+
"sync"
2829
"time"
2930

3031
"github.com/gravitational/trace"
@@ -33,6 +34,27 @@ import (
3334
logutils "github.com/gravitational/teleport/lib/utils/log"
3435
)
3536

37+
// bufferPool is a sync.Pool-backed implementation of httputil.BufferPool.
38+
// It reuses 32 KiB buffers across proxied requests to reduce GC pressure
39+
// under high concurrency.
40+
type bufferPool struct {
41+
pool sync.Pool
42+
}
43+
44+
func (b *bufferPool) Get() []byte {
45+
if v := b.pool.Get(); v != nil {
46+
return v.([]byte)
47+
}
48+
return make([]byte, 32*1024)
49+
}
50+
51+
func (b *bufferPool) Put(buf []byte) {
52+
b.pool.Put(buf) //nolint:staticcheck // ReverseProxy does not retain buf after Put
53+
}
54+
55+
// defaultBufferPool is shared across all Forwarder instances.
56+
var defaultBufferPool = &bufferPool{}
57+
3658
// X-* Header names.
3759
const (
3860
XForwardedProto = "X-Forwarded-Proto"
@@ -77,6 +99,7 @@ func New(opts ...Option) (*Forwarder, error) {
7799
ReverseProxy: &httputil.ReverseProxy{
78100
ErrorHandler: DefaultHandler.ServeHTTP,
79101
ErrorLog: log.Default(),
102+
BufferPool: defaultBufferPool,
80103
},
81104
logger: slog.Default(),
82105
}

lib/srv/app/connections_handler.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ type ConnectionsHandlerConfig struct {
124124

125125
// InsecureMode defines whether insecure connections are allowed.
126126
InsecureMode bool
127+
128+
// MaxActiveSessionChunks limits the number of concurrently active
129+
// session chunks on this agent. Each active chunk holds an open
130+
// recording stream. Zero or negative means use
131+
// DefaultMaxActiveSessionChunks.
132+
MaxActiveSessionChunks int
127133
}
128134

129135
// CheckAndSetDefaults validates the config values and sets defaults.
@@ -172,6 +178,9 @@ func (c *ConnectionsHandlerConfig) CheckAndSetDefaults() error {
172178
if c.ServiceComponent == "" {
173179
return trace.BadParameter("service component missing")
174180
}
181+
if c.MaxActiveSessionChunks <= 0 {
182+
c.MaxActiveSessionChunks = DefaultMaxActiveSessionChunks
183+
}
175184
return nil
176185
}
177186

@@ -211,6 +220,9 @@ type ConnectionsHandler struct {
211220

212221
// getAppByPublicAddress returns a types.Application using the public address as matcher.
213222
getAppByPublicAddress func(context.Context, string) (types.Application, error)
223+
224+
// chunkSem limits concurrent active session chunks.
225+
chunkSem chan struct{}
214226
}
215227

216228
// NewConnectionsHandler returns a new ConnectionsHandler.
@@ -244,6 +256,8 @@ func NewConnectionsHandler(closeContext context.Context, cfg *ConnectionsHandler
244256
return nil, trace.Wrap(err)
245257
}
246258

259+
chunkSem := make(chan struct{}, cfg.MaxActiveSessionChunks)
260+
247261
c := &ConnectionsHandler{
248262
cfg: cfg,
249263
closeContext: closeContext,
@@ -252,6 +266,7 @@ func NewConnectionsHandler(closeContext context.Context, cfg *ConnectionsHandler
252266
gcpHandler: gcpHandler,
253267
connAuth: make(map[net.Conn]error),
254268
log: slog.With(teleport.ComponentKey, cfg.ServiceComponent),
269+
chunkSem: chunkSem,
255270
getAppByPublicAddress: func(ctx context.Context, s string) (types.Application, error) {
256271
return nil, trace.NotFound("no applications are being proxied")
257272
},
@@ -265,6 +280,7 @@ func NewConnectionsHandler(closeContext context.Context, cfg *ConnectionsHandler
265280
Clock: c.cfg.Clock,
266281
CleanupInterval: time.Second,
267282
OnExpiry: c.onSessionExpired,
283+
ReloadOnErr: true,
268284
})
269285
if err != nil {
270286
return nil, trace.Wrap(err)

lib/srv/app/session.go

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ import (
4545
// sessionChunkCloseTimeout is the default timeout used for sessionChunk.closeTimeout
4646
const sessionChunkCloseTimeout = 1 * time.Hour
4747

48+
// DefaultMaxActiveSessionChunks is the default limit on concurrently
49+
// active session chunks per agent. Each chunk holds an open recording
50+
// stream whose in-memory footprint depends on upload throughput
51+
// (typically 3-8 MiB under IOPS pressure). 256 chunks at 8 MiB is
52+
// ~2 GiB, which caps memory without restricting normal workloads.
53+
const DefaultMaxActiveSessionChunks = 256
54+
4855
var errSessionChunkAlreadyClosed = errors.New("session chunk already closed")
4956

5057
// sessionChunk holds an open request handler and stream closer for an app session.
@@ -81,6 +88,14 @@ type sessionChunk struct {
8188
// for ~7 minutes at most.
8289
closeTimeout time.Duration
8390

91+
// chunkSem is an optional semaphore shared across all session
92+
// chunks on the same agent. A slot is acquired in newSessionChunk
93+
// before opening the recording stream (returning LimitExceeded if
94+
// full) and released in close after the stream shuts down. This
95+
// bounds the number of concurrently open recording streams to
96+
// prevent unbounded memory growth.
97+
chunkSem chan struct{}
98+
8499
log *slog.Logger
85100
}
86101

@@ -92,11 +107,33 @@ type sessionOpt func(context.Context, *sessionChunk, *tlsca.Identity, types.Appl
92107
// and as such expects `release()` to eventually be called
93108
// by the caller of this function.
94109
func (c *ConnectionsHandler) newSessionChunk(ctx context.Context, identity *tlsca.Identity, app types.Application, startTime time.Time, opts ...sessionOpt) (*sessionChunk, error) {
110+
// Acquire a semaphore slot before opening the recording stream.
111+
// Each chunk holds an open stream that consumes memory; bounding
112+
// the number of concurrent chunks prevents OOM under load. The
113+
// slot is released in close() when the chunk shuts down.
114+
if c.chunkSem != nil {
115+
select {
116+
case c.chunkSem <- struct{}{}:
117+
default:
118+
return nil, trace.LimitExceeded("too many active session chunks")
119+
}
120+
}
121+
122+
// If chunk creation fails after acquiring the semaphore slot,
123+
// release it so we do not leak slots on error paths.
124+
success := false
125+
defer func() {
126+
if !success && c.chunkSem != nil {
127+
<-c.chunkSem
128+
}
129+
}()
130+
95131
sess := &sessionChunk{
96132
id: uuid.New().String(),
97133
closeC: make(chan struct{}),
98134
inflightCond: sync.NewCond(&sync.Mutex{}),
99135
closeTimeout: sessionChunkCloseTimeout,
136+
chunkSem: c.chunkSem,
100137
log: c.log,
101138
}
102139

@@ -137,6 +174,7 @@ func (c *ConnectionsHandler) newSessionChunk(ctx context.Context, identity *tlsc
137174
return nil, trace.Wrap(err)
138175
}
139176

177+
success = true
140178
sess.log.DebugContext(ctx, "Created app session chunk", "session_id", sess.id)
141179
return sess, nil
142180
}
@@ -197,9 +235,9 @@ func (c *ConnectionsHandler) withGCPHandler(ctx context.Context, sess *sessionCh
197235
return nil
198236
}
199237

200-
// acquire() increments in-flight request count by 1.
201-
// It is supposed to be paired with a `release()` call,
202-
// after the chunk is done with for the individual request
238+
// acquire increments in-flight request count by 1. It is supposed to
239+
// be paired with a release call once the individual request finishes
240+
// using the chunk.
203241
func (s *sessionChunk) acquire() error {
204242
s.inflightCond.L.Lock()
205243
defer s.inflightCond.L.Unlock()
@@ -251,7 +289,13 @@ func (s *sessionChunk) close(ctx context.Context) error {
251289
s.inflightCond.L.Unlock()
252290
close(s.closeC)
253291
s.log.DebugContext(ctx, "Closed session chunk", "session_id", s.id)
254-
return trace.Wrap(s.streamCloser.Close(ctx))
292+
err := s.streamCloser.Close(ctx)
293+
// Release the semaphore slot after the stream is closed, not
294+
// before, so the slot accurately reflects an open stream.
295+
if s.chunkSem != nil {
296+
<-s.chunkSem
297+
}
298+
return trace.Wrap(err)
255299
}
256300

257301
func (c *ConnectionsHandler) onSessionExpired(ctx context.Context, key, expired any) {

0 commit comments

Comments
 (0)