Skip to content

Commit a5bd376

Browse files
authored
feat: implement retry mechanism for log processing (#136)
* feat: implement retry mechanism for log processing * add tests for retry mechanism and logCache functionality * simplify retry state initialization in logQueuer * implement maxRetries for log processing * Apply review suggestions and fix maxRetries * Remove maxRetries configuration from CLI and Helm values, setting a default of 15 retries for log send failures.
1 parent 13d885f commit a5bd376

File tree

3 files changed

+680
-69
lines changed

3 files changed

+680
-69
lines changed

logger.go

Lines changed: 199 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type podEventLoggerOptions struct {
3434

3535
logger slog.Logger
3636
logDebounce time.Duration
37+
// maxRetries is the maximum number of retries for a log send failure.
38+
maxRetries int
3739

3840
// The following fields are optional!
3941
namespaces []string
@@ -52,6 +54,10 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve
5254
opts.clock = quartz.NewReal()
5355
}
5456

57+
if opts.maxRetries == 0 {
58+
opts.maxRetries = 10
59+
}
60+
5561
logCh := make(chan agentLog, 512)
5662
ctx, cancelFunc := context.WithCancel(ctx)
5763
reporter := &podEventLogger{
@@ -75,6 +81,7 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve
7581
logCache: logCache{
7682
logs: map[string][]agentsdk.Log{},
7783
},
84+
maxRetries: opts.maxRetries,
7885
},
7986
}
8087

@@ -407,6 +414,11 @@ type logQueuer struct {
407414
loggerTTL time.Duration
408415
loggers map[string]agentLoggerLifecycle
409416
logCache logCache
417+
418+
// retries maps agent tokens to their retry state for exponential backoff
419+
retries map[string]*retryState
420+
// maxRetries is the maximum number of retries for a log send failure.
421+
maxRetries int
410422
}
411423

412424
func (l *logQueuer) work(ctx context.Context) {
@@ -427,87 +439,117 @@ func (l *logQueuer) work(ctx context.Context) {
427439
}
428440
}
429441

