Skip to content

Commit f1e6de3

Browse files
committed
Apply review suggestions and fix maxRetries
1 parent c99d83c commit f1e6de3

File tree

3 files changed

+55
-32
lines changed

3 files changed

+55
-32
lines changed

helm/templates/service.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ spec:
122122
{{- end }}
123123
{{- if .Values.maxRetries }}
124124
- name: CODER_MAX_RETRIES
125-
value: {{ .Values.maxRetries }}
125+
value: "{{ .Values.maxRetries }}"
126126
{{- end }}
127127
{{- with .Values.securityContext }}
128128
securityContext:

logger.go

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ type podEventLoggerOptions struct {
3434

3535
logger slog.Logger
3636
logDebounce time.Duration
37-
maxRetries int
37+
// maxRetries is the maximum number of retries for a log send failure.
38+
maxRetries int
3839

3940
// The following fields are optional!
4041
namespaces []string
@@ -414,7 +415,9 @@ type logQueuer struct {
414415
loggers map[string]agentLoggerLifecycle
415416
logCache logCache
416417

417-
retries map[string]*retryState
418+
// retries maps agent tokens to their retry state for exponential backoff
419+
retries map[string]*retryState
420+
// maxRetries is the maximum number of retries for a log send failure.
418421
maxRetries int
419422
}
420423

@@ -436,7 +439,7 @@ func (l *logQueuer) work(ctx context.Context) {
436439
}
437440
}
438441

439-
func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []agentsdk.Log) (agentLoggerLifecycle, error) {
442+
func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLifecycle, error) {
440443
client := agentsdk.New(l.coderURL)
441444
client.SetSessionToken(log.agentToken)
442445
logger := l.logger.With(slog.F("resource_name", log.resourceName))
@@ -448,10 +451,9 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []ag
448451
DisplayName: "Kubernetes",
449452
})
450453
if err != nil {
451-
// This shouldn't fail sending the log, as it only affects how they
452-
// appear.
454+
// Posting the log source failed, which affects how logs appear.
455+
// We'll retry to ensure the log source is properly registered.
453456
logger.Error(ctx, "post log source", slog.Error(err))
454-
l.scheduleRetry(ctx, log.agentToken)
455457
return agentLoggerLifecycle{}, err
456458
}
457459

@@ -466,7 +468,6 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []ag
466468
if err != nil {
467469
logger.Error(ctx, "drpc connect", slog.Error(err))
468470
gracefulCancel()
469-
l.scheduleRetry(ctx, log.agentToken)
470471
return agentLoggerLifecycle{}, err
471472
}
472473
go func() {
@@ -485,6 +486,8 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []ag
485486
lifecycle := agentLoggerLifecycle{
486487
scriptLogger: sl,
487488
close: func() {
489+
defer arpc.DRPCConn().Close()
490+
defer client.SDK.HTTPClient.CloseIdleConnections()
488491
// We could be stopping for reasons other than the timeout. If
489492
// so, stop the timer.
490493
closeTimer.Stop()
@@ -503,9 +506,6 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []ag
503506
// ctx err
504507
logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty")
505508
}
506-
507-
_ = arpc.DRPCConn().Close()
508-
client.SDK.HTTPClient.CloseIdleConnections()
509509
},
510510
}
511511
lifecycle.closeTimer = closeTimer
@@ -533,7 +533,7 @@ func (l *logQueuer) processLog(ctx context.Context, log agentLog) {
533533
}
534534

535535
var err error
536-
lgr, err = l.newLogger(ctx, log, queuedLogs)
536+
lgr, err = l.newLogger(ctx, log)
537537
if err != nil {
538538
l.scheduleRetry(ctx, log.agentToken)
539539
return
@@ -549,7 +549,7 @@ func (l *logQueuer) processLog(ctx context.Context, log agentLog) {
549549
l.scheduleRetry(ctx, log.agentToken)
550550
return
551551
}
552-
l.clearRetry(log.agentToken)
552+
l.clearRetryLocked(log.agentToken)
553553
l.logCache.delete(log.agentToken)
554554
}
555555

@@ -558,9 +558,8 @@ func (l *logQueuer) processDelete(log agentLog) {
558558
lgr, ok := l.loggers[log.agentToken]
559559
if ok {
560560
delete(l.loggers, log.agentToken)
561-
562561
}
563-
l.clearRetry(log.agentToken)
562+
l.clearRetryLocked(log.agentToken)
564563
l.logCache.delete(log.agentToken)
565564
l.mu.Unlock()
566565

@@ -598,6 +597,7 @@ type retryState struct {
598597
delay time.Duration
599598
timer *quartz.Timer
600599
retryCount int
600+
exhausted bool // prevent retry state recreation after max retries
601601
}
602602

603603
func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
@@ -606,8 +606,13 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
606606
}
607607

608608
rs := l.retries[token]
609+
610+
if rs != nil && rs.exhausted {
611+
return
612+
}
613+
609614
if rs == nil {
610-
rs = &retryState{delay: time.Second}
615+
rs = &retryState{delay: time.Second, retryCount: 0, exhausted: false}
611616
l.retries[token] = rs
612617
}
613618

@@ -618,7 +623,11 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
618623
l.logger.Error(ctx, "max retries exceeded",
619624
slog.F("retryCount", rs.retryCount),
620625
slog.F("maxRetries", l.maxRetries))
621-
l.clearRetry(token)
626+
rs.exhausted = true
627+
if rs.timer != nil {
628+
rs.timer.Stop()
629+
rs.timer = nil
630+
}
622631
l.logCache.delete(token)
623632
return
624633
}
@@ -627,24 +636,18 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
627636
return
628637
}
629638

630-
if rs.delay < time.Second {
631-
rs.delay = time.Second
632-
} else if rs.delay > 30*time.Second {
633-
rs.delay = 30 * time.Second
634-
}
635-
636639
l.logger.Info(ctx, "scheduling retry",
637640
slog.F("delay", rs.delay.String()),
638641
slog.F("retryCount", rs.retryCount))
639642

640643
rs.timer = l.clock.AfterFunc(rs.delay, func() {
641644
l.mu.Lock()
642-
if cur := l.retries[token]; cur != nil {
645+
defer l.mu.Unlock()
646+
647+
if cur := l.retries[token]; cur != nil && !cur.exhausted {
643648
cur.timer = nil
649+
l.q <- agentLog{op: opLog, agentToken: token}
644650
}
645-
l.mu.Unlock()
646-
647-
l.q <- agentLog{op: opLog, agentToken: token}
648651
})
649652

650653
rs.delay *= 2
@@ -653,7 +656,9 @@ func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
653656
}
654657
}
655658

656-
func (l *logQueuer) clearRetry(token string) {
659+
// clearRetryLocked clears the retry state for the given token.
660+
// The caller must hold the mutex lock.
661+
func (l *logQueuer) clearRetryLocked(token string) {
657662
if rs := l.retries[token]; rs != nil {
658663
if rs.timer != nil {
659664
rs.timer.Stop()

logger_test.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ func Test_logQueuer(t *testing.T) {
511511
logCache: logCache{
512512
logs: map[string][]agentsdk.Log{},
513513
},
514+
maxRetries: 10,
514515
}
515516

516517
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@@ -644,7 +645,7 @@ func Test_logQueuer(t *testing.T) {
644645
require.NotNil(t, lq.retries[token])
645646

646647
// Clear the retry
647-
lq.clearRetry(token)
648+
lq.clearRetryLocked(token)
648649
require.Nil(t, lq.retries[token])
649650
})
650651

@@ -672,6 +673,7 @@ func Test_logQueuer(t *testing.T) {
672673
logCache: logCache{
673674
logs: map[string][]agentsdk.Log{},
674675
},
676+
retries: make(map[string]*retryState),
675677
maxRetries: 2,
676678
}
677679

@@ -691,15 +693,31 @@ func Test_logQueuer(t *testing.T) {
691693
},
692694
}
693695

694-
// Wait for retry state to be cleared after exceeding maxRetries
695696
require.Eventually(t, func() bool {
696697
lq.mu.Lock()
697698
defer lq.mu.Unlock()
698699
rs := lq.retries[token]
699-
return rs == nil
700+
return rs != nil && rs.retryCount == 1
701+
}, testutil.WaitShort, testutil.IntervalFast)
702+
703+
clock.Advance(time.Second)
704+
705+
require.Eventually(t, func() bool {
706+
lq.mu.Lock()
707+
defer lq.mu.Unlock()
708+
rs := lq.retries[token]
709+
return rs != nil && rs.retryCount == 2
710+
}, testutil.WaitShort, testutil.IntervalFast)
711+
712+
clock.Advance(2 * time.Second)
713+
714+
require.Eventually(t, func() bool {
715+
lq.mu.Lock()
716+
defer lq.mu.Unlock()
717+
rs := lq.retries[token]
718+
return rs == nil || rs.exhausted
700719
}, testutil.WaitShort, testutil.IntervalFast)
701720

702-
// Verify cache is also cleared
703721
lq.mu.Lock()
704722
cachedLogs := lq.logCache.get(token)
705723
lq.mu.Unlock()

0 commit comments

Comments
 (0)