Skip to content

Commit da7ccc1

Browse files
authored
[PLAT-105856] fix grpc retry strategy (#31)
2 parents aca2382 + f349363 commit da7ccc1

File tree

4 files changed

+14
-5
lines changed

4 files changed

+14
-5
lines changed

cmd/thanos/receive.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,9 @@ func runReceive(
157157
if conf.compression != compressionNone {
158158
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(conf.compression)))
159159
}
160+
if receiveMode == receive.RouterOnly {
161+
dialOpts = append(dialOpts, extgrpc.EndpointGroupGRPCOpts()...)
162+
}
160163

161164
var bkt objstore.Bucket
162165
confContentYaml, err := conf.objStoreConfig.Content()

pkg/pool/worker_pool.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,26 +28,28 @@ type WorkerPool interface {
2828

2929
type workerPool struct {
3030
sync.Once
31+
ctx context.Context
3132
workCh chan Work
3233
cancel context.CancelFunc
3334
}
3435

3536
func NewWorkerPool(workers uint) WorkerPool {
37+
ctx, cancel := context.WithCancel(context.Background())
3638
return &workerPool{
39+
ctx: ctx,
40+
cancel: cancel,
3741
workCh: make(chan Work, workers),
3842
}
3943
}
4044

4145
func (p *workerPool) Init() {
4246
p.Do(func() {
43-
ctx, cancel := context.WithCancel(context.Background())
44-
p.cancel = cancel
45-
4647
for i := 0; i < cap(p.workCh); i++ {
4748
go func() {
4849
for {
4950
select {
50-
case <-ctx.Done():
51+
case <-p.ctx.Done():
52+
// TODO: exhaust workCh before exit
5153
return
5254
case work := <-p.workCh:
5355
work()

pkg/receive/handler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -829,6 +829,8 @@ func (h *Handler) sendLocalWrite(
829829
defer span.Finish()
830830
span.SetTag("endpoint", writeDestination.endpoint)
831831
span.SetTag("replica", writeDestination.replica)
832+
span.SetTag("tenant", tenant)
833+
span.SetTag("samples", len(trackedSeries.timeSeries))
832834
err := h.writer.Write(tracingCtx, tenant, &prompb.WriteRequest{
833835
Timeseries: trackedSeries.timeSeries,
834836
})

pkg/receive/writer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
"github.com/thanos-io/thanos/pkg/store/labelpb"
2121
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
22+
"github.com/thanos-io/thanos/pkg/tracing"
2223
)
2324

2425
// Appendable returns an Appender.
@@ -277,7 +278,8 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
277278
level.Info(tLogger).Log("msg", "Error on ingesting exemplars with label length exceeding maximum limit", "numDropped", numExemplarsLabelLength)
278279
errs.Add(errors.Wrapf(storage.ErrExemplarLabelLength, "add %d exemplars", numExemplarsLabelLength))
279280
}
280-
281+
span, _ := tracing.StartSpan(ctx, "receive_commit")
282+
defer span.Finish()
281283
if err := app.Commit(); err != nil {
282284
errs.Add(errors.Wrap(err, "commit samples"))
283285
}

0 commit comments

Comments
 (0)