442+
func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLifecycle, error) {
443+
client := agentsdk.New(l.coderURL)
444+
client.SetSessionToken(log.agentToken)
445+
logger := l.logger.With(slog.F("resource_name", log.resourceName))
446+
client.SDK.SetLogger(logger)
447+
448+
_, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{
449+
ID: sourceUUID,
450+
Icon: "/icon/k8s.png",
451+
DisplayName: "Kubernetes",
452+
})
453+
if err != nil {
454+
// Posting the log source failed, which affects how logs appear.
455+
// We'll retry to ensure the log source is properly registered.
456+
logger.Error(ctx, "post log source", slog.Error(err))
457+
return agentLoggerLifecycle{}, err
458+
}
459+
460+
ls := agentsdk.NewLogSender(logger)
461+
sl := ls.GetScriptLogger(sourceUUID)
462+
463+
gracefulCtx, gracefulCancel := context.WithCancel(context.Background())
464+
465+
// connect to Agent v2.0 API, since we don't need features added later.
466+
// This maximizes compatibility.
467+
arpc, err := client.ConnectRPC20(gracefulCtx)
468+
if err != nil {
469+
logger.Error(ctx, "drpc connect", slog.Error(err))
470+
gracefulCancel()
471+
return agentLoggerLifecycle{}, err
472+
}
473+
go func() {
474+
err := ls.SendLoop(gracefulCtx, arpc)
475+
// if the send loop exits on its own without the context
476+
// canceling, timeout the logger and force it to recreate.
477+
if err != nil && ctx.Err() == nil {
478+
l.loggerTimeout(log.agentToken)
479+
}
480+
}()
481+
482+
closeTimer := l.clock.AfterFunc(l.loggerTTL, func() {
483+
logger.Info(ctx, "logger timeout firing")
484+
l.loggerTimeout(log.agentToken)
485+
})
486+
lifecycle := agentLoggerLifecycle{
487+
scriptLogger: sl,
488+
close: func() {
489+
defer arpc.DRPCConn().Close()
490+
defer client.SDK.HTTPClient.CloseIdleConnections()
491+
// We could be stopping for reasons other than the timeout. If
492+
// so, stop the timer.
493+
closeTimer.Stop()
494+
defer gracefulCancel()
495+
timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel)
496+
defer timeout.Stop()
497+
logger.Info(ctx, "logger closing")
498+
499+
if err := sl.Flush(gracefulCtx); err != nil {
500+
// ctx err
501+
logger.Warn(gracefulCtx, "timeout reached while flushing")
502+
return
503+
}
504+
505+
if err := ls.WaitUntilEmpty(gracefulCtx); err != nil {
506+
// ctx err
507+
logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty")
508+
}
509+
},
510+
}
511+
lifecycle.closeTimer = closeTimer
512+
return lifecycle, nil
513+
}
514+
430515
func (l *logQueuer) processLog(ctx context.Context, log agentLog) {
431516
l.mu.Lock()
432517
defer l.mu.Unlock()
433-
queuedLogs := l.logCache.push(log)
518+
519+
queuedLogs := l.logCache.get(log.agentToken)
520+
if isAgentLogEmpty(log) {
521+
if queuedLogs == nil {
522+
return
523+
}
524+
} else {
525+
queuedLogs = l.logCache.push(log)
526+
}
527+
434528
lgr, ok := l.loggers[log.agentToken]
435529
if !ok {
436-
client := agentsdk.New(l.coderURL)
437-
client.SetSessionToken(log.agentToken)
438-
logger := l.logger.With(slog.F("resource_name", log.resourceName))
439-
client.SDK.SetLogger(logger)
440-
441-
_, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{
442-
ID: sourceUUID,
443-
Icon: "/icon/k8s.png",
444-
DisplayName: "Kubernetes",
445-
})
446-
if err != nil {
447-
// This shouldn't fail sending the log, as it only affects how they
448-
// appear.
449-
logger.Error(ctx, "post log source", slog.Error(err))
530+
// skip if we're in a retry cooldown window
531+
if rs := l.retries[log.agentToken]; rs != nil && rs.timer != nil {
532+
return
450533
}
451534

452-
ls := agentsdk.NewLogSender(logger)
453-
sl := ls.GetScriptLogger(sourceUUID)
454-
455-
gracefulCtx, gracefulCancel := context.WithCancel(context.Background())
456-
457-
// connect to Agent v2.0 API, since we don't need features added later.
458-
// This maximizes compatibility.
459-
arpc, err := client.ConnectRPC20(gracefulCtx)
535+
var err error
536+
lgr, err = l.newLogger(ctx, log)
460537
if err != nil {
461-
logger.Error(ctx, "drpc connect", slog.Error(err))
462-
gracefulCancel()
538+
l.scheduleRetry(ctx, log.agentToken)
463539
return
464540
}
465-
go func() {
466-
err := ls.SendLoop(gracefulCtx, arpc)
467-
// if the send loop exits on its own without the context
468-
// canceling, timeout the logger and force it to recreate.
469-
if err != nil && ctx.Err() == nil {
470-
l.loggerTimeout(log.agentToken)
471-
}
472-
}()
473-
474-
closeTimer := l.clock.AfterFunc(l.loggerTTL, func() {
475-
logger.Info(ctx, "logger timeout firing")
476-
l.loggerTimeout(log.agentToken)
477-
})
478-
lifecycle := agentLoggerLifecycle{
479-
scriptLogger: sl,
480-
close: func() {
481-
// We could be stopping for reasons other than the timeout. If
482-
// so, stop the timer.
483-
closeTimer.Stop()
484-
defer gracefulCancel()
485-
timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel)
486-
defer timeout.Stop()
487-
logger.Info(ctx, "logger closing")
488-
489-
if err := sl.Flush(gracefulCtx); err != nil {
490-
// ctx err
491-
logger.Warn(gracefulCtx, "timeout reached while flushing")
492-
return
493-
}
494-
495-
if err := ls.WaitUntilEmpty(gracefulCtx); err != nil {
496-
// ctx err
497-
logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty")
498-
}
499-
500-
_ = arpc.DRPCConn().Close()
501-
client.SDK.HTTPClient.CloseIdleConnections()
502-
},
503-
}
504-
lifecycle.closeTimer = closeTimer
505-
l.loggers[log.agentToken] = lifecycle
506-
lgr = lifecycle
541+
l.loggers[log.agentToken] = lgr
507542
}
508543

