Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions lib/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,9 @@ type ProtoStream struct {

eventsCh chan protoEvent

// cancelCtx is used to signal closure
// cancelCtx is canceled when the stream's goroutines should exit.
// This happens on all exit paths: successful completion, timeout,
// error, or explicit close.
cancelCtx context.Context
cancel context.CancelFunc
cancelErr error
Expand Down Expand Up @@ -499,12 +501,15 @@ func (s *ProtoStream) RecordEvent(ctx context.Context, pe apievents.PreparedSess
}
}

// Complete completes the upload, waits for completion and returns all allocated resources.
// Complete completes the upload, waits for completion and returns
// all allocated resources. The stream's internal context is always
// canceled when Complete returns (via defer), regardless of whether
// the upload finished successfully or the caller's context timed out.
func (s *ProtoStream) Complete(ctx context.Context) error {
defer s.cancel()
s.complete()
select {
case <-s.uploadLoopDoneCh:
s.cancel()
return s.getCompleteResult()
case <-ctx.Done():
return trace.ConnectionProblem(ctx.Err(), "context has canceled before complete could succeed")
Expand All @@ -517,9 +522,13 @@ func (s *ProtoStream) Status() <-chan apievents.StreamStatus {
return s.statusCh
}

// Close flushes non-uploaded flight stream data without marking
// the stream completed and closes the stream instance
// Close flushes non-uploaded in-flight stream data without marking
// the stream completed and closes the stream instance. The stream's
// internal context is always canceled when Close returns (via defer),
// regardless of whether the flush finished or the caller's context
// timed out.
func (s *ProtoStream) Close(ctx context.Context) error {
defer s.cancel()
s.completeType.Store(completeTypeFlush)
s.complete()
select {
Expand Down
207 changes: 207 additions & 0 deletions lib/events/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
"io"
"os"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -700,3 +702,208 @@ func (m *MockSummarizer) SummarizeWithoutEndEvent(ctx context.Context, sessionID
args := m.Called(ctx, sessionID)
return args.Error(0)
}

// blockingUploader and the tests below simulate what happens when a
// Teleport agent tries to close a session recording stream but the
// underlying disk writes are stuck (e.g., emptyDir volume with
// exhausted IOPS).
//
// ProtoStream uploads recording data in background goroutines. When
// Complete() or Close() time out waiting for those uploads, the stream
// should cancel its internal context so the stuck goroutines exit
// promptly. Without that cancellation, each orphaned goroutine holds
// a ~5 MB slice buffer and keeps running until the slow write finishes.
// Under sustained session churn, these goroutines accumulate and the
// agent eventually OOMs.
//
// The blockingUploader simulates a permanently stuck write by blocking
// UploadPart on ctx.Done(). The tests call Complete/Close with a short
// timeout and then check whether the blocked goroutine was unblocked
// by context cancellation.
//
// blockingUploader wraps MemoryUploader and blocks UploadPart until
// the context is canceled. It tracks how many UploadPart calls have
// started and finished so tests can observe goroutine lifecycle.
type blockingUploader struct {
*eventstest.MemoryUploader
partStarted atomic.Int32
partFinished atomic.Int32
}

// UploadPart overrides MemoryUploader.UploadPart to block until the
// context is canceled, simulating a permanently stuck disk write.
// This reproduces the conditions seen when a Kubernetes emptyDir
// volume runs out of IOPS and disk writes stall indefinitely.
func (b *blockingUploader) UploadPart(ctx context.Context, _ events.StreamUpload, _ int64, _ io.ReadSeeker) (*events.StreamPart, error) {
b.partStarted.Add(1)
defer b.partFinished.Add(1)

<-ctx.Done()
return nil, ctx.Err()
}

// blockedStream bundles a stream with a blockingUploader so tests can
// observe the upload goroutine lifecycle.
type blockedStream struct {
sid session.ID
stream apievents.Stream
uploader *blockingUploader
forceFlush chan struct{}
}

// newBlockedStream creates a ProtoStream backed by a blockingUploader.
// The ForceFlush channel lets tests trigger a slice upload without
// needing to fill the 5 MB MinUploadPartSizeBytes threshold.
func newBlockedStream(t *testing.T, sid session.ID) *blockedStream {
t.Helper()

uploader := &blockingUploader{
MemoryUploader: eventstest.NewMemoryUploader(),
}
forceFlush := make(chan struct{}, 1)
streamer, err := events.NewProtoStreamer(events.ProtoStreamerConfig{
Uploader: uploader,
ForceFlush: forceFlush,
})
require.NoError(t, err)

stream, err := streamer.CreateAuditStream(t.Context(), sid)
require.NoError(t, err)

return &blockedStream{
sid: sid,
stream: stream,
uploader: uploader,
forceFlush: forceFlush,
}
}

// emitAndFlush writes a few session events and forces a flush to
// trigger an upload goroutine.
func (s *blockedStream) emitAndFlush(t *testing.T) {
t.Helper()

preparer, err := events.NewPreparer(events.PreparerConfig{
SessionID: s.sid,
Namespace: apidefaults.Namespace,
ClusterName: "cluster",
})
require.NoError(t, err)

ctx := t.Context()
for _, auditEvent := range eventstest.GenerateTestSession(eventstest.SessionParams{PrintEvents: 1}) {
event, err := preparer.PrepareSessionEvent(auditEvent)
require.NoError(t, err)
require.NoError(t, s.stream.RecordEvent(ctx, event))
}

s.forceFlush <- struct{}{}
}

// waitForUpload waits until at least one upload goroutine has started.
func (s *blockedStream) waitForUpload(t *testing.T) {
t.Helper()

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Greater(c, s.uploader.partStarted.Load(), int32(0))
}, 5*time.Second, 10*time.Millisecond)
}

// requireGoroutineExit asserts that all started upload goroutines have
// finished within 5 seconds.
func (s *blockedStream) requireGoroutineExit(t *testing.T, msg string) {
t.Helper()

require.EventuallyWithT(t, func(c *assert.CollectT) {
started := s.uploader.partStarted.Load()
finished := s.uploader.partFinished.Load()
assert.Equal(c, started, finished, "expected all started uploads to have finished")
}, 5*time.Second, 50*time.Millisecond, msg)
}

// TestCompleteTimeoutCancelsStream verifies that when Complete() times
// out, the stream's internal context is canceled so that orphaned
// upload goroutines exit promptly via context cancellation rather than
// running until their slow writes complete.
func TestCompleteTimeoutCancelsStream(t *testing.T) {
s := newBlockedStream(t, session.ID("complete-timeout-test"))

s.emitAndFlush(t)
s.waitForUpload(t)

// Call Complete with a short timeout. The upload goroutine is blocked
// forever (simulating, for example, a stuck write on an emptyDir
// volume with exhausted IOPS), so Complete should time out.
ctx, cancel := context.WithTimeout(t.Context(), 100*time.Millisecond)
defer cancel()

err := s.stream.Complete(ctx)
require.Error(t, err, "Complete should return an error on timeout")

// The upload goroutine should exit promptly because Complete()
// canceled the stream's internal context.
s.requireGoroutineExit(t, "upload goroutine should exit after Complete timeout cancels the stream context")
}

// TestCloseCancelsStream verifies that Close() cancels the stream's
// internal context on both the timeout and success paths.
func TestCloseCancelsStream(t *testing.T) {
t.Run("timeout", func(t *testing.T) {
s := newBlockedStream(t, session.ID("close-timeout-test"))

s.emitAndFlush(t)
s.waitForUpload(t)

ctx, cancel := context.WithTimeout(t.Context(), 100*time.Millisecond)
defer cancel()

err := s.stream.Close(ctx)
require.Error(t, err, "Close should return an error on timeout")

s.requireGoroutineExit(t, "upload goroutine should exit after Close timeout cancels the stream context")
})

t.Run("success", func(t *testing.T) {
// Verify that Close() on the success path (fast uploader) also
// cancels the stream's internal context via the defer.
cfg := events.ProtoStreamerConfig{
Uploader: eventstest.NewMemoryUploader(),
}
streamer, err := events.NewProtoStreamer(cfg)
require.NoError(t, err)

ctx := t.Context()
sid := session.ID("close-success-test")

stream, err := streamer.CreateAuditStream(ctx, sid)
require.NoError(t, err)

preparer, err := events.NewPreparer(events.PreparerConfig{
SessionID: sid,
Namespace: apidefaults.Namespace,
ClusterName: "cluster",
})
require.NoError(t, err)

for _, auditEvent := range eventstest.GenerateTestSession(eventstest.SessionParams{PrintEvents: 1}) {
event, err := preparer.PrepareSessionEvent(auditEvent)
require.NoError(t, err)
require.NoError(t, stream.RecordEvent(ctx, event))
}

ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

err = stream.Close(ctx)
require.NoError(t, err, "Close should succeed with fast uploader")

// The stream's internal context should be canceled even on the
// success path, because Close() defers s.cancel().
select {
case <-stream.Done():
// Expected: the stream's cancelCtx was canceled.
case <-time.After(5 * time.Second):
t.Fatal("stream.Done() not closed after successful Close()")
}
})
}
Loading