Skip to content

Commit 4200ada

Browse files
authored
Merge all db_main commits to release (#122)
2 parents 149364c + 85c58c8 commit 4200ada

File tree

20 files changed

+906
-141
lines changed

20 files changed

+906
-141
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
3838
- [#7885](https://github.com/thanos-io/thanos/pull/7885) Store: Return chunks to the pool after completing a Series call.
3939
- [#7893](https://github.com/thanos-io/thanos/pull/7893) Sidecar: Fix retrieval of external labels for Prometheus v3.0.0.
4040
- [#7903](https://github.com/thanos-io/thanos/pull/7903) Query: Fix panic on regex store matchers.
41+
- [#7915](https://github.com/thanos-io/thanos/pull/7915) Store: Close block series client at the end to not reuse chunk buffer
42+
- [#7941](https://github.com/thanos-io/thanos/pull/7941) Receive: Fix race condition when adding multiple new tenants, see [issue-7892](https://github.com/thanos-io/thanos/issues/7892).
4143

4244
### Added
4345
- [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics.

cmd/thanos/compact.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,12 @@ func runCompact(
436436
level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h])
437437
}
438438

439+
retentionByTenant, err := compact.ParesRetentionPolicyByTenant(logger, *conf.retentionTenants)
440+
if err != nil {
441+
level.Error(logger).Log("msg", "failed to parse retention policy by tenant", "err", err)
442+
return err
443+
}
444+
439445
var cleanMtx sync.Mutex
440446
// TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex.
441447
cleanPartialMarked := func() error {
@@ -456,6 +462,17 @@ func runCompact(
456462
}
457463

458464
compactMainFn := func() error {
465+
// this should happen before any compaction to remove unnecessary process on backlogs beyond retention.
466+
if len(retentionByTenant) != 0 && len(sy.Metas()) == 0 {
467+
level.Info(logger).Log("msg", "sync before tenant retention due to no blocks")
468+
if err := sy.SyncMetas(ctx); err != nil {
469+
return errors.Wrap(err, "sync before tenant retention")
470+
}
471+
}
472+
if err := compact.ApplyRetentionPolicyByTenant(ctx, logger, insBkt, sy.Metas(), retentionByTenant, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, metadata.TenantRetentionExpired)); err != nil {
473+
return errors.Wrap(err, "retention by tenant failed")
474+
}
475+
459476
if err := compactor.Compact(ctx); err != nil {
460477
return errors.Wrap(err, "whole compaction error")
461478
}
@@ -726,6 +743,7 @@ type compactConfig struct {
726743
objStore extflag.PathOrContent
727744
consistencyDelay time.Duration
728745
retentionRaw, retentionFiveMin, retentionOneHr model.Duration
746+
retentionTenants *[]string
729747
wait bool
730748
waitInterval time.Duration
731749
disableDownsampling bool
@@ -781,6 +799,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
781799
Default("0d").SetValue(&cc.retentionFiveMin)
782800
cmd.Flag("retention.resolution-1h", "How long to retain samples of resolution 2 (1 hour) in bucket. Setting this to 0d will retain samples of this resolution forever").
783801
Default("0d").SetValue(&cc.retentionOneHr)
802+
cc.retentionTenants = cmd.Flag("retention.tenant", "How long to retain samples in bucket per tenant. Setting this to 0d will retain samples of this resolution forever").Strings()
784803

785804
// TODO(kakkoyun, pgough): https://github.com/thanos-io/thanos/issues/2266.
786805
cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work.").

cmd/thanos/receive.go

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package main
55

66
import (
77
"context"
8+
"encoding/json"
89
"fmt"
910
"net"
1011
"net/http"
@@ -29,6 +30,7 @@ import (
2930
"github.com/prometheus/prometheus/model/relabel"
3031
"github.com/prometheus/prometheus/tsdb"
3132
"github.com/prometheus/prometheus/tsdb/wlog"
33+
"github.com/thanos-io/thanos/pkg/store/storepb"
3234
"google.golang.org/grpc"
3335
"gopkg.in/yaml.v2"
3436

@@ -151,6 +153,20 @@ func runReceive(
151153
}
152154
}
153155

156+
// Create a matcher converter if specified by command line to cache expensive regex matcher conversions.
157+
// Proxy store and TSDB stores of all tenants share a single cache.
158+
var matcherConverter *storepb.MatcherConverter
159+
if conf.matcherConverterCacheCapacity > 0 {
160+
var err error
161+
matcherConverter, err = storepb.NewMatcherConverter(conf.matcherConverterCacheCapacity, reg)
162+
if err != nil {
163+
level.Error(logger).Log("msg", "failed to create matcher converter", "err", err)
164+
}
165+
}
166+
if matcherConverter != nil {
167+
multiTSDBOptions = append(multiTSDBOptions, receive.WithMatcherConverter(matcherConverter))
168+
}
169+
154170
rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), conf.rwServerCert, conf.rwServerKey, conf.rwServerClientCA, conf.rwServerTlsMinVersion)
155171
if err != nil {
156172
return err
@@ -252,7 +268,19 @@ func runReceive(
252268
return errors.Wrap(err, "parse limit configuration")
253269
}
254270
}
255-
limiter, err := receive.NewLimiter(conf.writeLimitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"), conf.limitsConfigReloadTimer)
271+
if conf.maxPendingGrpcWriteRequests > 0 {
272+
level.Info(logger).Log("msg", "set max pending gRPC write request in limiter", "max_pending_requests", conf.maxPendingGrpcWriteRequests)
273+
}
274+
limiter, err := receive.NewLimiterWithOptions(
275+
conf.writeLimitsConfig,
276+
reg,
277+
receiveMode,
278+
log.With(logger, "component", "receive-limiter"),
279+
conf.limitsConfigReloadTimer,
280+
receive.LimiterOptions{
281+
MaxPendingRequests: int32(conf.maxPendingGrpcWriteRequests),
282+
},
283+
)
256284
if err != nil {
257285
return errors.Wrap(err, "creating limiter")
258286
}
@@ -337,6 +365,25 @@ func runReceive(
337365
w.WriteHeader(http.StatusOK)
338366
}
339367
}))
368+
srv.Handle("/-/matchers", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
369+
w.WriteHeader(http.StatusOK)
370+
if matcherConverter != nil {
371+
labelMatchers := matcherConverter.Keys()
372+
// Convert the slice to JSON
373+
jsonData, err := json.Marshal(labelMatchers)
374+
if err != nil {
375+
http.Error(w, "Failed to encode JSON", http.StatusInternalServerError)
376+
return
377+
}
378+
379+
// Set the Content-Type header and write the response
380+
w.Header().Set("Content-Type", "application/json")
381+
w.WriteHeader(http.StatusOK)
382+
if _, err := w.Write(jsonData); err != nil {
383+
level.Error(logger).Log("msg", "failed to write matchers json", "err", err)
384+
}
385+
}
386+
}))
340387
g.Add(func() error {
341388
statusProber.Healthy()
342389
return srv.ListenAndServe()
@@ -359,6 +406,9 @@ func runReceive(
359406
store.WithProxyStoreDebugLogging(debugLogging),
360407
store.WithoutDedup(),
361408
}
409+
if matcherConverter != nil {
410+
options = append(options, store.WithProxyStoreMatcherConverter(matcherConverter))
411+
}
362412

363413
proxy := store.NewProxyStore(
364414
logger,
@@ -370,6 +420,7 @@ func runReceive(
370420
store.LazyRetrieval,
371421
options...,
372422
)
423+
373424
mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits)
374425
rw := store.ReadWriteTSDBStore{
375426
StoreServer: mts,
@@ -932,9 +983,11 @@ type receiveConfig struct {
932983

933984
asyncForwardWorkerCount uint
934985

935-
numTopMetricsPerTenant int
936-
topMetricsMinimumCardinality uint64
937-
topMetricsUpdateInterval time.Duration
986+
numTopMetricsPerTenant int
987+
topMetricsMinimumCardinality uint64
988+
topMetricsUpdateInterval time.Duration
989+
matcherConverterCacheCapacity int
990+
maxPendingGrpcWriteRequests int
938991

939992
featureList *[]string
940993
}
@@ -1097,6 +1150,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
10971150
Default("10000").Uint64Var(&rc.topMetricsMinimumCardinality)
10981151
cmd.Flag("receive.top-metrics-update-interval", "The interval at which the top metrics are updated.").
10991152
Default("5m").DurationVar(&rc.topMetricsUpdateInterval)
1153+
cmd.Flag("receive.store-matcher-converter-cache-capacity", "The number of label matchers to cache in the matcher converter for the Store API. Set to 0 to disable to cache. Default is 0.").
1154+
Default("0").IntVar(&rc.matcherConverterCacheCapacity)
1155+
cmd.Flag("receive.max-pending-grcp-write-requests", "Reject right away gRPC write requests when this number of requests are pending. Value 0 disables this feature.").
1156+
Default("0").IntVar(&rc.maxPendingGrpcWriteRequests)
11001157
rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings()
11011158
}
11021159

pkg/block/metadata/markers.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ const (
8181
OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk"
8282
// DownsampleVerticalCompactionNoCompactReason is a reason to not compact overlapping downsampled blocks as it does not make sense e.g. how to vertically compact the average.
8383
DownsampleVerticalCompactionNoCompactReason = "downsample-vertical-compaction"
84+
// TenantRetentionExpired is a reason to delete block as it's per tenant retention is expired.
85+
TenantRetentionExpired = "tenant-retention-expired"
8486
)
8587

8688
// NoCompactMark marker stores reason of block being excluded from compaction if needed.

pkg/compact/retention.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,27 @@ package compact
66
import (
77
"context"
88
"fmt"
9+
"regexp"
910
"time"
1011

1112
"github.com/go-kit/log"
1213
"github.com/go-kit/log/level"
1314
"github.com/oklog/ulid"
1415
"github.com/pkg/errors"
1516
"github.com/prometheus/client_golang/prometheus"
17+
"github.com/prometheus/common/model"
1618
"github.com/thanos-io/objstore"
1719

1820
"github.com/thanos-io/thanos/pkg/block"
1921
"github.com/thanos-io/thanos/pkg/block/metadata"
2022
)
2123

24+
const (
25+
// tenantRetentionRegex is the regex pattern for parsing tenant retention.
26+
// valid format is `<tenant>:(<yyyy-mm-dd>|<duration>d)` where <duration> > 0.
27+
tenantRetentionRegex = `^([\w-]+):((\d{4}-\d{2}-\d{2})|(\d+d))$`
28+
)
29+
2230
// ApplyRetentionPolicyByResolution removes blocks depending on the specified retentionByResolution based on blocks MaxTime.
2331
// A value of 0 disables the retention for its resolution.
2432
func ApplyRetentionPolicyByResolution(
@@ -47,3 +55,87 @@ func ApplyRetentionPolicyByResolution(
4755
level.Info(logger).Log("msg", "optional retention apply done")
4856
return nil
4957
}
58+
59+
type RetentionPolicy struct {
60+
CutoffDate time.Time
61+
RetentionDuration time.Duration
62+
}
63+
64+
func (r RetentionPolicy) isExpired(blockMaxTime time.Time) bool {
65+
if r.CutoffDate.IsZero() {
66+
return time.Now().After(blockMaxTime.Add(r.RetentionDuration))
67+
}
68+
return r.CutoffDate.After(blockMaxTime)
69+
}
70+
71+
func ParesRetentionPolicyByTenant(logger log.Logger, retentionTenants []string) (map[string]RetentionPolicy, error) {
72+
pattern := regexp.MustCompile(tenantRetentionRegex)
73+
retentionByTenant := make(map[string]RetentionPolicy, len(retentionTenants))
74+
for _, tenantRetention := range retentionTenants {
75+
matches := pattern.FindStringSubmatch(tenantRetention)
76+
invalidFormat := errors.Errorf("invalid retention format for tenant: %s, must be `<tenant>:(<yyyy-mm-dd>|<duration>d)`", tenantRetention)
77+
if len(matches) != 5 {
78+
return nil, errors.Wrapf(invalidFormat, "matched size %d", len(matches))
79+
}
80+
tenant := matches[1]
81+
var policy RetentionPolicy
82+
if _, ok := retentionByTenant[tenant]; ok {
83+
return nil, errors.Errorf("duplicate retention policy for tenant: %s", tenant)
84+
}
85+
if cutoffDate, err := time.Parse(time.DateOnly, matches[3]); matches[3] != "" {
86+
if err != nil {
87+
return nil, errors.Wrapf(invalidFormat, "error parsing cutoff date: %v", err)
88+
}
89+
policy.CutoffDate = cutoffDate
90+
}
91+
if duration, err := model.ParseDuration(matches[4]); matches[4] != "" {
92+
if err != nil {
93+
return nil, errors.Wrapf(invalidFormat, "error parsing duration: %v", err)
94+
} else if duration == 0 {
95+
return nil, errors.Wrapf(invalidFormat, "duration must be greater than 0")
96+
}
97+
policy.RetentionDuration = time.Duration(duration)
98+
}
99+
level.Info(logger).Log("msg", "retention policy for tenant is enabled", "tenant", tenant, "retention policy", fmt.Sprintf("%v", policy))
100+
retentionByTenant[tenant] = policy
101+
}
102+
return retentionByTenant, nil
103+
}
104+
105+
// ApplyRetentionPolicyByTenant removes blocks depending on the specified retentionByTenant based on blocks MaxTime.
106+
func ApplyRetentionPolicyByTenant(
107+
ctx context.Context,
108+
logger log.Logger,
109+
bkt objstore.Bucket,
110+
metas map[ulid.ULID]*metadata.Meta,
111+
retentionByTenant map[string]RetentionPolicy,
112+
blocksMarkedForDeletion prometheus.Counter) error {
113+
if len(retentionByTenant) == 0 {
114+
level.Info(logger).Log("msg", "tenant retention is disabled due to no policy")
115+
return nil
116+
}
117+
level.Info(logger).Log("msg", "start tenant retention", "total", len(metas))
118+
deleted, skipped, notExpired := 0, 0, 0
119+
for id, m := range metas {
120+
policy, ok := retentionByTenant[m.Thanos.GetTenant()]
121+
if !ok {
122+
skipped++
123+
continue
124+
}
125+
maxTime := time.Unix(m.MaxTime/1000, 0)
126+
if policy.isExpired(maxTime) {
127+
level.Info(logger).Log("msg", "applying retention: marking block for deletion", "id", id, "maxTime", maxTime.String())
128+
if err := block.Delete(ctx, logger, bkt, id); err != nil {
129+
level.Error(logger).Log("msg", "failed to delete block", "id", id, "err", err)
130+
continue // continue to next block to clean up backlogs
131+
} else {
132+
blocksMarkedForDeletion.Inc()
133+
deleted++
134+
}
135+
} else {
136+
notExpired++
137+
}
138+
}
139+
level.Info(logger).Log("msg", "tenant retention apply done", "deleted", deleted, "skipped", skipped, "notExpired", notExpired)
140+
return nil
141+
}

0 commit comments

Comments
 (0)