Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/otel_allocator_distinct.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
component: target-allocator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fixed potential duplicate scrape targets caused by Prometheus relabeling.

# One or more tracking issues related to the change
issues: [3617]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
2 changes: 1 addition & 1 deletion cmd/otel-allocator/internal/prehook/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (tf *relabelConfigTargetFilter) Apply(targets []*target.Item) []*target.Ite
}

if keepTarget {
targets[writeIndex] = tItem
targets[writeIndex] = target.NewItem(tItem.JobName, tItem.TargetURL, tItem.Labels, tItem.CollectorName, target.WithRelabeledLabels(lset))
writeIndex++
}
}
Expand Down
168 changes: 159 additions & 9 deletions cmd/otel-allocator/internal/prehook/relabel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ package prehook

import (
"crypto/rand"
"errors"
"fmt"
"math/big"
"strconv"
"strings"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/assert"
Expand All @@ -25,6 +29,8 @@ var (
defaultNumCollectors = 3
defaultStartIndex = 0

checkDistinctConfigLabel = "check-distinct-label-key"

relabelConfigs = []relabelConfigObj{
{
cfg: []*relabel.Config{
Expand Down Expand Up @@ -153,6 +159,11 @@ var (
Regex: relabel.MustNewRegexp("(.*)"),
Action: "drop",
}

CheckDistinctConfig = relabel.Config{
Regex: relabel.MustNewRegexp(checkDistinctConfigLabel),
Action: "labeldrop",
}
)

type relabelConfigObj struct {
Expand All @@ -167,20 +178,40 @@ func colIndex(index, numCols int) int {
return index % numCols
}

func makeNNewTargets(rCfgs []relabelConfigObj, n int, numCollectors int, startingIndex int) ([]*target.Item, int, []*target.Item, map[string][]*relabel.Config) {
func makeNNewTargets(rCfgs []relabelConfigObj, n int, numCollectors int, startingIndex int) ([]*target.Item, int, []*target.Item, map[string][]*relabel.Config, error) {
toReturn := []*target.Item{}
expected := []*target.Item{}
numItemsRemaining := n
relabelConfig := make(map[string][]*relabel.Config)
for i := startingIndex; i < n+startingIndex; i++ {
collector := fmt.Sprintf("collector-%d", colIndex(i, numCollectors))
jobName := fmt.Sprintf("test-job-%d", i)
label := labels.Labels{
{Name: "collector", Value: collector},
{Name: "i", Value: strconv.Itoa(i)},
{Name: "total", Value: strconv.Itoa(n + startingIndex)},
{Name: model.MetaLabelPrefix + strconv.Itoa(i), Value: strconv.Itoa(i)},
{Name: model.AddressLabel, Value: "address_value"},
// These labels are typically required for correct scraping behavior and are expected to be retained after relabeling.:
// - job
// - __scrape_interval__
// - __scrape_timeout__
// - __scheme__
// - __metrics_path__
// Prometheus adds these labels by default. Removing them via relabel_configs is considered invalid and is therefore ignored.
// For details, see:
// https://github.com/prometheus/prometheus/blob/e6cfa720fbe6280153fab13090a483dbd40bece3/scrape/target.go#L429
{Name: model.JobLabel, Value: jobName},
{Name: model.ScrapeIntervalLabel, Value: "10s"},
{Name: model.ScrapeTimeoutLabel, Value: "10s"},
{Name: model.SchemeLabel, Value: "http"},
{Name: model.MetricsPathLabel, Value: "/metrics" + strconv.Itoa(i)},

// Prometheus will automatically add the "instance" label if it is not present.
{Name: model.InstanceLabel, Value: "address_value"},
}
jobName := fmt.Sprintf("test-job-%d", i)
newTarget := target.NewItem(jobName, "test-url", label, collector)
rawTarget := target.NewItem(jobName, "test-url", label, collector)

// add a single replace, drop, or keep action as relabel_config for targets
var index int
ind, _ := rand.Int(rand.Reader, big.NewInt(int64(len(relabelConfigs))))
Expand All @@ -192,18 +223,24 @@ func makeNNewTargets(rCfgs []relabelConfigObj, n int, numCollectors int, startin
if relabelConfigs[index].isDrop {
numItemsRemaining--
} else {
newTarget, err := MakeTargetFromProm(relabelConfig[jobName], rawTarget)
if err != nil || newTarget == nil {
return nil, 0, nil, nil, fmt.Errorf("failed to create target from relabel config: %w", err)
}
expected = append(expected, newTarget)
}
toReturn = append(toReturn, newTarget)
toReturn = append(toReturn, rawTarget)
}
return toReturn, numItemsRemaining, expected, relabelConfig
return toReturn, numItemsRemaining, expected, relabelConfig, nil
}

func TestApply(t *testing.T) {
allocatorPrehook := New("relabel-config", logger)
assert.NotNil(t, allocatorPrehook)

targets, numRemaining, expectedTargetMap, relabelCfg := makeNNewTargets(relabelConfigs, defaultNumTargets, defaultNumCollectors, defaultStartIndex)
targets, numRemaining, expectedTargetMap, relabelCfg, err := makeNNewTargets(relabelConfigs, defaultNumTargets, defaultNumCollectors, defaultStartIndex)
assert.NoError(t, err)

allocatorPrehook.SetConfig(relabelCfg)
remainingItems := allocatorPrehook.Apply(targets)
assert.Len(t, remainingItems, numRemaining)
Expand All @@ -227,7 +264,9 @@ func TestApplyHashmodAction(t *testing.T) {
assert.NotNil(t, allocatorPrehook)

hashRelabelConfigs := append(relabelConfigs, HashmodConfig)
targets, numRemaining, expectedTargetMap, relabelCfg := makeNNewTargets(hashRelabelConfigs, defaultNumTargets, defaultNumCollectors, defaultStartIndex)
targets, numRemaining, expectedTargetMap, relabelCfg, err := makeNNewTargets(hashRelabelConfigs, defaultNumTargets, defaultNumCollectors, defaultStartIndex)
assert.NoError(t, err)

allocatorPrehook.SetConfig(relabelCfg)
remainingItems := allocatorPrehook.Apply(targets)
assert.Len(t, remainingItems, numRemaining)
Expand All @@ -239,7 +278,8 @@ func TestApplyEmptyRelabelCfg(t *testing.T) {
allocatorPrehook := New("relabel-config", logger)
assert.NotNil(t, allocatorPrehook)

targets, _, _, _ := makeNNewTargets(relabelConfigs, defaultNumTargets, defaultNumCollectors, defaultStartIndex)
targets, _, _, _, err := makeNNewTargets(relabelConfigs, defaultNumTargets, defaultNumCollectors, defaultStartIndex)
assert.NoError(t, err)

relabelCfg := map[string][]*relabel.Config{}
allocatorPrehook.SetConfig(relabelCfg)
Expand All @@ -253,7 +293,117 @@ func TestSetConfig(t *testing.T) {
allocatorPrehook := New("relabel-config", logger)
assert.NotNil(t, allocatorPrehook)

_, _, _, relabelCfg := makeNNewTargets(relabelConfigs, defaultNumTargets, defaultNumCollectors, defaultStartIndex)
_, _, _, relabelCfg, err := makeNNewTargets(relabelConfigs, defaultNumTargets, defaultNumCollectors, defaultStartIndex)
assert.NoError(t, err)

allocatorPrehook.SetConfig(relabelCfg)
assert.Equal(t, relabelCfg, allocatorPrehook.GetConfig())
}

func MakeTargetFromProm(rCfgs []*relabel.Config, rawTarget *target.Item) (*target.Item, error) {
lb := labels.NewBuilder(rawTarget.Labels)
cfg := &config.ScrapeConfig{
RelabelConfigs: rCfgs,
}
lset, _, err := PopulateLabels(lb, cfg)
if err != nil {
return nil, err
}
// If the lset is empty after relabeling, Prometheus drops the target.
if lset.IsEmpty() {
return nil, nil
}

newTarget := target.NewItem(rawTarget.JobName, rawTarget.TargetURL, rawTarget.Labels, rawTarget.CollectorName, target.WithRelabeledLabels(lset))
return newTarget, nil
}

// PopulateLabels is Copied from prometheus/scrape/target.go.
// Reason: "github.com/prometheus/common@0.65.0" and "github.com/prometheus/scrape@0.301.0" are incompatible (undefined: promslog.AllowedFormat).
func PopulateLabels(lb *labels.Builder, cfg *config.ScrapeConfig) (res, orig labels.Labels, err error) {
// Copy labels into the labelset for the target if they are not set already.
scrapeLabels := []labels.Label{
{Name: model.JobLabel, Value: cfg.JobName},
{Name: model.ScrapeIntervalLabel, Value: cfg.ScrapeInterval.String()},
{Name: model.ScrapeTimeoutLabel, Value: cfg.ScrapeTimeout.String()},
{Name: model.MetricsPathLabel, Value: cfg.MetricsPath},
{Name: model.SchemeLabel, Value: cfg.Scheme},
}

for _, l := range scrapeLabels {
if lb.Get(l.Name) == "" {
lb.Set(l.Name, l.Value)
}
}
// Encode scrape query parameters as labels.
for k, v := range cfg.Params {
if name := model.ParamLabelPrefix + k; len(v) > 0 && lb.Get(name) == "" {
lb.Set(name, v[0])
}
}

preRelabelLabels := lb.Labels()
keep := relabel.ProcessBuilder(lb, cfg.RelabelConfigs...)

// Check if the target was dropped.
if !keep {
return labels.EmptyLabels(), preRelabelLabels, nil
}
if v := lb.Get(model.AddressLabel); v == "" {
return labels.EmptyLabels(), labels.EmptyLabels(), errors.New("no address")
}

addr := lb.Get(model.AddressLabel)

if err = config.CheckTargetAddress(model.LabelValue(addr)); err != nil {
return labels.EmptyLabels(), labels.EmptyLabels(), err
}

interval := lb.Get(model.ScrapeIntervalLabel)
intervalDuration, err := model.ParseDuration(interval)
if err != nil {
return labels.EmptyLabels(), labels.EmptyLabels(), fmt.Errorf("error parsing scrape interval: %w", err)
}
if time.Duration(intervalDuration) == 0 {
return labels.EmptyLabels(), labels.EmptyLabels(), errors.New("scrape interval cannot be 0")
}

timeout := lb.Get(model.ScrapeTimeoutLabel)
timeoutDuration, err := model.ParseDuration(timeout)
if err != nil {
return labels.EmptyLabels(), labels.EmptyLabels(), fmt.Errorf("error parsing scrape timeout: %w", err)
}
if time.Duration(timeoutDuration) == 0 {
return labels.EmptyLabels(), labels.EmptyLabels(), errors.New("scrape timeout cannot be 0")
}

if timeoutDuration > intervalDuration {
return labels.EmptyLabels(), labels.EmptyLabels(), fmt.Errorf("scrape timeout cannot be greater than scrape interval (%q > %q)", timeout, interval)
}

// Meta labels are deleted after relabelling. Other internal labels propagate to
// the target which decides whether they will be part of their label set.
lb.Range(func(l labels.Label) {
if strings.HasPrefix(l.Name, model.MetaLabelPrefix) {
lb.Del(l.Name)
}
})

// Default the instance label to the target address.
if v := lb.Get(model.InstanceLabel); v == "" {
lb.Set(model.InstanceLabel, addr)
}

res = lb.Labels()
err = res.Validate(func(l labels.Label) error {
// Check label values are valid, drop the target if not.
if !model.LabelValue(l.Value).IsValid() {
return fmt.Errorf("invalid label value for %q: %q", l.Name, l.Value)
}
return nil
})
if err != nil {
return labels.EmptyLabels(), labels.EmptyLabels(), err
}
return res, preRelabelLabels, nil
}
48 changes: 36 additions & 12 deletions cmd/otel-allocator/internal/target/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package target

import (
"strconv"
"strings"

"github.com/cespare/xxhash/v2"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)

Expand All @@ -33,16 +35,32 @@ func (h ItemHash) String() string {

// Item represents a target to be scraped.
type Item struct {
JobName string
TargetURL string
Labels labels.Labels
CollectorName string
hash ItemHash
JobName string
TargetURL string
Labels labels.Labels
// relabeledLabels contains the final labels after Prometheus relabeling processing.
relabeledLabels labels.Labels
CollectorName string
hash ItemHash
}

type ItemOption func(*Item)

func WithRelabeledLabels(lbs labels.Labels) ItemOption {
return func(i *Item) {
// In Prometheus, labels with the MetaLabelPrefix are discarded after relabeling, which means they are not used in hash calculation.
// For details, see https://github.com/prometheus/prometheus/blob/e6cfa720fbe6280153fab13090a483dbd40bece3/scrape/target.go#L534.
for _, l := range lbs {
if !strings.HasPrefix(l.Name, model.MetaLabelPrefix) {
i.relabeledLabels = append(i.relabeledLabels, l)
}
}
}
}

func (t *Item) Hash() ItemHash {
if t.hash == 0 {
t.hash = ItemHash(LabelsHashWithJobName(t.Labels, t.JobName))
t.hash = ItemHash(LabelsHashWithJobName(t.relabeledLabels, t.JobName))
}
return t.hash
}
Expand Down Expand Up @@ -71,13 +89,19 @@ func (t *Item) GetEndpointSliceName() string {
// NewItem Creates a new target item.
// INVARIANTS:
// * Item fields must not be modified after creation.
func NewItem(jobName string, targetURL string, labels labels.Labels, collectorName string) *Item {
return &Item{
JobName: jobName,
TargetURL: targetURL,
Labels: labels,
CollectorName: collectorName,
func NewItem(jobName string, targetURL string, labels labels.Labels, collectorName string, opts ...ItemOption) *Item {
item := &Item{
JobName: jobName,
TargetURL: targetURL,
Labels: labels,
// relabeledLabels defaults to original labels if WithRelabeledLabels is not specified.
relabeledLabels: labels,
CollectorName: collectorName,
}
for _, opt := range opts {
opt(item)
}
return item
}

// LabelsHashWithJobName computes a hash of the labels and the job name.
Expand Down
Loading