Skip to content

Commit d3ed259

Browse files
authored
add matcher cache for proxy store and tsdb store API (#111)
2 parents 01f4c87 + 9b54f83 commit d3ed259

File tree

8 files changed

+325
-29
lines changed

8 files changed

+325
-29
lines changed

cmd/thanos/receive.go

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package main
55

66
import (
77
"context"
8+
"encoding/json"
89
"fmt"
910
"net"
1011
"net/http"
@@ -29,6 +30,7 @@ import (
2930
"github.com/prometheus/prometheus/model/relabel"
3031
"github.com/prometheus/prometheus/tsdb"
3132
"github.com/prometheus/prometheus/tsdb/wlog"
33+
"github.com/thanos-io/thanos/pkg/store/storepb"
3234
"google.golang.org/grpc"
3335
"gopkg.in/yaml.v2"
3436

@@ -151,6 +153,20 @@ func runReceive(
151153
}
152154
}
153155

156+
// Create a matcher converter if specified by command line to cache expensive regex matcher conversions.
157+
// Proxy store and TSDB stores of all tenants share a single cache.
158+
var matcherConverter *storepb.MatcherConverter
159+
if conf.matcherConverterCacheCapacity > 0 {
160+
var err error
161+
matcherConverter, err = storepb.NewMatcherConverter(conf.matcherConverterCacheCapacity, reg)
162+
if err != nil {
163+
level.Error(logger).Log("msg", "failed to create matcher converter", "err", err)
164+
}
165+
}
166+
if matcherConverter != nil {
167+
multiTSDBOptions = append(multiTSDBOptions, receive.WithMatcherConverter(matcherConverter))
168+
}
169+
154170
rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), conf.rwServerCert, conf.rwServerKey, conf.rwServerClientCA, conf.rwServerTlsMinVersion)
155171
if err != nil {
156172
return err
@@ -337,6 +353,25 @@ func runReceive(
337353
w.WriteHeader(http.StatusOK)
338354
}
339355
}))
356+
srv.Handle("/-/matchers", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
357+
w.WriteHeader(http.StatusOK)
358+
if matcherConverter != nil {
359+
labelMatchers := matcherConverter.Keys()
360+
// Convert the slice to JSON
361+
jsonData, err := json.Marshal(labelMatchers)
362+
if err != nil {
363+
http.Error(w, "Failed to encode JSON", http.StatusInternalServerError)
364+
return
365+
}
366+
367+
// Set the Content-Type header and write the response
368+
w.Header().Set("Content-Type", "application/json")
369+
w.WriteHeader(http.StatusOK)
370+
if _, err := w.Write(jsonData); err != nil {
371+
level.Error(logger).Log("msg", "failed to write matchers json", "err", err)
372+
}
373+
}
374+
}))
340375
g.Add(func() error {
341376
statusProber.Healthy()
342377
return srv.ListenAndServe()
@@ -359,6 +394,9 @@ func runReceive(
359394
store.WithProxyStoreDebugLogging(debugLogging),
360395
store.WithoutDedup(),
361396
}
397+
if matcherConverter != nil {
398+
options = append(options, store.WithProxyStoreMatcherConverter(matcherConverter))
399+
}
362400

363401
proxy := store.NewProxyStore(
364402
logger,
@@ -932,9 +970,10 @@ type receiveConfig struct {
932970

933971
asyncForwardWorkerCount uint
934972

935-
numTopMetricsPerTenant int
936-
topMetricsMinimumCardinality uint64
937-
topMetricsUpdateInterval time.Duration
973+
numTopMetricsPerTenant int
974+
topMetricsMinimumCardinality uint64
975+
topMetricsUpdateInterval time.Duration
976+
matcherConverterCacheCapacity int
938977

939978
featureList *[]string
940979
}
@@ -1097,6 +1136,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
10971136
Default("10000").Uint64Var(&rc.topMetricsMinimumCardinality)
10981137
cmd.Flag("receive.top-metrics-update-interval", "The interval at which the top metrics are updated.").
10991138
Default("5m").DurationVar(&rc.topMetricsUpdateInterval)
1139+
cmd.Flag("receive.store-matcher-converter-cache-capacity", "The number of label matchers to cache in the matcher converter for the Store API. Set to 0 to disable to cache. Default is 0.").
1140+
Default("0").IntVar(&rc.matcherConverterCacheCapacity)
11001141
rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings()
11011142
}
11021143

pkg/receive/multitsdb.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type MultiTSDB struct {
6666
exemplarClients map[string]*exemplars.TSDB
6767

6868
metricNameFilterEnabled bool
69+
matcherConverter *storepb.MatcherConverter
6970
}
7071

7172
// MultiTSDBOption is a functional option for MultiTSDB.
@@ -78,6 +79,13 @@ func WithMetricNameFilterEnabled() MultiTSDBOption {
7879
}
7980
}
8081

82+
// WithMatcherConverter enables caching matcher converter consumed by children TSDB Stores.
83+
func WithMatcherConverter(mc *storepb.MatcherConverter) MultiTSDBOption {
84+
return func(s *MultiTSDB) {
85+
s.matcherConverter = mc
86+
}
87+
}
88+
8189
// NewMultiTSDB creates new MultiTSDB.
8290
// NOTE: Passed labels must be sorted lexicographically (alphabetically).
8391
func NewMultiTSDB(
@@ -742,6 +750,10 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
742750
if t.metricNameFilterEnabled {
743751
options = append(options, store.WithCuckooMetricNameStoreFilter())
744752
}
753+
// Pass matcher converter to children TSDB Stores.
754+
if t.matcherConverter != nil {
755+
options = append(options, store.WithTSDBStoreMatcherConverter(t.matcherConverter))
756+
}
745757
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset))
746758
t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil
747759
level.Info(logger).Log("msg", "TSDB is now ready")