509544
lgr.resetCloseTimer(l.loggerTTL)
510-
_ = lgr.scriptLogger.Send(ctx, queuedLogs...)
545+
if len(queuedLogs) == 0 {
546+
return
547+
}
548+
if err := lgr.scriptLogger.Send(ctx, queuedLogs...); err != nil {
549+
l.scheduleRetry(ctx, log.agentToken)
550+
return
551+
}
552+
l.clearRetryLocked(log.agentToken)
511553
l.logCache.delete(log.agentToken)
512554
}
513555

@@ -516,8 +558,9 @@ func (l *logQueuer) processDelete(log agentLog) {
516558
lgr, ok := l.loggers[log.agentToken]
517559
if ok {
518560
delete(l.loggers, log.agentToken)
519-
520561
}
562+
l.clearRetryLocked(log.agentToken)
563+
l.logCache.delete(log.agentToken)
521564
l.mu.Unlock()
522565

523566
if ok {
@@ -549,6 +592,81 @@ func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) {
549592
}
550593
}
551594

595+
// retryState tracks exponential backoff for an agent token.
596+
type retryState struct {
597+
delay time.Duration
598+
timer *quartz.Timer
599+
retryCount int
600+
exhausted bool // prevent retry state recreation after max retries
601+
}
602+
603+
func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
604+
if l.retries == nil {
605+
l.retries = make(map[string]*retryState)
606+
}
607+
608+
rs := l.retries[token]
609+
610+
if rs != nil && rs.exhausted {
611+
return
612+
}
613+
614+
if rs == nil {
615+
rs = &retryState{delay: time.Second, retryCount: 0, exhausted: false}
616+
l.retries[token] = rs
617+
}
618+
619+
rs.retryCount++
620+
621+
// If we've reached the max retries, clear the retry state and delete the log cache.
622+
if rs.retryCount >= l.maxRetries {
623+
l.logger.Error(ctx, "max retries exceeded",
624+
slog.F("retryCount", rs.retryCount),
625+
slog.F("maxRetries", l.maxRetries))
626+
rs.exhausted = true
627+
if rs.timer != nil {
628+
rs.timer.Stop()
629+
rs.timer = nil
630+
}
631+
l.logCache.delete(token)
632+
return
633+
}
634+
635+
if rs.timer != nil {
636+
return
637+
}
638+
639+
l.logger.Info(ctx, "scheduling retry",
640+
slog.F("delay", rs.delay.String()),
641+
slog.F("retryCount", rs.retryCount))
642+
643+
rs.timer = l.clock.AfterFunc(rs.delay, func() {
644+
l.mu.Lock()
645+
defer l.mu.Unlock()
646+
647+
if cur := l.retries[token]; cur != nil && !cur.exhausted {
648+
cur.timer = nil
649+
l.q <- agentLog{op: opLog, agentToken: token}
650+
}
651+
})
652+
653+
rs.delay *= 2
654+
if rs.delay > 30*time.Second {
655+
rs.delay = 30 * time.Second
656+
}
657+
}
658+
659+
// clearRetryLocked clears the retry state for the given token.
660+
// The caller must hold the mutex lock.
661+
func (l *logQueuer) clearRetryLocked(token string) {
662+
if rs := l.retries[token]; rs != nil {
663+
if rs.timer != nil {
664+
rs.timer.Stop()
665+
}
666+
delete(l.retries, token)
667+
}
668+
}
669+
552670
func newColor(value ...color.Attribute) *color.Color {
553671
c := color.New(value...)
554672
c.EnableColor()
@@ -572,3 +690,15 @@ func (l *logCache) push(log agentLog) []agentsdk.Log {
572690
func (l *logCache) delete(token string) {
573691
delete(l.logs, token)
574692
}
693+
694+
func (l *logCache) get(token string) []agentsdk.Log {
695+
logs, ok := l.logs[token]
696+
if !ok {
697+
return nil
698+
}
699+
return logs
700+
}
701+
702+
func isAgentLogEmpty(log agentLog) bool {
703+
return log.resourceName == "" && log.log.Output == "" && log.log.CreatedAt.IsZero()
704+
}

0 commit comments

Comments
 (0)