Skip to content

Commit ac7b213

Browse files
committed
Merge branch 'main' of github.com:pitabwire/frame
2 parents 5c27a94 + a5428fb commit ac7b213

File tree

5 files changed

+201
-33
lines changed

5 files changed

+201
-33
lines changed

localization/interceptors/http/language.go renamed to localization/interceptors/httpi/language.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package http
1+
package httpi
22

33
import (
44
"net/http"

localization/localization_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"github.com/pitabwire/frame"
1515
"github.com/pitabwire/frame/localization"
1616
lgrpc "github.com/pitabwire/frame/localization/interceptors/grpc"
17-
lhttp "github.com/pitabwire/frame/localization/interceptors/http"
17+
lhttp "github.com/pitabwire/frame/localization/interceptors/httpi"
1818
"github.com/pitabwire/frame/tests"
1919
)
2020

@@ -255,7 +255,7 @@ func (s *LocalizationTestSuite) TestLanguageHTTPMiddleware() {
255255
middleware := lhttp.LanguageHTTPMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
256256
lang := localization.FromContext(r.Context())
257257
w.WriteHeader(http.StatusOK)
258-
w.Write([]byte(strings.Join(lang, ",")))
258+
_, _ = w.Write([]byte(strings.Join(lang, ",")))
259259
}))
260260

261261
req := httptest.NewRequest(http.MethodGet, tc.requestPath, nil)

queue/subscriber.go

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ import (
1818
"github.com/pitabwire/frame/workerpool"
1919
)
2020

