Skip to content

Commit e2feb0c

Browse files
authored
[PLAT-105626] fix issues-7248 to run in parallel (#29)
2 parents 995b2b5 + 77b6a80 commit e2feb0c

File tree

4 files changed

+141
-96
lines changed

4 files changed

+141
-96
lines changed

cmd/thanos/receive.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,8 @@ func runReceive(
255255
MaxBackoff: time.Duration(*conf.maxBackoff),
256256
TSDBStats: dbs,
257257
Limiter: limiter,
258+
259+
AsyncForwardWorkerCount: conf.asyncForwardWorkerCount,
258260
})
259261

260262
grpcProbe := prober.NewGRPC()

pkg/pool/worker_pool.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright (c) The Thanos Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package pool
5+
6+
import (
7+
"context"
8+
"sync"
9+
)
10+
11+
// Work is a unit of item to be worked on, like Java Runnable.
12+
type Work func()
13+
14+
// WorkerPool is a pool of goroutines that are reusable, similar to Java ThreadPool.
15+
type WorkerPool interface {
16+
// Init initializes the worker pool.
17+
Init()
18+
19+
// Go waits until the next worker becomes available and executes the given work.
20+
Go(work Work)
21+
22+
// Close cancels all workers and waits for them to finish.
23+
Close()
24+
25+
// Size returns the number of workers in the pool.
26+
Size() int
27+
}
28+
29+
type workerPool struct {
30+
sync.Once
31+
workCh chan Work
32+
cancel context.CancelFunc
33+
}
34+
35+
func NewWorkerPool(workers uint) WorkerPool {
36+
return &workerPool{
37+
workCh: make(chan Work, workers),
38+
}
39+
}
40+
41+
func (p *workerPool) Init() {
42+
p.Do(func() {
43+
ctx, cancel := context.WithCancel(context.Background())
44+
p.cancel = cancel
45+
46+
for i := 0; i < cap(p.workCh); i++ {
47+
go func() {
48+
for {
49+
select {
50+
case <-ctx.Done():
51+
return
52+
case work := <-p.workCh:
53+
work()
54+
}
55+
}
56+
}()
57+
}
58+
})
59+
}
60+
61+
func (p *workerPool) Go(work Work) {
62+
p.Init()
63+
p.workCh <- work
64+
}
65+
66+
func (p *workerPool) Close() {
67+
p.cancel()
68+
}
69+
70+
func (p *workerPool) Size() int {
71+
return cap(p.workCh)
72+
}

pkg/pool/worker_pool_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright (c) The Thanos Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package pool
5+
6+
import (
7+
"sync"
8+
"testing"
9+
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestGo(t *testing.T) {
14+
var expectedWorksDone uint32
15+
var workerPoolSize uint
16+
var mu sync.Mutex
17+
workerPoolSize = 5
18+
p := NewWorkerPool(workerPoolSize)
19+
p.Init()
20+
defer p.Close()
21+
22+
var wg sync.WaitGroup
23+
for i := 0; i < int(workerPoolSize*3); i++ {
24+
wg.Add(1)
25+
p.Go(func() {
26+
mu.Lock()
27+
defer mu.Unlock()
28+
expectedWorksDone++
29+
wg.Done()
30+
})
31+
}
32+
wg.Wait()
33+
require.Equal(t, uint32(workerPoolSize*3), expectedWorksDone)
34+
require.Equal(t, int(workerPoolSize), p.Size())
35+
}

pkg/receive/handler.go

Lines changed: 32 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"github.com/thanos-io/thanos/pkg/logging"
4444

4545
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
46+
"github.com/thanos-io/thanos/pkg/pool"
4647
"github.com/thanos-io/thanos/pkg/runutil"
4748
"github.com/thanos-io/thanos/pkg/server/http/middleware"
4849
"github.com/thanos-io/thanos/pkg/store/labelpb"
@@ -141,6 +142,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
141142
if workers == 0 {
142143
workers = 1
143144
}
145+
level.Info(logger).Log("msg", "Starting receive handler with async forward workers", "workers", workers)
144146

145147
h := &Handler{
146148
logger: logger,
@@ -1188,90 +1190,24 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors {
11881190
return errs
11891191
}
11901192

1191-
func (pw *peerWorker) initWorkers() {
1192-
pw.initWorkersOnce.Do(func() {
1193-
work := make(chan peerWorkItem)
1194-
pw.work = work
1195-
1196-
ctx, cancel := context.WithCancel(context.Background())
1197-
pw.turnOffGoroutines = cancel
1198-
1199-
for i := 0; i < int(pw.asyncWorkerCount); i++ {
1200-
go func() {
1201-
for {
1202-
select {
1203-
case <-ctx.Done():
1204-
return
1205-
case w := <-work:
1206-
pw.forwardDelay.Observe(time.Since(w.sendTime).Seconds())
1207-
1208-
tracing.DoInSpan(w.workItemCtx, "receive_forward", func(ctx context.Context) {
1209-
_, err := storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, w.req)
1210-
w.workResult <- peerWorkResponse{
1211-
er: w.er,
1212-
err: errors.Wrapf(err, "forwarding request to endpoint %v", w.er.endpoint),
1213-
}
1214-
if err != nil {
1215-
sp := trace.SpanFromContext(ctx)
1216-
sp.SetAttributes(attribute.Bool("error", true))
1217-
sp.SetAttributes(attribute.String("error.msg", err.Error()))
1218-
}
1219-
close(w.workResult)
1220-
}, opentracing.Tags{
1221-
"endpoint": w.er.endpoint,
1222-
"replica": w.er.replica,
1223-
})
1224-
1225-
}
1226-
}
1227-
}()
1228-
}
1229-
1230-
})
1231-
}
1232-
12331193
func newPeerWorker(cc *grpc.ClientConn, forwardDelay prometheus.Histogram, asyncWorkerCount uint) *peerWorker {
12341194
return &peerWorker{
1235-
cc: cc,
1236-
asyncWorkerCount: asyncWorkerCount,
1237-
forwardDelay: forwardDelay,
1195+
cc: cc,
1196+
wp: pool.NewWorkerPool(asyncWorkerCount),
1197+
forwardDelay: forwardDelay,
12381198
}
12391199
}
12401200

1241-
type peerWorkItem struct {
1242-
cc *grpc.ClientConn
1243-
req *storepb.WriteRequest
1244-
workItemCtx context.Context
1245-
1246-
workResult chan peerWorkResponse
1247-
er endpointReplica
1248-
sendTime time.Time
1249-
}
1250-
12511201
func (pw *peerWorker) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) {
1252-
pw.initWorkers()
1253-
1254-
w := peerWorkItem{
1255-
cc: pw.cc,
1256-
req: in,
1257-
workResult: make(chan peerWorkResponse, 1),
1258-
workItemCtx: ctx,
1259-
sendTime: time.Now(),
1260-
}
1261-
1262-
pw.work <- w
1263-
return nil, (<-w.workResult).err
1202+
_, err := storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, in)
1203+
return nil, err
12641204
}
12651205

