diff --git a/go.mod b/go.mod index 1b88330c581..2c38d26cbc1 100644 --- a/go.mod +++ b/go.mod @@ -83,7 +83,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 - github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 + github.com/prometheus-community/parquet-common v0.0.0-20250716185251-4cfa597e936c github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 github.com/tjhop/slog-gokit v0.1.4 diff --git a/go.sum b/go.sum index 9aa18e3ac6f..d42729e5298 100644 --- a/go.sum +++ b/go.sum @@ -814,8 +814,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 h1:XoOXq+q+CcY8MZqAVoPtdG3R6o84aeZpZFDM+C9DJXg= -github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is= +github.com/prometheus-community/parquet-common v0.0.0-20250716185251-4cfa597e936c h1:yDtT3c2klcWJj6A0osq72qM8rd1ohtl/J3rHD3FHuNw= +github.com/prometheus-community/parquet-common v0.0.0-20250716185251-4cfa597e936c/go.mod h1:MbAv/yCv9GORLj0XvXgRF913R9Jc04+BvVq4VJpPCi0= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA= diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go index 703ba862905..ed3c06ac778 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go @@ -247,7 +247,7 @@ func NewTsdbRowReader(ctx context.Context, mint, maxt, colDuration int64, blks [ return nil, fmt.Errorf("unable to get label names from block: %w", err) } - postings := sortedPostings(ctx, indexr, compareFunc, ops.sortedLabels...) + postings := sortedPostings(ctx, indexr, ops.sortedLabels...) seriesSet := tsdb.NewBlockChunkSeriesSet(blk.Meta().ULID, indexr, chunkr, tombsr, postings, mint, maxt, false) seriesSets = append(seriesSets, seriesSet) @@ -285,7 +285,7 @@ func (rr *TsdbRowReader) Schema() *schema.TSDBSchema { return rr.tsdbSchema } -func sortedPostings(ctx context.Context, indexr tsdb.IndexReader, compare func(a, b labels.Labels) int, sortedLabels ...string) index.Postings { +func sortedPostings(ctx context.Context, indexr tsdb.IndexReader, sortedLabels ...string) index.Postings { p := tsdb.AllSortedPostings(ctx, indexr) if len(sortedLabels) == 0 { @@ -294,25 +294,42 @@ func sortedPostings(ctx context.Context, indexr tsdb.IndexReader, compare func(a type s struct { ref storage.SeriesRef + idx int labels labels.Labels } series := make([]s, 0, 128) - lb := labels.NewScratchBuilder(10) + scratchBuilder := labels.NewScratchBuilder(10) + lb := labels.NewBuilder(labels.EmptyLabels()) + i := 0 for p.Next() { - lb.Reset() - err := indexr.Series(p.At(), &lb, nil) + scratchBuilder.Reset() + err := indexr.Series(p.At(), &scratchBuilder, nil) if err != nil { return index.ErrPostings(fmt.Errorf("expand series: %w", err)) } + lb.Reset(scratchBuilder.Labels()) - series = append(series, s{labels: lb.Labels(), ref: p.At()}) + series = append(series, s{labels: lb.Keep(sortedLabels...).Labels(), ref: p.At(), idx: i}) + i++ } if err := p.Err(); err != nil { return index.ErrPostings(fmt.Errorf("expand postings: %w", err)) } - slices.SortFunc(series, func(a, b s) int { return compare(a.labels, b.labels) }) + slices.SortFunc(series, func(a, b s) int { + for _, lb := range sortedLabels { + if c := strings.Compare(a.labels.Get(lb), b.labels.Get(lb)); c != 0 { + return c + } + } + if a.idx < b.idx { + return -1 + } else if a.idx > b.idx { + return 1 + } + return 0 + }) // Convert back to list. ep := make([]storage.SeriesRef, 0, len(series)) diff --git a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go index c4b2996f5ab..a674c96efc4 100644 --- a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go +++ b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go @@ -17,11 +17,15 @@ import ( "context" "runtime" "sort" + "strings" "sync" "github.com/prometheus/prometheus/model/labels" prom_storage "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/annotations" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "golang.org/x/sync/errgroup" "github.com/prometheus-community/parquet-common/convert" @@ -31,22 +35,26 @@ import ( "github.com/prometheus-community/parquet-common/util" ) +var tracer = otel.Tracer("parquet-common") + type ShardsFinderFunction func(ctx context.Context, mint, maxt int64) ([]storage.ParquetShard, error) type queryableOpts struct { - concurrency int - rowCountLimitFunc search.QuotaLimitFunc - chunkBytesLimitFunc search.QuotaLimitFunc - dataBytesLimitFunc search.QuotaLimitFunc - materializedSeriesCallback search.MaterializedSeriesFunc + concurrency int + rowCountLimitFunc search.QuotaLimitFunc + chunkBytesLimitFunc search.QuotaLimitFunc + dataBytesLimitFunc search.QuotaLimitFunc + materializedSeriesCallback search.MaterializedSeriesFunc + materializedLabelsFilterCallback search.MaterializedLabelsFilterCallback } var DefaultQueryableOpts = queryableOpts{ - concurrency: runtime.GOMAXPROCS(0), - rowCountLimitFunc: search.NoopQuotaLimitFunc, - chunkBytesLimitFunc: search.NoopQuotaLimitFunc, - dataBytesLimitFunc: search.NoopQuotaLimitFunc, - materializedSeriesCallback: search.NoopMaterializedSeriesFunc, + concurrency: runtime.GOMAXPROCS(0), + rowCountLimitFunc: search.NoopQuotaLimitFunc, + chunkBytesLimitFunc: search.NoopQuotaLimitFunc, + dataBytesLimitFunc: search.NoopQuotaLimitFunc, + materializedSeriesCallback: search.NoopMaterializedSeriesFunc, + materializedLabelsFilterCallback: search.NoopMaterializedLabelsFilterCallback, } type QueryableOpts func(*queryableOpts) @@ -87,6 +95,14 @@ func WithMaterializedSeriesCallback(fn search.MaterializedSeriesFunc) QueryableO } } +// WithMaterializedLabelsFilterCallback sets a callback function to create a filter that can be used +// to filter series based on their labels before materializing chunks. +func WithMaterializedLabelsFilterCallback(cb search.MaterializedLabelsFilterCallback) QueryableOpts { + return func(opts *queryableOpts) { + opts.materializedLabelsFilterCallback = cb + } +} + type parquetQueryable struct { shardsFinder ShardsFinderFunction d *schema.PrometheusParquetChunksDecoder @@ -124,7 +140,26 @@ type parquetQuerier struct { opts *queryableOpts } -func (p parquetQuerier) LabelValues(ctx context.Context, name string, hints *prom_storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { +func (p parquetQuerier) LabelValues(ctx context.Context, name string, hints *prom_storage.LabelHints, matchers ...*labels.Matcher) (result []string, annotations annotations.Annotations, err error) { + ctx, span := tracer.Start(ctx, "parquetQuerier.LabelValues") + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + + span.SetAttributes( + attribute.String("label_name", name), + attribute.Int64("mint", p.mint), + attribute.Int64("maxt", p.maxt), + attribute.String("matchers", matchersToString(matchers)), + ) + if hints != nil { + span.SetAttributes(attribute.Int("limit", hints.Limit)) + } + shards, err := p.queryableShards(ctx, p.mint, p.maxt) if err != nil { return nil, nil, err @@ -152,10 +187,27 @@ func (p parquetQuerier) LabelValues(ctx context.Context, name string, hints *pro return nil, nil, err } - return util.MergeUnsortedSlices(int(limit), resNameValues...), nil, nil + result = util.MergeUnsortedSlices(int(limit), resNameValues...) + span.SetAttributes(attribute.Int("result_count", len(result))) + return result, nil, nil } -func (p parquetQuerier) LabelNames(ctx context.Context, hints *prom_storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { +func (p parquetQuerier) LabelNames(ctx context.Context, hints *prom_storage.LabelHints, matchers ...*labels.Matcher) (result []string, annotations annotations.Annotations, err error) { + ctx, span := tracer.Start(ctx, "parquetQuerier.LabelNames") + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + + span.SetAttributes( + attribute.Int64("mint", p.mint), + attribute.Int64("maxt", p.maxt), + attribute.String("matchers", matchersToString(matchers)), + ) + shards, err := p.queryableShards(ctx, p.mint, p.maxt) if err != nil { return nil, nil, err @@ -165,6 +217,7 @@ func (p parquetQuerier) LabelNames(ctx context.Context, hints *prom_storage.Labe if hints != nil { limit = int64(hints.Limit) + span.SetAttributes(attribute.Int("limit", hints.Limit)) } resNameSets := make([][]string, len(shards)) @@ -183,7 +236,9 @@ func (p parquetQuerier) LabelNames(ctx context.Context, hints *prom_storage.Labe return nil, nil, err } - return util.MergeUnsortedSlices(int(limit), resNameSets...), nil, nil + result = util.MergeUnsortedSlices(int(limit), resNameSets...) + span.SetAttributes(attribute.Int("result_count", len(result))) + return result, nil, nil } func (p parquetQuerier) Close() error { @@ -191,6 +246,30 @@ func (p parquetQuerier) Close() error { } func (p parquetQuerier) Select(ctx context.Context, sorted bool, sp *prom_storage.SelectHints, matchers ...*labels.Matcher) prom_storage.SeriesSet { + ctx, span := tracer.Start(ctx, "parquetQuerier.Select") + var err error + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + + span.SetAttributes( + attribute.Bool("sorted", sorted), + attribute.Int64("mint", p.mint), + attribute.Int64("maxt", p.maxt), + attribute.String("matchers", matchersToString(matchers)), + ) + if sp != nil { + span.SetAttributes( + attribute.Int64("select_start", sp.Start), + attribute.Int64("select_end", sp.End), + attribute.String("select_func", sp.Func), + ) + } + shards, err := p.queryableShards(ctx, p.mint, p.maxt) if err != nil { return prom_storage.ErrSeriesSet(err) @@ -207,16 +286,21 @@ func (p parquetQuerier) Select(ctx context.Context, sorted bool, sp *prom_storag for i, shard := range shards { errGroup.Go(func() error { - ss, err := shard.Query(ctx, sorted, minT, maxT, skipChunks, matchers) + ss, err := shard.Query(ctx, sorted, sp, minT, maxT, skipChunks, matchers) seriesSet[i] = ss return err }) } - if err := errGroup.Wait(); err != nil { + if err = errGroup.Wait(); err != nil { return prom_storage.ErrSeriesSet(err) } + span.SetAttributes( + attribute.Int("shards_count", len(shards)), + attribute.Bool("skip_chunks", skipChunks), + ) + ss := convert.NewMergeChunkSeriesSet(seriesSet, labels.Compare, prom_storage.NewConcatenatingChunkSeriesMerger()) return convert.NewSeriesSetFromChunkSeriesSet(ss, skipChunks) @@ -252,7 +336,7 @@ func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schem if err != nil { return nil, err } - m, err := search.NewMaterializer(s, d, block, opts.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, opts.materializedSeriesCallback) + m, err := search.NewMaterializer(s, d, block, opts.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, opts.materializedSeriesCallback, opts.materializedLabelsFilterCallback) if err != nil { return nil, err } @@ -264,7 +348,7 @@ func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schem }, nil } -func (b queryableShard) Query(ctx context.Context, sorted bool, mint, maxt int64, skipChunks bool, matchers []*labels.Matcher) (prom_storage.ChunkSeriesSet, error) { +func (b queryableShard) Query(ctx context.Context, sorted bool, sp *prom_storage.SelectHints, mint, maxt int64, skipChunks bool, matchers []*labels.Matcher) (prom_storage.ChunkSeriesSet, error) { errGroup, ctx := errgroup.WithContext(ctx) errGroup.SetLimit(b.concurrency) @@ -290,10 +374,14 @@ func (b queryableShard) Query(ctx context.Context, sorted bool, mint, maxt int64 return nil } - series, err := b.m.Materialize(ctx, rgi, mint, maxt, skipChunks, rr) + series, err := b.m.Materialize(ctx, sp, rgi, mint, maxt, skipChunks, rr) if err != nil { return err } + if len(series) == 0 { + return nil + } + rMtx.Lock() results = append(results, series...) rMtx.Unlock() @@ -420,3 +508,14 @@ type byLabels []prom_storage.ChunkSeries func (b byLabels) Len() int { return len(b) } func (b byLabels) Swap(i, j int) { b[i], b[j] = b[j], b[i] } func (b byLabels) Less(i, j int) bool { return labels.Compare(b[i].Labels(), b[j].Labels()) < 0 } + +func matchersToString(matchers []*labels.Matcher) string { + if len(matchers) == 0 { + return "[]" + } + var matcherStrings []string + for _, m := range matchers { + matcherStrings = append(matcherStrings, m.String()) + } + return "[" + strings.Join(matcherStrings, ",") + "]" +} diff --git a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go index 03598bd28b1..f77bcf4aa87 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go @@ -94,7 +94,7 @@ func Filter(ctx context.Context, s storage.ParquetShard, rgIdx int, cs ...Constr } } var err error - rr := []RowRange{{from: int64(0), count: rg.NumRows()}} + rr := []RowRange{{From: int64(0), Count: rg.NumRows()}} for i := range cs { isPrimary := len(sc) > 0 && cs[i].path() == sc[0].Path()[0] rr, err = cs[i].filter(ctx, rgIdx, isPrimary, rr) @@ -185,7 +185,7 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, if len(rr) == 0 { return nil, nil } - from, to := rr[0].from, rr[len(rr)-1].from+rr[len(rr)-1].count + from, to := rr[0].From, rr[len(rr)-1].From+rr[len(rr)-1].Count rg := ec.f.RowGroups()[rgIdx] @@ -262,7 +262,7 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, // Did not find any pages if len(readPgs) == 0 { - return nil, nil + return intersectRowRanges(simplify(res), rr), nil } dictOff, dictSz := ec.f.DictionaryPageBounds(rgIdx, col.ColumnIndex) @@ -398,7 +398,7 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, if len(rr) == 0 { return nil, nil } - from, to := rr[0].from, rr[len(rr)-1].from+rr[len(rr)-1].count + from, to := rr[0].From, rr[len(rr)-1].From+rr[len(rr)-1].Count rg := rc.f.RowGroups()[rgIdx] diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index 2f485503e35..7de75834692 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -27,6 +27,9 @@ import ( "github.com/prometheus/prometheus/model/labels" prom_storage "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunks" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "golang.org/x/sync/errgroup" "github.com/prometheus-community/parquet-common/schema" @@ -34,6 +37,8 @@ import ( "github.com/prometheus-community/parquet-common/util" ) +var tracer = otel.Tracer("parquet-common") + type Materializer struct { b storage.ParquetShard s *schema.TSDBSchema @@ -49,7 +54,8 @@ type Materializer struct { chunkBytesQuota *Quota dataBytesQuota *Quota - materializedSeriesCallback MaterializedSeriesFunc + materializedSeriesCallback MaterializedSeriesFunc + materializedLabelsFilterCallback MaterializedLabelsFilterCallback } // MaterializedSeriesFunc is a callback function that can be used to add limiter or statistic logics for @@ -61,6 +67,25 @@ func NoopMaterializedSeriesFunc(_ context.Context, _ []prom_storage.ChunkSeries) return nil } +// MaterializedLabelsFilterCallback returns a filter and a boolean indicating if the filter is enabled or not. +// The filter is used to filter series based on their labels. +// The boolean if set to false then it means that the filter is a noop and we can take shortcut to include all series. +// Otherwise, the filter is used to filter series based on their labels. +type MaterializedLabelsFilterCallback func(ctx context.Context, hints *prom_storage.SelectHints) (MaterializedLabelsFilter, bool) + +// MaterializedLabelsFilter is a filter that can be used to filter series based on their labels. +type MaterializedLabelsFilter interface { + // Filter returns true if the labels should be included in the result. + Filter(ls labels.Labels) bool + // Close is used to close the filter and do some cleanup. + Close() +} + +// NoopMaterializedLabelsFilterCallback is a noop MaterializedLabelsFilterCallback function that filters nothing. +func NoopMaterializedLabelsFilterCallback(ctx context.Context, hints *prom_storage.SelectHints) (MaterializedLabelsFilter, bool) { + return nil, false +} + func NewMaterializer(s *schema.TSDBSchema, d *schema.PrometheusParquetChunksDecoder, block storage.ParquetShard, @@ -69,6 +94,7 @@ func NewMaterializer(s *schema.TSDBSchema, chunkBytesQuota *Quota, dataBytesQuota *Quota, materializeSeriesCallback MaterializedSeriesFunc, + materializeLabelsFilterCallback MaterializedLabelsFilterCallback, ) (*Materializer, error) { colIdx, ok := block.LabelsFile().Schema().Lookup(schema.ColIndexes) if !ok { @@ -86,23 +112,41 @@ func NewMaterializer(s *schema.TSDBSchema, } return &Materializer{ - s: s, - d: d, - b: block, - colIdx: colIdx.ColumnIndex, - concurrency: concurrency, - partitioner: util.NewGapBasedPartitioner(block.ChunksFile().Cfg.PagePartitioningMaxGapSize), - dataColToIndex: dataColToIndex, - rowCountQuota: rowCountQuota, - chunkBytesQuota: chunkBytesQuota, - dataBytesQuota: dataBytesQuota, - materializedSeriesCallback: materializeSeriesCallback, + s: s, + d: d, + b: block, + colIdx: colIdx.ColumnIndex, + concurrency: concurrency, + partitioner: util.NewGapBasedPartitioner(block.ChunksFile().Cfg.PagePartitioningMaxGapSize), + dataColToIndex: dataColToIndex, + rowCountQuota: rowCountQuota, + chunkBytesQuota: chunkBytesQuota, + dataBytesQuota: dataBytesQuota, + materializedSeriesCallback: materializeSeriesCallback, + materializedLabelsFilterCallback: materializeLabelsFilterCallback, }, nil } // Materialize reconstructs the ChunkSeries that belong to the specified row ranges (rr). // It uses the row group index (rgi) and time bounds (mint, maxt) to filter and decode the series. -func (m *Materializer) Materialize(ctx context.Context, rgi int, mint, maxt int64, skipChunks bool, rr []RowRange) ([]prom_storage.ChunkSeries, error) { +func (m *Materializer) Materialize(ctx context.Context, hints *prom_storage.SelectHints, rgi int, mint, maxt int64, skipChunks bool, rr []RowRange) (results []prom_storage.ChunkSeries, err error) { + ctx, span := tracer.Start(ctx, "Materializer.Materialize") + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + + span.SetAttributes( + attribute.Int("row_group_index", rgi), + attribute.Int64("mint", mint), + attribute.Int64("maxt", maxt), + attribute.Bool("skip_chunks", skipChunks), + attribute.Int("row_ranges_count", len(rr)), + ) + if err := m.checkRowCountQuota(rr); err != nil { return nil, err } @@ -111,13 +155,7 @@ func (m *Materializer) Materialize(ctx context.Context, rgi int, mint, maxt int6 return nil, errors.Wrapf(err, "error materializing labels") } - results := make([]prom_storage.ChunkSeries, len(sLbls)) - for i, s := range sLbls { - results[i] = &concreteChunksSeries{ - lbls: labels.New(s...), - } - } - + results, rr = m.filterSeries(ctx, hints, sLbls, rr) if !skipChunks { chks, err := m.materializeChunks(ctx, rgi, mint, maxt, rr) if err != nil { @@ -137,9 +175,78 @@ func (m *Materializer) Materialize(ctx context.Context, rgi int, mint, maxt int6 if err := m.materializedSeriesCallback(ctx, results); err != nil { return nil, err } + + span.SetAttributes(attribute.Int("materialized_series_count", len(results))) return results, err } +func (m *Materializer) filterSeries(ctx context.Context, hints *prom_storage.SelectHints, sLbls [][]labels.Label, rr []RowRange) ([]prom_storage.ChunkSeries, []RowRange) { + results := make([]prom_storage.ChunkSeries, 0, len(sLbls)) + labelsFilter, ok := m.materializedLabelsFilterCallback(ctx, hints) + if !ok { + for _, s := range sLbls { + results = append(results, &concreteChunksSeries{ + lbls: labels.New(s...), + }) + } + return results, rr + } + + defer labelsFilter.Close() + + filteredRR := make([]RowRange, 0, len(rr)) + var currentRange RowRange + inRange := false + seriesIdx := 0 + + for _, rowRange := range rr { + for i := int64(0); i < rowRange.Count; i++ { + actualRowID := rowRange.From + i + lbls := labels.New(sLbls[seriesIdx]...) + + if labelsFilter.Filter(lbls) { + results = append(results, &concreteChunksSeries{ + lbls: lbls, + }) + + // Handle row range collection + if !inRange { + // Start new range + currentRange = RowRange{ + From: actualRowID, + Count: 1, + } + inRange = true + } else if actualRowID == currentRange.From+currentRange.Count { + // Extend current range + currentRange.Count++ + } else { + // Save current range and start new range (non-contiguous) + filteredRR = append(filteredRR, currentRange) + currentRange = RowRange{ + From: actualRowID, + Count: 1, + } + } + } else { + // Save current range and reset when we hit a non-matching series + if inRange { + filteredRR = append(filteredRR, currentRange) + inRange = false + } + } + seriesIdx++ + } + } + + // Save the final range if we have one + if inRange { + filteredRR = append(filteredRR, currentRange) + } + + return results, filteredRR +} + func (m *Materializer) MaterializeAllLabelNames() []string { r := make([]string, 0, len(m.b.LabelsFile().Schema().Columns())) for _, c := range m.b.LabelsFile().Schema().Columns() { @@ -237,6 +344,21 @@ func (m *Materializer) MaterializeAllLabelValues(ctx context.Context, name strin } func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []RowRange) ([][]labels.Label, error) { + ctx, span := tracer.Start(ctx, "Materializer.materializeAllLabels") + var err error + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + + span.SetAttributes( + attribute.Int("row_group_index", rgi), + attribute.Int("row_ranges_count", len(rr)), + ) + labelsRg := m.b.LabelsFile().RowGroups()[rgi] cc := labelsRg.ColumnChunks()[m.colIdx] colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) @@ -272,7 +394,7 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R }) } - if err := errGroup.Wait(); err != nil { + if err = errGroup.Wait(); err != nil { return nil, err } @@ -292,22 +414,45 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R } } + span.SetAttributes(attribute.Int("materialized_labels_count", len(results))) return results, nil } func totalRows(rr []RowRange) int64 { res := int64(0) for _, r := range rr { - res += r.count + res += r.Count } return res } -func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, maxt int64, rr []RowRange) ([][]chunks.Meta, error) { +func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, maxt int64, rr []RowRange) (r [][]chunks.Meta, err error) { + ctx, span := tracer.Start(ctx, "Materializer.materializeChunks") + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + + span.SetAttributes( + attribute.Int("row_group_index", rgi), + attribute.Int64("mint", mint), + attribute.Int64("maxt", maxt), + attribute.Int("row_ranges_count", len(rr)), + ) + minDataCol := m.s.DataColumIdx(mint) maxDataCol := m.s.DataColumIdx(maxt) rg := m.b.ChunksFile().RowGroups()[rgi] - r := make([][]chunks.Meta, totalRows(rr)) + r = make([][]chunks.Meta, totalRows(rr)) + + span.SetAttributes( + attribute.Int("min_data_col", minDataCol), + attribute.Int("max_data_col", maxDataCol), + attribute.Int("total_rows", int(totalRows(rr))), + ) for i := minDataCol; i <= min(maxDataCol, len(m.dataColToIndex)-1); i++ { values, err := m.materializeColumn(ctx, m.b.ChunksFile(), rgi, rg.ColumnChunks()[m.dataColToIndex[i]], rr, true) @@ -324,6 +469,7 @@ func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, max } } + span.SetAttributes(attribute.Int("materialized_chunks_count", len(r))) return r, nil } @@ -348,12 +494,12 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq for i := 0; i < cidx.NumPages(); i++ { pageRowRange := RowRange{ - from: oidx.FirstRowIndex(i), + From: oidx.FirstRowIndex(i), } - pageRowRange.count = group.NumRows() + pageRowRange.Count = group.NumRows() if i < oidx.NumPages()-1 { - pageRowRange.count = oidx.FirstRowIndex(i+1) - pageRowRange.from + pageRowRange.Count = oidx.FirstRowIndex(i+1) - pageRowRange.From } for _, r := range rr { @@ -372,7 +518,7 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq rMutex := &sync.Mutex{} for _, v := range pageRanges { for _, rs := range v.rows { - r[rs] = make([]parquet.Value, 0, rs.count) + r[rs] = make([]parquet.Value, 0, rs.Count) } } @@ -397,7 +543,7 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq return errors.Wrap(err, "failed to get pages") } defer func() { _ = pgs.Close() }() - err = pgs.SeekToRow(p.rows[0].from) + err = pgs.SeekToRow(p.rows[0].From) if err != nil { return errors.Wrap(err, "could not seek to row") } @@ -405,9 +551,9 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq vi := new(valuesIterator) remainingRr := p.rows currentRr := remainingRr[0] - next := currentRr.from - remaining := currentRr.count - currentRow := currentRr.from + next := currentRr.From + remaining := currentRr.Count + currentRow := currentRr.From remainingRr = remainingRr[1:] for len(remainingRr) > 0 || remaining > 0 { @@ -426,8 +572,8 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq next = next + 1 } else if len(remainingRr) > 0 { currentRr = remainingRr[0] - next = currentRr.from - remaining = currentRr.count + next = currentRr.From + remaining = currentRr.Count remainingRr = remainingRr[1:] } } @@ -449,7 +595,7 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq ranges := slices.Collect(maps.Keys(r)) slices.SortFunc(ranges, func(a, b RowRange) int { - return int(a.from - b.from) + return int(a.From - b.From) }) res := make([]parquet.Value, 0, totalRows(rr)) diff --git a/vendor/github.com/prometheus-community/parquet-common/search/rowrange.go b/vendor/github.com/prometheus-community/parquet-common/search/rowrange.go index 412bcdfeca0..9112a711b75 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/rowrange.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/rowrange.go @@ -18,29 +18,29 @@ import ( ) type RowRange struct { - from int64 - count int64 + From int64 + Count int64 } func NewRowRange(from, count int64) *RowRange { return &RowRange{ - from: from, - count: count, + From: from, + Count: count, } } // Overlaps returns true if the receiver and the given RowRange share any overlapping rows. -// Both ranges are treated as half-open intervals: [from, from+count). +// Both ranges are treated as half-open intervals: [From, From+Count). func (rr RowRange) Overlaps(o RowRange) bool { - endA := rr.from + rr.count - endB := o.from + o.count - return rr.from < endB && o.from < endA + endA := rr.From + rr.Count + endB := o.From + o.Count + return rr.From < endB && o.From < endA } // Intersection returns the intersection of rr and o. Both are assumed to be overlapping func (rr RowRange) Intersection(o RowRange) RowRange { - os, oe := max(rr.from, o.from), min(rr.from+rr.count, o.from+o.count) - return RowRange{from: os, count: oe - os} + os, oe := max(rr.From, o.From), min(rr.From+rr.Count, o.From+o.Count) + return RowRange{From: os, Count: oe - os} } // intersect intersects the row ranges from left hand sight with the row ranges from rhs @@ -49,13 +49,13 @@ func (rr RowRange) Intersection(o RowRange) RowRange { func intersectRowRanges(lhs, rhs []RowRange) []RowRange { res := make([]RowRange, 0) for l, r := 0, 0; l < len(lhs) && r < len(rhs); { - al, bl := lhs[l].from, lhs[l].from+lhs[l].count - ar, br := rhs[r].from, rhs[r].from+rhs[r].count + al, bl := lhs[l].From, lhs[l].From+lhs[l].Count + ar, br := rhs[r].From, rhs[r].From+rhs[r].Count // check if rows intersect if al <= br && ar <= bl { os, oe := max(al, ar), min(bl, br) - res = append(res, RowRange{from: os, count: oe - os}) + res = append(res, RowRange{From: os, Count: oe - os}) } // advance the cursor of the range that ends first @@ -70,9 +70,9 @@ func intersectRowRanges(lhs, rhs []RowRange) []RowRange { // complementRowRanges returns the ranges that are in rhs but not in lhs. // For example, if you have: -// lhs: [{from: 1, count: 3}] // represents rows 1,2,3 -// rhs: [{from: 0, count: 5}] // represents rows 0,1,2,3,4 -// The complement would be [{from: 0, count: 1}, {from: 4, count: 1}] // represents rows 0,4 +// lhs: [{From: 1, Count: 3}] // represents rows 1,2,3 +// rhs: [{From: 0, Count: 5}] // represents rows 0,1,2,3,4 +// The complement would be [{From: 0, Count: 1}, {From: 4, Count: 1}] // represents rows 0,4 // because these are the rows in rhs that are not in lhs. // // The function assumes that lhs and rhs are simplified (no overlapping ranges) @@ -83,8 +83,8 @@ func complementRowRanges(lhs, rhs []RowRange) []RowRange { l, r := 0, 0 for l < len(lhs) && r < len(rhs) { - al, bl := lhs[l].from, lhs[l].from+lhs[l].count - ar, br := rhs[r].from, rhs[r].from+rhs[r].count + al, bl := lhs[l].From, lhs[l].From+lhs[l].Count + ar, br := rhs[r].From, rhs[r].From+rhs[r].Count // check if rows intersect switch { @@ -93,7 +93,7 @@ func complementRowRanges(lhs, rhs []RowRange) []RowRange { if bl <= br { l++ } else { - res = append(res, RowRange{from: ar, count: br - ar}) + res = append(res, RowRange{From: ar, Count: br - ar}) r++ } case al < ar && bl > br: @@ -102,20 +102,20 @@ func complementRowRanges(lhs, rhs []RowRange) []RowRange { case al < ar && bl <= br: // l covers r from left but has room on top oe := min(bl, br) - rhs[r].from += oe - ar - rhs[r].count -= oe - ar + rhs[r].From += oe - ar + rhs[r].Count -= oe - ar l++ case al >= ar && bl > br: // l covers r from right but has room on bottom os := max(al, ar) - res = append(res, RowRange{from: ar, count: os - ar}) + res = append(res, RowRange{From: ar, Count: os - ar}) r++ case al >= ar && bl <= br: // l is included r os, oe := max(al, ar), min(bl, br) - res = append(res, RowRange{from: rhs[r].from, count: os - rhs[r].from}) - rhs[r].from = oe - rhs[r].count = br - oe + res = append(res, RowRange{From: rhs[r].From, Count: os - rhs[r].From}) + rhs[r].From = oe + rhs[r].Count = br - oe l++ } } @@ -133,15 +133,15 @@ func simplify(rr []RowRange) []RowRange { } sort.Slice(rr, func(i, j int) bool { - return rr[i].from < rr[j].from + return rr[i].From < rr[j].From }) tmp := make([]RowRange, 0) l := rr[0] for i := 1; i < len(rr); i++ { r := rr[i] - al, bl := l.from, l.from+l.count - ar, br := r.from, r.from+r.count + al, bl := l.From, l.From+l.Count + ar, br := r.From, r.From+r.Count if bl < ar { tmp = append(tmp, l) l = r @@ -155,15 +155,15 @@ func simplify(rr []RowRange) []RowRange { } l = RowRange{ - from: from, - count: count, + From: from, + Count: count, } } tmp = append(tmp, l) res := make([]RowRange, 0, len(tmp)) for i := range tmp { - if tmp[i].count != 0 { + if tmp[i].Count != 0 { res = append(res, tmp[i]) } } diff --git a/vendor/modules.txt b/vendor/modules.txt index ca8316be582..418e65ed0a3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -947,7 +947,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 +# github.com/prometheus-community/parquet-common v0.0.0-20250716185251-4cfa597e936c ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable