Skip to content

Commit a0efc6d

Browse files
authored
perf: limit goroutine usage while unmarshaling prometheus and influx points/values (#909)
Signed-off-by: Chris Randles <randles.chris@gmail.com>
1 parent 70005e9 commit a0efc6d

File tree

2 files changed

+16
-10
lines changed

2 files changed

+16
-10
lines changed

pkg/backends/influxdb/influxql/unmarshal.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@ import (
2121
"encoding/json"
2222
"errors"
2323
"io"
24+
"runtime"
2425
"sort"
25-
"sync"
2626
"sync/atomic"
2727
"time"
2828

2929
"github.com/trickstercache/trickster/v2/pkg/timeseries"
3030
"github.com/trickstercache/trickster/v2/pkg/timeseries/dataset"
3131
"github.com/trickstercache/trickster/v2/pkg/timeseries/epoch"
32+
"golang.org/x/sync/errgroup"
3233
)
3334

3435
// Unmarshal performs a standard unmarshal of the bytes into the InfluxDB Wire Format Document,
@@ -97,17 +98,18 @@ func UnmarshalTimeseriesReader(reader io.Reader, trq *timeseries.TimeRangeQuery)
9798
sh.CalculateSize()
9899
pts := make(dataset.Points, len(wfd.Results[i].SeriesList[j].Values))
99100
var sz int64
100-
var wg sync.WaitGroup
101+
var eg errgroup.Group
102+
eg.SetLimit(runtime.GOMAXPROCS(0))
101103
errs := make([]error, len(wfd.Results[i].SeriesList[j].Values))
102104
for vi, v := range wfd.Results[i].SeriesList[j].Values {
103-
wg.Go(func() {
105+
eg.Go(func() error {
104106
pt, cols, err := pointFromValues(v, sh.TimestampField.OutputPosition)
105107
if err != nil {
106108
errs[vi] = err
107-
return
109+
return err
108110
}
109111
if pt.Epoch == 0 {
110-
return
112+
return nil
111113
}
112114
if vi == 0 {
113115
for x := range cols {
@@ -117,9 +119,10 @@ func UnmarshalTimeseriesReader(reader io.Reader, trq *timeseries.TimeRangeQuery)
117119
pts[vi] = pt
118120
atomic.AddInt64(&sz, int64(pt.Size))
119121
wfd.Results[i].SeriesList[j].Values[vi] = nil
122+
return nil
120123
})
121124
}
122-
wg.Wait()
125+
eg.Wait()
123126
if err := errors.Join(errs...); err != nil {
124127
return nil, err
125128
}

pkg/backends/prometheus/model/timeseries.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,17 @@ import (
2121
"encoding/json"
2222
"fmt"
2323
"io"
24+
"runtime"
2425
"sort"
2526
"strconv"
26-
"sync"
2727
"sync/atomic"
2828
"time"
2929

3030
"github.com/trickstercache/trickster/v2/pkg/errors"
3131
"github.com/trickstercache/trickster/v2/pkg/timeseries"
3232
"github.com/trickstercache/trickster/v2/pkg/timeseries/dataset"
3333
"github.com/trickstercache/trickster/v2/pkg/timeseries/epoch"
34+
"golang.org/x/sync/errgroup"
3435
)
3536

3637
// WFMatrixDocument is the Wire Format Document for prometheus range / timeseries
@@ -239,17 +240,19 @@ func populateSeries(ds *dataset.DataSet, result []*WFResult,
239240
var ps int64 = 16
240241
if !isVector && l > 0 {
241242
pts = make(dataset.Points, l)
242-
var wg sync.WaitGroup
243+
var eg errgroup.Group
244+
eg.SetLimit(runtime.GOMAXPROCS(0))
243245
for i, v := range pr.Values {
244-
wg.Go(func() {
246+
eg.Go(func() error {
245247
pt, _ := pointFromValues(v)
246248
if pt.Epoch > 0 {
247249
atomic.AddInt64(&ps, int64(pt.Size))
248250
pts[i] = pt
249251
}
252+
return nil
250253
})
251254
}
252-
wg.Wait()
255+
eg.Wait()
253256
} else if isVector && len(pr.Value) == 2 {
254257
pts = make(dataset.Points, 1)
255258
pt, _ := pointFromValues(pr.Value)

0 commit comments

Comments
 (0)