Skip to content

Commit 27a800e

Browse files
authored
Fix context canceled logged as error when attempting to heartbeat tasks (#423)
* Fix context canceled logged as error when attempting to heartbeat tasks
1 parent 0220af1 commit 27a800e

File tree

2 files changed

+181
-2
lines changed

2 files changed

+181
-2
lines changed

internal/worker/worker.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,9 @@ func (w *Worker[Task, TaskResult]) heartbeatTask(ctx context.Context, task *Task
211211
return
212212
case <-t.C:
213213
if err := w.tw.Extend(ctx, task); err != nil {
214-
w.logger.ErrorContext(ctx, "could not heartbeat task", "error", err)
214+
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
215+
w.logger.ErrorContext(ctx, "could not heartbeat task", "error", err)
216+
}
215217

216218
// We might not own the task anymore, abort processing
217219
if cancel != nil {

internal/worker/worker_test.go

Lines changed: 178 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"log/slog"
8+
"strings"
89
"sync"
910
"sync/atomic"
1011
"testing"
@@ -18,6 +19,90 @@ import (
1819
"github.com/stretchr/testify/require"
1920
)
2021

22+
// testLogger captures log output for testing
23+
type testLogger struct {
24+
mu sync.RWMutex
25+
logLines []logEntry
26+
logger *slog.Logger
27+
}
28+
29+
type logEntry struct {
30+
level slog.Level
31+
message string
32+
attrs map[string]interface{}
33+
}
34+
35+
func newTestLogger() *testLogger {
36+
tl := &testLogger{
37+
logLines: make([]logEntry, 0),
38+
}
39+
40+
handler := &testHandler{tl: tl}
41+
tl.logger = slog.New(handler)
42+
43+
return tl
44+
}
45+
46+
func (tl *testLogger) hasErrorLog(message string) bool {
47+
tl.mu.RLock()
48+
defer tl.mu.RUnlock()
49+
50+
for _, entry := range tl.logLines {
51+
if entry.level == slog.LevelError && strings.Contains(entry.message, message) {
52+
return true
53+
}
54+
}
55+
return false
56+
}
57+
58+
func (tl *testLogger) errorLogCount() int {
59+
tl.mu.RLock()
60+
defer tl.mu.RUnlock()
61+
62+
count := 0
63+
for _, entry := range tl.logLines {
64+
if entry.level == slog.LevelError {
65+
count++
66+
}
67+
}
68+
return count
69+
}
70+
71+
type testHandler struct {
72+
tl *testLogger
73+
}
74+
75+
func (h *testHandler) Enabled(ctx context.Context, level slog.Level) bool {
76+
return true
77+
}
78+
79+
func (h *testHandler) Handle(ctx context.Context, record slog.Record) error {
80+
h.tl.mu.Lock()
81+
defer h.tl.mu.Unlock()
82+
83+
attrs := make(map[string]interface{})
84+
record.Attrs(func(attr slog.Attr) bool {
85+
attrs[attr.Key] = attr.Value.Any()
86+
return true
87+
})
88+
89+
h.tl.logLines = append(h.tl.logLines, logEntry{
90+
level: record.Level,
91+
message: record.Message,
92+
attrs: attrs,
93+
})
94+
95+
return nil
96+
}
97+
98+
func (h *testHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
99+
return h
100+
}
101+
102+
func (h *testHandler) WithGroup(name string) slog.Handler {
103+
return h
104+
}
105+
21106
// Test types
22107
type testResult struct {
23108
Output string
@@ -62,9 +147,14 @@ func (m *mockTaskWorker) Complete(ctx context.Context, result *testResult, task
62147

63148
// Helper function to create a mock backend with basic setup
64149
func createMockBackend() *backend.MockBackend {
150+
return createMockBackendWithLogger(slog.Default())
151+
}
152+
153+
// Helper function to create a mock backend with custom logger
154+
func createMockBackendWithLogger(logger *slog.Logger) *backend.MockBackend {
65155
mockBackend := &backend.MockBackend{}
66156
mockBackend.On("Options").Return(&backend.Options{
67-
Logger: slog.Default(),
157+
Logger: logger,
68158
})
69159
return mockBackend
70160
}
@@ -516,6 +606,93 @@ func TestWorker_HeartbeatTask(t *testing.T) {
516606
// Should not have called Extend
517607
mockTaskWorker.AssertNotCalled(t, "Extend")
518608
})
609+
610+
t.Run("context canceled error in extend should not be logged as error", func(t *testing.T) {
611+
testLogger := newTestLogger()
612+
mockBackend := createMockBackendWithLogger(testLogger.logger)
613+
mockTaskWorker := &mockTaskWorker{}
614+
615+
options := &WorkerOptions{
616+
Pollers: 1,
617+
MaxParallelTasks: 1,
618+
HeartbeatInterval: time.Millisecond * 10,
619+
}
620+
621+
worker := NewWorker(mockBackend, mockTaskWorker, options)
622+
623+
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
624+
defer cancel()
625+
626+
task := &testTask{ID: 1, Data: "test"}
627+
628+
mockTaskWorker.On("Extend", ctx, task).Return(context.Canceled)
629+
630+
worker.heartbeatTask(ctx, task, nil)
631+
632+
assert.False(t, testLogger.hasErrorLog("could not heartbeat task"))
633+
assert.Equal(t, 0, testLogger.errorLogCount())
634+
635+
mockTaskWorker.AssertExpectations(t)
636+
})
637+
638+
t.Run("context deadline exceeded error in extend should not be logged as error", func(t *testing.T) {
639+
testLogger := newTestLogger()
640+
mockBackend := createMockBackendWithLogger(testLogger.logger)
641+
mockTaskWorker := &mockTaskWorker{}
642+
643+
options := &WorkerOptions{
644+
Pollers: 1,
645+
MaxParallelTasks: 1,
646+
HeartbeatInterval: time.Millisecond * 10,
647+
}
648+
649+
worker := NewWorker(mockBackend, mockTaskWorker, options)
650+
651+
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
652+
defer cancel()
653+
654+
task := &testTask{ID: 1, Data: "test"}
655+
656+
mockTaskWorker.On("Extend", ctx, task).Return(context.DeadlineExceeded)
657+
658+
worker.heartbeatTask(ctx, task, nil)
659+
660+
assert.False(t, testLogger.hasErrorLog("could not heartbeat task"))
661+
assert.Equal(t, 0, testLogger.errorLogCount())
662+
663+
mockTaskWorker.AssertExpectations(t)
664+
})
665+
666+
t.Run("other errors in extend should be logged as error", func(t *testing.T) {
667+
testLogger := newTestLogger()
668+
mockBackend := createMockBackendWithLogger(testLogger.logger)
669+
mockTaskWorker := &mockTaskWorker{}
670+
671+
options := &WorkerOptions{
672+
Pollers: 1,
673+
MaxParallelTasks: 1,
674+
HeartbeatInterval: time.Millisecond * 10,
675+
}
676+
677+
worker := NewWorker(mockBackend, mockTaskWorker, options)
678+
679+
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
680+
defer cancel()
681+
682+
task := &testTask{ID: 1, Data: "test"}
683+
684+
// Mock Extend to return a different error (not context related)
685+
otherError := errors.New("connection lost")
686+
mockTaskWorker.On("Extend", ctx, task).Return(otherError)
687+
688+
worker.heartbeatTask(ctx, task, nil)
689+
690+
// Verify that other errors DO generate ERROR logs
691+
assert.True(t, testLogger.hasErrorLog("could not heartbeat task"))
692+
assert.Equal(t, 1, testLogger.errorLogCount())
693+
694+
mockTaskWorker.AssertExpectations(t)
695+
})
519696
}
520697

521698
func TestWorker_FullWorkflow(t *testing.T) {

0 commit comments

Comments
 (0)