pkg/store/local.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func ScanGRPCCurlProtoStreamMessages(data []byte, atEOF bool) (advance int, toke
130130
// Series returns all series for a requested time range and label matcher. The returned data may
131131
// exceed the requested time bounds.
132132
func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
133-
match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels)
133+
match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, nil)
134134
if err != nil {
135135
return status.Error(codes.InvalidArgument, err.Error())
136136
}

pkg/store/prometheus.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto
125125

126126
extLset := p.externalLabelsFn()
127127

128-
match, matchers, err := matchesExternalLabels(r.Matchers, extLset)
128+
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil)
129129
if err != nil {
130130
return status.Error(codes.InvalidArgument, err.Error())
131131
}
@@ -488,8 +488,14 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que
488488

489489
// matchesExternalLabels returns false if given matchers are not matching external labels.
490490
// If true, matchesExternalLabels also returns Prometheus matchers without those matching external labels.
491-
func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels) (bool, []*labels.Matcher, error) {
492-
tms, err := storepb.MatchersToPromMatchers(ms...)
491+
func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, mc *storepb.MatcherConverter) (bool, []*labels.Matcher, error) {
492+
var tms []*labels.Matcher
493+
var err error
494+
if mc != nil {
495+
tms, err = mc.MatchersToPromMatchers(ms...)
496+
} else {
497+
tms, err = storepb.MatchersToPromMatchers(ms...)
498+
}
493499
if err != nil {
494500
return false, nil, err
495501
}
@@ -537,7 +543,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin
537543
func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
538544
extLset := p.externalLabelsFn()
539545

540-
match, matchers, err := matchesExternalLabels(r.Matchers, extLset)
546+
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil)
541547
if err != nil {
542548
return nil, status.Error(codes.InvalidArgument, err.Error())
543549
}
@@ -600,7 +606,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
600606

601607
extLset := p.externalLabelsFn()
602608

