Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 66 additions & 9 deletions lib/events/session_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ import (
"github.com/gravitational/teleport/lib/session"
)

// DefaultMaxBufferSize is the default maximum number of events retained
// in the SessionWriter's in-memory buffer while waiting for upload
// confirmation. When reached, the writer stops consuming from its
// internal channel, creating backpressure through to RecordEvent callers.
const DefaultMaxBufferSize = 4096

// NewSessionWriter returns a new instance of session writer
func NewSessionWriter(cfg SessionWriterConfig) (*SessionWriter, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
Expand Down Expand Up @@ -100,6 +106,14 @@ type SessionWriterConfig struct {

// BackoffDuration is a duration of the backoff before the next try
BackoffDuration time.Duration

// MaxBufferSize is the maximum number of events to retain in the
// in-memory buffer while waiting for upload confirmation. When the
// buffer reaches this size, processEvents stops reading new events
// from the channel, which creates backpressure through the
// unbuffered eventsCh all the way back to RecordEvent callers.
// Zero or negative means use DefaultMaxBufferSize.
MaxBufferSize int
}

// CheckAndSetDefaults checks and sets defaults
Expand Down Expand Up @@ -128,6 +142,9 @@ func (cfg *SessionWriterConfig) CheckAndSetDefaults() error {
if cfg.MakeEvents == nil {
cfg.MakeEvents = bytesToSessionPrintEvents
}
if cfg.MaxBufferSize <= 0 {
cfg.MaxBufferSize = DefaultMaxBufferSize
}
return nil
}

Expand Down Expand Up @@ -174,6 +191,10 @@ type SessionWriter struct {
lostEvents atomic.Int64
acceptedEvents atomic.Int64
slowWrites atomic.Int64
// bufferFull tracks whether the buffer cap warning has been
// logged. It resets when the buffer drops below capacity so the
// warning fires again if the buffer refills.
bufferFull bool
}

// SessionWriterStats provides stats about lost events and slow writes
Expand Down Expand Up @@ -428,6 +449,29 @@ func (a *SessionWriter) processEvents() {
a.updateStatus(status)
default:
}
// When the buffer is at capacity, stop reading new events and
// only wait for status updates (which trim the buffer) or
// shutdown signals. This creates backpressure through the
// unbuffered eventsCh back to RecordEvent callers.
if len(a.buffer) >= a.cfg.MaxBufferSize {
if !a.bufferFull {
a.bufferFull = true
a.log.WarnContext(a.cfg.Context, "Session writer buffer at capacity, applying backpressure.",
"buffer_size", len(a.buffer), "max_buffer_size", a.cfg.MaxBufferSize)
}
select {
case status := <-a.stream.Status():
a.updateStatus(status)
case <-a.stream.Done():
if !a.handleStreamDone() {
return
}
case <-a.closeCtx.Done():
a.completeStream(a.stream)
return
}
continue
}
select {
case status := <-a.stream.Status():
a.updateStatus(status)
Expand All @@ -454,14 +498,7 @@ func (a *SessionWriter) processEvents() {
a.log.DebugContext(a.cfg.Context, "Recovered stream", "duration", time.Since(start))
}
case <-a.stream.Done():
if a.closeCtx.Err() != nil {
// don't attempt recovery if we're closing
return
}
a.log.DebugContext(a.cfg.Context, "Stream was closed by the server, attempting to recover.")
if err := a.recoverStream(); err != nil {
a.log.WarnContext(a.cfg.Context, "Failed to recover stream.", "error", err)

if !a.handleStreamDone() {
return
}
case <-a.closeCtx.Done():
Expand All @@ -471,6 +508,21 @@ func (a *SessionWriter) processEvents() {
}
}

// handleStreamDone attempts to recover a server-closed stream. It
// returns true if recovery succeeded and the caller should continue
// the event loop, or false if the caller should return.
func (a *SessionWriter) handleStreamDone() bool {
if a.closeCtx.Err() != nil {
return false
}
a.log.DebugContext(a.cfg.Context, "Stream was closed by the server, attempting to recover.")
if err := a.recoverStream(); err != nil {
a.log.WarnContext(a.cfg.Context, "Failed to recover stream.", "error", err)
return false
}
return true
}

// IsPermanentEmitError checks if the error contains either a sole
// [trace.BadParameter] error in its chain, or a [trace.Aggregate] error
// composed entirely of BadParameters.
Expand Down Expand Up @@ -612,10 +664,15 @@ func (a *SessionWriter) updateStatus(status apievents.StreamStatus) {
}
lastIndex = i
}
if lastIndex > 0 {
if lastIndex >= 0 {
before := len(a.buffer)
a.buffer = a.buffer[lastIndex+1:]
a.log.DebugContext(a.closeCtx, "Removed saved events", "removed", before-len(a.buffer), "remaining", len(a.buffer))
if a.bufferFull && len(a.buffer) < a.cfg.MaxBufferSize {
a.bufferFull = false
a.log.InfoContext(a.closeCtx, "Session writer buffer below capacity, backpressure released.",
"buffer_size", len(a.buffer), "max_buffer_size", a.cfg.MaxBufferSize)
}
}
}

Expand Down
108 changes: 108 additions & 0 deletions lib/events/session_writer_buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Teleport
* Copyright (C) 2026 Gravitational, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package events

import (
"context"
"log/slog"
"testing"

"github.com/stretchr/testify/require"

apievents "github.com/gravitational/teleport/api/types/events"
)

// noOpPreparer is a SessionEventPreparer that returns events unchanged.
// It is used in tests that only need the SessionWriterConfig to pass
// validation without actually preparing events.
type noOpPreparer struct{}

func (noOpPreparer) PrepareSessionEvent(event apievents.AuditEvent) (apievents.PreparedSessionEvent, error) {
return preparedEvent{event: event}, nil
}

type preparedEvent struct {
event apievents.AuditEvent
}

func (p preparedEvent) GetAuditEvent() apievents.AuditEvent { return p.event }

// TestSessionWriterConfigMaxBufferSize verifies that the MaxBufferSize
// config field defaults to DefaultMaxBufferSize when unset and
// preserves an explicit value when set. Without a default, the buffer
// cap would be zero, which would block all event processing.
//
// The SessionWriter's internal buffer accumulates PreparedSessionEvents
// until a status update from the upload stream confirms they have been
// persisted, at which point the confirmed prefix is trimmed. When the
// upload stream stalls (e.g., disk IOPS exhausted on a Kubernetes
// emptyDir volume), status updates stop arriving and the buffer grows
// without limit. Each event is small (~200 bytes), but at sustained
// throughput (hundreds of events per second across many sessions) this
// leads to OOM.
//
// The MaxBufferSize config caps the buffer. When full, processEvents
// stops reading from eventsCh, which creates backpressure through the
// unbuffered channel: RecordEvent blocks for BackoffTimeout (5s by
// default) then drops the event and enters backoff. This trades event
// loss for process survival.
func TestSessionWriterConfigMaxBufferSize(t *testing.T) {
t.Run("defaults to DefaultMaxBufferSize when zero", func(t *testing.T) {
cfg := SessionWriterConfig{
SessionID: "test",
Streamer: NewDiscardStreamer(),
Preparer: noOpPreparer{},
Context: t.Context(),
}
require.NoError(t, cfg.CheckAndSetDefaults())
require.Equal(t, DefaultMaxBufferSize, cfg.MaxBufferSize, "MaxBufferSize must default to DefaultMaxBufferSize so the buffer cap is active even when callers do not set it explicitly")
})

t.Run("preserves explicit value", func(t *testing.T) {
cfg := SessionWriterConfig{
SessionID: "test",
Streamer: NewDiscardStreamer(),
Preparer: noOpPreparer{},
Context: t.Context(),
MaxBufferSize: 500,
}
require.NoError(t, cfg.CheckAndSetDefaults())
require.Equal(t, 500, cfg.MaxBufferSize)
})
}

// TestUpdateStatusTrimsAtIndexZero verifies that updateStatus trims the
// buffer when the only confirmed event is at buffer index 0. Before the
// fix, the condition was lastIndex > 0 which skipped trimming when only
// one event was confirmed, causing the buffer to grow without bound
// even when the upload stream was healthy.
func TestUpdateStatusTrimsAtIndexZero(t *testing.T) {
evt := &apievents.SessionPrint{}
evt.SetIndex(0)

w := &SessionWriter{
log: slog.Default(),
buffer: []apievents.PreparedSessionEvent{preparedEvent{event: evt}},
cfg: SessionWriterConfig{MaxBufferSize: DefaultMaxBufferSize},
closeCtx: context.Background(),
}

w.updateStatus(apievents.StreamStatus{LastEventIndex: 0})
require.Empty(t, w.buffer, "updateStatus must trim the buffer when the confirmed event is at index 0")
}
23 changes: 23 additions & 0 deletions lib/httplib/reverseproxy/reverse_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http"
"net/http/httputil"
"net/url"
"sync"
"time"

"github.com/gravitational/trace"
Expand All @@ -33,6 +34,27 @@ import (
logutils "github.com/gravitational/teleport/lib/utils/log"
)

// bufferPool is a sync.Pool-backed implementation of httputil.BufferPool.
// It reuses 32 KiB buffers across proxied requests to reduce GC pressure
// under high concurrency.
type bufferPool struct {
pool sync.Pool
}

func (b *bufferPool) Get() []byte {
if v := b.pool.Get(); v != nil {
return v.([]byte)
}
return make([]byte, 32*1024)
}

func (b *bufferPool) Put(buf []byte) {
b.pool.Put(buf) //nolint:staticcheck // SA6002: []byte is already a reference type; boxing cost is negligible
}

// defaultBufferPool is shared across all Forwarder instances.
var defaultBufferPool = &bufferPool{}

// X-* Header names.
const (
XForwardedProto = "X-Forwarded-Proto"
Expand Down Expand Up @@ -77,6 +99,7 @@ func New(opts ...Option) (*Forwarder, error) {
ReverseProxy: &httputil.ReverseProxy{
ErrorHandler: DefaultHandler.ServeHTTP,
ErrorLog: log.Default(),
BufferPool: defaultBufferPool,
},
logger: slog.Default(),
}
Expand Down
12 changes: 12 additions & 0 deletions lib/httplib/reverseproxy/reverse_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ func TestRequestCancelWithoutPanic(t *testing.T) {

}

// TestForwarderUsesBufferPool verifies that the reverse proxy Forwarder
// is configured with a BufferPool. Without a pool, every proxied request
// allocates a fresh 32 KiB buffer for io.Copy that becomes garbage
// immediately after the request completes. Under high concurrency (e.g.,
// hundreds of concurrent app sessions), this creates significant GC
// pressure and contributes to memory growth.
func TestForwarderUsesBufferPool(t *testing.T) {
fwd, err := New()
require.NoError(t, err)
require.NotNil(t, fwd.ReverseProxy.BufferPool, "Forwarder must set a BufferPool to reuse io.Copy buffers and reduce GC pressure under high concurrency")
}

func newSingleHostReverseProxy(target *url.URL) *Forwarder {
return &Forwarder{
ReverseProxy: httputil.NewSingleHostReverseProxy(target),
Expand Down
16 changes: 16 additions & 0 deletions lib/srv/app/connections_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,22 @@ type ConnectionsHandlerConfig struct {

// InsecureMode defines whether insecure connections are allowed.
InsecureMode bool

// MaxActiveSessionChunks limits the number of concurrently active
// session chunks on this agent. Each active chunk holds an open
// recording stream. Zero or negative means use
// DefaultMaxActiveSessionChunks.
MaxActiveSessionChunks int
}

// CheckAndSetDefaults validates the config values and sets defaults.
func (c *ConnectionsHandlerConfig) CheckAndSetDefaults() error {
if c.Clock == nil {
c.Clock = clockwork.NewRealClock()
}
if c.MaxActiveSessionChunks <= 0 {
c.MaxActiveSessionChunks = DefaultMaxActiveSessionChunks
}
if c.DataDir == "" {
return trace.BadParameter("data dir missing")
}
Expand Down Expand Up @@ -211,6 +220,9 @@ type ConnectionsHandler struct {

// getAppByPublicAddress returns a types.Application using the public address as matcher.
getAppByPublicAddress func(context.Context, string) (types.Application, error)

// chunkSem limits concurrent active session chunks.
chunkSem chan struct{}
}

// NewConnectionsHandler returns a new ConnectionsHandler.
Expand Down Expand Up @@ -244,6 +256,8 @@ func NewConnectionsHandler(closeContext context.Context, cfg *ConnectionsHandler
return nil, trace.Wrap(err)
}

chunkSem := make(chan struct{}, cfg.MaxActiveSessionChunks)

c := &ConnectionsHandler{
cfg: cfg,
closeContext: closeContext,
Expand All @@ -252,6 +266,7 @@ func NewConnectionsHandler(closeContext context.Context, cfg *ConnectionsHandler
gcpHandler: gcpHandler,
connAuth: make(map[net.Conn]error),
log: slog.With(teleport.ComponentKey, cfg.ServiceComponent),
chunkSem: chunkSem,
getAppByPublicAddress: func(ctx context.Context, s string) (types.Application, error) {
return nil, trace.NotFound("no applications are being proxied")
},
Expand All @@ -265,6 +280,7 @@ func NewConnectionsHandler(closeContext context.Context, cfg *ConnectionsHandler
Clock: c.cfg.Clock,
CleanupInterval: time.Second,
OnExpiry: c.onSessionExpired,
ReloadOnErr: true,
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down
Loading
Loading