Skip to content

Commit 0caaf60

Browse files
authored
refactor: Scrape loop. (#5945)
Previously when the scrape suceeded but the function was unable to parse the result 'continue' was called which made it immediately try again since the ticker is only waited upon at the end of the loop. This can cause a huge rps spike if the endpoint is always returning invalid profiling data. This moves the processing of the response code to a function instead that returns an error, and if an error is encountered it handles it and drops into the second select in the loop to prevent rapid reties.
1 parent 66b690c commit 0caaf60

File tree

1 file changed

+98
-96
lines changed

1 file changed

+98
-96
lines changed

pkg/scrape/scrape.go

Lines changed: 98 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -461,106 +461,18 @@ mainLoop:
461461
cancel()
462462

463463
if scrapeErr == nil {
464-
b = buf.Bytes()
465-
// NOTE: There were issues with misbehaving clients in the past
466-
// that occasionally returned empty results. We don't want those
467-
// to falsely reset our buffer size.
468-
if len(b) > 0 {
469-
sl.lastScrapeSize = len(b)
470-
}
471-
472-
tl := labels.NewBuilder(sl.target.Labels())
473-
tl.Set("__name__", profileType)
474-
sl.externalLabels.Range(func(l labels.Label) {
475-
tl.Set(l.Name, l.Value)
476-
})
477-
478-
protolbls := &profilepb.LabelSet{
479-
Labels: []*profilepb.Label{},
480-
}
481-
tl.Range(func(l labels.Label) {
482-
protolbls.Labels = append(protolbls.Labels, &profilepb.Label{
483-
Name: l.Name,
484-
Value: l.Value,
485-
})
486-
})
487-
488-
byt := buf.Bytes()
489-
p, err := profile.ParseData(byt)
490-
if err != nil {
491-
level.Error(sl.l).Log("msg", "failed to parse profile data", "err", err)
492-
continue
493-
}
494-
495-
var executableInfo []*profilepb.ExecutableInfo
496-
for _, comment := range p.Comments {
497-
if strings.HasPrefix(comment, "executableInfo=") {
498-
ei, err := parseExecutableInfo(comment)
499-
if err != nil {
500-
level.Error(sl.l).Log("msg", "failed to parse executableInfo", "err", err)
501-
continue
502-
}
503-
504-
executableInfo = append(executableInfo, ei)
505-
}
506-
}
507-
508-
ks := sl.target.KeepSet()
509-
if len(ks) > 0 {
510-
keepIndexes := []int{}
511-
newTypes := []*profile.ValueType{}
512-
for i, st := range p.SampleType {
513-
if _, ok := ks[config.SampleType{Type: st.Type, Unit: st.Unit}]; ok {
514-
keepIndexes = append(keepIndexes, i)
515-
newTypes = append(newTypes, st)
516-
}
517-
}
518-
p.SampleType = newTypes
519-
for _, s := range p.Sample {
520-
newValues := []int64{}
521-
for _, i := range keepIndexes {
522-
newValues = append(newValues, s.Value[i])
523-
}
524-
s.Value = newValues
525-
}
526-
p = p.Compact()
527-
newB := sl.buffers.Get(sl.lastScrapeSize).([]byte)
528-
newBuf := bytes.NewBuffer(newB)
529-
if err := p.Write(newBuf); err != nil {
530-
level.Error(sl.l).Log("msg", "failed to write profile data", "err", err)
531-
continue
532-
}
533-
sl.buffers.Put(b)
534-
byt = newBuf.Bytes()
535-
b = newB // We want to make sure we return the new buffer to the pool further below.
536-
}
537-
538-
_, err = sl.store.WriteRaw(sl.ctx, &profilepb.WriteRawRequest{
539-
Normalized: sl.normalizedAddresses,
540-
Series: []*profilepb.RawProfileSeries{
541-
{
542-
Labels: protolbls,
543-
Samples: []*profilepb.RawSample{
544-
{
545-
RawProfile: byt,
546-
ExecutableInfo: executableInfo,
547-
},
548-
},
549-
},
550-
},
551-
})
464+
err := processScrapeResp(buf, sl, profileType)
552465
if err != nil {
553-
switch errc {
554-
case nil:
555-
level.Error(sl.l).Log("msg", "WriteRaw failed for scraped profile", "err", err)
556-
default:
466+
if errc != nil {
557467
errc <- err
558468
}
469+
sl.target.health = HealthBad
470+
sl.target.lastError = err
471+
} else {
472+
sl.target.health = HealthGood
559473
}
560474

561-
sl.target.health = HealthGood
562475
sl.target.lastScrapeDuration = time.Since(start)
563-
sl.target.lastError = nil
564476
} else {
565477
level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())
566478
if errc != nil {
@@ -572,9 +484,7 @@ mainLoop:
572484
sl.target.lastError = scrapeErr
573485
}
574486

575-
sl.buffers.Put(b)
576487
last = start
577-
578488
sl.target.lastScrape = last
579489

580490
select {
@@ -590,6 +500,98 @@ mainLoop:
590500
close(sl.stopped)
591501
}
592502

503+
func processScrapeResp(buf *bytes.Buffer, sl *scrapeLoop, profileType string) error {
504+
b := buf.Bytes()
505+
defer sl.buffers.Put(b)
506+
// NOTE: There were issues with misbehaving clients in the past
507+
// that occasionally returned empty results. We don't want those
508+
// to falsely reset our buffer size.
509+
if len(b) > 0 {
510+
sl.lastScrapeSize = len(b)
511+
}
512+
513+
tl := labels.NewBuilder(sl.target.Labels())
514+
tl.Set("__name__", profileType)
515+
sl.externalLabels.Range(func(l labels.Label) {
516+
tl.Set(l.Name, l.Value)
517+
})
518+
519+
protolbls := &profilepb.LabelSet{
520+
Labels: []*profilepb.Label{},
521+
}
522+
tl.Range(func(l labels.Label) {
523+
protolbls.Labels = append(protolbls.Labels, &profilepb.Label{
524+
Name: l.Name,
525+
Value: l.Value,
526+
})
527+
})
528+
529+
byt := buf.Bytes()
530+
p, err := profile.ParseData(byt)
531+
if err != nil {
532+
level.Error(sl.l).Log("msg", "failed to parse profile data", "err", err)
533+
return err
534+
}
535+
536+
var executableInfo []*profilepb.ExecutableInfo
537+
for _, comment := range p.Comments {
538+
if strings.HasPrefix(comment, "executableInfo=") {
539+
ei, err := parseExecutableInfo(comment)
540+
if err != nil {
541+
level.Error(sl.l).Log("msg", "failed to parse executableInfo", "err", err)
542+
continue
543+
}
544+
545+
executableInfo = append(executableInfo, ei)
546+
}
547+
}
548+
549+
ks := sl.target.KeepSet()
550+
if len(ks) > 0 {
551+
keepIndexes := []int{}
552+
newTypes := []*profile.ValueType{}
553+
for i, st := range p.SampleType {
554+
if _, ok := ks[config.SampleType{Type: st.Type, Unit: st.Unit}]; ok {
555+
keepIndexes = append(keepIndexes, i)
556+
newTypes = append(newTypes, st)
557+
}
558+
}
559+
p.SampleType = newTypes
560+
for _, s := range p.Sample {
561+
newValues := []int64{}
562+
for _, i := range keepIndexes {
563+
newValues = append(newValues, s.Value[i])
564+
}
565+
s.Value = newValues
566+
}
567+
p = p.Compact()
568+
sl.buffers.Put(b)
569+
b = sl.buffers.Get(sl.lastScrapeSize).([]byte)
570+
newBuf := bytes.NewBuffer(b)
571+
if err := p.Write(newBuf); err != nil {
572+
level.Error(sl.l).Log("msg", "failed to write profile data", "err", err)
573+
return err
574+
}
575+
byt = newBuf.Bytes()
576+
}
577+
578+
_, err = sl.store.WriteRaw(sl.ctx, &profilepb.WriteRawRequest{
579+
Normalized: sl.normalizedAddresses,
580+
Series: []*profilepb.RawProfileSeries{
581+
{
582+
Labels: protolbls,
583+
Samples: []*profilepb.RawSample{
584+
{
585+
RawProfile: byt,
586+
ExecutableInfo: executableInfo,
587+
},
588+
},
589+
},
590+
},
591+
})
592+
return err
593+
}
594+
593595
// parseExecutableInfo parses the executableInfo string from the comment. It is in the format of: "executableInfo=elfType;offset;vaddr".
594596
func parseExecutableInfo(comment string) (*profilepb.ExecutableInfo, error) {
595597
eiString := strings.TrimPrefix(comment, "executableInfo=")

0 commit comments

Comments
 (0)