Skip to content

Commit d35a00c

Browse files
authored
receiver: Add hot reload for relabel configs (#124)
2 parents d83dae6 + e6fcd04 commit d35a00c

File tree

10 files changed

+351
-47
lines changed

10 files changed

+351
-47
lines changed

cmd/thanos/receive.go

Lines changed: 44 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,10 @@ import (
2727
"github.com/prometheus/client_golang/prometheus/promauto"
2828
"github.com/prometheus/common/model"
2929
"github.com/prometheus/prometheus/model/labels"
30-
"github.com/prometheus/prometheus/model/relabel"
3130
"github.com/prometheus/prometheus/tsdb"
3231
"github.com/prometheus/prometheus/tsdb/wlog"
3332
"github.com/thanos-io/thanos/pkg/store/storepb"
3433
"google.golang.org/grpc"
35-
"gopkg.in/yaml.v2"
3634

3735
"github.com/thanos-io/objstore"
3836
"github.com/thanos-io/objstore/client"
@@ -231,14 +229,11 @@ func runReceive(
231229
return errors.Wrapf(err, "migrate legacy storage in %v to default tenant %v", conf.dataDir, conf.defaultTenantID)
232230
}
233231

234-
relabelContentYaml, err := conf.relabelConfigPath.Content()
232+
relabeller, err := receive.NewRelabeller(conf.relabelConfigPath, reg, logger, conf.relabelConfigReloadTimer)
233+
235234
if err != nil {
236235
return errors.Wrap(err, "get content of relabel configuration")
237236
}
238-
var relabelConfig []*relabel.Config
239-
if err := yaml.Unmarshal(relabelContentYaml, &relabelConfig); err != nil {
240-
return errors.Wrap(err, "parse relabel configuration")
241-
}
242237

243238
dbs := receive.NewMultiTSDB(
244239
conf.dataDir,
@@ -286,30 +281,47 @@ func runReceive(
286281
}
287282

288283
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
289-
Writer: writer,
290-
ListenAddress: conf.rwAddress,
291-
Registry: reg,
292-
Endpoint: conf.endpoint,
293-
TenantHeader: conf.tenantHeader,
294-
TenantField: conf.tenantField,
295-
DefaultTenantID: conf.defaultTenantID,
296-
ReplicaHeader: conf.replicaHeader,
297-
ReplicationFactor: conf.replicationFactor,
298-
RelabelConfigs: relabelConfig,
299-
ReceiverMode: receiveMode,
300-
Tracer: tracer,
301-
TLSConfig: rwTLSConfig,
302-
SplitTenantLabelName: conf.splitTenantLabelName,
303-
DialOpts: dialOpts,
304-
ForwardTimeout: time.Duration(*conf.forwardTimeout),
305-
MaxBackoff: time.Duration(*conf.maxBackoff),
306-
TSDBStats: dbs,
307-
Limiter: limiter,
308-
284+
Writer: writer,
285+
ListenAddress: conf.rwAddress,
286+
Registry: reg,
287+
Endpoint: conf.endpoint,
288+
TenantHeader: conf.tenantHeader,
289+
TenantField: conf.tenantField,
290+
DefaultTenantID: conf.defaultTenantID,
291+
ReplicaHeader: conf.replicaHeader,
292+
ReplicationFactor: conf.replicationFactor,
293+
Relabeller: relabeller,
294+
ReceiverMode: receiveMode,
295+
Tracer: tracer,
296+
TLSConfig: rwTLSConfig,
297+
SplitTenantLabelName: conf.splitTenantLabelName,
298+
DialOpts: dialOpts,
299+
ForwardTimeout: time.Duration(*conf.forwardTimeout),
300+
MaxBackoff: time.Duration(*conf.maxBackoff),
301+
TSDBStats: dbs,
302+
Limiter: limiter,
309303
AsyncForwardWorkerCount: conf.asyncForwardWorkerCount,
310304
ReplicationProtocol: receive.ReplicationProtocol(conf.replicationProtocol),
311305
})
312306

307+
{
308+
if relabeller.CanReload() {
309+
ctx, cancel := context.WithCancel(context.Background())
310+
g.Add(func() error {
311+
level.Debug(logger).Log("msg", "relabel config initialized with file watcher.")
312+
if err := relabeller.StartConfigReloader(ctx); err != nil {
313+
level.Error(logger).Log("msg", "initializing relabel config reloading.", "err", err)
314+
return err
315+
}
316+
level.Info(logger).Log("msg", "relabel config reloading initialized.")
317+
<-ctx.Done()
318+
return nil
319+
}, func(error) {
320+
cancel()
321+
})
322+
}
323+
}
324+
313325
grpcProbe := prober.NewGRPC()
314326
httpProbe := prober.NewHTTP()
315327
statusProber := prober.Combine(
@@ -974,8 +986,9 @@ type receiveConfig struct {
974986
ignoreBlockSize bool
975987
allowOutOfOrderUpload bool
976988

977-
reqLogConfig *extflag.PathOrContent
978-
relabelConfigPath *extflag.PathOrContent
989+
reqLogConfig *extflag.PathOrContent
990+
relabelConfigPath *extflag.PathOrContent
991+
relabelConfigReloadTimer time.Duration
979992

980993
writeLimitsConfig *extflag.PathOrContent
981994
storeRateLimits store.SeriesSelectLimits
@@ -1073,6 +1086,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
10731086
rc.maxBackoff = extkingpin.ModelDuration(cmd.Flag("receive-forward-max-backoff", "Maximum backoff for each forward fan-out request").Default("5s").Hidden())
10741087

10751088
rc.relabelConfigPath = extflag.RegisterPathOrContent(cmd, "receive.relabel-config", "YAML file that contains relabeling configuration.", extflag.WithEnvSubstitution())
1089+
cmd.Flag("receive.relabel-config-reload-timer", "Minimum amount of time to pass for the relabel configuration to be reloaded. Helps to avoid excessive reloads.").
1090+
Default("0s").Hidden().DurationVar(&rc.relabelConfigReloadTimer)
10761091

10771092
rc.tsdbMinBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden())
10781093

pkg/receive/handler.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ type Options struct {
112112
DialOpts []grpc.DialOption
113113
ForwardTimeout time.Duration
114114
MaxBackoff time.Duration
115-
RelabelConfigs []*relabel.Config
115+
Relabeller *Relabeller
116116
TSDBStats TSDBStats
117117
Limiter *Limiter
118118
AsyncForwardWorkerCount uint
@@ -1104,13 +1104,14 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
11041104

11051105
// relabel relabels the time series labels in the remote write request.
11061106
func (h *Handler) relabel(wreq *prompb.WriteRequest) {
1107-
if len(h.options.RelabelConfigs) == 0 {
1107+
relabelConfigs := h.options.Relabeller.RelabelConfig()
1108+
if len(relabelConfigs) == 0 {
11081109
return
11091110
}
11101111
timeSeries := make([]prompb.TimeSeries, 0, len(wreq.Timeseries))
11111112
for _, ts := range wreq.Timeseries {
11121113
var keep bool
1113-
lbls, keep := relabel.Process(labelpb.ZLabelsToPromLabels(ts.Labels), h.options.RelabelConfigs...)
1114+
lbls, keep := relabel.Process(labelpb.ZLabelsToPromLabels(ts.Labels), relabelConfigs...)
11141115
if !keep {
11151116
continue
11161117
}

pkg/receive/handler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1733,7 +1733,7 @@ func TestRelabel(t *testing.T) {
17331733
} {
17341734
t.Run(tcase.name, func(t *testing.T) {
17351735
h := NewHandler(nil, &Options{
1736-
RelabelConfigs: tcase.relabel,
1736+
Relabeller: newRelabelerWithConstantConfig(tcase.relabel, nil),
17371737
})
17381738

17391739
h.relabel(&tcase.writeRequest)

pkg/receive/interfaces.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
// Copyright (c) The Thanos Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package receive
5+
6+
// fileContent is an interface to avoid a direct dependency on kingpin or extkingpin.
7+
type fileContent interface {
8+
Content() ([]byte, error)
9+
Path() string
10+
}

pkg/receive/limiter.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,6 @@ type requestLimiter interface {
6464
AllowSamples(tenant string, amount int64) bool
6565
}
6666

67-
// fileContent is an interface to avoid a direct dependency on kingpin or extkingpin.
68-
type fileContent interface {
69-
Content() ([]byte, error)
70-
Path() string
71-
}
72-
7367
func (l *Limiter) HeadSeriesLimiter() headSeriesLimiter {
7468
l.headSeriesLimiterMtx.Lock()
7569
defer l.headSeriesLimiterMtx.Unlock()
@@ -143,14 +137,6 @@ func NewLimiterWithOptions(
143137
Help: "How many times the limit configuration failed to reload.",
144138
},
145139
)
146-
limiter.configReloadFailedCounter = promauto.With(limiter.registerer).NewCounter(
147-
prometheus.CounterOpts{
148-
Namespace: "thanos",
149-
Subsystem: "receive",
150-
Name: "limits_config_reload_err_total",
151-
Help: "How many times the limit configuration failed to reload.",
152-
},
153-
)
154140
limiter.maxPendingRequestLimitHit = promauto.With(limiter.registerer).NewCounter(
155141
prometheus.CounterOpts{
156142
Name: "thanos_receive_max_pending_write_request_limit_hit_total",

pkg/receive/relabeller.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
// Copyright (c) The Thanos Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package receive
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"time"
10+
11+
"github.com/go-kit/log"
12+
"github.com/go-kit/log/level"
13+
"github.com/pkg/errors"
14+
"github.com/prometheus/client_golang/prometheus"
15+
"github.com/prometheus/client_golang/prometheus/promauto"
16+
"github.com/prometheus/prometheus/model/relabel"
17+
"github.com/thanos-io/thanos/pkg/extkingpin"
18+
"go.uber.org/atomic"
19+
"gopkg.in/yaml.v2"
20+
)
21+
22+
// Relabeller is responsible for managing the configuration and initialization of
23+
// different types that apply relabel configurations to the Receive instance.
24+
// The new config is atomically swapped in, so we don't need to use locks.
25+
type Relabeller struct {
26+
configPathOrContent fileContent
27+
relabelConfigs *atomic.Pointer[RelabelConfig]
28+
logger log.Logger
29+
configReloadCounter prometheus.Counter
30+
configReloadFailedCounter prometheus.Counter
31+
configReloadTimer time.Duration
32+
}
33+
34+
// RelabelConfig is a collection of relabel configurations.
35+
type RelabelConfig []*relabel.Config
36+
37+
// NewRelabeller creates a new relabeller and loads the configuration to make sure loading is possible.
38+
func NewRelabeller(configFile fileContent, reg prometheus.Registerer, logger log.Logger, configReloadTimer time.Duration) (*Relabeller, error) {
39+
var relabelConfigs atomic.Pointer[RelabelConfig]
40+
relabelConfigs.Store(&RelabelConfig{})
41+
relabeller := &Relabeller{
42+
configPathOrContent: configFile,
43+
relabelConfigs: &relabelConfigs,
44+
logger: logger,
45+
configReloadTimer: configReloadTimer,
46+
}
47+
48+
if reg != nil {
49+
relabeller.configReloadCounter = promauto.With(reg).NewCounter(
50+
prometheus.CounterOpts{
51+
Namespace: "thanos",
52+
Subsystem: "receive",
53+
Name: "relabel_config_reload_total",
54+
Help: "How many times the relabel configuration was reloaded",
55+
},
56+
)
57+
relabeller.configReloadFailedCounter = promauto.With(reg).NewCounter(
58+
prometheus.CounterOpts{
59+
Namespace: "thanos",
60+
Subsystem: "receive",
61+
Name: "relabel_config_reload_err_total",
62+
Help: "How many times the relabel configuration failed to reload.",
63+
},
64+
)
65+
}
66+
if configFile == nil {
67+
return relabeller, nil
68+
}
69+
relabeller.configPathOrContent = configFile
70+
if err := relabeller.loadConfig(); err != nil {
71+
return nil, errors.Wrap(err, "load relabel config")
72+
}
73+
return relabeller, nil
74+
}
75+
76+
// simply returns the provided config. This is useful for testing.
77+
func newRelabelerWithConstantConfig(config RelabelConfig, logger log.Logger) *Relabeller {
78+
var relabelConfigs atomic.Pointer[RelabelConfig]
79+
relabelConfigs.Store(&config)
80+
return &Relabeller{nil, &relabelConfigs, logger, nil, nil, 0}
81+
}
82+
83+
// RelabelConfig returns the current relabel config.
84+
// This is concurrent safe.
85+
func (r *Relabeller) RelabelConfig() RelabelConfig {
86+
if r == nil {
87+
var relabelConfig RelabelConfig
88+
return relabelConfig
89+
}
90+
return *r.relabelConfigs.Load()
91+
}
92+
93+
// setRelabelConfig sets the relabel config to the provided array.
94+
// This is concurrent safe.
95+
func (r *Relabeller) setRelabelConfig(configs RelabelConfig) {
96+
r.relabelConfigs.Store(&configs)
97+
}
98+
99+
func (r *Relabeller) loadConfig() error {
100+
relabelContentYaml, err := r.configPathOrContent.Content()
101+
if err != nil {
102+
return errors.Wrap(err, "getting content of relabel config")
103+
}
104+
var relabelConfig RelabelConfig
105+
if err := yaml.Unmarshal(relabelContentYaml, &relabelConfig); err != nil {
106+
return errors.Wrap(err, "parsing relabel config")
107+
}
108+
r.setRelabelConfig(relabelConfig)
109+
return nil
110+
}
111+
112+
// StartConfigReloader starts the automatic configuration reloader based off of
113+
// the file indicated by pathOrContent.
114+
func (r *Relabeller) StartConfigReloader(ctx context.Context) error {
115+
if !r.CanReload() {
116+
return nil
117+
}
118+
119+
return extkingpin.PathContentReloader(ctx, r.configPathOrContent, r.logger, func() {
120+
level.Info(r.logger).Log("msg", "reloading relabel config.")
121+
122+
if err := r.loadConfig(); err != nil {
123+
if failedReload := r.configReloadFailedCounter; failedReload != nil {
124+
failedReload.Inc()
125+
}
126+
errMsg := fmt.Sprintf("error reloading relabel config from %s", r.configPathOrContent.Path())
127+
level.Error(r.logger).Log("msg", errMsg, "err", err)
128+
}
129+
if reloadCounter := r.configReloadCounter; reloadCounter != nil {
130+
reloadCounter.Inc()
131+
}
132+
133+
}, r.configReloadTimer)
134+
}
135+
136+
func (r *Relabeller) CanReload() bool {
137+
if r.configReloadTimer == 0 {
138+
return false
139+
}
140+
if r.configPathOrContent == nil {
141+
return false
142+
}
143+
if r.configPathOrContent.Path() == "" {
144+
return false
145+
}
146+
return true
147+
}

0 commit comments

Comments
 (0)