Skip to content

Commit e421912

Browse files
authored
2021w15 scale benchtool (#167)
* feat(benchtool): use buffer pool for remote-write runner Signed-off-by: Jacob Lisi <[email protected]> * fixup series chan Signed-off-by: Jacob Lisi <[email protected]>
1 parent 7c8b651 commit e421912

File tree

2 files changed

+110
-49
lines changed

2 files changed

+110
-49
lines changed

pkg/bench/workload.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@ package bench
22

33
import (
44
"bytes"
5+
"context"
56
"fmt"
67
"hash/adler32"
78
"math/rand"
89
"strings"
10+
"sync"
911
"text/template"
1012
"time"
1113

1214
"github.com/prometheus/client_golang/prometheus"
15+
"github.com/prometheus/client_golang/prometheus/promauto"
1316
"github.com/prometheus/prometheus/prompb"
1417
)
1518

@@ -70,7 +73,11 @@ type writeWorkload struct {
7073
totalSeries int
7174
totalSeriesTypeMap map[SeriesType]int
7275

76+
missedIterations prometheus.Counter
77+
7378
options WriteDesc
79+
80+
seriesBufferChan chan []prompb.TimeSeries
7481
}
7582

7683
func newWriteWorkload(workloadDesc WorkloadDesc, reg prometheus.Registerer) *writeWorkload {
@@ -132,6 +139,14 @@ func newWriteWorkload(workloadDesc WorkloadDesc, reg prometheus.Registerer) *wri
132139
totalSeries: totalSeries,
133140
totalSeriesTypeMap: totalSeriesTypeMap,
134141
options: workloadDesc.Write,
142+
143+
missedIterations: promauto.With(reg).NewCounter(
144+
prometheus.CounterOpts{
145+
Namespace: "benchtool",
146+
Name: "write_iterations_late_total",
147+
Help: "Number of write intervals started late because the previous interval did not complete in time.",
148+
},
149+
),
135150
}
136151
}
137152

@@ -194,6 +209,91 @@ func (w *writeWorkload) generateTimeSeries(id string, t time.Time) []prompb.Time
194209
return timeseries
195210
}
196211

212+
type batchReq struct {
213+
batch []prompb.TimeSeries
214+
wg *sync.WaitGroup
215+
putBack chan []prompb.TimeSeries
216+
}
217+
218+
func (w *writeWorkload) getSeriesBuffer(ctx context.Context) []prompb.TimeSeries {
219+
select {
220+
case <-ctx.Done():
221+
return nil
222+
case seriesBuffer := <-w.seriesBufferChan:
223+
return seriesBuffer[:0]
224+
}
225+
}
226+
227+
func (w *writeWorkload) generateWriteBatch(ctx context.Context, id string, numBuffers int, seriesChan chan batchReq) error {
228+
w.seriesBufferChan = make(chan []prompb.TimeSeries, numBuffers)
229+
for i := 0; i < numBuffers; i++ {
230+
w.seriesBufferChan <- make([]prompb.TimeSeries, 0, w.options.BatchSize)
231+
}
232+
233+
seriesBuffer := w.getSeriesBuffer(ctx)
234+
ticker := time.NewTicker(w.options.Interval)
235+
236+
defer close(seriesChan)
237+
238+
for {
239+
select {
240+
case <-ctx.Done():
241+
return nil
242+
case timeNow := <-ticker.C:
243+
now := timeNow.UnixNano() / int64(time.Millisecond)
244+
wg := &sync.WaitGroup{}
245+
for replicaNum := 0; replicaNum < w.replicas; replicaNum++ {
246+
replicaLabel := prompb.Label{Name: "bench_replica", Value: fmt.Sprintf("replica-%05d", replicaNum)}
247+
idLabel := prompb.Label{Name: "bench_id", Value: id}
248+
for _, series := range w.series {
249+
var value float64
250+
switch series.seriesType {
251+
case GaugeZero:
252+
value = 0
253+
case GaugeRandom:
254+
value = rand.Float64()
255+
case CounterOne:
256+
value = series.lastValue + 1
257+
case CounterRandom:
258+
value = series.lastValue + float64(rand.Int())
259+
default:
260+
return fmt.Errorf("unknown series type %v", series.seriesType)
261+
}
262+
series.lastValue = value
263+
for _, labelSet := range series.labelSets {
264+
if len(seriesBuffer) == w.options.BatchSize {
265+
wg.Add(1)
266+
seriesChan <- batchReq{seriesBuffer, wg, w.seriesBufferChan}
267+
seriesBuffer = w.getSeriesBuffer(ctx)
268+
}
269+
newLabelSet := make([]prompb.Label, len(labelSet)+2)
270+
copy(newLabelSet, labelSet)
271+
272+
newLabelSet[len(newLabelSet)-2] = replicaLabel
273+
newLabelSet[len(newLabelSet)-1] = idLabel
274+
seriesBuffer = append(seriesBuffer, prompb.TimeSeries{
275+
Labels: newLabelSet,
276+
Samples: []prompb.Sample{{
277+
Timestamp: now,
278+
Value: value,
279+
}},
280+
})
281+
}
282+
}
283+
}
284+
if len(seriesBuffer) > 0 {
285+
wg.Add(1)
286+
seriesChan <- batchReq{seriesBuffer, wg, w.seriesBufferChan}
287+
seriesBuffer = w.getSeriesBuffer(ctx)
288+
}
289+
wg.Wait()
290+
if time.Since(timeNow) > w.options.Interval {
291+
w.missedIterations.Inc()
292+
}
293+
}
294+
}
295+
}
296+
197297
type queryWorkload struct {
198298
queries []query
199299
}

pkg/bench/write_runner.go

Lines changed: 10 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ type WriteBenchmarkRunner struct {
5555
reg prometheus.Registerer
5656
logger log.Logger
5757

58-
requestDuration *prometheus.HistogramVec
59-
missedIterations prometheus.Counter
58+
requestDuration *prometheus.HistogramVec
6059
}
6160

6261
func NewWriteBenchmarkRunner(id string, tenantName string, cfg WriteBenchConfig, workload *writeWorkload, logger log.Logger, reg prometheus.Registerer) (*WriteBenchmarkRunner, error) {
@@ -82,13 +81,6 @@ func NewWriteBenchmarkRunner(id string, tenantName string, cfg WriteBenchConfig,
8281
},
8382
[]string{"code"},
8483
),
85-
missedIterations: promauto.With(reg).NewCounter(
86-
prometheus.CounterOpts{
87-
Namespace: "benchtool",
88-
Name: "write_iterations_late_total",
89-
Help: "Number of write intervals started late because the previous interval did not complete in time.",
90-
},
91-
),
9284
}
9385

9486
// Resolve an initial set of distributor addresses
@@ -143,50 +135,16 @@ func (w *WriteBenchmarkRunner) Run(ctx context.Context) error {
143135
// Start a loop to re-resolve addresses every 5 minutes
144136
go w.resolveAddrsLoop(ctx)
145137

138+
// Run replicas * 10 write client workers.
139+
// This number will also be used for the number of series buffers to store at once.
140+
numWorkers := w.workload.replicas * 10
141+
146142
batchChan := make(chan batchReq, 10)
147-
for i := 0; i < w.workload.replicas*10; i++ {
143+
for i := 0; i < numWorkers; i++ {
148144
go w.writeWorker(batchChan)
149145
}
150146

151-
ticker := time.NewTicker(w.workload.options.Interval)
152-
for {
153-
select {
154-
case <-ctx.Done():
155-
close(batchChan)
156-
return nil
157-
case now := <-ticker.C:
158-
timeseries := w.workload.generateTimeSeries(w.id, now)
159-
batchSize := w.workload.options.BatchSize
160-
var batches [][]prompb.TimeSeries
161-
if batchSize < len(timeseries) {
162-
batches = make([][]prompb.TimeSeries, 0, (len(timeseries)+batchSize-1)/batchSize)
163-
164-
level.Info(w.logger).Log("msg", "sending timeseries", "num_series", strconv.Itoa(len(timeseries)))
165-
for batchSize < len(timeseries) {
166-
timeseries, batches = timeseries[batchSize:], append(batches, timeseries[0:batchSize:batchSize])
167-
}
168-
} else {
169-
batches = [][]prompb.TimeSeries{timeseries}
170-
}
171-
172-
wg := &sync.WaitGroup{}
173-
for _, batch := range batches {
174-
reqBatch := batch
175-
wg.Add(1)
176-
batchChan <- batchReq{reqBatch, wg}
177-
}
178-
179-
wg.Wait()
180-
if time.Since(now) > w.workload.options.Interval {
181-
w.missedIterations.Inc()
182-
}
183-
}
184-
}
185-
}
186-
187-
type batchReq struct {
188-
batch []prompb.TimeSeries
189-
wg *sync.WaitGroup
147+
return w.workload.generateWriteBatch(ctx, w.id, numWorkers+10, batchChan)
190148
}
191149

192150
func (w *WriteBenchmarkRunner) writeWorker(batchChan chan batchReq) {
@@ -195,6 +153,9 @@ func (w *WriteBenchmarkRunner) writeWorker(batchChan chan batchReq) {
195153
if err != nil {
196154
level.Warn(w.logger).Log("msg", "unable to send batch", "err", err)
197155
}
156+
157+
// put back the series buffer
158+
batchReq.putBack <- batchReq.batch
198159
batchReq.wg.Done()
199160
}
200161
}

0 commit comments

Comments
 (0)