@@ -60,7 +60,6 @@ type apiOpts struct {
60
60
compression Compression
61
61
path string
62
62
retryOnRateLimit bool
63
- retryCallback RetryCallback
64
63
}
65
64
66
65
var defaultAPIOpts = & apiOpts {
@@ -116,15 +115,6 @@ func WithAPIBackoff(backoff backoff.Config) APIOption {
116
115
}
117
116
}
118
117
119
- // WithAPIRetryCallback sets a callback to be invoked on each retry attempt.
120
- // This is useful for tracking retry metrics and debugging retry behavior.
121
- func WithAPIRetryCallback (callback RetryCallback ) APIOption {
122
- return func (o * apiOpts ) error {
123
- o .retryCallback = callback
124
- return nil
125
- }
126
- }
127
-
128
118
type nopSlogHandler struct {}
129
119
130
120
func (n nopSlogHandler ) Enabled (context.Context , slog.Level ) bool { return false }
@@ -174,6 +164,21 @@ func (r retryableError) RetryAfter() time.Duration {
174
164
return r .retryAfter
175
165
}
176
166
167
+ // WriteOption represents an option for Write method.
168
+ type WriteOption func (o * writeOpts )
169
+
170
+ type writeOpts struct {
171
+ retryCallback RetryCallback
172
+ }
173
+
174
+ // WithWriteRetryCallback sets a retry callback for this Write request.
175
+ // The callback is invoked each time the request is retried.
176
+ func WithWriteRetryCallback (callback RetryCallback ) WriteOption {
177
+ return func (o * writeOpts ) {
178
+ o .retryCallback = callback
179
+ }
180
+ }
181
+
177
182
type vtProtoEnabled interface {
178
183
SizeVT () int
179
184
MarshalToSizedBufferVT (dAtA []byte ) (int , error )
@@ -193,7 +198,13 @@ type gogoProtoEnabled interface {
193
198
// will be used
194
199
// - If neither is supported, it will marshaled using generic google.golang.org/protobuf methods and
195
200
// error out on unknown scheme.
196
- func (r * API ) Write (ctx context.Context , msgType WriteMessageType , msg any ) (_ WriteResponseStats , err error ) {
201
+ func (r * API ) Write (ctx context.Context , msgType WriteMessageType , msg any , opts ... WriteOption ) (_ WriteResponseStats , err error ) {
202
+ // Parse write options.
203
+ var writeOpts writeOpts
204
+ for _ , opt := range opts {
205
+ opt (& writeOpts )
206
+ }
207
+
197
208
buf := r .bufPool .Get ().(* []byte )
198
209
199
210
if err := msgType .Validate (); err != nil {
@@ -280,9 +291,9 @@ func (r *API) Write(ctx context.Context, msgType WriteMessageType, msg any) (_ W
280
291
281
292
backoffDelay := b .NextDelay () + retryableErr .RetryAfter ()
282
293
283
- // Invoke retry callback if provided (after NextDelay which increments the retry counter) .
284
- if r . opts .retryCallback != nil {
285
- r . opts .retryCallback (retryableErr .error )
294
+ // Invoke retry callback if provided.
295
+ if writeOpts .retryCallback != nil {
296
+ writeOpts .retryCallback (retryableErr .error )
286
297
}
287
298
288
299
r .opts .logger .Error ("failed to send remote write request; retrying after backoff" , "err" , err , "backoff" , backoffDelay )
0 commit comments