diff --git a/lib/events/session_writer.go b/lib/events/session_writer.go index bff1bb1bc103d..445b66d10e092 100644 --- a/lib/events/session_writer.go +++ b/lib/events/session_writer.go @@ -37,6 +37,12 @@ import ( "github.com/gravitational/teleport/lib/session" ) +// DefaultMaxBufferSize is the default maximum number of events retained +// in the SessionWriter's in-memory buffer while waiting for upload +// confirmation. When reached, the writer stops consuming from its +// internal channel, creating backpressure through to RecordEvent callers. +const DefaultMaxBufferSize = 4096 + // NewSessionWriter returns a new instance of session writer func NewSessionWriter(cfg SessionWriterConfig) (*SessionWriter, error) { if err := cfg.CheckAndSetDefaults(); err != nil { @@ -100,6 +106,14 @@ type SessionWriterConfig struct { // BackoffDuration is a duration of the backoff before the next try BackoffDuration time.Duration + + // MaxBufferSize is the maximum number of events to retain in the + // in-memory buffer while waiting for upload confirmation. When the + // buffer reaches this size, processEvents stops reading new events + // from the channel, which creates backpressure through the + // unbuffered eventsCh all the way back to RecordEvent callers. + // Zero or negative means use DefaultMaxBufferSize. + MaxBufferSize int } // CheckAndSetDefaults checks and sets defaults @@ -128,6 +142,9 @@ func (cfg *SessionWriterConfig) CheckAndSetDefaults() error { if cfg.MakeEvents == nil { cfg.MakeEvents = bytesToSessionPrintEvents } + if cfg.MaxBufferSize <= 0 { + cfg.MaxBufferSize = DefaultMaxBufferSize + } return nil } @@ -174,6 +191,10 @@ type SessionWriter struct { lostEvents atomic.Int64 acceptedEvents atomic.Int64 slowWrites atomic.Int64 + // bufferFull tracks whether the buffer cap warning has been + // logged. It resets when the buffer drops below capacity so the + // warning fires again if the buffer refills. + bufferFull bool } // SessionWriterStats provides stats about lost events and slow writes @@ -428,6 +449,29 @@ func (a *SessionWriter) processEvents() { a.updateStatus(status) default: } + // When the buffer is at capacity, stop reading new events and + // only wait for status updates (which trim the buffer) or + // shutdown signals. This creates backpressure through the + // unbuffered eventsCh back to RecordEvent callers. + if len(a.buffer) >= a.cfg.MaxBufferSize { + if !a.bufferFull { + a.bufferFull = true + a.log.WarnContext(a.cfg.Context, "Session writer buffer at capacity, applying backpressure.", + "buffer_size", len(a.buffer), "max_buffer_size", a.cfg.MaxBufferSize) + } + select { + case status := <-a.stream.Status(): + a.updateStatus(status) + case <-a.stream.Done(): + if !a.handleStreamDone() { + return + } + case <-a.closeCtx.Done(): + a.completeStream(a.stream) + return + } + continue + } select { case status := <-a.stream.Status(): a.updateStatus(status) @@ -454,14 +498,7 @@ func (a *SessionWriter) processEvents() { a.log.DebugContext(a.cfg.Context, "Recovered stream", "duration", time.Since(start)) } case <-a.stream.Done(): - if a.closeCtx.Err() != nil { - // don't attempt recovery if we're closing - return - } - a.log.DebugContext(a.cfg.Context, "Stream was closed by the server, attempting to recover.") - if err := a.recoverStream(); err != nil { - a.log.WarnContext(a.cfg.Context, "Failed to recover stream.", "error", err) - + if !a.handleStreamDone() { return } case <-a.closeCtx.Done(): @@ -471,6 +508,21 @@ func (a *SessionWriter) processEvents() { } } +// handleStreamDone attempts to recover a server-closed stream. It +// returns true if recovery succeeded and the caller should continue +// the event loop, or false if the caller should return. +func (a *SessionWriter) handleStreamDone() bool { + if a.closeCtx.Err() != nil { + return false + } + a.log.DebugContext(a.cfg.Context, "Stream was closed by the server, attempting to recover.") + if err := a.recoverStream(); err != nil { + a.log.WarnContext(a.cfg.Context, "Failed to recover stream.", "error", err) + return false + } + return true +} + // IsPermanentEmitError checks if the error contains either a sole // [trace.BadParameter] error in its chain, or a [trace.Aggregate] error // composed entirely of BadParameters. @@ -612,10 +664,15 @@ func (a *SessionWriter) updateStatus(status apievents.StreamStatus) { } lastIndex = i } - if lastIndex > 0 { + if lastIndex >= 0 { before := len(a.buffer) a.buffer = a.buffer[lastIndex+1:] a.log.DebugContext(a.closeCtx, "Removed saved events", "removed", before-len(a.buffer), "remaining", len(a.buffer)) + if a.bufferFull && len(a.buffer) < a.cfg.MaxBufferSize { + a.bufferFull = false + a.log.InfoContext(a.closeCtx, "Session writer buffer below capacity, backpressure released.", + "buffer_size", len(a.buffer), "max_buffer_size", a.cfg.MaxBufferSize) + } } } diff --git a/lib/events/session_writer_buffer_test.go b/lib/events/session_writer_buffer_test.go new file mode 100644 index 0000000000000..c73950891b40f --- /dev/null +++ b/lib/events/session_writer_buffer_test.go @@ -0,0 +1,108 @@ +/* + * Teleport + * Copyright (C) 2026 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package events + +import ( + "context" + "log/slog" + "testing" + + "github.com/stretchr/testify/require" + + apievents "github.com/gravitational/teleport/api/types/events" +) + +// noOpPreparer is a SessionEventPreparer that returns events unchanged. +// It is used in tests that only need the SessionWriterConfig to pass +// validation without actually preparing events. +type noOpPreparer struct{} + +func (noOpPreparer) PrepareSessionEvent(event apievents.AuditEvent) (apievents.PreparedSessionEvent, error) { + return preparedEvent{event: event}, nil +} + +type preparedEvent struct { + event apievents.AuditEvent +} + +func (p preparedEvent) GetAuditEvent() apievents.AuditEvent { return p.event } + +// TestSessionWriterConfigMaxBufferSize verifies that the MaxBufferSize +// config field defaults to DefaultMaxBufferSize when unset and +// preserves an explicit value when set. Without a default, the buffer +// cap would be zero, which would block all event processing. +// +// The SessionWriter's internal buffer accumulates PreparedSessionEvents +// until a status update from the upload stream confirms they have been +// persisted, at which point the confirmed prefix is trimmed. When the +// upload stream stalls (e.g., disk IOPS exhausted on a Kubernetes +// emptyDir volume), status updates stop arriving and the buffer grows +// without limit. Each event is small (~200 bytes), but at sustained +// throughput (hundreds of events per second across many sessions) this +// leads to OOM. +// +// The MaxBufferSize config caps the buffer. When full, processEvents +// stops reading from eventsCh, which creates backpressure through the +// unbuffered channel: RecordEvent blocks for BackoffTimeout (5s by +// default) then drops the event and enters backoff. This trades event +// loss for process survival. +func TestSessionWriterConfigMaxBufferSize(t *testing.T) { + t.Run("defaults to DefaultMaxBufferSize when zero", func(t *testing.T) { + cfg := SessionWriterConfig{ + SessionID: "test", + Streamer: NewDiscardStreamer(), + Preparer: noOpPreparer{}, + Context: t.Context(), + } + require.NoError(t, cfg.CheckAndSetDefaults()) + require.Equal(t, DefaultMaxBufferSize, cfg.MaxBufferSize, "MaxBufferSize must default to DefaultMaxBufferSize so the buffer cap is active even when callers do not set it explicitly") + }) + + t.Run("preserves explicit value", func(t *testing.T) { + cfg := SessionWriterConfig{ + SessionID: "test", + Streamer: NewDiscardStreamer(), + Preparer: noOpPreparer{}, + Context: t.Context(), + MaxBufferSize: 500, + } + require.NoError(t, cfg.CheckAndSetDefaults()) + require.Equal(t, 500, cfg.MaxBufferSize) + }) +} + +// TestUpdateStatusTrimsAtIndexZero verifies that updateStatus trims the +// buffer when the only confirmed event is at buffer index 0. Before the +// fix, the condition was lastIndex > 0 which skipped trimming when only +// one event was confirmed, causing the buffer to grow without bound +// even when the upload stream was healthy. +func TestUpdateStatusTrimsAtIndexZero(t *testing.T) { + evt := &apievents.SessionPrint{} + evt.SetIndex(0) + + w := &SessionWriter{ + log: slog.Default(), + buffer: []apievents.PreparedSessionEvent{preparedEvent{event: evt}}, + cfg: SessionWriterConfig{MaxBufferSize: DefaultMaxBufferSize}, + closeCtx: context.Background(), + } + + w.updateStatus(apievents.StreamStatus{LastEventIndex: 0}) + require.Empty(t, w.buffer, "updateStatus must trim the buffer when the confirmed event is at index 0") +} diff --git a/lib/httplib/reverseproxy/reverse_proxy.go b/lib/httplib/reverseproxy/reverse_proxy.go index ccc108707d37e..306a2926ffaec 100644 --- a/lib/httplib/reverseproxy/reverse_proxy.go +++ b/lib/httplib/reverseproxy/reverse_proxy.go @@ -25,6 +25,7 @@ import ( "net/http" "net/http/httputil" "net/url" + "sync" "time" "github.com/gravitational/trace" @@ -33,6 +34,27 @@ import ( logutils "github.com/gravitational/teleport/lib/utils/log" ) +// bufferPool is a sync.Pool-backed implementation of httputil.BufferPool. +// It reuses 32 KiB buffers across proxied requests to reduce GC pressure +// under high concurrency. +type bufferPool struct { + pool sync.Pool +} + +func (b *bufferPool) Get() []byte { + if v := b.pool.Get(); v != nil { + return v.([]byte) + } + return make([]byte, 32*1024) +} + +func (b *bufferPool) Put(buf []byte) { + b.pool.Put(buf) //nolint:staticcheck // SA6002: []byte is already a reference type; boxing cost is negligible +} + +// defaultBufferPool is shared across all Forwarder instances. +var defaultBufferPool = &bufferPool{} + // X-* Header names. const ( XForwardedProto = "X-Forwarded-Proto" @@ -77,6 +99,7 @@ func New(opts ...Option) (*Forwarder, error) { ReverseProxy: &httputil.ReverseProxy{ ErrorHandler: DefaultHandler.ServeHTTP, ErrorLog: log.Default(), + BufferPool: defaultBufferPool, }, logger: slog.Default(), } diff --git a/lib/httplib/reverseproxy/reverse_proxy_test.go b/lib/httplib/reverseproxy/reverse_proxy_test.go index 7bf218e8f1646..989452a26b370 100644 --- a/lib/httplib/reverseproxy/reverse_proxy_test.go +++ b/lib/httplib/reverseproxy/reverse_proxy_test.go @@ -116,6 +116,18 @@ func TestRequestCancelWithoutPanic(t *testing.T) { } +// TestForwarderUsesBufferPool verifies that the reverse proxy Forwarder +// is configured with a BufferPool. Without a pool, every proxied request +// allocates a fresh 32 KiB buffer for io.Copy that becomes garbage +// immediately after the request completes. Under high concurrency (e.g., +// hundreds of concurrent app sessions), this creates significant GC +// pressure and contributes to memory growth. +func TestForwarderUsesBufferPool(t *testing.T) { + fwd, err := New() + require.NoError(t, err) + require.NotNil(t, fwd.ReverseProxy.BufferPool, "Forwarder must set a BufferPool to reuse io.Copy buffers and reduce GC pressure under high concurrency") +} + func newSingleHostReverseProxy(target *url.URL) *Forwarder { return &Forwarder{ ReverseProxy: httputil.NewSingleHostReverseProxy(target), diff --git a/lib/srv/app/connections_handler.go b/lib/srv/app/connections_handler.go index aa9b8b1efd747..e39440cacbc50 100644 --- a/lib/srv/app/connections_handler.go +++ b/lib/srv/app/connections_handler.go @@ -124,6 +124,12 @@ type ConnectionsHandlerConfig struct { // InsecureMode defines whether insecure connections are allowed. InsecureMode bool + + // MaxActiveSessionChunks limits the number of concurrently active + // session chunks on this agent. Each active chunk holds an open + // recording stream. Zero or negative means use + // DefaultMaxActiveSessionChunks. + MaxActiveSessionChunks int } // CheckAndSetDefaults validates the config values and sets defaults. @@ -131,6 +137,9 @@ func (c *ConnectionsHandlerConfig) CheckAndSetDefaults() error { if c.Clock == nil { c.Clock = clockwork.NewRealClock() } + if c.MaxActiveSessionChunks <= 0 { + c.MaxActiveSessionChunks = DefaultMaxActiveSessionChunks + } if c.DataDir == "" { return trace.BadParameter("data dir missing") } @@ -211,6 +220,9 @@ type ConnectionsHandler struct { // getAppByPublicAddress returns a types.Application using the public address as matcher. getAppByPublicAddress func(context.Context, string) (types.Application, error) + + // chunkSem limits concurrent active session chunks. + chunkSem chan struct{} } // NewConnectionsHandler returns a new ConnectionsHandler. @@ -244,6 +256,8 @@ func NewConnectionsHandler(closeContext context.Context, cfg *ConnectionsHandler return nil, trace.Wrap(err) } + chunkSem := make(chan struct{}, cfg.MaxActiveSessionChunks) + c := &ConnectionsHandler{ cfg: cfg, closeContext: closeContext, @@ -252,6 +266,7 @@ func NewConnectionsHandler(closeContext context.Context, cfg *ConnectionsHandler gcpHandler: gcpHandler, connAuth: make(map[net.Conn]error), log: slog.With(teleport.ComponentKey, cfg.ServiceComponent), + chunkSem: chunkSem, getAppByPublicAddress: func(ctx context.Context, s string) (types.Application, error) { return nil, trace.NotFound("no applications are being proxied") }, @@ -265,6 +280,7 @@ func NewConnectionsHandler(closeContext context.Context, cfg *ConnectionsHandler Clock: c.cfg.Clock, CleanupInterval: time.Second, OnExpiry: c.onSessionExpired, + ReloadOnErr: true, }) if err != nil { return nil, trace.Wrap(err) diff --git a/lib/srv/app/session.go b/lib/srv/app/session.go index 395ddfeba81c8..4978bc8a3e1ef 100644 --- a/lib/srv/app/session.go +++ b/lib/srv/app/session.go @@ -45,6 +45,21 @@ import ( // sessionChunkCloseTimeout is the default timeout used for sessionChunk.closeTimeout const sessionChunkCloseTimeout = 1 * time.Hour +// DefaultMaxActiveSessionChunks is the default limit on concurrently +// active session chunks per agent. Each chunk holds an open recording +// stream whose in-memory footprint depends on upload throughput. +// The dominant cost is the ProtoStreamer upload buffer (~7 MiB per +// stream). 256 chunks at 7 MiB is ~1.8 GiB, which caps memory +// without restricting normal workloads. +// +// Under an IOPS stall, each slot is held for up to +// sessionChunkCloseTimeout (1 hour) because the recording stream +// cannot flush. With a 5-minute chunk TTL, all 256 slots fill in +// roughly 5 minutes, and new sessions are rejected with +// LimitExceeded for the duration of the incident. This is the +// intended tradeoff: cap memory at the cost of availability. +const DefaultMaxActiveSessionChunks = 256 + var errSessionChunkAlreadyClosed = errors.New("session chunk already closed") // sessionChunk holds an open request handler and stream closer for an app session. @@ -81,6 +96,14 @@ type sessionChunk struct { // for ~7 minutes at most. closeTimeout time.Duration + // chunkSem is an optional semaphore shared across all session + // chunks on the same agent. A slot is acquired in newSessionChunk + // before opening the recording stream (returning LimitExceeded if + // full) and released in close after the stream shuts down. This + // bounds the number of concurrently open recording streams to + // prevent unbounded memory growth. + chunkSem chan struct{} + log *slog.Logger } @@ -92,11 +115,36 @@ type sessionOpt func(context.Context, *sessionChunk, *tlsca.Identity, types.Appl // and as such expects `release()` to eventually be called // by the caller of this function. func (c *ConnectionsHandler) newSessionChunk(ctx context.Context, identity *tlsca.Identity, app types.Application, startTime time.Time, opts ...sessionOpt) (*sessionChunk, error) { + // Acquire a semaphore slot before opening the recording stream. + // Each chunk holds an open stream that consumes memory; bounding + // the number of concurrent chunks prevents OOM under load. The + // slot is released in close() when the chunk shuts down. + if c.chunkSem != nil { + select { + case c.chunkSem <- struct{}{}: + default: + c.log.WarnContext(ctx, "Rejecting session chunk, semaphore full.", + "max_active_chunks", cap(c.chunkSem), + ) + return nil, trace.LimitExceeded("too many active session chunks") + } + } + + // If chunk creation fails after acquiring the semaphore slot, + // release it so we do not leak slots on error paths. + success := false + defer func() { + if !success && c.chunkSem != nil { + <-c.chunkSem + } + }() + sess := &sessionChunk{ id: uuid.New().String(), closeC: make(chan struct{}), inflightCond: sync.NewCond(&sync.Mutex{}), closeTimeout: sessionChunkCloseTimeout, + chunkSem: c.chunkSem, log: c.log, } @@ -137,6 +185,7 @@ func (c *ConnectionsHandler) newSessionChunk(ctx context.Context, identity *tlsc return nil, trace.Wrap(err) } + success = true sess.log.DebugContext(ctx, "Created app session chunk", "session_id", sess.id) return sess, nil } @@ -197,9 +246,9 @@ func (c *ConnectionsHandler) withGCPHandler(ctx context.Context, sess *sessionCh return nil } -// acquire() increments in-flight request count by 1. -// It is supposed to be paired with a `release()` call, -// after the chunk is done with for the individual request +// acquire increments in-flight request count by 1. It is supposed to +// be paired with a release call once the individual request finishes +// using the chunk. func (s *sessionChunk) acquire() error { s.inflightCond.L.Lock() defer s.inflightCond.L.Unlock() @@ -251,7 +300,13 @@ func (s *sessionChunk) close(ctx context.Context) error { s.inflightCond.L.Unlock() close(s.closeC) s.log.DebugContext(ctx, "Closed session chunk", "session_id", s.id) - return trace.Wrap(s.streamCloser.Close(ctx)) + err := s.streamCloser.Close(ctx) + // Release the semaphore slot after the stream is closed, not + // before, so the slot accurately reflects an open stream. + if s.chunkSem != nil { + <-s.chunkSem + } + return trace.Wrap(err) } func (c *ConnectionsHandler) onSessionExpired(ctx context.Context, key, expired any) { diff --git a/lib/srv/app/session_test.go b/lib/srv/app/session_test.go index 1835b405a98ad..4a253b1acccdc 100644 --- a/lib/srv/app/session_test.go +++ b/lib/srv/app/session_test.go @@ -87,3 +87,62 @@ func TestSessionChunkCannotAcquireAfterClosing(t *testing.T) { sess.close(context.Background()) require.Error(t, sess.acquire()) } + +// TestMaxActiveSessionChunksDefault verifies that CheckAndSetDefaults +// sets MaxActiveSessionChunks to DefaultMaxActiveSessionChunks when +// the caller leaves it at zero. Without this default the semaphore +// would have zero capacity and reject every session chunk. +func TestMaxActiveSessionChunksDefault(t *testing.T) { + cfg := &ConnectionsHandlerConfig{MaxActiveSessionChunks: 0} + // CheckAndSetDefaults will fail on other missing fields, but + // MaxActiveSessionChunks is set before any error return. + _ = cfg.CheckAndSetDefaults() + require.Equal(t, DefaultMaxActiveSessionChunks, cfg.MaxActiveSessionChunks) +} + +// TestSessionChunkSemaphore verifies that the session chunk semaphore +// limits the number of concurrently open recording streams per agent. +// Each chunk holds an open stream whose memory footprint is dominated +// by the ProtoStreamer upload buffer (~7 MiB). Without a bound, an +// agent handling many concurrent app sessions can accumulate hundreds +// of open streams, exhausting memory. +// +// The chunkSem is a buffered channel shared across all chunks on one +// agent. A slot is acquired in newSessionChunk before the recording +// stream is opened (returning LimitExceeded when full) and released +// in close after the stream shuts down. +func TestSessionChunkSemaphore(t *testing.T) { + t.Run("close releases semaphore slot", func(t *testing.T) { + sem := make(chan struct{}, 2) + sess := newSessionChunk(time.Second) + sess.chunkSem = sem + + // Simulate what newSessionChunk does: acquire a slot. + sem <- struct{}{} + require.Len(t, sem, 1, "slot should be occupied after creation") + + sess.close(context.Background()) + require.Empty(t, sem, "close should release the semaphore slot") + }) + + t.Run("close releases semaphore even with inflight requests", func(t *testing.T) { + // Simulate: a request acquired the chunk (inflight=1), then + // close() force-closed it (setting inflight to -1) because + // the closeTimeout elapsed. close() must still drain the + // semaphore slot so the agent does not leak slots. + sem := make(chan struct{}, 2) + sess := newSessionChunk(100 * time.Millisecond) + sess.chunkSem = sem + + // Simulate what newSessionChunk does. + sem <- struct{}{} + + require.NoError(t, sess.acquire()) + require.Len(t, sem, 1, "semaphore should still have one slot from creation") + + // Force-close: the chunk's closeTimeout is 100ms and we + // never release, so close() will force inflight to -1. + sess.close(context.Background()) + require.Empty(t, sem, "close must drain the semaphore even when force-closing with in-flight requests, otherwise the slot leaks") + }) +}