12661206
type peerWorker struct {
12671207
cc *grpc.ClientConn
1208+
wp pool.WorkerPool
12681209

1269-
work chan peerWorkItem
1270-
turnOffGoroutines func()
1271-
1272-
initWorkersOnce sync.Once
1273-
asyncWorkerCount uint
1274-
forwardDelay prometheus.Histogram
1210+
forwardDelay prometheus.Histogram
12751211
}
12761212

12771213
func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, dialOpts ...grpc.DialOption) peersContainer {
@@ -1295,29 +1231,29 @@ type peersContainer interface {
12951231
reset()
12961232
}
12971233

1298-
type peerWorkResponse struct {
1299-
er endpointReplica
1300-
err error
1301-
}
1302-
13031234
func (p *peerWorker) RemoteWriteAsync(ctx context.Context, req *storepb.WriteRequest, er endpointReplica, seriesIDs []int, responseWriter chan writeResponse, cb func(error)) {
1304-
p.initWorkers()
1305-
1306-
w := peerWorkItem{
1307-
cc: p.cc,
1308-
req: req,
1309-
workResult: make(chan peerWorkResponse, 1),
1310-
workItemCtx: ctx,
1311-
er: er,
1312-
1313-
sendTime: time.Now(),
1314-
}
1315-
1316-
p.work <- w
1317-
res := <-w.workResult
1318-
1319-
responseWriter <- newWriteResponse(seriesIDs, res.err, er)
1320-
cb(res.err)
1235+
now := time.Now()
1236+
p.wp.Go(func() {
1237+
p.forwardDelay.Observe(time.Since(now).Seconds())
1238+
1239+
tracing.DoInSpan(ctx, "receive_forward", func(ctx context.Context) {
1240+
_, err := storepb.NewWriteableStoreClient(p.cc).RemoteWrite(ctx, req)
1241+
responseWriter <- newWriteResponse(
1242+
seriesIDs,
1243+
errors.Wrapf(err, "forwarding request to endpoint %v", er.endpoint),
1244+
er,
1245+
)
1246+
if err != nil {
1247+
sp := trace.SpanFromContext(ctx)
1248+
sp.SetAttributes(attribute.Bool("error", true))
1249+
sp.SetAttributes(attribute.String("error.msg", err.Error()))
1250+
}
1251+
cb(err)
1252+
}, opentracing.Tags{
1253+
"endpoint": er.endpoint,
1254+
"replica": er.replica,
1255+
})
1256+
})
13211257
}
13221258

13231259
type peerGroup struct {
@@ -1345,7 +1281,7 @@ func (p *peerGroup) close(addr string) error {
13451281
return nil
13461282
}
13471283

1348-
p.connections[addr].turnOffGoroutines()
1284+
p.connections[addr].wp.Close()
13491285
delete(p.connections, addr)
13501286
if err := c.cc.Close(); err != nil {
13511287
return fmt.Errorf("closing connection for %s", addr)

0 commit comments

Comments
 (0)