Skip to content

Commit 3d13af1

Browse files
authored
Add RetryCallBack to remote_api.go (#1888)
* add Signed-off-by: pipiland2612 <[email protected]> * fix comment Signed-off-by: pipiland2612 <[email protected]> * passing error Signed-off-by: pipiland2612 <[email protected]> --------- Signed-off-by: pipiland2612 <[email protected]>
1 parent 8b3bc2d commit 3d13af1

File tree

2 files changed

+105
-3
lines changed

2 files changed

+105
-3
lines changed

exp/api/remote/remote_api.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ type API struct {
4848
// APIOption represents a remote API option.
4949
type APIOption func(o *apiOpts) error
5050

51+
// RetryCallback is called each time Write() retries a request.
52+
// err is the error that caused the retry.
53+
type RetryCallback func(err error)
54+
5155
// TODO(bwplotka): Add "too old sample" handling one day.
5256
type apiOpts struct {
5357
logger *slog.Logger
@@ -56,6 +60,7 @@ type apiOpts struct {
5660
compression Compression
5761
path string
5862
retryOnRateLimit bool
63+
retryCallback RetryCallback
5964
}
6065

6166
var defaultAPIOpts = &apiOpts{
@@ -111,6 +116,15 @@ func WithAPIBackoff(backoff backoff.Config) APIOption {
111116
}
112117
}
113118

119+
// WithAPIRetryCallback sets a callback to be invoked on each retry attempt.
120+
// This is useful for tracking retry metrics and debugging retry behavior.
121+
func WithAPIRetryCallback(callback RetryCallback) APIOption {
122+
return func(o *apiOpts) error {
123+
o.retryCallback = callback
124+
return nil
125+
}
126+
}
127+
114128
type nopSlogHandler struct{}
115129

116130
func (n nopSlogHandler) Enabled(context.Context, slog.Level) bool { return false }
@@ -257,20 +271,23 @@ func (r *API) Write(ctx context.Context, msgType WriteMessageType, msg any) (_ W
257271

258272
var retryableErr retryableError
259273
if !errors.As(err, &retryableErr) {
260-
// TODO(bwplotka): More context in the error e.g. about retries.
261274
return accumulatedStats, err
262275
}
263276

264277
if !b.Ongoing() {
265-
// TODO(bwplotka): More context in the error e.g. about retries.
266278
return accumulatedStats, err
267279
}
268280

269281
backoffDelay := b.NextDelay() + retryableErr.RetryAfter()
282+
283+
// Invoke retry callback if provided (after NextDelay which increments the retry counter).
284+
if r.opts.retryCallback != nil {
285+
r.opts.retryCallback(retryableErr.error)
286+
}
287+
270288
r.opts.logger.Error("failed to send remote write request; retrying after backoff", "err", err, "backoff", backoffDelay)
271289
select {
272290
case <-ctx.Done():
273-
// TODO(bwplotka): More context in the error e.g. about retries.
274291
return WriteResponseStats{}, ctx.Err()
275292
case <-time.After(backoffDelay):
276293
// Retry.

exp/api/remote/remote_api_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,4 +208,89 @@ func TestRemoteAPI_Write_WithHandler(t *testing.T) {
208208
t.Fatalf("expected error to contain 'storage error', got %v", err)
209209
}
210210
})
211+
212+
t.Run("retry callback invoked on retries", func(t *testing.T) {
213+
tLogger := slog.Default()
214+
mockCode := http.StatusInternalServerError
215+
mStore := &mockStorage{
216+
mockErr: errors.New("storage error"),
217+
mockCode: &mockCode,
218+
}
219+
srv := httptest.NewServer(NewWriteHandler(mStore, MessageTypes{WriteV2MessageType}, WithWriteHandlerLogger(tLogger)))
220+
t.Cleanup(srv.Close)
221+
222+
var retryCount int
223+
var retryErrors []error
224+
225+
client, err := NewAPI(srv.URL,
226+
WithAPIHTTPClient(srv.Client()),
227+
WithAPILogger(tLogger),
228+
WithAPIPath("api/v1/write"),
229+
WithAPIBackoff(backoff.Config{
230+
Min: 1 * time.Millisecond,
231+
Max: 1 * time.Millisecond,
232+
MaxRetries: 3,
233+
}),
234+
WithAPIRetryCallback(func(err error) {
235+
retryCount++
236+
retryErrors = append(retryErrors, err)
237+
}),
238+
)
239+
if err != nil {
240+
t.Fatal(err)
241+
}
242+
243+
req := testV2()
244+
_, err = client.Write(context.Background(), WriteV2MessageType, req)
245+
if err == nil {
246+
t.Fatal("expected error, got nil")
247+
}
248+
249+
// Verify callback was invoked for each retry.
250+
expectedRetries := 3
251+
if retryCount != expectedRetries {
252+
t.Fatalf("expected %d retry callback invocations, got %d", expectedRetries, retryCount)
253+
}
254+
255+
// Verify errors were passed correctly.
256+
for i, retryErr := range retryErrors {
257+
if retryErr == nil {
258+
t.Fatalf("expected non-nil error for retry %d", i)
259+
}
260+
if !strings.Contains(retryErr.Error(), "storage error") {
261+
t.Fatalf("expected error to contain 'storage error', got %v", retryErr)
262+
}
263+
}
264+
})
265+
266+
t.Run("retry callback not invoked on success", func(t *testing.T) {
267+
tLogger := slog.Default()
268+
mStore := &mockStorage{}
269+
srv := httptest.NewServer(NewWriteHandler(mStore, MessageTypes{WriteV2MessageType}, WithWriteHandlerLogger(tLogger)))
270+
t.Cleanup(srv.Close)
271+
272+
callbackInvoked := false
273+
client, err := NewAPI(srv.URL,
274+
WithAPIHTTPClient(srv.Client()),
275+
WithAPILogger(tLogger),
276+
WithAPIPath("api/v1/write"),
277+
WithAPIRetryCallback(func(err error) {
278+
callbackInvoked = true
279+
}),
280+
)
281+
if err != nil {
282+
t.Fatal(err)
283+
}
284+
285+
req := testV2()
286+
_, err = client.Write(context.Background(), WriteV2MessageType, req)
287+
if err != nil {
288+
t.Fatal(err)
289+
}
290+
291+
// Verify callback was not invoked for successful request.
292+
if callbackInvoked {
293+
t.Fatal("retry callback should not be invoked on successful request")
294+
}
295+
})
211296
}

0 commit comments

Comments
 (0)