603-
match, matchers, err := matchesExternalLabels(r.Matchers, extLset)
609+
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil)
604610
if err != nil {
605611
return nil, status.Error(codes.InvalidArgument, err.Error())
606612
}

pkg/store/proxy.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ type ProxyStore struct {
9999
tsdbSelector *TSDBSelector
100100
quorumChunkDedup bool
101101
enableDedup bool
102+
matcherConverter *storepb.MatcherConverter
102103
}
103104

104105
type proxyStoreMetrics struct {
@@ -162,6 +163,13 @@ func WithoutDedup() ProxyStoreOption {
162163
}
163164
}
164165

166+
// WithProxyStoreMatcherConverter returns a ProxyStoreOption that enables caching matcher converter for ProxyStore.
167+
func WithProxyStoreMatcherConverter(mc *storepb.MatcherConverter) ProxyStoreOption {
168+
return func(s *ProxyStore) {
169+
s.matcherConverter = mc
170+
}
171+
}
172+
165173
// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
166174
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL).
167175
func NewProxyStore(
@@ -273,7 +281,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
273281
reqLogger = log.With(reqLogger, "request", originalRequest.String())
274282
}
275283

276-
match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels)
284+
match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherConverter)
277285
if err != nil {
278286
return status.Error(codes.InvalidArgument, err.Error())
279287
}
@@ -472,7 +480,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, originalRequest *storepb.La
472480
if s.debugLogging {
473481
reqLogger = log.With(reqLogger, "request", originalRequest.String())
474482
}
475-
match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels)
483+
match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherConverter)
476484
if err != nil {
477485
return nil, status.Error(codes.InvalidArgument, err.Error())
478486
}
@@ -575,7 +583,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, originalRequest *storepb.L
575583
return nil, status.Error(codes.InvalidArgument, "label name parameter cannot be empty")
576584
}
577585

578-
match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels)
586+
match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherConverter)
579587
if err != nil {
580588
return nil, status.Error(codes.InvalidArgument, err.Error())
581589
}

pkg/store/storepb/custom.go

Lines changed: 103 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ import (
1212
"strings"
1313

1414
"github.com/gogo/protobuf/types"
15+
cache "github.com/hashicorp/golang-lru/v2"
1516
"github.com/pkg/errors"
17+
"github.com/prometheus/client_golang/prometheus"
18+
"github.com/prometheus/client_golang/prometheus/promauto"
1619
"github.com/prometheus/prometheus/model/labels"
1720
"google.golang.org/grpc/codes"
1821

@@ -381,34 +384,120 @@ func PromMatchersToMatchers(ms ...*labels.Matcher) ([]LabelMatcher, error) {
381384
return res, nil
382385
}
383386

387+
func matcherToPromMatcher(m LabelMatcher) (*labels.Matcher, error) {
388+
var t labels.MatchType
389+
390+
switch m.Type {
391+
case LabelMatcher_EQ:
392+
t = labels.MatchEqual
393+
case LabelMatcher_NEQ:
394+
t = labels.MatchNotEqual
395+
case LabelMatcher_RE:
396+
t = labels.MatchRegexp
397+
case LabelMatcher_NRE:
398+
t = labels.MatchNotRegexp
399+
default:
400+
return nil, errors.Errorf("unrecognized label matcher type %d", m.Type)
401+
}
402+
pm, err := labels.NewMatcher(t, m.Name, m.Value)
403+
if err != nil {
404+
return nil, err
405+
}
406+
return pm, nil
407+
}
408+
384409
// MatchersToPromMatchers returns Prometheus matchers from proto matchers.
385410
// NOTE: It allocates memory.
386411
func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) {
387412
res := make([]*labels.Matcher, 0, len(ms))
388413
for _, m := range ms {
389-
var t labels.MatchType
414+
m, err := matcherToPromMatcher(m)
415+
if err != nil {
416+
return nil, err
417+
}
418+
res = append(res, m)
419+
}
420+
return res, nil
421+
}
390422