21+
const (
22+
subscriberReceiveErrorBackoffBaseDelay = 100 * time.Millisecond
23+
subscriberReceiveErrorBackoffMaxDelay = 30 * time.Second
24+
subscriberReceiveErrorBackoffMaxConsecutiveErrs = 10
25+
)
26+
2127
type subscriber struct {
2228
reference string
2329
url string
@@ -30,6 +36,38 @@ type subscriber struct {
3036
workManager workerpool.Manager
3137
}
3238

39+
func subscriberReceiveErrorBackoffDelay(consecutiveErrors int) time.Duration {
40+
if consecutiveErrors <= 0 {
41+
return 0
42+
}
43+
44+
attempt := consecutiveErrors
45+
if attempt > subscriberReceiveErrorBackoffMaxConsecutiveErrs {
46+
attempt = subscriberReceiveErrorBackoffMaxConsecutiveErrs
47+
}
48+
49+
delay := subscriberReceiveErrorBackoffBaseDelay * time.Duration(1<<(attempt-1))
50+
if delay > subscriberReceiveErrorBackoffMaxDelay {
51+
return subscriberReceiveErrorBackoffMaxDelay
52+
}
53+
54+
return delay
55+
}
56+
57+
func waitForDelay(ctx context.Context, delay time.Duration) {
58+
if delay <= 0 {
59+
return
60+
}
61+
62+
timer := time.NewTimer(delay)
63+
defer timer.Stop()
64+
65+
select {
66+
case <-ctx.Done():
67+
case <-timer.C:
68+
}
69+
}
70+
3371
func (s *subscriber) Ref() string {
3472
return s.reference
3573
}
@@ -227,41 +265,51 @@ func (s *subscriber) listen(ctx context.Context) {
227265
WithField("function", "subscription").
228266
WithField("url", s.url)
229267
logger.Debug("starting to listen for messages")
268+
269+
var consecutiveReceiveErrors int
230270
for {
231-
select {
232-
case <-ctx.Done():
271+
if ctx.Err() != nil {
233272
err := s.Stop(ctx)
234273
if err != nil {
235274
logger.WithError(err).Error("could not stop subscription")
236275
return
237276
}
238277
logger.Debug("exiting due to canceled context")
239278
return
279+
}
240280

241-
default:
242-
msg, err := s.Receive(ctx)
243-
if err != nil {
244-
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
245-
// Context cancelled or deadline exceeded, loop again to check ctx.Done()
246-
continue
247-
}
248-
// Other errors from Receive are critical for the listener.
249-
logger.WithError(err).Error("could not pull message")
250-
251-
// Recreate subscription
252-
s.recreateSubscription(ctx)
281+
msg, err := s.Receive(ctx)
282+
if err != nil {
283+
if ctx.Err() != nil && (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
284+
// Context cancelled or deadline exceeded, loop again to check ctx.Done()
253285
continue
254286
}
255287

256-
// Process the received message. Errors from processing (like job submission failure)
257-
// will be logged by processReceivedMessage. If it's a critical submission error,
258-
// it will be returned and will stop the whole application.
259-
if procErr := s.processReceivedMessage(ctx, msg); procErr != nil {
260-
// processReceivedMessage already logs details. This error is for critical failures.
261-
logger.WithError(procErr).Error("critical error processing message, stopping listener")
262-
s.SendStopError(ctx, procErr) // procErr
263-
return // Exit listen loop
288+
// Other errors from Receive are critical for the listener.
289+
consecutiveReceiveErrors++
290+
if consecutiveReceiveErrors > subscriberReceiveErrorBackoffMaxConsecutiveErrs {
291+
consecutiveReceiveErrors = subscriberReceiveErrorBackoffMaxConsecutiveErrs
264292
}
293+
logger.WithError(err).Error("could not pull message")
294+
295+
// Recreate subscription
296+
s.recreateSubscription(ctx)
297+
298+
delay := subscriberReceiveErrorBackoffDelay(consecutiveReceiveErrors)
299+
waitForDelay(ctx, delay)
300+
continue
301+
}
302+
303+
consecutiveReceiveErrors = 0
304+
305+
// Process the received message. Errors from processing (like job submission failure)
306+
// will be logged by processReceivedMessage. If it's a critical submission error,
307+
// it will be returned and will stop the whole application.
308+
if procErr := s.processReceivedMessage(ctx, msg); procErr != nil {
309+
// processReceivedMessage already logs details. This error is for critical failures.
310+
logger.WithError(procErr).Error("critical error processing message, stopping listener")
311+
s.SendStopError(ctx, procErr) // procErr
312+
return // Exit listen loop
265313
}
266314
}
267315
}

workerpool/manager.go

Lines changed: 73 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,81 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"time"
78

89
"github.com/pitabwire/util"
910

1011
"github.com/pitabwire/frame/config"
1112
)
1213

14+
const (
15+
jobRetryBackoffBaseDelay = 100 * time.Millisecond
16+
jobRetryBackoffMaxDelay = 30 * time.Second
17+
jobRetryBackoffMaxRunNumber = 10
18+
)
19+
20+
func shouldCloseJob(executionErr error) bool {
21+
return executionErr == nil || errors.Is(executionErr, context.Canceled) ||
22+
errors.Is(executionErr, ErrWorkerPoolResultChannelIsClosed)
23+
}
24+
25+
func jobRetryBackoffDelay(run int) time.Duration {
26+
if run < 1 {
27+
run = 1
28+
}
29+
30+
if run > jobRetryBackoffMaxRunNumber {
31+
run = jobRetryBackoffMaxRunNumber
32+
}
33+
34+
delay := jobRetryBackoffBaseDelay * time.Duration(1<<(run-1))
35+
if delay > jobRetryBackoffMaxDelay {
36+
return jobRetryBackoffMaxDelay
37+
}
38+
39+
return delay
40+
}
41+
42+
func handleResubmitError[T any](
43+
ctx context.Context,
44+
job Job[T],
45+
log *util.LogEntry,
46+
executionErr error,
47+
resubmitErr error,
48+
) {
49+
if resubmitErr == nil {
50+
return
51+
}
52+
53+
log.WithError(resubmitErr).Error("Failed to resubmit job")
54+
_ = job.WriteError(ctx, fmt.Errorf("failed to resubmit job: %w", executionErr))
55+
job.Close()
56+
}
57+
58+
func scheduleRetryResubmission[T any](
59+
ctx context.Context,
60+
s Manager,
61+
job Job[T],
62+
delay time.Duration,
63+
log *util.LogEntry,
64+
executionErr error,
65+
) {
66+
go func() {
67+
timer := time.NewTimer(delay)
68+
defer timer.Stop()
69+
70+
select {
71+
case <-ctx.Done():
72+
job.Close()
73+
return
74+
case <-timer.C:
75+
}
76+
77+
resubmitErr := SubmitJob(ctx, s, job)
78+
handleResubmitError(ctx, job, log, executionErr, resubmitErr)
79+
}()
80+
}
81+
1382
type manager struct {
1483
pool WorkerPool
1584
stopErr func(ctx context.Context, err error)
@@ -88,8 +157,7 @@ func createJobExecutionTask[T any](ctx context.Context, s Manager, job Job[T]) f
88157
executionErr := job.F()(ctx, job)
89158

90159
// Handle successful execution first and return early
91-
if executionErr == nil || errors.Is(executionErr, context.Canceled) ||
92-
errors.Is(executionErr, ErrWorkerPoolResultChannelIsClosed) {
160+
if shouldCloseJob(executionErr) {
93161
job.Close()
94162
return
95163
}
@@ -105,11 +173,8 @@ func createJobExecutionTask[T any](ctx context.Context, s Manager, job Job[T]) f
105173

106174
// Job can be retried to resolve error
107175
log.Warn("Job failed, attempting to retry it")
108-
resubmitErr := SubmitJob(ctx, s, job) // Recursive call to SubmitJob for retry
109-
if resubmitErr != nil {
110-
log.WithError(resubmitErr).Error("Failed to resubmit job")
111-
_ = job.WriteError(ctx, fmt.Errorf("failed to resubmit job: %w", executionErr))
112-
job.Close()
113-
}
176+
177+
delay := jobRetryBackoffDelay(job.Runs())
178+
scheduleRetryResubmission(ctx, s, job, delay, log, executionErr)
114179
}
115180
}

workerpool/worker_pool_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,22 @@ type WorkerPoolTestSuite struct {
1818
tests.BaseTestSuite
1919
}
2020

21+
type testWorkerPool struct{}
22+
23+
func (p *testWorkerPool) Submit(_ context.Context, task func()) error {
24+
task()
25+
return nil
26+
}
27+
28+
func (p *testWorkerPool) Shutdown() {}
29+
30+
type testManager struct {
31+
pool workerpool.WorkerPool
32+
}
33+
34+
func (m *testManager) GetPool() (workerpool.WorkerPool, error) { return m.pool, nil }
35+
func (m *testManager) StopError(_ context.Context, _ error) {}
36+
2137
// TestWorkerPoolSuite runs the worker pool test suite.
2238
func TestWorkerPoolSuite(t *testing.T) {
2339
suite.Run(t, &WorkerPoolTestSuite{})
@@ -72,6 +88,45 @@ func (s *WorkerPoolTestSuite) TestJobImplChannelOperations() {
7288
}
7389
}
7490

91+
func (s *WorkerPoolTestSuite) TestJobRetryUsesBackoff() {
92+
ctx, cancel := context.WithTimeout(s.T().Context(), 2*time.Second)
93+
defer cancel()
94+
95+
var mu sync.Mutex
96+
var executionTimes []time.Time
97+
98+
job := workerpool.NewJobWithRetry[any](func(_ context.Context, _ workerpool.JobResultPipe[any]) error {
99+
mu.Lock()
100+
executionTimes = append(executionTimes, time.Now())
101+
mu.Unlock()
102+
return errors.New("fail")
103+
}, 1)
104+
105+
mgr := &testManager{pool: &testWorkerPool{}}
106+
err := workerpool.SubmitJob(ctx, mgr, job)
107+
s.Require().NoError(err)
108+
109+
mu.Lock()
110+
s.Require().Len(executionTimes, 1)
111+
mu.Unlock()
112+
113+
time.Sleep(20 * time.Millisecond)
114+
mu.Lock()
115+
s.Require().Len(executionTimes, 1)
116+
mu.Unlock()
117+
118+
s.Require().Eventually(func() bool {
119+
mu.Lock()
120+
defer mu.Unlock()
121+
return len(executionTimes) == 2
122+
}, 500*time.Millisecond, 10*time.Millisecond)
123+
124+
mu.Lock()
125+
delta := executionTimes[1].Sub(executionTimes[0])
126+
mu.Unlock()
127+
s.Require().GreaterOrEqual(delta, 80*time.Millisecond)
128+
}
129+
75130
func (s *WorkerPoolTestSuite) writeIntRangeAsResult(
76131
ctx context.Context,
77132
t *testing.T,

0 commit comments

Comments
 (0)