@@ -235,63 +235,72 @@ func (w *writeWorkload) generateWriteBatch(ctx context.Context, id string, numBu
235
235
236
236
defer close (seriesChan )
237
237
238
- for {
238
+ tick := func () {
239
239
select {
240
240
case <- ctx .Done ():
241
- return nil
242
- case timeNow := <- ticker .C :
243
- now := timeNow .UnixNano () / int64 (time .Millisecond )
244
- wg := & sync.WaitGroup {}
245
- for replicaNum := 0 ; replicaNum < w .replicas ; replicaNum ++ {
246
- replicaLabel := prompb.Label {Name : "bench_replica" , Value : fmt .Sprintf ("replica-%05d" , replicaNum )}
247
- idLabel := prompb.Label {Name : "bench_id" , Value : id }
248
- for _ , series := range w .series {
249
- var value float64
250
- switch series .seriesType {
251
- case GaugeZero :
252
- value = 0
253
- case GaugeRandom :
254
- value = rand .Float64 ()
255
- case CounterOne :
256
- value = series .lastValue + 1
257
- case CounterRandom :
258
- value = series .lastValue + float64 (rand .Int ())
259
- default :
260
- return fmt .Errorf ("unknown series type %v" , series .seriesType )
261
- }
262
- series .lastValue = value
263
- for _ , labelSet := range series .labelSets {
264
- if len (seriesBuffer ) == w .options .BatchSize {
265
- wg .Add (1 )
266
- seriesChan <- batchReq {seriesBuffer , wg , w .seriesBufferChan }
267
- seriesBuffer = w .getSeriesBuffer (ctx )
268
- }
269
- newLabelSet := make ([]prompb.Label , len (labelSet )+ 2 )
270
- copy (newLabelSet , labelSet )
271
-
272
- newLabelSet [len (newLabelSet )- 2 ] = replicaLabel
273
- newLabelSet [len (newLabelSet )- 1 ] = idLabel
274
- seriesBuffer = append (seriesBuffer , prompb.TimeSeries {
275
- Labels : newLabelSet ,
276
- Samples : []prompb.Sample {{
277
- Timestamp : now ,
278
- Value : value ,
279
- }},
280
- })
241
+ case <- ticker .C :
242
+ }
243
+ }
244
+
245
+ for ; true ; tick () {
246
+ if ctx .Err () != nil {
247
+ // cancelled
248
+ break
249
+ }
250
+ timeNow := time .Now ()
251
+ timeNowMillis := timeNow .UnixNano () / int64 (time .Millisecond )
252
+ wg := & sync.WaitGroup {}
253
+ for replicaNum := 0 ; replicaNum < w .replicas ; replicaNum ++ {
254
+ replicaLabel := prompb.Label {Name : "bench_replica" , Value : fmt .Sprintf ("replica-%05d" , replicaNum )}
255
+ idLabel := prompb.Label {Name : "bench_id" , Value : id }
256
+ for _ , series := range w .series {
257
+ var value float64
258
+ switch series .seriesType {
259
+ case GaugeZero :
260
+ value = 0
261
+ case GaugeRandom :
262
+ value = rand .Float64 ()
263
+ case CounterOne :
264
+ value = series .lastValue + 1
265
+ case CounterRandom :
266
+ value = series .lastValue + float64 (rand .Int ())
267
+ default :
268
+ return fmt .Errorf ("unknown series type %v" , series .seriesType )
269
+ }
270
+ series .lastValue = value
271
+ for _ , labelSet := range series .labelSets {
272
+ if len (seriesBuffer ) == w .options .BatchSize {
273
+ wg .Add (1 )
274
+ seriesChan <- batchReq {seriesBuffer , wg , w .seriesBufferChan }
275
+ seriesBuffer = w .getSeriesBuffer (ctx )
281
276
}
277
+ newLabelSet := make ([]prompb.Label , len (labelSet )+ 2 )
278
+ copy (newLabelSet , labelSet )
279
+
280
+ newLabelSet [len (newLabelSet )- 2 ] = replicaLabel
281
+ newLabelSet [len (newLabelSet )- 1 ] = idLabel
282
+ seriesBuffer = append (seriesBuffer , prompb.TimeSeries {
283
+ Labels : newLabelSet ,
284
+ Samples : []prompb.Sample {{
285
+ Timestamp : timeNowMillis ,
286
+ Value : value ,
287
+ }},
288
+ })
282
289
}
283
290
}
284
- if len ( seriesBuffer ) > 0 {
285
- wg . Add ( 1 )
286
- seriesChan <- batchReq { seriesBuffer , wg , w . seriesBufferChan }
287
- seriesBuffer = w .getSeriesBuffer ( ctx )
288
- }
289
- wg . Wait ()
290
- if time . Since ( timeNow ) > w . options . Interval {
291
- w . missedIterations . Inc ()
292
- }
291
+ }
292
+ if len ( seriesBuffer ) > 0 {
293
+ wg . Add ( 1 )
294
+ seriesChan <- batchReq { seriesBuffer , wg , w .seriesBufferChan }
295
+ seriesBuffer = w . getSeriesBuffer ( ctx )
296
+ }
297
+ wg . Wait ()
298
+ if time . Since ( timeNow ) > w . options . Interval {
299
+ w . missedIterations . Inc ()
293
300
}
294
301
}
302
+
303
+ return nil
295
304
}
296
305
297
306
type queryWorkload struct {
0 commit comments