diff --git a/logger.go b/logger.go index 2aa4ed3..8d0775a 100644 --- a/logger.go +++ b/logger.go @@ -34,6 +34,8 @@ type podEventLoggerOptions struct { logger slog.Logger logDebounce time.Duration + // maxRetries is the maximum number of retries for a log send failure. + maxRetries int // The following fields are optional! namespaces []string @@ -52,6 +54,10 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve opts.clock = quartz.NewReal() } + if opts.maxRetries == 0 { + opts.maxRetries = 10 + } + logCh := make(chan agentLog, 512) ctx, cancelFunc := context.WithCancel(ctx) reporter := &podEventLogger{ @@ -75,6 +81,7 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve logCache: logCache{ logs: map[string][]agentsdk.Log{}, }, + maxRetries: opts.maxRetries, }, } @@ -407,6 +414,11 @@ type logQueuer struct { loggerTTL time.Duration loggers map[string]agentLoggerLifecycle logCache logCache + + // retries maps agent tokens to their retry state for exponential backoff + retries map[string]*retryState + // maxRetries is the maximum number of retries for a log send failure. + maxRetries int } func (l *logQueuer) work(ctx context.Context) { @@ -427,87 +439,117 @@ func (l *logQueuer) work(ctx context.Context) { } } +func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLifecycle, error) { + client := agentsdk.New(l.coderURL) + client.SetSessionToken(log.agentToken) + logger := l.logger.With(slog.F("resource_name", log.resourceName)) + client.SDK.SetLogger(logger) + + _, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ + ID: sourceUUID, + Icon: "/icon/k8s.png", + DisplayName: "Kubernetes", + }) + if err != nil { + // Posting the log source failed, which affects how logs appear. + // We'll retry to ensure the log source is properly registered. + logger.Error(ctx, "post log source", slog.Error(err)) + return agentLoggerLifecycle{}, err + } + + ls := agentsdk.NewLogSender(logger) + sl := ls.GetScriptLogger(sourceUUID) + + gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) + + // connect to Agent v2.0 API, since we don't need features added later. + // This maximizes compatibility. + arpc, err := client.ConnectRPC20(gracefulCtx) + if err != nil { + logger.Error(ctx, "drpc connect", slog.Error(err)) + gracefulCancel() + return agentLoggerLifecycle{}, err + } + go func() { + err := ls.SendLoop(gracefulCtx, arpc) + // if the send loop exits on its own without the context + // canceling, timeout the logger and force it to recreate. + if err != nil && ctx.Err() == nil { + l.loggerTimeout(log.agentToken) + } + }() + + closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { + logger.Info(ctx, "logger timeout firing") + l.loggerTimeout(log.agentToken) + }) + lifecycle := agentLoggerLifecycle{ + scriptLogger: sl, + close: func() { + defer arpc.DRPCConn().Close() + defer client.SDK.HTTPClient.CloseIdleConnections() + // We could be stopping for reasons other than the timeout. If + // so, stop the timer. + closeTimer.Stop() + defer gracefulCancel() + timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) + defer timeout.Stop() + logger.Info(ctx, "logger closing") + + if err := sl.Flush(gracefulCtx); err != nil { + // ctx err + logger.Warn(gracefulCtx, "timeout reached while flushing") + return + } + + if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { + // ctx err + logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") + } + }, + } + lifecycle.closeTimer = closeTimer + return lifecycle, nil +} + func (l *logQueuer) processLog(ctx context.Context, log agentLog) { l.mu.Lock() defer l.mu.Unlock() - queuedLogs := l.logCache.push(log) + + queuedLogs := l.logCache.get(log.agentToken) + if isAgentLogEmpty(log) { + if queuedLogs == nil { + return + } + } else { + queuedLogs = l.logCache.push(log) + } + lgr, ok := l.loggers[log.agentToken] if !ok { - client := agentsdk.New(l.coderURL) - client.SetSessionToken(log.agentToken) - logger := l.logger.With(slog.F("resource_name", log.resourceName)) - client.SDK.SetLogger(logger) - - _, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ - ID: sourceUUID, - Icon: "/icon/k8s.png", - DisplayName: "Kubernetes", - }) - if err != nil { - // This shouldn't fail sending the log, as it only affects how they - // appear. - logger.Error(ctx, "post log source", slog.Error(err)) + // skip if we're in a retry cooldown window + if rs := l.retries[log.agentToken]; rs != nil && rs.timer != nil { + return } - ls := agentsdk.NewLogSender(logger) - sl := ls.GetScriptLogger(sourceUUID) - - gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) - - // connect to Agent v2.0 API, since we don't need features added later. - // This maximizes compatibility. - arpc, err := client.ConnectRPC20(gracefulCtx) + var err error + lgr, err = l.newLogger(ctx, log) if err != nil { - logger.Error(ctx, "drpc connect", slog.Error(err)) - gracefulCancel() + l.scheduleRetry(ctx, log.agentToken) return } - go func() { - err := ls.SendLoop(gracefulCtx, arpc) - // if the send loop exits on its own without the context - // canceling, timeout the logger and force it to recreate. - if err != nil && ctx.Err() == nil { - l.loggerTimeout(log.agentToken) - } - }() - - closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { - logger.Info(ctx, "logger timeout firing") - l.loggerTimeout(log.agentToken) - }) - lifecycle := agentLoggerLifecycle{ - scriptLogger: sl, - close: func() { - // We could be stopping for reasons other than the timeout. If - // so, stop the timer. - closeTimer.Stop() - defer gracefulCancel() - timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) - defer timeout.Stop() - logger.Info(ctx, "logger closing") - - if err := sl.Flush(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while flushing") - return - } - - if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") - } - - _ = arpc.DRPCConn().Close() - client.SDK.HTTPClient.CloseIdleConnections() - }, - } - lifecycle.closeTimer = closeTimer - l.loggers[log.agentToken] = lifecycle - lgr = lifecycle + l.loggers[log.agentToken] = lgr } lgr.resetCloseTimer(l.loggerTTL) - _ = lgr.scriptLogger.Send(ctx, queuedLogs...) + if len(queuedLogs) == 0 { + return + } + if err := lgr.scriptLogger.Send(ctx, queuedLogs...); err != nil { + l.scheduleRetry(ctx, log.agentToken) + return + } + l.clearRetryLocked(log.agentToken) l.logCache.delete(log.agentToken) } @@ -516,8 +558,9 @@ func (l *logQueuer) processDelete(log agentLog) { lgr, ok := l.loggers[log.agentToken] if ok { delete(l.loggers, log.agentToken) - } + l.clearRetryLocked(log.agentToken) + l.logCache.delete(log.agentToken) l.mu.Unlock() if ok { @@ -549,6 +592,81 @@ func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) { } } +// retryState tracks exponential backoff for an agent token. +type retryState struct { + delay time.Duration + timer *quartz.Timer + retryCount int + exhausted bool // prevent retry state recreation after max retries +} + +func (l *logQueuer) scheduleRetry(ctx context.Context, token string) { + if l.retries == nil { + l.retries = make(map[string]*retryState) + } + + rs := l.retries[token] + + if rs != nil && rs.exhausted { + return + } + + if rs == nil { + rs = &retryState{delay: time.Second, retryCount: 0, exhausted: false} + l.retries[token] = rs + } + + rs.retryCount++ + + // If we've reached the max retries, clear the retry state and delete the log cache. + if rs.retryCount >= l.maxRetries { + l.logger.Error(ctx, "max retries exceeded", + slog.F("retryCount", rs.retryCount), + slog.F("maxRetries", l.maxRetries)) + rs.exhausted = true + if rs.timer != nil { + rs.timer.Stop() + rs.timer = nil + } + l.logCache.delete(token) + return + } + + if rs.timer != nil { + return + } + + l.logger.Info(ctx, "scheduling retry", + slog.F("delay", rs.delay.String()), + slog.F("retryCount", rs.retryCount)) + + rs.timer = l.clock.AfterFunc(rs.delay, func() { + l.mu.Lock() + defer l.mu.Unlock() + + if cur := l.retries[token]; cur != nil && !cur.exhausted { + cur.timer = nil + l.q <- agentLog{op: opLog, agentToken: token} + } + }) + + rs.delay *= 2 + if rs.delay > 30*time.Second { + rs.delay = 30 * time.Second + } +} + +// clearRetryLocked clears the retry state for the given token. +// The caller must hold the mutex lock. +func (l *logQueuer) clearRetryLocked(token string) { + if rs := l.retries[token]; rs != nil { + if rs.timer != nil { + rs.timer.Stop() + } + delete(l.retries, token) + } +} + func newColor(value ...color.Attribute) *color.Color { c := color.New(value...) c.EnableColor() @@ -572,3 +690,15 @@ func (l *logCache) push(log agentLog) []agentsdk.Log { func (l *logCache) delete(token string) { delete(l.logs, token) } + +func (l *logCache) get(token string) []agentsdk.Log { + logs, ok := l.logs[token] + if !ok { + return nil + } + return logs +} + +func isAgentLogEmpty(log agentLog) bool { + return log.resourceName == "" && log.log.Output == "" && log.log.CreatedAt.IsZero() +} diff --git a/logger_test.go b/logger_test.go index 3ab1c0b..259be40 100644 --- a/logger_test.go +++ b/logger_test.go @@ -486,6 +486,471 @@ func Test_logQueuer(t *testing.T) { // wait for the client to disconnect _ = testutil.RequireRecvCtx(ctx, t, api.disconnect) }) + + t.Run("RetryMechanism", func(t *testing.T) { + t.Parallel() + + // Create a failing API that will reject connections + failingAPI := newFailingAgentAPI(t) + agentURL, err := url.Parse(failingAPI.server.URL) + require.NoError(t, err) + clock := quartz.NewMock(t) + ttl := time.Second + + ch := make(chan agentLog, 10) + logger := slogtest.Make(t, &slogtest.Options{ + IgnoreErrors: true, + }) + lq := &logQueuer{ + logger: logger, + clock: clock, + q: ch, + coderURL: agentURL, + loggerTTL: ttl, + loggers: map[string]agentLoggerLifecycle{}, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + maxRetries: 10, + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + go lq.work(ctx) + + token := "retry-token" + ch <- agentLog{ + op: opLog, + resourceName: "hello", + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "This is a log.", + Level: codersdk.LogLevelInfo, + }, + } + + // Wait for the initial failure to be processed and retry state to be created + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.timer != nil && rs.delay == 2*time.Second + }, testutil.WaitShort, testutil.IntervalFast) + + // Verify retry state exists and has correct doubled delay (it gets doubled after scheduling) + lq.mu.Lock() + rs := lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 2*time.Second, rs.delay) // Delay gets doubled after scheduling + require.NotNil(t, rs.timer) + lq.mu.Unlock() + + // Advance clock to trigger first retry + clock.Advance(time.Second) + + // Wait for retry to be processed and delay to double again + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.delay == 4*time.Second + }, testutil.WaitShort, testutil.IntervalFast) + + // Check that delay doubled again for next retry + lq.mu.Lock() + rs = lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 4*time.Second, rs.delay) + lq.mu.Unlock() + + // Advance clock to trigger second retry + clock.Advance(2 * time.Second) + + // Wait for retry to be processed and delay to double again + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.delay == 8*time.Second + }, testutil.WaitShort, testutil.IntervalFast) + + // Check that delay doubled again + lq.mu.Lock() + rs = lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 8*time.Second, rs.delay) + lq.mu.Unlock() + }) + + t.Run("RetryMaxDelay", func(t *testing.T) { + t.Parallel() + + clock := quartz.NewMock(t) + ch := make(chan agentLog, 10) + lq := &logQueuer{ + logger: slogtest.Make(t, nil), + clock: clock, + q: ch, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + maxRetries: 10, + } + + ctx := context.Background() + token := "test-token" + + // Set up a retry state with a large delay + lq.retries = make(map[string]*retryState) + lq.retries[token] = &retryState{ + delay: 20 * time.Second, + retryCount: 0, + } + + // Schedule a retry - should cap at 30 seconds + lq.scheduleRetry(ctx, token) + + rs := lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 30*time.Second, rs.delay) + + // Schedule another retry - should stay at 30 seconds + lq.scheduleRetry(ctx, token) + rs = lq.retries[token] + require.NotNil(t, rs) + require.Equal(t, 30*time.Second, rs.delay) + }) + + t.Run("ClearRetry", func(t *testing.T) { + t.Parallel() + + clock := quartz.NewMock(t) + ch := make(chan agentLog, 10) + lq := &logQueuer{ + logger: slogtest.Make(t, nil), + clock: clock, + q: ch, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + maxRetries: 2, + } + + ctx := context.Background() + token := "test-token" + + // Schedule a retry + lq.scheduleRetry(ctx, token) + require.NotNil(t, lq.retries[token]) + + // Clear the retry + lq.clearRetryLocked(token) + require.Nil(t, lq.retries[token]) + }) + + t.Run("MaxRetries", func(t *testing.T) { + t.Parallel() + + // Create a failing API that will reject connections + failingAPI := newFailingAgentAPI(t) + agentURL, err := url.Parse(failingAPI.server.URL) + require.NoError(t, err) + clock := quartz.NewMock(t) + ttl := time.Second + + ch := make(chan agentLog, 10) + logger := slogtest.Make(t, &slogtest.Options{ + IgnoreErrors: true, + }) + lq := &logQueuer{ + logger: logger, + clock: clock, + q: ch, + coderURL: agentURL, + loggerTTL: ttl, + loggers: map[string]agentLoggerLifecycle{}, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + retries: make(map[string]*retryState), + maxRetries: 2, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go lq.work(ctx) + + token := "max-retry-token" + ch <- agentLog{ + op: opLog, + resourceName: "hello", + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "This is a log.", + Level: codersdk.LogLevelInfo, + }, + } + + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.retryCount == 1 + }, testutil.WaitShort, testutil.IntervalFast) + + clock.Advance(time.Second) + + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs != nil && rs.retryCount == 2 + }, testutil.WaitShort, testutil.IntervalFast) + + clock.Advance(2 * time.Second) + + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + rs := lq.retries[token] + return rs == nil || rs.exhausted + }, testutil.WaitShort, testutil.IntervalFast) + + lq.mu.Lock() + cachedLogs := lq.logCache.get(token) + lq.mu.Unlock() + require.Nil(t, cachedLogs) + }) +} + +func Test_logCache(t *testing.T) { + t.Parallel() + + t.Run("PushAndGet", func(t *testing.T) { + t.Parallel() + + lc := logCache{ + logs: map[string][]agentsdk.Log{}, + } + + token := "test-token" + + // Initially should return nil + logs := lc.get(token) + require.Nil(t, logs) + + // Push first log + log1 := agentLog{ + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "First log", + Level: codersdk.LogLevelInfo, + }, + } + returnedLogs := lc.push(log1) + require.Len(t, returnedLogs, 1) + require.Equal(t, "First log", returnedLogs[0].Output) + + // Get should return the cached logs + cachedLogs := lc.get(token) + require.Len(t, cachedLogs, 1) + require.Equal(t, "First log", cachedLogs[0].Output) + + // Push second log to same token + log2 := agentLog{ + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Second log", + Level: codersdk.LogLevelWarn, + }, + } + returnedLogs = lc.push(log2) + require.Len(t, returnedLogs, 2) + require.Equal(t, "First log", returnedLogs[0].Output) + require.Equal(t, "Second log", returnedLogs[1].Output) + + // Get should return both logs + cachedLogs = lc.get(token) + require.Len(t, cachedLogs, 2) + require.Equal(t, "First log", cachedLogs[0].Output) + require.Equal(t, "Second log", cachedLogs[1].Output) + }) + + t.Run("Delete", func(t *testing.T) { + t.Parallel() + + lc := logCache{ + logs: map[string][]agentsdk.Log{}, + } + + token := "test-token" + + // Push a log + log := agentLog{ + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Test log", + Level: codersdk.LogLevelInfo, + }, + } + lc.push(log) + + // Verify it exists + cachedLogs := lc.get(token) + require.Len(t, cachedLogs, 1) + + // Delete it + lc.delete(token) + + // Should return nil now + cachedLogs = lc.get(token) + require.Nil(t, cachedLogs) + }) + + t.Run("MultipleTokens", func(t *testing.T) { + t.Parallel() + + lc := logCache{ + logs: map[string][]agentsdk.Log{}, + } + + token1 := "token1" + token2 := "token2" + + // Push logs for different tokens + log1 := agentLog{ + agentToken: token1, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Log for token1", + Level: codersdk.LogLevelInfo, + }, + } + log2 := agentLog{ + agentToken: token2, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Log for token2", + Level: codersdk.LogLevelError, + }, + } + + lc.push(log1) + lc.push(log2) + + // Each token should have its own logs + logs1 := lc.get(token1) + require.Len(t, logs1, 1) + require.Equal(t, "Log for token1", logs1[0].Output) + + logs2 := lc.get(token2) + require.Len(t, logs2, 1) + require.Equal(t, "Log for token2", logs2[0].Output) + + // Delete one token shouldn't affect the other + lc.delete(token1) + require.Nil(t, lc.get(token1)) + + logs2 = lc.get(token2) + require.Len(t, logs2, 1) + require.Equal(t, "Log for token2", logs2[0].Output) + }) + + t.Run("EmptyLogHandling", func(t *testing.T) { + t.Parallel() + + api := newFakeAgentAPI(t) + agentURL, err := url.Parse(api.server.URL) + require.NoError(t, err) + clock := quartz.NewMock(t) + ttl := time.Second + + ch := make(chan agentLog, 10) + lq := &logQueuer{ + logger: slogtest.Make(t, nil), + clock: clock, + q: ch, + coderURL: agentURL, + loggerTTL: ttl, + loggers: map[string]agentLoggerLifecycle{}, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go lq.work(ctx) + + token := "test-token" + + // Send an empty log first - should be ignored since no cached logs exist + emptyLog := agentLog{ + op: opLog, + resourceName: "", + agentToken: token, + log: agentsdk.Log{ + Output: "", + CreatedAt: time.Time{}, + }, + } + ch <- emptyLog + + // Wait to ensure processing completes - no logger should be created for empty log with no cache + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + _, exists := lq.loggers[token] + return !exists + }, testutil.WaitShort, testutil.IntervalFast) + + // No logger should be created for empty log with no cache + lq.mu.Lock() + _, exists := lq.loggers[token] + require.False(t, exists) + lq.mu.Unlock() + + // Now send a real log to establish the logger + realLog := agentLog{ + op: opLog, + resourceName: "hello", + agentToken: token, + log: agentsdk.Log{ + CreatedAt: time.Now(), + Output: "Real log", + Level: codersdk.LogLevelInfo, + }, + } + ch <- realLog + + // Should create logger and send log + _ = testutil.RequireRecvCtx(ctx, t, api.logSource) + logs := testutil.RequireRecvCtx(ctx, t, api.logs) + require.Len(t, logs, 1) + require.Contains(t, logs[0].Output, "Real log") + + // Now send empty log - should trigger flush of any cached logs + ch <- emptyLog + + // Wait for processing - logger should still exist after empty log + require.Eventually(t, func() bool { + lq.mu.Lock() + defer lq.mu.Unlock() + _, exists := lq.loggers[token] + return exists + }, testutil.WaitShort, testutil.IntervalFast) + + // Logger should still exist + lq.mu.Lock() + _, exists = lq.loggers[token] + require.True(t, exists) + lq.mu.Unlock() + }) } func newFakeAgentAPI(t *testing.T) *fakeAgentAPI { @@ -547,6 +1012,21 @@ func newFakeAgentAPI(t *testing.T) *fakeAgentAPI { return fakeAPI } +func newFailingAgentAPI(t *testing.T) *fakeAgentAPI { + fakeAPI := &fakeAgentAPI{ + disconnect: make(chan struct{}), + logs: make(chan []*proto.Log), + logSource: make(chan agentsdk.PostLogSourceRequest), + } + + // Create a server that always returns 401 Unauthorized errors + fakeAPI.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + })) + + return fakeAPI +} + type fakeAgentAPI struct { disconnect chan struct{} logs chan []*proto.Log diff --git a/main.go b/main.go index 5e5d2bd..4c65be5 100644 --- a/main.go +++ b/main.go @@ -79,6 +79,7 @@ func root() *cobra.Command { fieldSelector: fieldSelector, labelSelector: labelSelector, logger: slog.Make(sloghuman.Sink(cmd.ErrOrStderr())).Leveled(slog.LevelDebug), + maxRetries: 15, // 15 retries is the default max retries for a log send failure. }) if err != nil { return fmt.Errorf("create pod event reporter: %w", err)