diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index c12e8cefc00..b9d45e6aae5 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -60,6 +60,7 @@ import ( const ( compressionNone = "none" metricNamesFilter = "metric-names-filter" + extLabelsInTSDB = "ext-labels-in-tsdb" ) func registerReceive(app *extkingpin.App) { @@ -152,6 +153,11 @@ func runReceive( multiTSDBOptions = append(multiTSDBOptions, receive.WithMetricNameFilterEnabled()) level.Info(logger).Log("msg", "metric name filter feature enabled") } + + if feature == extLabelsInTSDB { + multiTSDBOptions = append(multiTSDBOptions, receive.WithExternalLabelsInTSDB()) + level.Info(logger).Log("msg", "external labels in TSDB feature enabled. This will make Receive dump the head block if external labels are changed.") + } } rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), conf.rwServerCert, conf.rwServerKey, conf.rwServerClientCA, conf.rwServerTlsMinVersion) @@ -1100,7 +1106,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("receive.otlp-enable-target-info", "Enables target information in OTLP metrics ingested by Receive. If enabled, it converts the resource to the target info metric").Default("true").BoolVar(&rc.otlpEnableTargetInfo) cmd.Flag("receive.otlp-promote-resource-attributes", "(Repeatable) Resource attributes to include in OTLP metrics ingested by Receive.").Default("").StringsVar(&rc.otlpResourceAttributes) - rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings() + rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+","+extLabelsInTSDB+".").Default("").Strings() cmd.Flag("receive.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled."). Default("20").IntVar(&rc.lazyRetrievalMaxBufferedResponses) diff --git a/docs/components/receive.md b/docs/components/receive.md index a1acc848781..806aa246f40 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -388,6 +388,18 @@ func (h *Handler) writeQuorum() int { So, if the replication factor is 2 then at least one write must succeed. With RF=3, two writes must succeed, and so on. +## Feature Flags + +### metric-names-filter + +If enabled then every 15 seconds Receiver will query all available metric names in each tenant and build a bloom filter from them. + +This allows filtering out certain tenants from queriers and thus it will not require spawning a Go routine for them. + +### ext-labels-in-tsdb + +If enabled then it will put the current external labels as "normal" labels inside of the TSDB. This also adds a special marker to the meta files in blocks so that it would be known whether external labels are part of the series inside of the TSDB. + ## Flags ```$ mdox-exec="thanos receive --help" @@ -664,7 +676,7 @@ Flags: OTLP metrics ingested by Receive. --enable-feature= ... Comma separated experimental feature names to enable. The current list of features is - metric-names-filter. + metric-names-filter,ext-labels-in-tsdb. --receive.lazy-retrieval-max-buffered-responses=20 The lazy retrieval strategy can buffer up to this number of responses. This is to limit the diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index 63a56202d17..217363d1371 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -59,6 +59,11 @@ const ( // that the block has been migrated to parquet format and can be safely ignored // by store gateways. ParquetMigratedExtensionKey = "parquet_migrated" + + // ExtLabelsInTSDBKey is the key used in block extensions to indicate that + // external labels have been put in the TSDB and the Series() API can + // stream. + ExtLabelsInTSDBKey = "ext_labels_in_tsdb" ) // Meta describes the a block's meta. It wraps the known TSDB meta structure and diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 4bcf0dfd5c1..6c3ddd2bfb8 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -72,6 +72,7 @@ type MultiTSDB struct { exemplarClients map[string]*exemplars.TSDB metricNameFilterEnabled bool + extLabelsInTSDB bool headExpandedPostingsCacheSize uint64 blockExpandedPostingsCacheSize uint64 @@ -80,6 +81,14 @@ type MultiTSDB struct { // MultiTSDBOption is a functional option for MultiTSDB. type MultiTSDBOption func(mt *MultiTSDB) +// WithExternalLabelsInTSDB enables putting external labels in the TSDB. +// This permits streaming from the TSDB to the querier. +func WithExternalLabelsInTSDB() MultiTSDBOption { + return func(s *MultiTSDB) { + s.extLabelsInTSDB = true + } +} + // WithMetricNameFilterEnabled enables metric name filtering on TSDB clients. func WithMetricNameFilterEnabled() MultiTSDBOption { return func(s *MultiTSDB) { @@ -302,10 +311,13 @@ func (t *tenant) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} { return deletable } -func newTenant() *tenant { +func newTenant(extLabels labels.Labels, addExtLabels bool) *tenant { return &tenant{ - readyS: &ReadyStorage{}, - mtx: &sync.RWMutex{}, + readyS: &ReadyStorage{ + extLabels: extLabels, + addExtLabels: addExtLabels, + }, + mtx: &sync.RWMutex{}, } } @@ -705,6 +717,71 @@ func (t *MultiTSDB) TenantStats(limit int, statsByLabelName string, tenantIDs .. return result } +func (t *MultiTSDB) getLastBlockPath(dataDir string, s *tsdb.DB) string { + bls := s.Blocks() + if len(bls) == 0 { + return "" + } + + sort.Slice(bls, func(i, j int) bool { + return bls[i].MinTime() > bls[j].MinTime() + }) + + lastBlock := bls[0] + + return path.Join(dataDir, lastBlock.Meta().ULID.String()) + +} + +func (t *MultiTSDB) maybePruneHead(dataDir, tenantID, lastMetaPath string, curLset labels.Labels, pruneHead func() error) error { + if !t.extLabelsInTSDB { + return nil + } + + if lastMetaPath == "" { + return nil + } + + m, err := metadata.ReadFromDir(lastMetaPath) + if err != nil { + return fmt.Errorf("reading meta %s: %w", lastMetaPath, err) + } + + oldLset := labels.FromMap(m.Thanos.Labels) + if labels.Equal(oldLset, curLset) { + return nil + } + + level.Info(t.logger).Log("msg", "changed external labelset detected, dumping the head block", "newLset", curLset.String(), "oldLset", oldLset.String()) + + if err := pruneHead(); err != nil { + return fmt.Errorf("flushing head: %w", err) + } + + if t.bucket != nil { + logger := log.With(t.logger, "tenant", tenantID, "oldLset", oldLset.String()) + reg := NewUnRegisterer(prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg)) + + ship := shipper.New( + t.bucket, + dataDir, + shipper.WithLogger(logger), + shipper.WithRegisterer(reg), + shipper.WithSource(metadata.ReceiveSource), + shipper.WithHashFunc(t.hashFunc), + shipper.WithMetaFileName(shipper.DefaultMetaFilename), + shipper.WithLabels(func() labels.Labels { return oldLset }), + shipper.WithAllowOutOfOrderUploads(t.allowOutOfOrderUpload), + shipper.WithSkipCorruptedBlocks(t.skipCorruptedBlocks), + ) + if _, err := ship.Sync(context.Background()); err != nil { + return fmt.Errorf("syncing head for old label set: %w", err) + } + } + + return nil +} + func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant) error { reg := prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg) reg = NewUnRegisterer(reg) @@ -754,19 +831,35 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant t.removeTenantLocked(tenantID) return err } + + if err := t.maybePruneHead(dataDir, tenantID, t.getLastBlockPath(dataDir, s), lset, func() error { return t.flushHead(s) }); err != nil { + return err + } + var ship *shipper.Shipper if t.bucket != nil { - ship = shipper.New( - t.bucket, - dataDir, - shipper.WithLogger(logger), + shipperOpts := []shipper.Option{} + + shipperOpts = append(shipperOpts, shipper.WithLogger(logger), shipper.WithRegisterer(reg), shipper.WithSource(metadata.ReceiveSource), shipper.WithHashFunc(t.hashFunc), shipper.WithMetaFileName(shipper.DefaultMetaFilename), shipper.WithLabels(func() labels.Labels { return lset }), shipper.WithAllowOutOfOrderUploads(t.allowOutOfOrderUpload), - shipper.WithSkipCorruptedBlocks(t.skipCorruptedBlocks), + shipper.WithSkipCorruptedBlocks(t.skipCorruptedBlocks)) + + if t.extLabelsInTSDB { + shipperOpts = append(shipperOpts, shipper.WithExtensions( + map[string]any{ + metadata.ExtLabelsInTSDBKey: "", + }, + )) + } + ship = shipper.New( + t.bucket, + dataDir, + shipperOpts..., ) } var options []store.TSDBStoreOption @@ -776,7 +869,10 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant if t.matcherCache != nil { options = append(options, store.WithMatcherCacheInstance(t.matcherCache)) } + options = append(options, store.WithExtLabelsInTSDB(t.extLabelsInTSDB)) + tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset), reg.(*UnRegisterer)) + t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil level.Info(logger).Log("msg", "TSDB is now ready") return nil @@ -805,7 +901,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan return tenant, nil } - tenant = newTenant() + tenant = newTenant(t.labels, t.extLabelsInTSDB) t.addTenantUnlocked(tenantID, tenant) t.mtx.Unlock() @@ -866,10 +962,12 @@ var ErrNotReady = errors.New("TSDB not ready") // ReadyStorage implements the Storage interface while allowing to set the actual // storage at a later point in time. -// TODO: Replace this with upstream Prometheus implementation when it is exposed. type ReadyStorage struct { mtx sync.RWMutex a *adapter + + extLabels labels.Labels + addExtLabels bool } // Set the storage. @@ -920,9 +1018,39 @@ func (s *ReadyStorage) ExemplarQuerier(ctx context.Context) (storage.ExemplarQue return nil, ErrNotReady } +type wrappingAppender struct { + addLabels labels.Labels + storage.Appender + gr storage.GetRef +} + +var _ storage.Appender = (*wrappingAppender)(nil) +var _ storage.GetRef = (*wrappingAppender)(nil) + +func (w *wrappingAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) { + return w.gr.GetRef(labelpb.ExtendSortedLabels(lset, w.addLabels), hash) +} + +func (w *wrappingAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + l = labelpb.ExtendSortedLabels(l, w.addLabels) + return w.Appender.Append(ref, l, t, v) +} + // Appender implements the Storage interface. func (s *ReadyStorage) Appender(ctx context.Context) (storage.Appender, error) { if x := s.get(); x != nil { + if s.addExtLabels { + app, err := x.Appender(ctx) + if err != nil { + return nil, err + } + + return &wrappingAppender{ + Appender: app, + gr: app.(storage.GetRef), + addLabels: s.extLabels, + }, nil + } return x.Appender(ctx) } return nil, ErrNotReady diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index f95cb38b9f3..580f55c674b 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -5,6 +5,7 @@ package receive import ( "context" + "encoding/json" "fmt" "io" "math" @@ -959,3 +960,69 @@ func TestMultiTSDBDoesNotDeleteNotUploadedBlocks(t *testing.T) { }, tenant.blocksToDelete(nil)) }) } + +func TestDumpsIfUnequalLabels(t *testing.T) { + var dumped = false + + dumpHead := func() error { //nolint:unparam + dumped = true + return nil + } + + td := t.TempDir() + + ul := ulid.MustNewDefault(time.Now()) + require.NoError(t, os.MkdirAll( + path.Join(td, ul.String()), os.ModePerm, + )) + + m := metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ + ULID: ul, + Version: 1, + }, + Thanos: metadata.Thanos{ + Version: 1, + Labels: map[string]string{ + "foo": "bar", + }, + }, + } + + mm, err := json.Marshal(m) + require.NoError(t, err) + + require.NoError(t, os.WriteFile(path.Join(td, ul.String(), metadata.MetaFilename), mm, os.ModePerm)) + + t.Run("unequal labels", func(t *testing.T) { + m := &MultiTSDB{ + extLabelsInTSDB: true, + logger: log.NewNopLogger(), + } + + require.NoError(t, m.maybePruneHead(td, "foo", path.Join(td, ul.String()), labels.FromStrings("aa", "bb"), dumpHead)) + require.True(t, dumped) + + m.extLabelsInTSDB = false + dumped = false + + require.NoError(t, m.maybePruneHead(td, "foo", path.Join(td, ul.String()), labels.FromStrings("aa", "bb"), dumpHead)) + require.False(t, dumped) + }) + + t.Run("equal labels", func(t *testing.T) { + m := &MultiTSDB{ + extLabelsInTSDB: true, + logger: log.NewNopLogger(), + } + + dumped = false + require.NoError(t, m.maybePruneHead(td, "foo", path.Join(td, ul.String()), labels.FromStrings("foo", "bar"), dumpHead)) + require.False(t, dumped) + + m.extLabelsInTSDB = false + require.NoError(t, m.maybePruneHead(td, "foo", path.Join(td, ul.String()), labels.FromStrings("foo", "bar"), dumpHead)) + require.False(t, dumped) + }) + +} diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 5fbabfa9900..72bd579f72a 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -85,6 +85,7 @@ type Shipper struct { allowOutOfOrderUploads bool skipCorruptedBlocks bool hashFunc metadata.HashFunc + withExtensions any labels func() labels.Labels mtx sync.RWMutex @@ -104,6 +105,7 @@ type shipperOptions struct { uploadCompacted bool allowOutOfOrderUploads bool skipCorruptedBlocks bool + withExtensions any } type Option func(*shipperOptions) @@ -171,6 +173,13 @@ func WithSkipCorruptedBlocks(skip bool) Option { } } +// WithExtensions adds the given extensions to the uploaded blocks. +func WithExtensions(extensions any) Option { + return func(o *shipperOptions) { + o.withExtensions = extensions + } +} + func applyOptions(opts []Option) *shipperOptions { so := new(shipperOptions) for _, o := range opts { @@ -208,6 +217,7 @@ func New(bucket objstore.Bucket, dir string, opts ...Option) *Shipper { allowOutOfOrderUploads: options.allowOutOfOrderUploads, skipCorruptedBlocks: options.skipCorruptedBlocks, uploadCompacted: options.uploadCompacted, + withExtensions: options.withExtensions, hashFunc: options.hashFunc, metadataFilePath: filepath.Join(dir, filepath.Clean(options.metaFileName)), } @@ -460,6 +470,7 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error { } meta.Thanos.Source = s.source meta.Thanos.SegmentFiles = block.GetSegmentFiles(updir) + meta.Thanos.Extensions = s.withExtensions if err := meta.WriteToDir(s.logger, updir); err != nil { return errors.Wrap(err, "write meta file") } diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index a696b3a8804..c5ded522637 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -53,6 +53,10 @@ type mockedStartTimeDB struct { startTime int64 } +func (db *mockedStartTimeDB) Blocks() []*tsdb.Block { + return []*tsdb.Block{} +} + func (db *mockedStartTimeDB) StartTime() (int64, error) { return db.startTime, nil } func TestProxyStore_TSDBInfos(t *testing.T) { diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 30a985019c8..7b44203032f 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -20,10 +20,13 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" @@ -42,6 +45,7 @@ const ( type TSDBReader interface { storage.ChunkQueryable StartTime() (int64, error) + Blocks() []*tsdb.Block } // TSDBStoreOption is a functional option for TSDBStore. @@ -61,6 +65,12 @@ func WithMatcherCacheInstance(cache storecache.MatchersCache) TSDBStoreOption { } } +func WithExtLabelsInTSDB(val bool) TSDBStoreOption { + return func(s *TSDBStore) { + s.addExtLabelsToTSDB = val + } +} + // TSDBStore implements the store API against a local TSDB instance. // It attaches the provided external labels to all results. It only responds with raw data // and does not support downsampling. @@ -77,6 +87,10 @@ type TSDBStore struct { storeFilter filter.StoreFilter mtx sync.RWMutex close func() + + addExtLabelsToTSDB bool + + shouldNotBuffer atomic.Bool storepb.UnimplementedStoreServer } @@ -128,9 +142,12 @@ func NewTSDBStore( option(st) } - if st.startStoreFilterUpdate { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + st.close = cancel + + t := time.NewTicker(storeFilterUpdateInterval) + if st.startStoreFilterUpdate { updateFilter := func(ctx context.Context) { vals, err := st.LabelValues(ctx, &storepb.LabelValuesRequest{ Label: model.MetricNameLabel, @@ -143,11 +160,8 @@ func NewTSDBStore( st.storeFilter.ResetAndSet(vals.Values...) } - st.close = cancel updateFilter(ctx) - t := time.NewTicker(storeFilterUpdateInterval) - go func() { for { select { @@ -160,6 +174,54 @@ func NewTSDBStore( }() } + if st.addExtLabelsToTSDB { + allBlocksHaveExtension := func() { + var allBlocksHaveExtension = true + for _, b := range st.db.Blocks() { + m, err := metadata.ReadFromDir(b.Dir()) + if err != nil { + level.Error(logger).Log("msg", "failed to read meta", "err", err, "block", b.Dir()) + return + } + + exts := m.Thanos.Extensions + if exts == nil { + allBlocksHaveExtension = false + break + } + + mapExts, ok := exts.(map[string]any) + if !ok { + allBlocksHaveExtension = false + break + } + + if _, ok := mapExts[metadata.ExtLabelsInTSDBKey]; !ok { + allBlocksHaveExtension = false + break + } + } + + if !allBlocksHaveExtension { + level.Warn(logger).Log("msg", "not all blocks have the external labels in TSDB extension so buffering is still enabled; will take at least the retention period to add the extensions everywhere") + } else { + st.shouldNotBuffer.Store(true) + + } + } + + go func() { + for { + select { + case <-t.C: + allBlocksHaveExtension() + case <-ctx.Done(): + return + } + } + }() + } + return st } @@ -252,7 +314,12 @@ func (s *TSDBStore) SeriesLocal(ctx context.Context, r *storepb.SeriesRequest) ( func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { var srv flushableServer if fs, ok := seriesSrv.(flushableServer); !ok { - srv = newFlushableServer(seriesSrv, sortingStrategyStore) + var sortingStrategy = sortingStrategyStore + if s.shouldNotBuffer.Load() { + sortingStrategy = sortingStrategyNone + } + + srv = newFlushableServer(seriesSrv, sortingStrategy) } else { srv = fs } @@ -304,7 +371,13 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_Ser for set.Next() { series := set.At() - completeLabelset := labelpb.ExtendSortedLabels(rmLabels(series.Labels(), extLsetToRemove), finalExtLset) + var completeLabelset labels.Labels + if s.shouldNotBuffer.Load() { + completeLabelset = series.Labels() + } else { + completeLabelset = labelpb.ExtendSortedLabels(rmLabels(series.Labels(), extLsetToRemove), finalExtLset) + } + if !shardMatcher.MatchesLabels(completeLabelset) { continue } diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index fb7db911886..15b09b64573 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -578,6 +578,8 @@ type ReceiveBuilder struct { nativeHistograms bool labels []string tenantSplitLabel string + + extLabelsInTSDB bool } func NewReceiveBuilder(e e2e.Environment, name string) *ReceiveBuilder { @@ -654,6 +656,11 @@ func (r *ReceiveBuilder) WithNativeHistograms() *ReceiveBuilder { return r } +func (r *ReceiveBuilder) WithExtLabelsInTSDB() *ReceiveBuilder { + r.extLabelsInTSDB = true + return r +} + // Init creates a Thanos Receive instance. // If ingestion is enabled it will be configured for ingesting samples. // If routing is configured (i.e. hashring configuration is provided) it routes samples to other receivers. @@ -676,6 +683,10 @@ func (r *ReceiveBuilder) Init() *e2eobs.Observable { "--tsdb.too-far-in-future.time-window": "5m", } + if r.extLabelsInTSDB { + args["--enable-feature"] = "ext-labels-in-tsdb" + } + if r.tenantSplitLabel != "" { args["--receive.split-tenant-label-name"] = r.tenantSplitLabel } diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index c7aee18643a..8b35d611e01 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -1211,7 +1211,7 @@ func TestReceiveCpnp(t *testing.T) { testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) - i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().WithExemplarsInMemStorage(100).Init() + i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().WithExemplarsInMemStorage(100).WithExtLabelsInTSDB().Init() testutil.Ok(t, e2e.StartAndWaitReady(i)) h := receive.HashringConfig{ @@ -1282,5 +1282,4 @@ func TestReceiveCpnp(t *testing.T) { return nil }, ) - }