@@ -27,7 +27,6 @@ import (
2727
2828 "github.com/prometheus-community/avalanche/pkg/errors"
2929 "github.com/prometheus/client_golang/exp/api/remote"
30- writev2 "github.com/prometheus/client_golang/exp/api/remote/genproto/v2"
3130
3231 "github.com/prometheus/client_golang/prometheus"
3332 dto "github.com/prometheus/client_model/go"
@@ -180,113 +179,6 @@ func cloneRequest(r *http.Request) *http.Request {
180179 return r2
181180}
182181
183- func (c * Client ) writeV2 (ctx context.Context ) error {
184- select {
185- // Wait for update first as write and collector.Run runs simultaneously.
186- case <- c .config .UpdateNotify :
187- case <- ctx .Done ():
188- return ctx .Err ()
189- }
190-
191- tss , st , err := collectMetricsV2 (c .gatherer , c .config .OutOfOrder )
192- if err != nil {
193- return err
194- }
195-
196- var (
197- totalTime time.Duration
198- totalSamplesExp = len (tss ) * c .config .RequestCount
199- totalSamplesAct int
200- mtx sync.Mutex
201- wgMetrics sync.WaitGroup
202- merr = & errors.MultiError {}
203- )
204-
205- shouldRunForever := c .config .RequestCount == - 1
206- if shouldRunForever {
207- log .Printf ("Sending: %v timeseries infinitely, %v timeseries per request, %v delay between requests\n " ,
208- len (tss ), c .config .BatchSize , c .config .RequestInterval )
209- } else {
210- log .Printf ("Sending: %v timeseries, %v times, %v timeseries per request, %v delay between requests\n " ,
211- len (tss ), c .config .RequestCount , c .config .BatchSize , c .config .RequestInterval )
212- }
213-
214- ticker := time .NewTicker (c .config .RequestInterval )
215- defer ticker .Stop ()
216-
217- concurrencyLimitCh := make (chan struct {}, c .config .Concurrency )
218-
219- for i := 0 ; ; {
220- if ctx .Err () != nil {
221- return ctx .Err ()
222- }
223-
224- if ! shouldRunForever {
225- if i >= c .config .RequestCount {
226- break
227- }
228- i ++
229- }
230-
231- <- ticker .C
232- select {
233- case <- c .config .UpdateNotify :
234- log .Println ("updating remote write metrics" )
235- tss , st , err = collectMetricsV2 (c .gatherer , c .config .OutOfOrder )
236- if err != nil {
237- merr .Add (err )
238- }
239- default :
240- tss = updateTimestampsV2 (tss )
241- }
242-
243- start := time .Now ()
244- for i := 0 ; i < len (tss ); i += c .config .BatchSize {
245- wgMetrics .Add (1 )
246- concurrencyLimitCh <- struct {}{}
247- go func (i int ) {
248- defer func () {
249- <- concurrencyLimitCh
250- }()
251- defer wgMetrics .Done ()
252- end := i + c .config .BatchSize
253- if end > len (tss ) {
254- end = len (tss )
255- }
256- req := & writev2.Request {
257- Timeseries : tss [i :end ],
258- Symbols : st .Symbols (), // We pass full symbols table to each request for now
259- }
260-
261- if _ , err := c .remoteAPI .Write (ctx , req ); err != nil {
262- merr .Add (err )
263- c .logger .Error ("error writing metrics" , "error" , err )
264- return
265- }
266-
267- mtx .Lock ()
268- totalSamplesAct += len (tss [i :end ])
269- mtx .Unlock ()
270- }(i )
271- }
272- wgMetrics .Wait ()
273- totalTime += time .Since (start )
274- if merr .Count () > 20 {
275- merr .Add (fmt .Errorf ("too many errors" ))
276- return merr .Err ()
277- }
278- }
279- if c .config .RequestCount * len (tss ) != totalSamplesAct {
280- merr .Add (fmt .Errorf ("total samples mismatch, exp:%v , act:%v" , totalSamplesExp , totalSamplesAct ))
281- }
282- c .logger .Info ("metrics summary" ,
283- "total_time" , totalTime .Round (time .Second ),
284- "total_samples" , totalSamplesAct ,
285- "samples_per_sec" , int (float64 (totalSamplesAct )/ totalTime .Seconds ()),
286- "errors" , merr .Count ())
287- return merr .Err ()
288- }
289-
290182func (c * Client ) write (ctx context.Context ) error {
291183 select {
292184 // Wait for update first as write and collector.Run runs simultaneously.
@@ -393,14 +285,6 @@ func (c *Client) write(ctx context.Context) error {
393285 return merr .Err ()
394286}
395287
396- func updateTimestampsV2 (tss []* writev2.TimeSeries ) []* writev2.TimeSeries {
397- now := time .Now ().UnixMilli ()
398- for i := range tss {
399- tss [i ].Samples [0 ].Timestamp = now
400- }
401- return tss
402- }
403-
404288func updateTimetamps (tss []prompb.TimeSeries ) []prompb.TimeSeries {
405289 t := int64 (model .Now ())
406290 for i := range tss {
@@ -409,18 +293,6 @@ func updateTimetamps(tss []prompb.TimeSeries) []prompb.TimeSeries {
409293 return tss
410294}
411295
412- func collectMetricsV2 (gatherer prometheus.Gatherer , outOfOrder bool ) ([]* writev2.TimeSeries , writev2.SymbolsTable , error ) {
413- metricFamilies , err := gatherer .Gather ()
414- if err != nil {
415- return nil , writev2.SymbolsTable {}, err
416- }
417- tss , st := ToTimeSeriesSliceV2 (metricFamilies )
418- if outOfOrder {
419- tss = shuffleTimestampsV2 (tss )
420- }
421- return tss , st , nil
422- }
423-
424296func collectMetrics (gatherer prometheus.Gatherer , outOfOrder bool ) ([]prompb.TimeSeries , error ) {
425297 metricFamilies , err := gatherer .Gather ()
426298 if err != nil {
@@ -433,16 +305,6 @@ func collectMetrics(gatherer prometheus.Gatherer, outOfOrder bool) ([]prompb.Tim
433305 return tss , nil
434306}
435307
436- func shuffleTimestampsV2 (tss []* writev2.TimeSeries ) []* writev2.TimeSeries {
437- now := time .Now ().UnixMilli ()
438- offsets := []int64 {0 , - 60 * 1000 , - 5 * 60 * 1000 }
439- for i := range tss {
440- offset := offsets [i % len (offsets )]
441- tss [i ].Samples [0 ].Timestamp = now + offset
442- }
443- return tss
444- }
445-
446308func shuffleTimestamps (tss []prompb.TimeSeries ) []prompb.TimeSeries {
447309 now := time .Now ().UnixMilli ()
448310 offsets := []int64 {0 , - 60 * 1000 , - 5 * 60 * 1000 }
@@ -490,47 +352,6 @@ func ToTimeSeriesSlice(metricFamilies []*dto.MetricFamily) []prompb.TimeSeries {
490352 return tss
491353}
492354
493- func ToTimeSeriesSliceV2 (metricFamilies []* dto.MetricFamily ) ([]* writev2.TimeSeries , writev2.SymbolsTable ) {
494- st := writev2 .NewSymbolTable ()
495- timestamp := int64 (model .Now ())
496- tss := make ([]* writev2.TimeSeries , 0 , len (metricFamilies )* 10 )
497-
498- skippedSamples := 0
499- for _ , metricFamily := range metricFamilies {
500- for _ , metric := range metricFamily .Metric {
501- labels := prompbLabels (* metricFamily .Name , metric .Label )
502- labelRefs := make ([]uint32 , 0 , len (labels ))
503- for _ , label := range labels {
504- labelRefs = append (labelRefs , st .Symbolize (label .Name ))
505- labelRefs = append (labelRefs , st .Symbolize (label .Value ))
506- }
507- ts := & writev2.TimeSeries {
508- LabelsRefs : labelRefs ,
509- }
510- switch * metricFamily .Type {
511- case dto .MetricType_COUNTER :
512- ts .Samples = []* writev2.Sample {{
513- Value : * metric .Counter .Value ,
514- Timestamp : timestamp ,
515- }}
516- tss = append (tss , ts )
517- case dto .MetricType_GAUGE :
518- ts .Samples = []* writev2.Sample {{
519- Value : * metric .Gauge .Value ,
520- Timestamp : timestamp ,
521- }}
522- tss = append (tss , ts )
523- default :
524- skippedSamples ++
525- }
526- }
527- }
528- if skippedSamples > 0 {
529- log .Printf ("WARN: Skipping %v samples; sending only %v samples, given only gauge and counters are currently implemented\n " , skippedSamples , len (tss ))
530- }
531- return tss , st
532- }
533-
534355func prompbLabels (name string , label []* dto.LabelPair ) []prompb.Label {
535356 ret := make ([]prompb.Label , 0 , len (label )+ 1 )
536357 ret = append (ret , prompb.Label {
0 commit comments