From ef02b94ba9912dd99b2ec873846310b8c0faeae7 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Mon, 8 Dec 2025 22:22:17 -0800 Subject: [PATCH] upgrade parquet common version Signed-off-by: yeya24 --- go.mod | 2 +- go.sum | 4 +- pkg/querier/parquet_queryable.go | 9 +- pkg/storegateway/parquet_bucket_stores.go | 11 +- .../parquet-common/convert/convert.go | 159 ++++++++++++++++- .../queryable/parquet_queryable.go | 122 +++++++++---- .../parquet-common/search/constraint.go | 11 +- .../parquet-common/search/constraint_cache.go | 71 ++++++++ .../parquet-common/search/materialize.go | 166 ++++++++++++++++-- .../parquet-common/storage/parquet_shard.go | 14 ++ vendor/modules.txt | 2 +- 11 files changed, 505 insertions(+), 66 deletions(-) create mode 100644 vendor/github.com/prometheus-community/parquet-common/search/constraint_cache.go diff --git a/go.mod b/go.mod index d96966be12e..a5f5e66dee1 100644 --- a/go.mod +++ b/go.mod @@ -87,7 +87,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 - github.com/prometheus-community/parquet-common v0.0.0-20251023184424-4f977ece2a46 + github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71 github.com/prometheus/client_golang/exp v0.0.0-20250914183048-a974e0d45e0a github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 diff --git a/go.sum b/go.sum index 197819d6a8d..e8d973c9536 100644 --- a/go.sum +++ b/go.sum @@ -1634,8 +1634,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20251023184424-4f977ece2a46 h1:ZzUcddfRLCewtFsx1d/XeyKVmQDsrJLYnlcamNopoYk= -github.com/prometheus-community/parquet-common v0.0.0-20251023184424-4f977ece2a46/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY= +github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71 h1:BwrzRNGy0GbnBA7rQd85G6NuFvydvwTXxRB9XiA5TXk= +github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.29.0 h1:/ET4NmAGx2Dv9kStrXIBqBgHyiSgIk4OetY+hoZRfgc= diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index db737782569..40be5ff8997 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -141,6 +141,11 @@ func NewParquetQueryable( } cDecoder := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()) + // This is a noop cache for now as we didn't expose any config to enable this cache. + rrConstraintsCache := search.NewConstraintRowRangeCacheSyncMap() + constraintCacheFunc := func(ctx context.Context) (search.RowRangesForConstraintsCache, error) { + return rrConstraintsCache, nil + } parquetQueryableOpts := []queryable.QueryableOpts{ queryable.WithRowCountLimitFunc(func(ctx context.Context) int64 { @@ -195,7 +200,7 @@ func NewParquetQueryable( return nil }), } - parquetQueryable, err := queryable.NewParquetQueryable(cDecoder, func(ctx context.Context, mint, maxt int64) ([]parquet_storage.ParquetShard, error) { + parquetQueryable, err := queryable.NewParquetQueryable(func(ctx context.Context, mint, maxt int64) ([]parquet_storage.ParquetShard, error) { userID, err := users.TenantID(ctx) if err != nil { return nil, err @@ -245,7 +250,7 @@ func NewParquetQueryable( } return shards, errGroup.Wait() - }, parquetQueryableOpts...) + }, constraintCacheFunc, cDecoder, parquetQueryableOpts...) p := &parquetQueryableWithFallback{ subservices: manager, diff --git a/pkg/storegateway/parquet_bucket_stores.go b/pkg/storegateway/parquet_bucket_stores.go index c82cf606183..636e9e9c0c9 100644 --- a/pkg/storegateway/parquet_bucket_stores.go +++ b/pkg/storegateway/parquet_bucket_stores.go @@ -286,7 +286,7 @@ func (p *parquetBucketStore) newParquetBlock(ctx context.Context, name string, l if err != nil { return nil, err } - m, err := search.NewMaterializer(s, d, shard, p.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, search.NoopMaterializedSeriesFunc, materializedLabelsFilterCallback) + m, err := search.NewMaterializer(s, d, shard, p.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, search.NoopMaterializedSeriesFunc, materializedLabelsFilterCallback, false) if err != nil { return nil, err } @@ -361,7 +361,8 @@ func (b *parquetBlock) Query(ctx context.Context, mint, maxt int64, skipChunks b if err != nil { return err } - rr, err := search.Filter(ctx, b.shard, rgi, cs...) + // TODO: Add cache. + rr, err := search.Filter(ctx, b.shard, rgi, nil, cs...) if err != nil { return err } @@ -421,7 +422,8 @@ func (b *parquetBlock) LabelNames(ctx context.Context, limit int64, matchers []* if err != nil { return err } - rr, err := search.Filter(ctx, b.shard, rgi, cs...) + // TODO: Add cache. + rr, err := search.Filter(ctx, b.shard, rgi, nil, cs...) if err != nil { return err } @@ -461,7 +463,8 @@ func (b *parquetBlock) LabelValues(ctx context.Context, name string, limit int64 if err != nil { return err } - rr, err := search.Filter(ctx, b.shard, rgi, cs...) + // TODO: Add cache. + rr, err := search.Filter(ctx, b.shard, rgi, nil, cs...) if err != nil { return err } diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go index 41b800b65af..3780b4c3909 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go @@ -358,11 +358,26 @@ func ConvertTSDBBlock( opt(&cfg) } - logger.Info("sharding input series") - shardedRowReaders, err := shardedTSDBRowReaders(ctx, mint, maxt, cfg.colDuration.Milliseconds(), blocks, cfg) - if err != nil { - return 0, errors.Wrap(err, "failed to create sharded TSDB row readers") + var ( + rr *TSDBRowReader + shardedRowReaders []*TSDBRowReader + err error + ) + // If numRowGroups is not specified, we can use a single row reader. + if cfg.numRowGroups == math.MaxInt32 { + rr, err = singleTSDBRowReader(ctx, mint, maxt, cfg.colDuration.Milliseconds(), blocks, cfg) + if err != nil { + return 0, errors.Wrap(err, "failed to create TSDB row readers") + } + shardedRowReaders = []*TSDBRowReader{rr} + } else { + logger.Info("sharding input series") + shardedRowReaders, err = shardedTSDBRowReaders(ctx, mint, maxt, cfg.colDuration.Milliseconds(), blocks, cfg) + if err != nil { + return 0, errors.Wrap(err, "failed to create sharded TSDB row readers") + } } + defer func() { for _, rr := range shardedRowReaders { _ = rr.Close() @@ -424,6 +439,80 @@ type blockSeries struct { labels labels.Labels } +// singleTSDBRowReader is a shortcut when we know there is only one final shard. +// This can happen when numRowGroups is not specified. +func singleTSDBRowReader( + ctx context.Context, + mint, maxt, colDuration int64, + blocks []Convertible, + opts convertOpts, +) (*TSDBRowReader, error) { + var ( + seriesSets = make([]storage.ChunkSeriesSet, 0, len(blocks)) + closers = make([]io.Closer, 0, len(blocks)) + ok = false + ) + // If we fail to build the row reader, make sure we release resources. + // This could be either a controlled error or a panic. + defer func() { + if !ok { + for i := range closers { + _ = closers[i].Close() + } + } + }() + + b := schema.NewBuilder(mint, maxt, colDuration) + + compareFunc := func(a, b labels.Labels) int { + for _, lb := range opts.sortedLabels { + if c := strings.Compare(a.Get(lb), b.Get(lb)); c != 0 { + return c + } + } + + return labels.Compare(a, b) + } + + for _, blk := range blocks { + indexr, err := blk.Index() + if err != nil { + return nil, errors.Wrap(err, "failed to get index reader from block") + } + closers = append(closers, indexr) + + chunkr, err := blk.Chunks() + if err != nil { + return nil, errors.Wrap(err, "failed to get chunk reader from block") + } + closers = append(closers, chunkr) + + tombsr, err := blk.Tombstones() + if err != nil { + return nil, errors.Wrap(err, "failed to get tombstone reader from block") + } + closers = append(closers, tombsr) + lblns, err := indexr.LabelNames(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to get label names from index reader") + } + postings := sortedPostings(ctx, indexr, mint, maxt, opts.sortedLabels...) + seriesSet := tsdb.NewBlockChunkSeriesSet(blk.Meta().ULID, indexr, chunkr, tombsr, postings, mint, maxt, false) + seriesSets = append(seriesSets, seriesSet) + + b.AddLabelNameColumn(lblns...) + } + + cseriesSet := NewMergeChunkSeriesSet(seriesSets, compareFunc, storage.NewConcatenatingChunkSeriesMerger()) + + s, err := b.Build() + if err != nil { + return nil, fmt.Errorf("unable to build schema reader from block: %w", err) + } + ok = true + return newTSDBRowReader(ctx, closers, cseriesSet, s, opts), nil +} + func shardedTSDBRowReaders( ctx context.Context, mint, maxt, colDuration int64, @@ -673,3 +762,65 @@ func allChunksEmpty(chkBytes [][]byte) bool { } return true } + +func sortedPostings(ctx context.Context, indexr tsdb.IndexReader, mint, maxt int64, sortedLabels ...string) index.Postings { + p := tsdb.AllSortedPostings(ctx, indexr) + if len(sortedLabels) == 0 { + return p + } + + type s struct { + ref storage.SeriesRef + idx int + labels labels.Labels + } + series := make([]s, 0, 128) + chks := make([]chunks.Meta, 0, 128) + scratchBuilder := labels.NewScratchBuilder(10) + lb := labels.NewBuilder(labels.EmptyLabels()) + i := 0 +P: + for p.Next() { + scratchBuilder.Reset() + chks = chks[:0] + if err := indexr.Series(p.At(), &scratchBuilder, &chks); err != nil { + return index.ErrPostings(fmt.Errorf("unable to expand series: %w", err)) + } + hasChunks := slices.ContainsFunc(chks, func(chk chunks.Meta) bool { + return mint <= chk.MaxTime && chk.MinTime <= maxt + }) + if !hasChunks { + continue P + } + + lb.Reset(scratchBuilder.Labels()) + + series = append(series, s{labels: lb.Keep(sortedLabels...).Labels(), ref: p.At(), idx: i}) + i++ + } + + if err := p.Err(); err != nil { + return index.ErrPostings(fmt.Errorf("expand postings: %w", err)) + } + + slices.SortFunc(series, func(a, b s) int { + for _, lb := range sortedLabels { + if c := strings.Compare(a.labels.Get(lb), b.labels.Get(lb)); c != 0 { + return c + } + } + if a.idx < b.idx { + return -1 + } else if a.idx > b.idx { + return 1 + } + return 0 + }) + + // Convert back to list. + ep := make([]storage.SeriesRef, 0, len(series)) + for _, p := range series { + ep = append(ep, p.ref) + } + return index.NewListPostings(ep) +} diff --git a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go index 2b4d1590d88..f55a97523de 100644 --- a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go +++ b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go @@ -38,6 +38,8 @@ var tracer = otel.Tracer("parquet-common") type ShardsFinderFunction func(ctx context.Context, mint, maxt int64) ([]storage.ParquetShard, error) +type ConstraintCacheFunction func(ctx context.Context) (search.RowRangesForConstraintsCache, error) + type queryableOpts struct { concurrency int rowCountLimitFunc search.QuotaLimitFunc @@ -45,6 +47,8 @@ type queryableOpts struct { dataBytesLimitFunc search.QuotaLimitFunc materializedSeriesCallback search.MaterializedSeriesFunc materializedLabelsFilterCallback search.MaterializedLabelsFilterCallback + cacheRowRangesForConstraints bool + honorProjectionHints bool } var DefaultQueryableOpts = queryableOpts{ @@ -54,6 +58,8 @@ var DefaultQueryableOpts = queryableOpts{ dataBytesLimitFunc: search.NoopQuotaLimitFunc, materializedSeriesCallback: search.NoopMaterializedSeriesFunc, materializedLabelsFilterCallback: search.NoopMaterializedLabelsFilterCallback, + cacheRowRangesForConstraints: false, + honorProjectionHints: false, } type QueryableOpts func(*queryableOpts) @@ -102,13 +108,34 @@ func WithMaterializedLabelsFilterCallback(cb search.MaterializedLabelsFilterCall } } +// WithCacheRowRangesForConstraints set the concurrency that can be used to run the query +func WithCacheRowRangesForConstraints(cache bool) QueryableOpts { + return func(opts *queryableOpts) { + opts.cacheRowRangesForConstraints = cache + } +} + +// WithHonorProjectionHints enables or disables projection pushdown optimization. +// When enabled, only the labels specified in SelectHints.ProjectionLabels will be materialized. +func WithHonorProjectionHints(honor bool) QueryableOpts { + return func(opts *queryableOpts) { + opts.honorProjectionHints = honor + } +} + type parquetQueryable struct { - shardsFinder ShardsFinderFunction - d *schema.PrometheusParquetChunksDecoder - opts *queryableOpts + shardsFinder ShardsFinderFunction + constraintCacheFunc ConstraintCacheFunction + chunksDecoder *schema.PrometheusParquetChunksDecoder + opts *queryableOpts } -func NewParquetQueryable(d *schema.PrometheusParquetChunksDecoder, shardFinder ShardsFinderFunction, opts ...QueryableOpts) (prom_storage.Queryable, error) { +func NewParquetQueryable( + shardFinder ShardsFinderFunction, + constraintCacheFunc ConstraintCacheFunction, + chunksDecoder *schema.PrometheusParquetChunksDecoder, + opts ...QueryableOpts, +) (prom_storage.Queryable, error) { cfg := DefaultQueryableOpts for _, opt := range opts { @@ -116,27 +143,30 @@ func NewParquetQueryable(d *schema.PrometheusParquetChunksDecoder, shardFinder S } return &parquetQueryable{ - shardsFinder: shardFinder, - d: d, - opts: &cfg, + shardsFinder: shardFinder, + constraintCacheFunc: constraintCacheFunc, + chunksDecoder: chunksDecoder, + opts: &cfg, }, nil } func (p parquetQueryable) Querier(mint, maxt int64) (prom_storage.Querier, error) { return &parquetQuerier{ - mint: mint, - maxt: maxt, - shardsFinder: p.shardsFinder, - d: p.d, - opts: p.opts, + mint: mint, + maxt: maxt, + shardsFinder: p.shardsFinder, + constraintCacheFunc: p.constraintCacheFunc, + chunksDecoder: p.chunksDecoder, + opts: p.opts, }, nil } type parquetQuerier struct { - mint, maxt int64 - shardsFinder ShardsFinderFunction - d *schema.PrometheusParquetChunksDecoder - opts *queryableOpts + mint, maxt int64 + shardsFinder ShardsFinderFunction + constraintCacheFunc ConstraintCacheFunction + chunksDecoder *schema.PrometheusParquetChunksDecoder + opts *queryableOpts } func (p parquetQuerier) LabelValues(ctx context.Context, name string, hints *prom_storage.LabelHints, matchers ...*labels.Matcher) (result []string, annotations annotations.Annotations, err error) { @@ -325,7 +355,20 @@ func (p parquetQuerier) queryableShards(ctx context.Context, mint, maxt int64) ( chunkBytesQuota := search.NewQuota(p.opts.chunkBytesLimitFunc(ctx)) dataBytesQuota := search.NewQuota(p.opts.dataBytesLimitFunc(ctx)) for i, shard := range shards { - qb, err := newQueryableShard(p.opts, shard, p.d, rowCountQuota, chunkBytesQuota, dataBytesQuota) + var err error + + var constraintCache search.RowRangesForConstraintsCache + if p.constraintCacheFunc != nil { + constraintCache, err = p.constraintCacheFunc(ctx) + if err != nil { + return nil, err + } + } + + qb, err := newQueryableShard( + shard, constraintCache, p.chunksDecoder, + p.opts, rowCountQuota, chunkBytesQuota, dataBytesQuota, + ) if err != nil { return nil, err } @@ -335,25 +378,36 @@ func (p parquetQuerier) queryableShards(ctx context.Context, mint, maxt int64) ( } type queryableShard struct { - shard storage.ParquetShard - m *search.Materializer - concurrency int + shard storage.ParquetShard + constraintCache search.RowRangesForConstraintsCache + materializer *search.Materializer + concurrency int } -func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schema.PrometheusParquetChunksDecoder, rowCountQuota *search.Quota, chunkBytesQuota *search.Quota, dataBytesQuota *search.Quota) (*queryableShard, error) { - s, err := block.TSDBSchema() +func newQueryableShard( + shard storage.ParquetShard, + constraintCache search.RowRangesForConstraintsCache, + chunksDecoder *schema.PrometheusParquetChunksDecoder, + opts *queryableOpts, + rowCountQuota *search.Quota, + chunkBytesQuota *search.Quota, + dataBytesQuota *search.Quota, +) (*queryableShard, error) { + shardSchema, err := shard.TSDBSchema() if err != nil { return nil, err } - m, err := search.NewMaterializer(s, d, block, opts.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, opts.materializedSeriesCallback, opts.materializedLabelsFilterCallback) + materializer, err := search.NewMaterializer( + shardSchema, chunksDecoder, shard, opts.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, opts.materializedSeriesCallback, opts.materializedLabelsFilterCallback, opts.honorProjectionHints) if err != nil { return nil, err } return &queryableShard{ - shard: block, - m: m, - concurrency: opts.concurrency, + shard: shard, + constraintCache: constraintCache, + materializer: materializer, + concurrency: opts.concurrency, }, nil } @@ -377,7 +431,7 @@ func (b queryableShard) Query(ctx context.Context, sorted bool, sp *prom_storage if err != nil { return err } - rr, err := search.Filter(ctx, b.shard, rgi, cs...) + rr, err := search.Filter(ctx, b.shard, rgi, b.constraintCache, cs...) if err != nil { return err } @@ -386,7 +440,7 @@ func (b queryableShard) Query(ctx context.Context, sorted bool, sp *prom_storage return nil } - seriesSetIter, err := b.m.Materialize(ctx, sp, rgi, mint, maxt, skipChunks, rr) + seriesSetIter, err := b.materializer.Materialize(ctx, sp, rgi, mint, maxt, skipChunks, rr) if err != nil { return err } @@ -423,7 +477,7 @@ func (b queryableShard) Query(ctx context.Context, sorted bool, sp *prom_storage func (b queryableShard) LabelNames(ctx context.Context, limit int64, matchers []*labels.Matcher) ([]string, error) { if len(matchers) == 0 { - return b.m.MaterializeAllLabelNames(), nil + return b.materializer.MaterializeAllLabelNames(), nil } errGroup, ctx := errgroup.WithContext(ctx) @@ -441,11 +495,11 @@ func (b queryableShard) LabelNames(ctx context.Context, limit int64, matchers [] if err != nil { return err } - rr, err := search.Filter(ctx, b.shard, rgi, cs...) + rr, err := search.Filter(ctx, b.shard, rgi, b.constraintCache, cs...) if err != nil { return err } - series, err := b.m.MaterializeLabelNames(ctx, rgi, rr) + series, err := b.materializer.MaterializeLabelNames(ctx, rgi, rr) if err != nil { return err } @@ -481,11 +535,11 @@ func (b queryableShard) LabelValues(ctx context.Context, name string, limit int6 if err != nil { return err } - rr, err := search.Filter(ctx, b.shard, rgi, cs...) + rr, err := search.Filter(ctx, b.shard, rgi, b.constraintCache, cs...) if err != nil { return err } - series, err := b.m.MaterializeLabelValues(ctx, name, rgi, rr) + series, err := b.materializer.MaterializeLabelValues(ctx, name, rgi, rr) if err != nil { return err } @@ -509,7 +563,7 @@ func (b queryableShard) allLabelValues(ctx context.Context, name string, limit i for i := range b.shard.LabelsFile().RowGroups() { errGroup.Go(func() error { - series, err := b.m.MaterializeAllLabelValues(ctx, name, i) + series, err := b.materializer.MaterializeAllLabelValues(ctx, name, i) if err != nil { return err } diff --git a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go index cd5f1e29617..98037471098 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go @@ -163,7 +163,7 @@ func sortConstraintsBySortingColumns(cs []Constraint, sc []parquet.SortingColumn // // Returns a slice of RowRange that represent the rows satisfying all constraints, // or an error if any constraint fails to filter. -func Filter(ctx context.Context, f storage.ParquetShard, rgIdx int, cs ...Constraint) ([]RowRange, error) { +func Filter(ctx context.Context, f storage.ParquetShard, rgIdx int, cache RowRangesForConstraintsCache, cs ...Constraint) ([]RowRange, error) { rg := f.LabelsFile().RowGroups()[rgIdx] // Constraints for sorting columns are cheaper to evaluate, so we sort them first. @@ -177,6 +177,12 @@ func Filter(ctx context.Context, f storage.ParquetShard, rgIdx int, cs ...Constr g errgroup.Group ) + if cache != nil { + if rr, ok := cache.Get(ctx, f, rgIdx, cs); ok { + return rr, nil + } + } + // First pass prefilter with a quick index scan to find a superset of matching rows rr := []RowRange{{From: int64(0), Count: rg.NumRows()}} for i := range cs { @@ -211,6 +217,9 @@ func Filter(ctx context.Context, f storage.ParquetShard, rgIdx int, cs ...Constr return nil, fmt.Errorf("unable to do second pass filter: %w", err) } + if cache != nil { + _ = cache.Set(ctx, f, rgIdx, cs, res) + } return res, nil } diff --git a/vendor/github.com/prometheus-community/parquet-common/search/constraint_cache.go b/vendor/github.com/prometheus-community/parquet-common/search/constraint_cache.go new file mode 100644 index 00000000000..35dfe5750bf --- /dev/null +++ b/vendor/github.com/prometheus-community/parquet-common/search/constraint_cache.go @@ -0,0 +1,71 @@ +package search + +import ( + "context" + "io" + "strconv" + "strings" + "sync" + + "github.com/prometheus-community/parquet-common/storage" +) + +// RowRangesForConstraintsCache can be implemented to cache filter results for Constraint types +// The cache is keyed to the shard + row group, as row range indexes are zeroed to the row group. + +type RowRangesForConstraintsCache interface { + Get(ctx context.Context, shard storage.ParquetShard, rgIdx int, cs []Constraint) ([]RowRange, bool) + Set(ctx context.Context, shard storage.ParquetShard, rgIdx int, cs []Constraint, rr []RowRange) error + Delete(ctx context.Context, shard storage.ParquetShard, rgIdx int, cs []Constraint) error + io.Closer +} + +func constraintsCacheKey(shard storage.ParquetShard, rgIdx int, cs []Constraint) string { + // :rgidx-:::...: + s := make([]string, len(cs)+2) + s[0] = shard.Name() + s[1] = "rgidx-" + strconv.Itoa(rgIdx) + for i, c := range cs { + s[i+2] = c.String() + } + return strings.Join(s, ":") +} + +// ConstraintRowRangeCacheSyncMap implements a basic RowRangesForConstraintsCache with sync.Map. +// The sync.Map is optimized to offer write-once-read-many usage with minimal lock contention, +// which matches constraint filtering on the Parquet shard labels files which are not expected to change. +type ConstraintRowRangeCacheSyncMap struct { + m *sync.Map +} + +func NewConstraintRowRangeCacheSyncMap() *ConstraintRowRangeCacheSyncMap { + return &ConstraintRowRangeCacheSyncMap{ + m: &sync.Map{}, + } +} + +func (c ConstraintRowRangeCacheSyncMap) Get(_ context.Context, shard storage.ParquetShard, rgIdx int, cs []Constraint) ([]RowRange, bool) { + key := constraintsCacheKey(shard, rgIdx, cs) + rr, ok := c.m.Load(key) + if ok { + return rr.([]RowRange), ok + } + return nil, ok +} + +func (c ConstraintRowRangeCacheSyncMap) Set(_ context.Context, shard storage.ParquetShard, rgIdx int, cs []Constraint, rr []RowRange) error { + key := constraintsCacheKey(shard, rgIdx, cs) + c.m.Store(key, rr) + return nil +} + +func (c ConstraintRowRangeCacheSyncMap) Delete(_ context.Context, shard storage.ParquetShard, rgIdx int, cs []Constraint) error { + key := constraintsCacheKey(shard, rgIdx, cs) + c.m.Delete(key) + return nil +} + +func (c ConstraintRowRangeCacheSyncMap) Close() error { + c.m.Clear() + return nil +} diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index b8baa92d179..24daad7ef26 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -54,6 +54,8 @@ type Materializer struct { materializedSeriesCallback MaterializedSeriesFunc materializedLabelsFilterCallback MaterializedLabelsFilterCallback + + honorProjectionHints bool } // MaterializedSeriesFunc is a callback function that can be used to add limiter or statistic logics for @@ -93,6 +95,7 @@ func NewMaterializer(s *schema.TSDBSchema, dataBytesQuota *Quota, materializeSeriesCallback MaterializedSeriesFunc, materializeLabelsFilterCallback MaterializedLabelsFilterCallback, + honorProjectionHints bool, ) (*Materializer, error) { colIdx, ok := block.LabelsFile().Schema().Lookup(schema.ColIndexesColumn) if !ok { @@ -122,6 +125,7 @@ func NewMaterializer(s *schema.TSDBSchema, dataBytesQuota: dataBytesQuota, materializedSeriesCallback: materializeSeriesCallback, materializedLabelsFilterCallback: materializeLabelsFilterCallback, + honorProjectionHints: honorProjectionHints, }, nil } @@ -146,7 +150,11 @@ func (m *Materializer) Materialize(ctx context.Context, hints *prom_storage.Sele attribute.Int("row_ranges_count", len(rr)), ) - sLbls, err := m.MaterializeAllLabels(ctx, rgi, rr) + if err := m.checkRowCountQuota(rr); err != nil { + span.SetAttributes(attribute.String("quota_failure", "row_count")) + return nil, err + } + sLbls, err := m.MaterializeLabels(ctx, hints, rgi, rr) if err != nil { return nil, errors.Wrapf(err, "error materializing labels") } @@ -371,8 +379,11 @@ func (m *Materializer) MaterializeAllLabelValues(ctx context.Context, name strin return r, nil } -func (m *Materializer) MaterializeAllLabels(ctx context.Context, rgi int, rr []RowRange) ([][]labels.Label, error) { - ctx, span := tracer.Start(ctx, "Materializer.MaterializeAllLabels") +// MaterializeLabels retrieves series labels, optionally filtered by projection hints. +// Returns all labels when projection is disabled, or only requested labels in the hints when enabled. +// The s_series_hash column is included only when explicitly requested. +func (m *Materializer) MaterializeLabels(ctx context.Context, hints *prom_storage.SelectHints, rgi int, rr []RowRange) ([][]labels.Label, error) { + ctx, span := tracer.Start(ctx, "Materializer.MaterializeLabels") var err error defer func() { if err != nil { @@ -387,11 +398,13 @@ func (m *Materializer) MaterializeAllLabels(ctx context.Context, rgi int, rr []R attribute.Int("row_group_index", rgi), attribute.Int("row_ranges_count", len(rr)), attribute.Int64("total_rows_requested", totalRowsRequested), + attribute.Bool("honor_projection_hints", m.honorProjectionHints), ) - if err := m.checkRowCountQuota(rr); err != nil { - span.SetAttributes(attribute.String("quota_failure", "row_count")) - return nil, err + // If projection hints are not enabled, materialize all labels by letting it fall through to default case + if !m.honorProjectionHints { + span.SetAttributes(attribute.String("projection_mode", "all_labels")) + hints = nil } // Get column indexes for all rows in the specified ranges @@ -400,13 +413,87 @@ func (m *Materializer) MaterializeAllLabels(ctx context.Context, rgi int, rr []R return nil, errors.Wrap(err, "failed to get column indexes") } - // Build mapping of which columns are needed for which row ranges - columnToRowRanges, rowRangeToStartIndex, err := m.buildColumnMappings(rr, columnIndexes) + var ( + colsMap = make(map[int][]RowRange, 10) + needsHash bool + ) + + labelsSchema := m.b.LabelsFile().Schema() + + columnToRowRanges, _, err := m.buildColumnMappings(rr, columnIndexes) if err != nil { - return nil, errors.Wrap(err, "failed to build collum mapping") + return nil, errors.Wrap(err, "failed to build column mapping") } - // Materialize label values for each column concurrently + switch { + case hints != nil && hints.ProjectionInclude: + span.SetAttributes(attribute.String("projection_mode", "include")) + span.SetAttributes(attribute.StringSlice("projection_labels", hints.ProjectionLabels)) + + for _, labelName := range hints.ProjectionLabels { + if labelName == schema.SeriesHashColumn { + needsHash = true + col, ok := labelsSchema.Lookup(schema.SeriesHashColumn) + if ok { + colsMap[col.ColumnIndex] = []RowRange{} + } + } else { + col, ok := labelsSchema.Lookup(schema.LabelToColumn(labelName)) + if !ok { + continue + } + colsMap[col.ColumnIndex] = []RowRange{} + } + } + + for columnId := range colsMap { + if rowRanges, exists := columnToRowRanges[columnId]; exists { + colsMap[columnId] = rowRanges + } + } + + case hints != nil && !hints.ProjectionInclude: + span.SetAttributes(attribute.String("projection_mode", "exclude")) + span.SetAttributes(attribute.StringSlice("projection_labels", hints.ProjectionLabels)) + + for columnId, rowRanges := range columnToRowRanges { + colsMap[columnId] = rowRanges + } + + seriesHashExcluded := false + for _, labelName := range hints.ProjectionLabels { + if labelName == schema.SeriesHashColumn { + seriesHashExcluded = true + break + } + } + needsHash = !seriesHashExcluded + + for _, labelName := range hints.ProjectionLabels { + if labelName == schema.SeriesHashColumn { + col, ok := labelsSchema.Lookup(schema.SeriesHashColumn) + if ok { + delete(colsMap, col.ColumnIndex) + } + } else { + col, ok := labelsSchema.Lookup(schema.LabelToColumn(labelName)) + if !ok { + continue + } + delete(colsMap, col.ColumnIndex) + } + } + + default: + span.SetAttributes(attribute.String("projection_mode", "all_labels_fallback")) + // Materialize all columns when no projection hints are provided + for columnId, rowRanges := range columnToRowRanges { + colsMap[columnId] = rowRanges + } + } + + span.SetAttributes(attribute.Int("columns_to_materialize", len(colsMap))) + results := make([][]labels.Label, len(columnIndexes)) mtx := sync.Mutex{} errGroup := &errgroup.Group{} @@ -415,9 +502,16 @@ func (m *Materializer) MaterializeAllLabels(ctx context.Context, rgi int, rr []R span.SetAttributes(attribute.Int("goroutine_pool_limit", m.concurrency)) - for columnIndex, rowRanges := range columnToRowRanges { + rowRangeToStartIndex := make(map[RowRange]int, len(rr)) + resultIndex := 0 + for _, rowRange := range rr { + rowRangeToStartIndex[rowRange] = resultIndex + resultIndex += int(rowRange.Count) + } + + for columnIndex, rowRanges := range colsMap { errGroup.Go(func() error { - ctx, span := tracer.Start(ctx, "Materializer.materializeAllLabels.column") + ctx, span := tracer.Start(ctx, "Materializer.MaterializeLabels.column") var err error defer func() { if err != nil { @@ -427,9 +521,19 @@ func (m *Materializer) MaterializeAllLabels(ctx context.Context, rgi int, rr []R span.End() }() - // Extract label name from column schema columnChunk := labelsRowGroup.ColumnChunks()[columnIndex] - labelName, ok := schema.ExtractLabelFromColumn(m.b.LabelsFile().Schema().Columns()[columnIndex][0]) + columnName := labelsSchema.Columns()[columnIndex][0] + + var labelName string + var ok bool + + if columnName == schema.SeriesHashColumn { + labelName = schema.SeriesHashColumn + ok = true + } else { + labelName, ok = schema.ExtractLabelFromColumn(columnName) + } + if !ok { return fmt.Errorf("column %d not found in schema", columnIndex) } @@ -440,7 +544,6 @@ func (m *Materializer) MaterializeAllLabels(ctx context.Context, rgi int, rr []R attribute.Int("row_ranges_count", len(rowRanges)), ) - // Materialize the actual label values for this column labelValues, err := m.materializeColumnSlice(ctx, m.b.LabelsFile(), rgi, rowRanges, columnChunk, false) if err != nil { return errors.Wrap(err, "failed to materialize label values") @@ -448,7 +551,6 @@ func (m *Materializer) MaterializeAllLabels(ctx context.Context, rgi int, rr []R span.SetAttributes(attribute.Int("label_values_count", len(labelValues))) - // Assign label values to the appropriate result positions mtx.Lock() defer mtx.Unlock() @@ -469,13 +571,43 @@ func (m *Materializer) MaterializeAllLabels(ctx context.Context, rgi int, rr []R return nil }) } + + var hashes []parquet.Value + if needsHash { + errGroup.Go(func() error { + col, ok := labelsSchema.Lookup(schema.SeriesHashColumn) + if !ok { + return fmt.Errorf("unable to find series hash column %q", schema.SeriesHashColumn) + } + columnChunk := labelsRowGroup.ColumnChunks()[col.ColumnIndex] + + h, err := m.materializeColumnSlice(ctx, m.b.LabelsFile(), rgi, rr, columnChunk, false) + if err != nil { + return fmt.Errorf("unable to materialize hash values: %w", err) + } + hashes = h + return nil + }) + } + if err := errGroup.Wait(); err != nil { return nil, err } + if needsHash { + for i := range hashes { + if !hashes[i].IsNull() { + results[i] = append(results[i], labels.Label{ + Name: schema.SeriesHashColumn, + Value: util.YoloString(hashes[i].ByteArray()), + }) + } + } + } + span.SetAttributes( attribute.Int("materialized_labels_count", len(results)), - attribute.Int("columns_materialized", len(columnToRowRanges)), + attribute.Int("columns_materialized", len(colsMap)), ) return results, nil } diff --git a/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go b/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go index fa4dd046130..ac5cee19aed 100644 --- a/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go +++ b/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go @@ -185,6 +185,8 @@ func OpenFromFile(ctx context.Context, path string, opts ...FileOption) (*Parque } type ParquetShard interface { + Name() string + ShardIdx() int LabelsFile() ParquetFileView ChunksFile() ParquetFileView TSDBSchema() (*schema.TSDBSchema, error) @@ -219,6 +221,8 @@ func (o *ParquetLocalFileOpener) Open(ctx context.Context, name string, opts ... } type ParquetShardOpener struct { + name string + shardIdx int labelsFile, chunksFile *ParquetFile schema *schema.TSDBSchema o sync.Once @@ -260,11 +264,21 @@ func NewParquetShardOpener( } return &ParquetShardOpener{ + name: name, + shardIdx: shard, labelsFile: labelsFile, chunksFile: chunksFile, }, nil } +func (s *ParquetShardOpener) Name() string { + return s.name +} + +func (s *ParquetShardOpener) ShardIdx() int { + return s.shardIdx +} + func (s *ParquetShardOpener) LabelsFile() ParquetFileView { return s.labelsFile } diff --git a/vendor/modules.txt b/vendor/modules.txt index ee3d60fe3ec..d4aecb1b3e7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -952,7 +952,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20251023184424-4f977ece2a46 +# github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71 ## explicit; go 1.24.0 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable