From 834bc2b2464bcc9989f0150854a68a58fa67e715 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Thu, 18 Sep 2025 16:56:50 +0200 Subject: [PATCH 1/6] feat: implement retry mechanism for log processing --- logger.go | 243 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 175 insertions(+), 68 deletions(-) diff --git a/logger.go b/logger.go index 2aa4ed3..4cf8264 100644 --- a/logger.go +++ b/logger.go @@ -407,6 +407,8 @@ type logQueuer struct { loggerTTL time.Duration loggers map[string]agentLoggerLifecycle logCache logCache + + retries map[string]*retryState } func (l *logQueuer) work(ctx context.Context) { @@ -427,87 +429,120 @@ func (l *logQueuer) work(ctx context.Context) { } } +func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []agentsdk.Log) (agentLoggerLifecycle, error) { + client := agentsdk.New(l.coderURL) + client.SetSessionToken(log.agentToken) + logger := l.logger.With(slog.F("resource_name", log.resourceName)) + client.SDK.SetLogger(logger) + + _, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ + ID: sourceUUID, + Icon: "/icon/k8s.png", + DisplayName: "Kubernetes", + }) + if err != nil { + // This shouldn't fail sending the log, as it only affects how they + // appear. + logger.Error(ctx, "post log source", slog.Error(err)) + l.scheduleRetry(ctx, log.agentToken) + return agentLoggerLifecycle{}, err + } + + ls := agentsdk.NewLogSender(logger) + sl := ls.GetScriptLogger(sourceUUID) + + gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) + + // connect to Agent v2.0 API, since we don't need features added later. + // This maximizes compatibility. + arpc, err := client.ConnectRPC20(gracefulCtx) + if err != nil { + logger.Error(ctx, "drpc connect", slog.Error(err)) + gracefulCancel() + l.scheduleRetry(ctx, log.agentToken) + return agentLoggerLifecycle{}, err + } + go func() { + err := ls.SendLoop(gracefulCtx, arpc) + // if the send loop exits on its own without the context + // canceling, timeout the logger and force it to recreate. + if err != nil && ctx.Err() == nil { + l.loggerTimeout(log.agentToken) + } + }() + + closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { + logger.Info(ctx, "logger timeout firing") + l.loggerTimeout(log.agentToken) + }) + lifecycle := agentLoggerLifecycle{ + scriptLogger: sl, + close: func() { + // We could be stopping for reasons other than the timeout. If + // so, stop the timer. + closeTimer.Stop() + defer gracefulCancel() + timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) + defer timeout.Stop() + logger.Info(ctx, "logger closing") + + if err := sl.Flush(gracefulCtx); err != nil { + // ctx err + logger.Warn(gracefulCtx, "timeout reached while flushing") + return + } + + if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { + // ctx err + logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") + } + + _ = arpc.DRPCConn().Close() + client.SDK.HTTPClient.CloseIdleConnections() + }, + } + lifecycle.closeTimer = closeTimer + return lifecycle, nil +} + func (l *logQueuer) processLog(ctx context.Context, log agentLog) { l.mu.Lock() defer l.mu.Unlock() - queuedLogs := l.logCache.push(log) + + queuedLogs := l.logCache.get(log.agentToken) + if isAgentLogEmpty(log) { + if queuedLogs == nil { + return + } + } else { + queuedLogs = l.logCache.push(log) + } + lgr, ok := l.loggers[log.agentToken] if !ok { - client := agentsdk.New(l.coderURL) - client.SetSessionToken(log.agentToken) - logger := l.logger.With(slog.F("resource_name", log.resourceName)) - client.SDK.SetLogger(logger) - - _, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ - ID: sourceUUID, - Icon: "/icon/k8s.png", - DisplayName: "Kubernetes", - }) - if err != nil { - // This shouldn't fail sending the log, as it only affects how they - // appear. - logger.Error(ctx, "post log source", slog.Error(err)) + // skip if we're in a retry cooldown window + if rs := l.retries[log.agentToken]; rs != nil && rs.timer != nil { + return } - ls := agentsdk.NewLogSender(logger) - sl := ls.GetScriptLogger(sourceUUID) - - gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) - - // connect to Agent v2.0 API, since we don't need features added later. - // This maximizes compatibility. - arpc, err := client.ConnectRPC20(gracefulCtx) + var err error + lgr, err = l.newLogger(ctx, log, queuedLogs) if err != nil { - logger.Error(ctx, "drpc connect", slog.Error(err)) - gracefulCancel() + l.scheduleRetry(ctx, log.agentToken) return } - go func() { - err := ls.SendLoop(gracefulCtx, arpc) - // if the send loop exits on its own without the context - // canceling, timeout the logger and force it to recreate. - if err != nil && ctx.Err() == nil { - l.loggerTimeout(log.agentToken) - } - }() - - closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { - logger.Info(ctx, "logger timeout firing") - l.loggerTimeout(log.agentToken) - }) - lifecycle := agentLoggerLifecycle{ - scriptLogger: sl, - close: func() { - // We could be stopping for reasons other than the timeout. If - // so, stop the timer. - closeTimer.Stop() - defer gracefulCancel() - timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) - defer timeout.Stop() - logger.Info(ctx, "logger closing") - - if err := sl.Flush(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while flushing") - return - } - - if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") - } - - _ = arpc.DRPCConn().Close() - client.SDK.HTTPClient.CloseIdleConnections() - }, - } - lifecycle.closeTimer = closeTimer - l.loggers[log.agentToken] = lifecycle - lgr = lifecycle + l.loggers[log.agentToken] = lgr } lgr.resetCloseTimer(l.loggerTTL) - _ = lgr.scriptLogger.Send(ctx, queuedLogs...) + if len(queuedLogs) == 0 { + return + } + if err := lgr.scriptLogger.Send(ctx, queuedLogs...); err != nil { + l.scheduleRetry(ctx, log.agentToken) + return + } + l.clearRetry(log.agentToken) l.logCache.delete(log.agentToken) } @@ -518,6 +553,8 @@ func (l *logQueuer) processDelete(log agentLog) { delete(l.loggers, log.agentToken) } + l.clearRetry(log.agentToken) + l.logCache.delete(log.agentToken) l.mu.Unlock() if ok { @@ -549,6 +586,64 @@ func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) { } } +// retryState tracks exponential backoff for an agent token. +type retryState struct { + delay time.Duration + timer *quartz.Timer +} + +func (l *logQueuer) ensureRetryMap() { + if l.retries == nil { + l.retries = make(map[string]*retryState) + } +} + +func (l *logQueuer) scheduleRetry(ctx context.Context, token string) { + l.ensureRetryMap() + + rs := l.retries[token] + if rs == nil { + rs = &retryState{delay: time.Second} + l.retries[token] = rs + } + + if rs.timer != nil { + return + } + + if rs.delay < time.Second { + rs.delay = time.Second + } else if rs.delay > 30*time.Second { + rs.delay = 30 * time.Second + } + + l.logger.Info(ctx, "scheduling retry", slog.F("delay", rs.delay.String())) + + rs.timer = l.clock.AfterFunc(rs.delay, func() { + l.mu.Lock() + if cur := l.retries[token]; cur != nil { + cur.timer = nil + } + l.mu.Unlock() + + l.q <- agentLog{op: opLog, agentToken: token} + }) + + rs.delay *= 2 + if rs.delay > 30*time.Second { + rs.delay = 30 * time.Second + } +} + +func (l *logQueuer) clearRetry(token string) { + if rs := l.retries[token]; rs != nil { + if rs.timer != nil { + rs.timer.Stop() + } + delete(l.retries, token) + } +} + func newColor(value ...color.Attribute) *color.Color { c := color.New(value...) c.EnableColor() @@ -572,3 +667,15 @@ func (l *logCache) push(log agentLog) []agentsdk.Log { func (l *logCache) delete(token string) { delete(l.logs, token) } + +func (l *logCache) get(token string) []agentsdk.Log { + logs, ok := l.logs[token] + if !ok { + return nil + } + return logs +} + +func isAgentLogEmpty(log agentLog) bool { + return log.resourceName == "" && log.log.Output == "" && log.log.CreatedAt.IsZero() +} From 232e983af64c6954d91b2c2e9664fdcbd03506ae Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Fri, 19 Sep 2025 15:48:09 +0200 Subject: [PATCH 2/6] add tests for retry mechanism and logCache functionality --- logger_test.go | 397 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 397 insertions(+) diff --git a/logger_test.go b/logger_test.go index 3ab1c0b..aa35584 100644 --- a/logger_test.go +++ b/logger_test.go @@ -486,6 +486,388 @@ func Test_logQueuer(t *testing.T) { // wait for the client to disconnect _ = testutil.RequireRecvCtx(ctx, t, api.disconnect) }) + + t.Run("RetryMechanism", func(t *testing.T) { + t.Parallel() + + // Create a failing API that will reject connections + failingAPI := newFailingAgentAPI(t) + agentURL, err := url.Parse(failingAPI.server.URL) + require.NoError(t, err) + clock := quartz.NewMock(t) + ttl := time.Second + + ch := make(chan agentLog, 10) + logger := slogtest.Make(t, &slogtest.Options{ + IgnoreErrors: true, + }) + lq := &logQueuer{ + logger: logger, + clock: clock, + q: ch, + coderURL: agentURL, + loggerTTL: ttl, + loggers: map[string]agentLoggerLifecycle{}, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + go lq.work(ctx) + + token := "retry-token" + ch <- agentLog{ + op: opLog, + resourceName: "hello", + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "This is a log.", + Level: codersdk.LogLevelInfo, + }, + } + + // Wait for the initial failure to be processed and retry state to be created + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.timer != nil && rs.delay == 2*time.Second + }, testutil.WaitShort, testutil.IntervalFast) + + // Verify retry state exists and has correct doubled delay (it gets doubled after scheduling) + lq.mu.Lock() + rs := lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 2*time.Second, rs.delay) // Delay gets doubled after scheduling + require.NotNil(t, rs.timer) + lq.mu.Unlock() + + // Advance clock to trigger first retry + clock.Advance(time.Second) + + // Wait for retry to be processed and delay to double again + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.delay == 4*time.Second + }, testutil.WaitShort, testutil.IntervalFast) + + // Check that delay doubled again for next retry + lq.mu.Lock() + rs = lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 4*time.Second, rs.delay) + lq.mu.Unlock() + + // Advance clock to trigger second retry + clock.Advance(2 * time.Second) + + // Wait for retry to be processed and delay to double again + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.delay == 8*time.Second + }, testutil.WaitShort, testutil.IntervalFast) + + // Check that delay doubled again + lq.mu.Lock() + rs = lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 8*time.Second, rs.delay) + lq.mu.Unlock() + }) + + t.Run("RetryMaxDelay", func(t *testing.T) { + t.Parallel() + + clock := quartz.NewMock(t) + ch := make(chan agentLog, 10) + lq := &logQueuer{ + logger: slogtest.Make(t, nil), + clock: clock, + q: ch, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + } + + ctx := context.Background() + token := "test-token" + + // Set up a retry state with a large delay + lq.ensureRetryMap() + lq.retries[token] = &retryState{delay: 20 * time.Second} + + // Schedule a retry - should cap at 30 seconds + lq.scheduleRetry(ctx, token) + + rs := lq.retries[token] + require.Equal(t, 30*time.Second, rs.delay) + + // Schedule another retry - should stay at 30 seconds + lq.scheduleRetry(ctx, token) + rs = lq.retries[token] + require.Equal(t, 30*time.Second, rs.delay) + }) + + t.Run("ClearRetry", func(t *testing.T) { + t.Parallel() + + clock := quartz.NewMock(t) + ch := make(chan agentLog, 10) + lq := &logQueuer{ + logger: slogtest.Make(t, nil), + clock: clock, + q: ch, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + } + + ctx := context.Background() + token := "test-token" + + // Schedule a retry + lq.scheduleRetry(ctx, token) + require.NotNil(t, lq.retries[token]) + + // Clear the retry + lq.clearRetry(token) + require.Nil(t, lq.retries[token]) + }) +} + +func Test_logCache(t *testing.T) { + t.Parallel() + + t.Run("PushAndGet", func(t *testing.T) { + t.Parallel() + + lc := logCache{ + logs: map[string][]agentsdk.Log{}, + } + + token := "test-token" + + // Initially should return nil + logs := lc.get(token) + require.Nil(t, logs) + + // Push first log + log1 := agentLog{ + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "First log", + Level: codersdk.LogLevelInfo, + }, + } + returnedLogs := lc.push(log1) + require.Len(t, returnedLogs, 1) + require.Equal(t, "First log", returnedLogs[0].Output) + + // Get should return the cached logs + cachedLogs := lc.get(token) + require.Len(t, cachedLogs, 1) + require.Equal(t, "First log", cachedLogs[0].Output) + + // Push second log to same token + log2 := agentLog{ + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Second log", + Level: codersdk.LogLevelWarn, + }, + } + returnedLogs = lc.push(log2) + require.Len(t, returnedLogs, 2) + require.Equal(t, "First log", returnedLogs[0].Output) + require.Equal(t, "Second log", returnedLogs[1].Output) + + // Get should return both logs + cachedLogs = lc.get(token) + require.Len(t, cachedLogs, 2) + require.Equal(t, "First log", cachedLogs[0].Output) + require.Equal(t, "Second log", cachedLogs[1].Output) + }) + + t.Run("Delete", func(t *testing.T) { + t.Parallel() + + lc := logCache{ + logs: map[string][]agentsdk.Log{}, + } + + token := "test-token" + + // Push a log + log := agentLog{ + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Test log", + Level: codersdk.LogLevelInfo, + }, + } + lc.push(log) + + // Verify it exists + cachedLogs := lc.get(token) + require.Len(t, cachedLogs, 1) + + // Delete it + lc.delete(token) + + // Should return nil now + cachedLogs = lc.get(token) + require.Nil(t, cachedLogs) + }) + + t.Run("MultipleTokens", func(t *testing.T) { + t.Parallel() + + lc := logCache{ + logs: map[string][]agentsdk.Log{}, + } + + token1 := "token1" + token2 := "token2" + + // Push logs for different tokens + log1 := agentLog{ + agentToken: token1, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Log for token1", + Level: codersdk.LogLevelInfo, + }, + } + log2 := agentLog{ + agentToken: token2, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Log for token2", + Level: codersdk.LogLevelError, + }, + } + + lc.push(log1) + lc.push(log2) + + // Each token should have its own logs + logs1 := lc.get(token1) + require.Len(t, logs1, 1) + require.Equal(t, "Log for token1", logs1[0].Output) + + logs2 := lc.get(token2) + require.Len(t, logs2, 1) + require.Equal(t, "Log for token2", logs2[0].Output) + + // Delete one token shouldn't affect the other + lc.delete(token1) + require.Nil(t, lc.get(token1)) + + logs2 = lc.get(token2) + require.Len(t, logs2, 1) + require.Equal(t, "Log for token2", logs2[0].Output) + }) + + t.Run("EmptyLogHandling", func(t *testing.T) { + t.Parallel() + + api := newFakeAgentAPI(t) + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + clock := quartz.NewMock(t) + ttl := time.Second + + ch := make(chan agentLog, 10) + lq := &logQueuer{ + logger: slogtest.Make(t, nil), + clock: clock, + q: ch, + coderURL: agentURL, + loggerTTL: ttl, + loggers: map[string]agentLoggerLifecycle{}, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go lq.work(ctx) + + token := "test-token" + + // Send an empty log first - should be ignored since no cached logs exist + emptyLog := agentLog{ + op: opLog, + resourceName: "", + agentToken: token, + log: agentsdk.Log{ + Output: "", + CreatedAt: time.Time{}, + }, + } + ch <- emptyLog + + // Wait to ensure processing completes - no logger should be created for empty log with no cache + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + _, exists := lq.loggers[token] + return !exists + }, testutil.WaitShort, testutil.IntervalFast) + + // No logger should be created for empty log with no cache + lq.mu.Lock() + _, exists := lq.loggers[token] + require.False(t, exists) + lq.mu.Unlock() + + // Now send a real log to establish the logger + realLog := agentLog{ + op: opLog, + resourceName: "hello", + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Real log", + Level: codersdk.LogLevelInfo, + }, + } + ch <- realLog + + // Should create logger and send log + _ = testutil.RequireRecvCtx(ctx, t, api.logSource) + logs := testutil.RequireRecvCtx(ctx, t, api.logs) + require.Len(t, logs, 1) + require.Contains(t, logs[0].Output, "Real log") + + // Now send empty log - should trigger flush of any cached logs + ch <- emptyLog + + // Wait for processing - logger should still exist after empty log + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + _, exists := lq.loggers[token] + return exists + }, testutil.WaitShort, testutil.IntervalFast) + + // Logger should still exist + lq.mu.Lock() + _, exists = lq.loggers[token] + require.True(t, exists) + lq.mu.Unlock() + }) } func newFakeAgentAPI(t *testing.T) *fakeAgentAPI { @@ -547,6 +929,21 @@ func newFakeAgentAPI(t *testing.T) *fakeAgentAPI { return fakeAPI } +func newFailingAgentAPI(t *testing.T) *fakeAgentAPI { + fakeAPI := &fakeAgentAPI{ + disconnect: make(chan struct{}), + logs: make(chan []*proto.Log), + logSource: make(chan agentsdk.PostLogSourceRequest), + } + + // Create a server that always returns 401 Unauthorized errors + fakeAPI.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + })) + + return fakeAPI +} + type fakeAgentAPI struct { disconnect chan struct{} logs chan []*proto.Log From dc8bde6fe344d942ee42738079180bfd1d5c3366 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Mon, 22 Sep 2025 10:46:46 +0200 Subject: [PATCH 3/6] simplify retry state initialization in logQueuer --- logger.go | 6 +----- logger_test.go | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/logger.go b/logger.go index 4cf8264..9415ce3 100644 --- a/logger.go +++ b/logger.go @@ -592,14 +592,10 @@ type retryState struct { timer *quartz.Timer } -func (l *logQueuer) ensureRetryMap() { +func (l *logQueuer) scheduleRetry(ctx context.Context, token string) { if l.retries == nil { l.retries = make(map[string]*retryState) } -} - -func (l *logQueuer) scheduleRetry(ctx context.Context, token string) { - l.ensureRetryMap() rs := l.retries[token] if rs == nil { diff --git a/logger_test.go b/logger_test.go index aa35584..116fae5 100644 --- a/logger_test.go +++ b/logger_test.go @@ -600,7 +600,7 @@ func Test_logQueuer(t *testing.T) { token := "test-token" // Set up a retry state with a large delay - lq.ensureRetryMap() + lq.retries = make(map[string]*retryState) lq.retries[token] = &retryState{delay: 20 * time.Second} // Schedule a retry - should cap at 30 seconds From c99d83c6352dd6c8daf74a5f19fc92861523e59b Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Tue, 23 Sep 2025 12:23:19 +0200 Subject: [PATCH 4/6] implement maxRetries for log processing --- helm/templates/service.yaml | 4 +++ helm/values.yaml | 3 ++ logger.go | 30 ++++++++++++++--- logger_test.go | 67 ++++++++++++++++++++++++++++++++++++- main.go | 9 +++++ 5 files changed, 108 insertions(+), 5 deletions(-) diff --git a/helm/templates/service.yaml b/helm/templates/service.yaml index c89a98a..fa7fc40 100644 --- a/helm/templates/service.yaml +++ b/helm/templates/service.yaml @@ -120,6 +120,10 @@ spec: - name: SSL_CERT_DIR value: {{ .Values.image.sslCertDir }} {{- end }} + {{- if .Values.maxRetries }} + - name: CODER_MAX_RETRIES + value: {{ .Values.maxRetries }} + {{- end }} {{- with .Values.securityContext }} securityContext: {{- toYaml . | nindent 12 }} diff --git a/helm/values.yaml b/helm/values.yaml index 5a6d1b6..17c5941 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -5,6 +5,9 @@ url: "" # If unspecified or empty it will watch all namespaces. namespaces: [] +# maxRetries -- Maximum retry attempts for failed log sends (logs are discarded after this limit) +maxRetries: 10 + # volumes -- A list of extra volumes to add to the coder-logstream pod. volumes: # emptyDir: {} diff --git a/logger.go b/logger.go index 9415ce3..52a474e 100644 --- a/logger.go +++ b/logger.go @@ -34,6 +34,7 @@ type podEventLoggerOptions struct { logger slog.Logger logDebounce time.Duration + maxRetries int // The following fields are optional! namespaces []string @@ -52,6 +53,10 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve opts.clock = quartz.NewReal() } + if opts.maxRetries == 0 { + opts.maxRetries = 10 + } + logCh := make(chan agentLog, 512) ctx, cancelFunc := context.WithCancel(ctx) reporter := &podEventLogger{ @@ -75,6 +80,7 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve logCache: logCache{ logs: map[string][]agentsdk.Log{}, }, + maxRetries: opts.maxRetries, }, } @@ -408,7 +414,8 @@ type logQueuer struct { loggers map[string]agentLoggerLifecycle logCache logCache - retries map[string]*retryState + retries map[string]*retryState + maxRetries int } func (l *logQueuer) work(ctx context.Context) { @@ -588,8 +595,9 @@ func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) { // retryState tracks exponential backoff for an agent token. type retryState struct { - delay time.Duration - timer *quartz.Timer + delay time.Duration + timer *quartz.Timer + retryCount int } func (l *logQueuer) scheduleRetry(ctx context.Context, token string) { @@ -603,6 +611,18 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) { l.retries[token] = rs } + rs.retryCount++ + + // If we've reached the max retries, clear the retry state and delete the log cache. + if rs.retryCount >= l.maxRetries { + l.logger.Error(ctx, "max retries exceeded", + slog.F("retryCount", rs.retryCount), + slog.F("maxRetries", l.maxRetries)) + l.clearRetry(token) + l.logCache.delete(token) + return + } + if rs.timer != nil { return } @@ -613,7 +633,9 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) { rs.delay = 30 * time.Second } - l.logger.Info(ctx, "scheduling retry", slog.F("delay", rs.delay.String())) + l.logger.Info(ctx, "scheduling retry", + slog.F("delay", rs.delay.String()), + slog.F("retryCount", rs.retryCount)) rs.timer = l.clock.AfterFunc(rs.delay, func() { l.mu.Lock() diff --git a/logger_test.go b/logger_test.go index 116fae5..1944e92 100644 --- a/logger_test.go +++ b/logger_test.go @@ -594,6 +594,7 @@ func Test_logQueuer(t *testing.T) { logCache: logCache{ logs: map[string][]agentsdk.Log{}, }, + maxRetries: 10, } ctx := context.Background() @@ -601,17 +602,22 @@ func Test_logQueuer(t *testing.T) { // Set up a retry state with a large delay lq.retries = make(map[string]*retryState) - lq.retries[token] = &retryState{delay: 20 * time.Second} + lq.retries[token] = &retryState{ + delay: 20 * time.Second, + retryCount: 0, + } // Schedule a retry - should cap at 30 seconds lq.scheduleRetry(ctx, token) rs := lq.retries[token] + require.NotNil(t, rs) require.Equal(t, 30*time.Second, rs.delay) // Schedule another retry - should stay at 30 seconds lq.scheduleRetry(ctx, token) rs = lq.retries[token] + require.NotNil(t, rs) require.Equal(t, 30*time.Second, rs.delay) }) @@ -627,6 +633,7 @@ func Test_logQueuer(t *testing.T) { logCache: logCache{ logs: map[string][]agentsdk.Log{}, }, + maxRetries: 2, } ctx := context.Background() @@ -640,6 +647,64 @@ func Test_logQueuer(t *testing.T) { lq.clearRetry(token) require.Nil(t, lq.retries[token]) }) + + t.Run("MaxRetries", func(t *testing.T) { + t.Parallel() + + // Create a failing API that will reject connections + failingAPI := newFailingAgentAPI(t) + agentURL, err := url.Parse(failingAPI.server.URL) + require.NoError(t, err) + clock := quartz.NewMock(t) + ttl := time.Second + + ch := make(chan agentLog, 10) + logger := slogtest.Make(t, &slogtest.Options{ + IgnoreErrors: true, + }) + lq := &logQueuer{ + logger: logger, + clock: clock, + q: ch, + coderURL: agentURL, + loggerTTL: ttl, + loggers: map[string]agentLoggerLifecycle{}, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + maxRetries: 2, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go lq.work(ctx) + + token := "max-retry-token" + ch <- agentLog{ + op: opLog, + resourceName: "hello", + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "This is a log.", + Level: codersdk.LogLevelInfo, + }, + } + + // Wait for retry state to be cleared after exceeding maxRetries + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs == nil + }, testutil.WaitShort, testutil.IntervalFast) + + // Verify cache is also cleared + lq.mu.Lock() + cachedLogs := lq.logCache.get(token) + lq.mu.Unlock() + require.Nil(t, cachedLogs) + }) } func Test_logCache(t *testing.T) { diff --git a/main.go b/main.go index 5e5d2bd..f53d28d 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "fmt" "net/url" "os" + "strconv" "strings" "cdr.dev/slog" @@ -30,6 +31,7 @@ func root() *cobra.Command { kubeConfig string namespacesStr string labelSelector string + maxRetriesStr string ) cmd := &cobra.Command{ Use: "coder-logstream-kube", @@ -72,6 +74,11 @@ func root() *cobra.Command { } } + maxRetries, err := strconv.Atoi(maxRetriesStr) + if err != nil { + return fmt.Errorf("parse max retries: %w", err) + } + reporter, err := newPodEventLogger(cmd.Context(), podEventLoggerOptions{ coderURL: parsedURL, client: client, @@ -79,6 +86,7 @@ func root() *cobra.Command { fieldSelector: fieldSelector, labelSelector: labelSelector, logger: slog.Make(sloghuman.Sink(cmd.ErrOrStderr())).Leveled(slog.LevelDebug), + maxRetries: maxRetries, }) if err != nil { return fmt.Errorf("create pod event reporter: %w", err) @@ -97,6 +105,7 @@ func root() *cobra.Command { cmd.Flags().StringVarP(&namespacesStr, "namespaces", "n", os.Getenv("CODER_NAMESPACES"), "List of namespaces to use when listing pods") cmd.Flags().StringVarP(&fieldSelector, "field-selector", "f", "", "Field selector to use when listing pods") cmd.Flags().StringVarP(&labelSelector, "label-selector", "l", "", "Label selector to use when listing pods") + cmd.Flags().StringVarP(&maxRetriesStr, "max-retries", "m", os.Getenv("CODER_MAX_RETRIES"), "Maximum retry attempts for failed log sends (logs are discarded after this limit)") return cmd } From f1e6de30f2dc50ba7dac291edc828fe6aabc9189 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Tue, 23 Sep 2025 14:14:30 +0200 Subject: [PATCH 5/6] Apply review suggestions and fix maxRetries --- helm/templates/service.yaml | 2 +- logger.go | 59 ++++++++++++++++++++----------------- logger_test.go | 26 +++++++++++++--- 3 files changed, 55 insertions(+), 32 deletions(-) diff --git a/helm/templates/service.yaml b/helm/templates/service.yaml index fa7fc40..8c26472 100644 --- a/helm/templates/service.yaml +++ b/helm/templates/service.yaml @@ -122,7 +122,7 @@ spec: {{- end }} {{- if .Values.maxRetries }} - name: CODER_MAX_RETRIES - value: {{ .Values.maxRetries }} + value: "{{ .Values.maxRetries }}" {{- end }} {{- with .Values.securityContext }} securityContext: diff --git a/logger.go b/logger.go index 52a474e..8d0775a 100644 --- a/logger.go +++ b/logger.go @@ -34,7 +34,8 @@ type podEventLoggerOptions struct { logger slog.Logger logDebounce time.Duration - maxRetries int + // maxRetries is the maximum number of retries for a log send failure. + maxRetries int // The following fields are optional! namespaces []string @@ -414,7 +415,9 @@ type logQueuer struct { loggers map[string]agentLoggerLifecycle logCache logCache - retries map[string]*retryState + // retries maps agent tokens to their retry state for exponential backoff + retries map[string]*retryState + // maxRetries is the maximum number of retries for a log send failure. maxRetries int } @@ -436,7 +439,7 @@ func (l *logQueuer) work(ctx context.Context) { } } -func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []agentsdk.Log) (agentLoggerLifecycle, error) { +func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLifecycle, error) { client := agentsdk.New(l.coderURL) client.SetSessionToken(log.agentToken) logger := l.logger.With(slog.F("resource_name", log.resourceName)) @@ -448,10 +451,9 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []ag DisplayName: "Kubernetes", }) if err != nil { - // This shouldn't fail sending the log, as it only affects how they - // appear. + // Posting the log source failed, which affects how logs appear. + // We'll retry to ensure the log source is properly registered. logger.Error(ctx, "post log source", slog.Error(err)) - l.scheduleRetry(ctx, log.agentToken) return agentLoggerLifecycle{}, err } @@ -466,7 +468,6 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []ag if err != nil { logger.Error(ctx, "drpc connect", slog.Error(err)) gracefulCancel() - l.scheduleRetry(ctx, log.agentToken) return agentLoggerLifecycle{}, err } go func() { @@ -485,6 +486,8 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []ag lifecycle := agentLoggerLifecycle{ scriptLogger: sl, close: func() { + defer arpc.DRPCConn().Close() + defer client.SDK.HTTPClient.CloseIdleConnections() // We could be stopping for reasons other than the timeout. If // so, stop the timer. closeTimer.Stop() @@ -503,9 +506,6 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []ag // ctx err logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") } - - _ = arpc.DRPCConn().Close() - client.SDK.HTTPClient.CloseIdleConnections() }, } lifecycle.closeTimer = closeTimer @@ -533,7 +533,7 @@ func (l *logQueuer) processLog(ctx context.Context, log agentLog) { } var err error - lgr, err = l.newLogger(ctx, log, queuedLogs) + lgr, err = l.newLogger(ctx, log) if err != nil { l.scheduleRetry(ctx, log.agentToken) return @@ -549,7 +549,7 @@ func (l *logQueuer) processLog(ctx context.Context, log agentLog) { l.scheduleRetry(ctx, log.agentToken) return } - l.clearRetry(log.agentToken) + l.clearRetryLocked(log.agentToken) l.logCache.delete(log.agentToken) } @@ -558,9 +558,8 @@ func (l *logQueuer) processDelete(log agentLog) { lgr, ok := l.loggers[log.agentToken] if ok { delete(l.loggers, log.agentToken) - } - l.clearRetry(log.agentToken) + l.clearRetryLocked(log.agentToken) l.logCache.delete(log.agentToken) l.mu.Unlock() @@ -598,6 +597,7 @@ type retryState struct { delay time.Duration timer *quartz.Timer retryCount int + exhausted bool // prevent retry state recreation after max retries } func (l *logQueuer) scheduleRetry(ctx context.Context, token string) { @@ -606,8 +606,13 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) { } rs := l.retries[token] + + if rs != nil && rs.exhausted { + return + } + if rs == nil { - rs = &retryState{delay: time.Second} + rs = &retryState{delay: time.Second, retryCount: 0, exhausted: false} l.retries[token] = rs } @@ -618,7 +623,11 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) { l.logger.Error(ctx, "max retries exceeded", slog.F("retryCount", rs.retryCount), slog.F("maxRetries", l.maxRetries)) - l.clearRetry(token) + rs.exhausted = true + if rs.timer != nil { + rs.timer.Stop() + rs.timer = nil + } l.logCache.delete(token) return } @@ -627,24 +636,18 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) { return } - if rs.delay < time.Second { - rs.delay = time.Second - } else if rs.delay > 30*time.Second { - rs.delay = 30 * time.Second - } - l.logger.Info(ctx, "scheduling retry", slog.F("delay", rs.delay.String()), slog.F("retryCount", rs.retryCount)) rs.timer = l.clock.AfterFunc(rs.delay, func() { l.mu.Lock() - if cur := l.retries[token]; cur != nil { + defer l.mu.Unlock() + + if cur := l.retries[token]; cur != nil && !cur.exhausted { cur.timer = nil + l.q <- agentLog{op: opLog, agentToken: token} } - l.mu.Unlock() - - l.q <- agentLog{op: opLog, agentToken: token} }) rs.delay *= 2 @@ -653,7 +656,9 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) { } } -func (l *logQueuer) clearRetry(token string) { +// clearRetryLocked clears the retry state for the given token. +// The caller must hold the mutex lock. +func (l *logQueuer) clearRetryLocked(token string) { if rs := l.retries[token]; rs != nil { if rs.timer != nil { rs.timer.Stop() diff --git a/logger_test.go b/logger_test.go index 1944e92..259be40 100644 --- a/logger_test.go +++ b/logger_test.go @@ -511,6 +511,7 @@ func Test_logQueuer(t *testing.T) { logCache: logCache{ logs: map[string][]agentsdk.Log{}, }, + maxRetries: 10, } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -644,7 +645,7 @@ func Test_logQueuer(t *testing.T) { require.NotNil(t, lq.retries[token]) // Clear the retry - lq.clearRetry(token) + lq.clearRetryLocked(token) require.Nil(t, lq.retries[token]) }) @@ -672,6 +673,7 @@ func Test_logQueuer(t *testing.T) { logCache: logCache{ logs: map[string][]agentsdk.Log{}, }, + retries: make(map[string]*retryState), maxRetries: 2, } @@ -691,15 +693,31 @@ func Test_logQueuer(t *testing.T) { }, } - // Wait for retry state to be cleared after exceeding maxRetries require.Eventually(t, func() bool { lq.mu.Lock() defer lq.mu.Unlock() rs := lq.retries[token] - return rs == nil + return rs != nil && rs.retryCount == 1 + }, testutil.WaitShort, testutil.IntervalFast) + + clock.Advance(time.Second) + + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.retryCount == 2 + }, testutil.WaitShort, testutil.IntervalFast) + + clock.Advance(2 * time.Second) + + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs == nil || rs.exhausted }, testutil.WaitShort, testutil.IntervalFast) - // Verify cache is also cleared lq.mu.Lock() cachedLogs := lq.logCache.get(token) lq.mu.Unlock() From 6447dd5cda899f0f4a82e2d9099fb3e84c1b62bd Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Wed, 24 Sep 2025 16:03:47 +0200 Subject: [PATCH 6/6] Remove maxRetries configuration from CLI and Helm values, setting a default of 15 retries for log send failures. --- helm/templates/service.yaml | 4 ---- helm/values.yaml | 3 --- main.go | 10 +--------- 3 files changed, 1 insertion(+), 16 deletions(-) diff --git a/helm/templates/service.yaml b/helm/templates/service.yaml index 8c26472..c89a98a 100644 --- a/helm/templates/service.yaml +++ b/helm/templates/service.yaml @@ -120,10 +120,6 @@ spec: - name: SSL_CERT_DIR value: {{ .Values.image.sslCertDir }} {{- end }} - {{- if .Values.maxRetries }} - - name: CODER_MAX_RETRIES - value: "{{ .Values.maxRetries }}" - {{- end }} {{- with .Values.securityContext }} securityContext: {{- toYaml . | nindent 12 }} diff --git a/helm/values.yaml b/helm/values.yaml index 17c5941..5a6d1b6 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -5,9 +5,6 @@ url: "" # If unspecified or empty it will watch all namespaces. namespaces: [] -# maxRetries -- Maximum retry attempts for failed log sends (logs are discarded after this limit) -maxRetries: 10 - # volumes -- A list of extra volumes to add to the coder-logstream pod. volumes: # emptyDir: {} diff --git a/main.go b/main.go index f53d28d..4c65be5 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,6 @@ import ( "fmt" "net/url" "os" - "strconv" "strings" "cdr.dev/slog" @@ -31,7 +30,6 @@ func root() *cobra.Command { kubeConfig string namespacesStr string labelSelector string - maxRetriesStr string ) cmd := &cobra.Command{ Use: "coder-logstream-kube", @@ -74,11 +72,6 @@ func root() *cobra.Command { } } - maxRetries, err := strconv.Atoi(maxRetriesStr) - if err != nil { - return fmt.Errorf("parse max retries: %w", err) - } - reporter, err := newPodEventLogger(cmd.Context(), podEventLoggerOptions{ coderURL: parsedURL, client: client, @@ -86,7 +79,7 @@ func root() *cobra.Command { fieldSelector: fieldSelector, labelSelector: labelSelector, logger: slog.Make(sloghuman.Sink(cmd.ErrOrStderr())).Leveled(slog.LevelDebug), - maxRetries: maxRetries, + maxRetries: 15, // 15 retries is the default max retries for a log send failure. }) if err != nil { return fmt.Errorf("create pod event reporter: %w", err) @@ -105,7 +98,6 @@ func root() *cobra.Command { cmd.Flags().StringVarP(&namespacesStr, "namespaces", "n", os.Getenv("CODER_NAMESPACES"), "List of namespaces to use when listing pods") cmd.Flags().StringVarP(&fieldSelector, "field-selector", "f", "", "Field selector to use when listing pods") cmd.Flags().StringVarP(&labelSelector, "label-selector", "l", "", "Label selector to use when listing pods") - cmd.Flags().StringVarP(&maxRetriesStr, "max-retries", "m", os.Getenv("CODER_MAX_RETRIES"), "Maximum retry attempts for failed log sends (logs are discarded after this limit)") return cmd }