391-
switch m.Type {
392-
case LabelMatcher_EQ:
393-
t = labels.MatchEqual
394-
case LabelMatcher_NEQ:
395-
t = labels.MatchNotEqual
396-
case LabelMatcher_RE:
397-
t = labels.MatchRegexp
398-
case LabelMatcher_NRE:
399-
t = labels.MatchNotRegexp
400-
default:
401-
return nil, errors.Errorf("unrecognized label matcher type %d", m.Type)
423+
type MatcherConverter struct {
424+
cache *cache.TwoQueueCache[LabelMatcher, *labels.Matcher]
425+
cacheCapacity int
426+
metrics *matcherConverterMetrics
427+
}
428+
429+
type matcherConverterMetrics struct {
430+
cacheTotalCount prometheus.Counter
431+
cacheHitCount prometheus.Counter
432+
cacheSizeGauge prometheus.Gauge
433+
}
434+
435+
func newMatcherConverterMetrics(reg prometheus.Registerer) *matcherConverterMetrics {
436+
var m matcherConverterMetrics
437+
438+
m.cacheTotalCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{
439+
Name: "thanos_store_matcher_converter_cache_total",
440+
Help: "Total number of cache access.",
441+
})
442+
m.cacheHitCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{
443+
Name: "thanos_store_matcher_converter_cache_hit_total",
444+
Help: "Total number of cache hits.",
445+
})
446+
m.cacheSizeGauge = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
447+
Name: "thanos_store_matcher_converter_cache_size",
448+
Help: "Current size of the cache.",
449+
})
450+
451+
return &m
452+
}
453+
454+
// NewMatcherConverter creates a new MatcherConverter with given capacity.
455+
func NewMatcherConverter(cacheCapacity int, reg prometheus.Registerer) (*MatcherConverter, error) {
456+
c, err := cache.New2Q[LabelMatcher, *labels.Matcher](cacheCapacity)
457+
if err != nil {
458+
return nil, err
459+
}
460+
metrics := newMatcherConverterMetrics(reg)
461+
return &MatcherConverter{cache: c, cacheCapacity: cacheCapacity, metrics: metrics}, nil
462+
}
463+
464+
// MatchersToPromMatchers converts proto label matchers to Prometheus label matchers. It caches regex conversions.
465+
func (c *MatcherConverter) MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) {
466+
res := make([]*labels.Matcher, 0, len(ms))
467+
for _, m := range ms {
468+
if m.Type != LabelMatcher_RE && m.Type != LabelMatcher_NRE {
469+
// EQ and NEQ are very cheap, so we don't cache them.
470+
pm, err := matcherToPromMatcher(m)
471+
if err != nil {
472+
return nil, err
473+
}
474+
res = append(res, pm)
475+
continue
402476
}
403-
m, err := labels.NewMatcher(t, m.Name, m.Value)
477+
c.metrics.cacheTotalCount.Inc()
478+
if pm, ok := c.cache.Get(m); ok {
479+
// cache hit
480+
c.metrics.cacheHitCount.Inc()
481+
res = append(res, pm)
482+
continue
483+
}
484+
// cache miss
485+
pm, err := matcherToPromMatcher(m)
404486
if err != nil {
405487
return nil, err
406488
}
407-
res = append(res, m)
489+
c.cache.Add(m, pm)
490+
res = append(res, pm)
408491
}
492+
c.metrics.cacheSizeGauge.Set(float64(c.cache.Len()))
409493
return res, nil
410494
}
411495

496+
// Get all keys from the cache for debugging.
497+
func (c *MatcherConverter) Keys() []LabelMatcher {
498+
return c.cache.Keys()
499+
}
500+
412501
// MatchersToString converts label matchers to string format.
413502
// String should be parsable as a valid PromQL query metric selector.
414503
func MatchersToString(ms ...LabelMatcher) string {

0 commit comments

Comments
 (0)