From 62fa38175bfe364a85764aa0437a7de4fd1ccf97 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Wed, 29 Oct 2025 12:24:31 -0700 Subject: [PATCH 1/6] upgrade parquet common to latest main Signed-off-by: yeya24 --- go.mod | 7 +- go.sum | 4 +- pkg/parquetconverter/converter.go | 1 + .../parquet-common/convert/convert.go | 557 ++++++++++-------- .../parquet-common/convert/reader.go | 166 ++++++ .../parquet-common/convert/writer.go | 105 ++-- .../parquet-common/schema/schema_builder.go | 4 + .../parquet-common/search/constraint.go | 268 +++++++-- .../parquet-common/search/materialize.go | 11 +- .../parquet-common/search/rowrange.go | 7 + .../parquet-common/storage/read_at.go | 2 +- vendor/modules.txt | 7 +- 12 files changed, 766 insertions(+), 373 deletions(-) create mode 100644 vendor/github.com/prometheus-community/parquet-common/convert/reader.go diff --git a/go.mod b/go.mod index bde9ec2ebb5..e9833933267 100644 --- a/go.mod +++ b/go.mod @@ -87,7 +87,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 - github.com/prometheus-community/parquet-common v0.0.0-20250827225610-65f0b68d35e6 + github.com/prometheus-community/parquet-common v0.0.0-20251023184424-4f977ece2a46 github.com/prometheus/client_golang/exp v0.0.0-20250914183048-a974e0d45e0a github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 @@ -230,7 +230,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus-community/prom-label-proxy v0.11.1 // indirect github.com/prometheus/exporter-toolkit v0.14.0 // indirect - github.com/prometheus/otlptranslator v0.0.0-20250620074007-94f535e0c588 // indirect + github.com/prometheus/otlptranslator v0.0.2 // indirect github.com/prometheus/sigv4 v0.2.0 // indirect github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect github.com/rantav/go-grpc-channelz v0.0.4 // indirect @@ -334,3 +334,6 @@ replace google.golang.org/grpc => google.golang.org/grpc v1.71.2 // See https://github.com/envoyproxy/go-control-plane/issues/1083 as this version introduces checksum mismatch. exclude github.com/envoyproxy/go-control-plane/envoy v1.32.3 + +// TODO: update it in next PR +replace github.com/prometheus/otlptranslator => github.com/prometheus/otlptranslator v0.0.0-20250620074007-94f535e0c588 diff --git a/go.sum b/go.sum index 6f3f229d4e2..98d6b6cbf88 100644 --- a/go.sum +++ b/go.sum @@ -1613,8 +1613,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20250827225610-65f0b68d35e6 h1:jWcDrCpAU047f2NTGtm3vRPqJ8skDOkdKCC5sSfSN4Q= -github.com/prometheus-community/parquet-common v0.0.0-20250827225610-65f0b68d35e6/go.mod h1:MbAv/yCv9GORLj0XvXgRF913R9Jc04+BvVq4VJpPCi0= +github.com/prometheus-community/parquet-common v0.0.0-20251023184424-4f977ece2a46 h1:ZzUcddfRLCewtFsx1d/XeyKVmQDsrJLYnlcamNopoYk= +github.com/prometheus-community/parquet-common v0.0.0-20251023184424-4f977ece2a46/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA= diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index 477149705c7..3464f4bf3e9 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -444,6 +444,7 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin tsdbBlock.MinTime(), tsdbBlock.MaxTime(), []convert.Convertible{tsdbBlock}, + util_log.GoKitLogToSlog(logger), converterOpts..., ) diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go index 819f9cd13d5..41b800b65af 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go @@ -15,16 +15,16 @@ package convert import ( "context" - "encoding/binary" "fmt" "io" + "log/slog" "math" "runtime" "slices" "strings" "time" - "github.com/hashicorp/go-multierror" + "github.com/oklog/ulid/v2" "github.com/parquet-go/parquet-go" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" @@ -34,6 +34,7 @@ import ( "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/thanos-io/objstore" + "golang.org/x/sync/errgroup" "github.com/prometheus-community/parquet-common/schema" ) @@ -48,7 +49,8 @@ var DefaultConvertOpts = convertOpts{ pageBufferSize: parquet.DefaultPageBufferSize, writeBufferSize: parquet.DefaultWriteBufferSize, columnPageBuffers: parquet.DefaultWriterConfig().ColumnPageBuffers, - concurrency: runtime.GOMAXPROCS(0), + readConcurrency: runtime.GOMAXPROCS(0), + writeConcurrency: 1, maxSamplesPerChunk: tsdb.DefaultSamplesPerChunk, } @@ -69,7 +71,8 @@ type convertOpts struct { pageBufferSize int writeBufferSize int columnPageBuffers parquet.BufferPool - concurrency int + readConcurrency int + writeConcurrency int maxSamplesPerChunk int labelsCompressionOpts []schema.CompressionOpts chunksCompressionOpts []schema.CompressionOpts @@ -112,6 +115,23 @@ func WithSortBy(labels ...string) ConvertOption { } } +// WithBloomFilterLabels configures which labels should have bloom filters created during conversion. +// Bloom filters enable fast filtering during queries by allowing quick elimination of row groups +// that definitely don't contain a specific label value. This significantly improves query performance +// for high-cardinality labels. By default, bloom filters are created for __name__. +// +// Parameters: +// - labels: Label names to create bloom filters for +// +// Example: +// +// WithBloomFilterLabels("__name__", "job", "instance") +func WithBloomFilterLabels(labels ...string) ConvertOption { + return func(opts *convertOpts) { + opts.bloomfilterLabels = labels + } +} + // WithColDuration sets the time duration for each column in the Parquet schema. // This determines how time series data is partitioned across columns, affecting // both storage efficiency and query performance. Shorter durations create more @@ -209,7 +229,7 @@ func WithRowGroupSize(size int) ConvertOption { } } -// WithConcurrency sets the number of concurrent goroutines used during conversion. +// WithReadConcurrency sets the number of concurrent goroutines used to read TSDB series during conversion. // Higher concurrency can improve performance on multi-core systems but increases // memory usage. The optimal value depends on available CPU cores and memory. // @@ -218,10 +238,26 @@ func WithRowGroupSize(size int) ConvertOption { // // Example: // -// WithConcurrency(8) // Use 8 concurrent workers -func WithConcurrency(concurrency int) ConvertOption { +// WithReadConcurrency(8) // Use 8 concurrent workers +func WithReadConcurrency(concurrency int) ConvertOption { return func(opts *convertOpts) { - opts.concurrency = concurrency + opts.readConcurrency = concurrency + } +} + +// WithWriteConcurrency sets the number of concurrent goroutines used to write Parquet shards during conversion. +// Higher concurrency can improve conversion time on multi-core systems but increases +// CPU and memory usage. The optimal value depends on available CPU cores and memory. +// +// Parameters: +// - concurrency: Number of concurrent workers (default: runtime.GOMAXPROCS(0)) +// +// Example: +// +// WithWriteConcurrency(8) // Use 8 concurrent workers +func WithWriteConcurrency(concurrency int) ConvertOption { + return func(opts *convertOpts) { + opts.writeConcurrency = concurrency } } @@ -301,321 +337,332 @@ func WithChunksCompression(compressionOpts ...schema.CompressionOpts) ConvertOpt // - opts: Optional configuration options to customize the conversion process // // Returns: -// - int: The current shard number after conversion +// - int: The number of shards written for a successful conversion // - error: Any error that occurred during the conversion process // -// The function creates a row reader from the TSDB blocks, generates both labels and chunks -// projections with optional compression, and writes the data to the bucket using a sharded -// writer approach for better performance and parallelization. +// The function creates a row reader for each TSDB block and identifies the unique input series. +// Input series are divided into shards based on the configured max row groups and row group size. +// The labels file schema is independently generated from the series present in each shard, +// in order to avoid writing (and later reading) footer and index data for blank columns. +// Shards will be written in parallel if configured by the ConvertOptions. func ConvertTSDBBlock( ctx context.Context, bkt objstore.Bucket, mint, maxt int64, - blks []Convertible, + blocks []Convertible, + logger *slog.Logger, opts ...ConvertOption, ) (int, error) { cfg := DefaultConvertOpts - for _, opt := range opts { opt(&cfg) } - rr, err := NewTsdbRowReader(ctx, mint, maxt, cfg.colDuration.Milliseconds(), blks, cfg) + logger.Info("sharding input series") + shardedRowReaders, err := shardedTSDBRowReaders(ctx, mint, maxt, cfg.colDuration.Milliseconds(), blocks, cfg) if err != nil { - return 0, err + return 0, errors.Wrap(err, "failed to create sharded TSDB row readers") } - defer func() { _ = rr.Close() }() + defer func() { + for _, rr := range shardedRowReaders { + _ = rr.Close() + } + }() - labelsProjection, err := rr.Schema().LabelsProjection(cfg.labelsCompressionOpts...) - if err != nil { - return 0, errors.Wrap(err, "error getting labels projection from tsdb schema") + logger.Info("starting parallel block conversion", "shards", len(shardedRowReaders), "write_concurrency", cfg.writeConcurrency) + errGroup := &errgroup.Group{} + errGroup.SetLimit(cfg.writeConcurrency) + for shard, rr := range shardedRowReaders { + errGroup.Go(func() error { + labelsProjection, err := rr.Schema().LabelsProjection(cfg.labelsCompressionOpts...) + if err != nil { + return errors.Wrap(err, "error getting labels projection from tsdb schema") + } + chunksProjection, err := rr.Schema().ChunksProjection(cfg.chunksCompressionOpts...) + if err != nil { + return errors.Wrap(err, "error getting chunks projection from tsdb schema") + } + outSchemaProjections := []*schema.TSDBProjection{ + labelsProjection, chunksProjection, + } + + w := &PreShardedWriter{ + shard: shard, + rr: rr, + schema: rr.Schema(), + outSchemaProjections: outSchemaProjections, + pipeReaderWriter: NewPipeReaderBucketWriter(bkt), + opts: &cfg, + logger: logger, + } + err = w.Write(ctx) + if err != nil { + return errors.Wrap(err, "error writing shard for block") + } + return nil + }) } - chunksProjection, err := rr.Schema().ChunksProjection(cfg.chunksCompressionOpts...) + + err = errGroup.Wait() if err != nil { - return 0, errors.Wrap(err, "error getting chunks projection from tsdb schema") - } - outSchemaProjections := []*schema.TSDBProjection{ - labelsProjection, chunksProjection, + return 0, errors.Wrap(err, "failed to convert shards in parallel") } - - pipeReaderWriter := NewPipeReaderBucketWriter(bkt) - w := NewShardedWrite(rr, rr.Schema(), outSchemaProjections, pipeReaderWriter, &cfg) - return w.currentShard, errors.Wrap(w.Write(ctx), "error writing block") + return len(shardedRowReaders), nil } -var _ parquet.RowReader = &TsdbRowReader{} - -type TsdbRowReader struct { - ctx context.Context +type blockIndexReader struct { + blockID ulid.ULID + idx int // index of the block in the input slice + reader tsdb.IndexReader + postings index.Postings +} - closers []io.Closer +type blockSeries struct { + blockIdx int // index of the block in the input slice + seriesIdx int // index of the series in the block postings + ref storage.SeriesRef + labels labels.Labels +} - seriesSet storage.ChunkSeriesSet +func shardedTSDBRowReaders( + ctx context.Context, + mint, maxt, colDuration int64, + blocks []Convertible, + opts convertOpts, +) ([]*TSDBRowReader, error) { + // Blocks can have multiple entries with the same of ULID in the case of head blocks; + // track all blocks by their index in the input slice rather than assuming unique ULIDs. + indexReaders := make([]blockIndexReader, len(blocks)) + // Simpler to track and close these readers separate from those used by shard conversion reader/writers. + defer func() { + for _, indexReader := range indexReaders { + _ = indexReader.reader.Close() + } + }() + for i, blk := range blocks { + indexReader, err := blk.Index() + if err != nil { + return nil, errors.Wrap(err, "failed to get index reader from block") + } + indexReaders[i] = blockIndexReader{ + blockID: blk.Meta().ULID, + idx: i, + reader: indexReader, + postings: tsdb.AllSortedPostings(ctx, indexReader), + } + } - rowBuilder *parquet.RowBuilder - tsdbSchema *schema.TSDBSchema + uniqueSeriesCount, shardedSeries, err := shardSeries(indexReaders, mint, maxt, opts) + if err != nil { + return nil, errors.Wrap(err, "failed to determine unique series count") + } + if uniqueSeriesCount == 0 { + return nil, errors.Wrap(err, "no series found in the specified time range") + } - encoder *schema.PrometheusParquetChunksEncoder - totalRead int64 - concurrency int -} + shardTSDBRowReaders := make([]*TSDBRowReader, len(shardedSeries)) -func NewTsdbRowReader(ctx context.Context, mint, maxt, colDuration int64, blks []Convertible, ops convertOpts) (*TsdbRowReader, error) { - var ( - seriesSets = make([]storage.ChunkSeriesSet, 0, len(blks)) - closers = make([]io.Closer, 0, len(blks)) - ok = false - ) - // If we fail to build the row reader, make sure we release resources. - // This could be either a controlled error or a panic. + // We close everything if any errors or panic occur + allClosers := make([]io.Closer, 0, len(blocks)*3) + ok := false defer func() { if !ok { - for i := range closers { - _ = closers[i].Close() + for _, closer := range allClosers { + _ = closer.Close() } } }() - b := schema.NewBuilder(mint, maxt, colDuration) - - compareFunc := func(a, b labels.Labels) int { - for _, lb := range ops.sortedLabels { - if c := strings.Compare(a.Get(lb), b.Get(lb)); c != 0 { - return c + // For each shard, create a TSDBRowReader with: + // * a MergeChunkSeriesSet of all blocks' series sets for the shard + // * a schema built from only the label names present in the shard + for shardIdx, shardSeries := range shardedSeries { + // An index, chunk, and tombstone reader per block each must be closed after usage + // in order for the prometheus block reader to not hang indefinitely when closed. + closers := make([]io.Closer, 0, len(shardSeries)*3) + seriesSets := make([]storage.ChunkSeriesSet, 0, len(blocks)) + schemaBuilder := schema.NewBuilder(mint, maxt, colDuration) + + // For each block with series in the shard, + // init readers and postings list required to create a tsdb.blockChunkSeriesSet; + // series sets from all blocks for the shard will be merged by mergeChunkSeriesSet. + for _, blockSeries := range shardSeries { + blk := blocks[blockSeries[0].blockIdx] + // Init all readers for block & add to closers + + // Init separate index readers from above indexReaders to simplify closing logic + indexr, err := blk.Index() + if err != nil { + return nil, errors.Wrap(err, "failed to get index reader from block") } - } + closers = append(closers, indexr) + allClosers = append(allClosers, indexr) - return labels.Compare(a, b) - } - - for _, blk := range blks { - indexr, err := blk.Index() - if err != nil { - return nil, fmt.Errorf("unable to get index reader from block: %w", err) - } - closers = append(closers, indexr) + chunkr, err := blk.Chunks() + if err != nil { + return nil, errors.Wrap(err, "failed to get chunk reader from block") + } + closers = append(closers, chunkr) + allClosers = append(allClosers, chunkr) - chunkr, err := blk.Chunks() - if err != nil { - return nil, fmt.Errorf("unable to get chunk reader from block: %w", err) + tombsr, err := blk.Tombstones() + if err != nil { + return nil, errors.Wrap(err, "failed to get tombstone reader from block") + } + closers = append(closers, tombsr) + allClosers = append(allClosers, tombsr) + + // Flatten series refs and add all label columns to schema for the shard + refs := make([]storage.SeriesRef, 0, len(blockSeries)) + for _, series := range blockSeries { + refs = append(refs, series.ref) + series.labels.Range(func(l labels.Label) { + schemaBuilder.AddLabelNameColumn(l.Name) + }) + } + postings := index.NewListPostings(refs) + seriesSet := tsdb.NewBlockChunkSeriesSet(blk.Meta().ULID, indexr, chunkr, tombsr, postings, mint, maxt, false) + seriesSets = append(seriesSets, seriesSet) } - closers = append(closers, chunkr) - tombsr, err := blk.Tombstones() - if err != nil { - return nil, fmt.Errorf("unable to get tombstone reader from block: %w", err) - } - closers = append(closers, tombsr) + mergeSeriesSet := NewMergeChunkSeriesSet( + seriesSets, compareBySortedLabelsFunc(opts.sortedLabels), storage.NewConcatenatingChunkSeriesMerger(), + ) - lblns, err := indexr.LabelNames(ctx) + tsdbSchema, err := schemaBuilder.Build() if err != nil { - return nil, fmt.Errorf("unable to get label names from block: %w", err) + return nil, fmt.Errorf("unable to build schema reader from block: %w", err) } - - postings := sortedPostings(ctx, indexr, mint, maxt, ops.sortedLabels...) - seriesSet := tsdb.NewBlockChunkSeriesSet(blk.Meta().ULID, indexr, chunkr, tombsr, postings, mint, maxt, false) - seriesSets = append(seriesSets, seriesSet) - - b.AddLabelNameColumn(lblns...) - } - - cseriesSet := NewMergeChunkSeriesSet(seriesSets, compareFunc, storage.NewConcatenatingChunkSeriesMerger()) - - s, err := b.Build() - if err != nil { - return nil, fmt.Errorf("unable to build index reader from block: %w", err) - } - - rr := &TsdbRowReader{ - ctx: ctx, - seriesSet: cseriesSet, - closers: closers, - tsdbSchema: s, - concurrency: ops.concurrency, - - rowBuilder: parquet.NewRowBuilder(s.Schema), - encoder: schema.NewPrometheusParquetChunksEncoder(s, ops.maxSamplesPerChunk), + shardTSDBRowReaders[shardIdx] = newTSDBRowReader( + ctx, closers, mergeSeriesSet, tsdbSchema, opts, + ) } ok = true - return rr, nil + return shardTSDBRowReaders, nil } -func (rr *TsdbRowReader) Close() error { - err := &multierror.Error{} - for i := range rr.closers { - err = multierror.Append(err, rr.closers[i].Close()) - } - return err.ErrorOrNil() -} - -func (rr *TsdbRowReader) Schema() *schema.TSDBSchema { - return rr.tsdbSchema -} - -func sortedPostings(ctx context.Context, indexr tsdb.IndexReader, mint, maxt int64, sortedLabels ...string) index.Postings { - p := tsdb.AllSortedPostings(ctx, indexr) - - if len(sortedLabels) == 0 { - return p - } - - type s struct { - ref storage.SeriesRef - idx int - labels labels.Labels - } - series := make([]s, 0, 128) +func shardSeries( + blockIndexReaders []blockIndexReader, + mint, maxt int64, + opts convertOpts, +) (int, []map[int][]blockSeries, error) { chks := make([]chunks.Meta, 0, 128) + allSeries := make([]blockSeries, 0, 128*len(blockIndexReaders)) + // Collect all series from all blocks with chunks in the time range + for _, blockIndexReader := range blockIndexReaders { + i := 0 + scratchBuilder := labels.NewScratchBuilder(10) - scratchBuilder := labels.NewScratchBuilder(10) - lb := labels.NewBuilder(labels.EmptyLabels()) - i := 0 -P: - for p.Next() { - scratchBuilder.Reset() - chks = chks[:0] - if err := indexr.Series(p.At(), &scratchBuilder, &chks); err != nil { - return index.ErrPostings(fmt.Errorf("unable to expand series: %w", err)) - } - hasChunks := slices.ContainsFunc(chks, func(chk chunks.Meta) bool { - return mint <= chk.MaxTime && chk.MinTime <= maxt - }) - if !hasChunks { - continue P - } - - lb.Reset(scratchBuilder.Labels()) + for blockIndexReader.postings.Next() { + scratchBuilder.Reset() + chks = chks[:0] - series = append(series, s{labels: lb.Keep(sortedLabels...).Labels(), ref: p.At(), idx: i}) - i++ - } - if err := p.Err(); err != nil { - return index.ErrPostings(fmt.Errorf("expand postings: %w", err)) - } + if err := blockIndexReader.reader.Series(blockIndexReader.postings.At(), &scratchBuilder, &chks); err != nil { + return 0, nil, errors.Wrap(err, "unable to expand series") + } - slices.SortFunc(series, func(a, b s) int { - for _, lb := range sortedLabels { - if c := strings.Compare(a.labels.Get(lb), b.labels.Get(lb)); c != 0 { - return c + hasChunks := slices.ContainsFunc(chks, func(chk chunks.Meta) bool { + return mint <= chk.MaxTime && chk.MinTime <= maxt + }) + if !hasChunks { + continue } - } - if a.idx < b.idx { - return -1 - } else if a.idx > b.idx { - return 1 - } - return 0 - }) - // Convert back to list. - ep := make([]storage.SeriesRef, 0, len(series)) - for _, p := range series { - ep = append(ep, p.ref) + scratchBuilderLabels := scratchBuilder.Labels() + allSeries = append(allSeries, blockSeries{ + blockIdx: blockIndexReader.idx, + seriesIdx: i, + ref: blockIndexReader.postings.At(), + labels: scratchBuilderLabels, + }) + } } - return index.NewListPostings(ep) -} -func (rr *TsdbRowReader) ReadRows(buf []parquet.Row) (int, error) { - type chkBytesOrError struct { - chkBytes [][]byte - err error - } - type chunkSeriesPromise struct { - s storage.ChunkSeries - c chan chkBytesOrError + if len(allSeries) == 0 { + return 0, nil, nil } - c := make(chan chunkSeriesPromise, rr.concurrency) + slices.SortFunc(allSeries, compareBlockSeriesBySortedLabelsFunc(opts.sortedLabels)) - go func() { - i := 0 - defer close(c) - for i < len(buf) && rr.seriesSet.Next() { - s := rr.seriesSet.At() - it := s.Iterator(nil) - - promise := chunkSeriesPromise{ - s: s, - c: make(chan chkBytesOrError, 1), - } + // Count how many unique series will exist after merging across blocks. + uniqueSeriesCount := 1 + for i := 1; i < len(allSeries); i++ { + if labels.Compare(allSeries[i].labels, allSeries[i-1].labels) != 0 { + uniqueSeriesCount++ + } + } - select { - case c <- promise: - case <-rr.ctx.Done(): - return + // Divide rows evenly across shards to avoid one small shard at the end; + // Use (a + b - 1) / b equivalence to math.Ceil(a / b) + // so integer division does not cut off the remainder series and to avoid floating point issues. + totalShards := (uniqueSeriesCount + (opts.numRowGroups * opts.rowGroupSize) - 1) / (opts.numRowGroups * opts.rowGroupSize) + rowsPerShard := (uniqueSeriesCount + totalShards - 1) / totalShards + + // For each shard index i, shardSeries[i] is a map of blockIdx -> []series. + shardSeries := make([]map[int][]blockSeries, totalShards) + for i := range shardSeries { + shardSeries[i] = make(map[int][]blockSeries) + } + + shardIdx, allSeriesIdx := 0, 0 + for shardIdx < totalShards { + seriesToShard := allSeries[allSeriesIdx:] + + i, uniqueCount := 0, 0 + matchLabels := labels.Labels{} + for i < len(seriesToShard) { + current := seriesToShard[i] + if labels.Compare(current.labels, matchLabels) != 0 { + // New unique series + + if uniqueCount >= rowsPerShard { + // Stop before adding current series if it would exceed the unique series count for the shard. + // Do not increment, we will start the next shard with this series. + break + } + + // Unique series limit is not hit yet for the shard; add the series. + shardSeries[shardIdx][current.blockIdx] = append(shardSeries[shardIdx][current.blockIdx], current) + // Increment unique count, update labels to compare against, and move on to next series + uniqueCount++ + matchLabels = current.labels + i++ + } else { + // Same labelset as previous series, add it to the shard but do not increment unique count + shardSeries[shardIdx][current.blockIdx] = append(shardSeries[shardIdx][current.blockIdx], current) + // Move on to next series + i++ } - go func() { - chkBytes, err := rr.encoder.Encode(it) - promise.c <- chkBytesOrError{chkBytes: chkBytes, err: err} - }() - i++ + allSeriesIdx++ } - }() - - i, j := 0, 0 - lblsIdxs := []int{} - colIndex, ok := rr.tsdbSchema.Schema.Lookup(schema.ColIndexesColumn) - if !ok { - return 0, fmt.Errorf("unable to find indexes") - } - seriesHashIndex, ok := rr.tsdbSchema.Schema.Lookup(schema.SeriesHashColumn) - if !ok { - return 0, fmt.Errorf("unable to find series hash column") + shardIdx++ } - for promise := range c { - j++ + return uniqueSeriesCount, shardSeries, nil +} - chkBytesOrErr := <-promise.c - if err := chkBytesOrErr.err; err != nil { - return 0, fmt.Errorf("unable encode chunks: %w", err) +func compareBlockSeriesBySortedLabelsFunc(sortedLabels []string) func(a, b blockSeries) int { + return func(a, b blockSeries) int { + for _, lb := range sortedLabels { + if c := strings.Compare(a.labels.Get(lb), b.labels.Get(lb)); c != 0 { + return c + } } - chkBytes := chkBytesOrErr.chkBytes - - rr.rowBuilder.Reset() - lblsIdxs = lblsIdxs[:0] - seriesLabels := promise.s.Labels() - seriesLabels.Range(func(l labels.Label) { - colName := schema.LabelToColumn(l.Name) - lc, _ := rr.tsdbSchema.Schema.Lookup(colName) - rr.rowBuilder.Add(lc.ColumnIndex, parquet.ValueOf(l.Value)) - lblsIdxs = append(lblsIdxs, lc.ColumnIndex) - }) - - rr.rowBuilder.Add(colIndex.ColumnIndex, parquet.ValueOf(schema.EncodeIntSlice(lblsIdxs))) - - // Compute and store the series hash as a byte slice in big-endian format - seriesHashValue := labels.StableHash(seriesLabels) - seriesHashBytes := make([]byte, 8) - binary.BigEndian.PutUint64(seriesHashBytes, seriesHashValue) - rr.rowBuilder.Add(seriesHashIndex.ColumnIndex, parquet.ValueOf(seriesHashBytes)) - - // skip series that have no chunks in the requested time - if allChunksEmpty(chkBytes) { - continue - } + return labels.Compare(a.labels, b.labels) + } +} - for idx, chk := range chkBytes { - if len(chk) == 0 { - continue +func compareBySortedLabelsFunc(sortedLabels []string) func(a, b labels.Labels) int { + return func(a, b labels.Labels) int { + for _, lb := range sortedLabels { + if c := strings.Compare(a.Get(lb), b.Get(lb)); c != 0 { + return c } - rr.rowBuilder.Add(rr.tsdbSchema.DataColsIndexes[idx], parquet.ValueOf(chk)) } - buf[i] = rr.rowBuilder.AppendRow(buf[i][:0]) - i++ - } - rr.totalRead += int64(i) - if rr.ctx.Err() != nil { - return i, rr.ctx.Err() - } - - if j < len(buf) { - return i, io.EOF + return labels.Compare(a, b) } - - return i, rr.seriesSet.Err() } func allChunksEmpty(chkBytes [][]byte) bool { diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/reader.go b/vendor/github.com/prometheus-community/parquet-common/convert/reader.go new file mode 100644 index 00000000000..3acdf423f9c --- /dev/null +++ b/vendor/github.com/prometheus-community/parquet-common/convert/reader.go @@ -0,0 +1,166 @@ +package convert + +import ( + "context" + "encoding/binary" + "fmt" + "io" + + "github.com/hashicorp/go-multierror" + "github.com/parquet-go/parquet-go" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + + "github.com/prometheus-community/parquet-common/schema" +) + +var _ parquet.RowReader = &TSDBRowReader{} + +type TSDBRowReader struct { + ctx context.Context + + closers []io.Closer + + seriesSet storage.ChunkSeriesSet + + rowBuilder *parquet.RowBuilder + tsdbSchema *schema.TSDBSchema + + encoder *schema.PrometheusParquetChunksEncoder + totalRead int64 + concurrency int +} + +func newTSDBRowReader( + ctx context.Context, + closers []io.Closer, + seriesSet storage.ChunkSeriesSet, + tsdbSchema *schema.TSDBSchema, + opts convertOpts, +) *TSDBRowReader { + return &TSDBRowReader{ + ctx: ctx, + seriesSet: seriesSet, + closers: closers, + tsdbSchema: tsdbSchema, + concurrency: opts.readConcurrency, + + rowBuilder: parquet.NewRowBuilder(tsdbSchema.Schema), + encoder: schema.NewPrometheusParquetChunksEncoder(tsdbSchema, opts.maxSamplesPerChunk), + } +} + +func (rr *TSDBRowReader) Close() error { + err := &multierror.Error{} + for i := range rr.closers { + err = multierror.Append(err, rr.closers[i].Close()) + } + return err.ErrorOrNil() +} + +func (rr *TSDBRowReader) Schema() *schema.TSDBSchema { + return rr.tsdbSchema +} + +func (rr *TSDBRowReader) ReadRows(buf []parquet.Row) (int, error) { + type chkBytesOrError struct { + chkBytes [][]byte + err error + } + type chunkSeriesPromise struct { + s storage.ChunkSeries + c chan chkBytesOrError + } + + c := make(chan chunkSeriesPromise, rr.concurrency) + + go func() { + i := 0 + defer close(c) + for i < len(buf) && rr.seriesSet.Next() { + s := rr.seriesSet.At() + it := s.Iterator(nil) + + promise := chunkSeriesPromise{ + s: s, + c: make(chan chkBytesOrError, 1), + } + + select { + case c <- promise: + case <-rr.ctx.Done(): + return + } + go func() { + chkBytes, err := rr.encoder.Encode(it) + promise.c <- chkBytesOrError{chkBytes: chkBytes, err: err} + }() + i++ + } + }() + + i, j := 0, 0 + lblsIdxs := []int{} + colIndex, ok := rr.tsdbSchema.Schema.Lookup(schema.ColIndexesColumn) + if !ok { + return 0, fmt.Errorf("unable to find indexes") + } + seriesHashIndex, ok := rr.tsdbSchema.Schema.Lookup(schema.SeriesHashColumn) + if !ok { + return 0, fmt.Errorf("unable to find series hash column") + } + + for promise := range c { + j++ + + chkBytesOrErr := <-promise.c + if err := chkBytesOrErr.err; err != nil { + return 0, fmt.Errorf("unable encode chunks: %w", err) + } + chkBytes := chkBytesOrErr.chkBytes + + rr.rowBuilder.Reset() + lblsIdxs = lblsIdxs[:0] + + seriesLabels := promise.s.Labels() + seriesLabels.Range(func(l labels.Label) { + colName := schema.LabelToColumn(l.Name) + lc, _ := rr.tsdbSchema.Schema.Lookup(colName) + rr.rowBuilder.Add(lc.ColumnIndex, parquet.ValueOf(l.Value)) + lblsIdxs = append(lblsIdxs, lc.ColumnIndex) + }) + + rr.rowBuilder.Add(colIndex.ColumnIndex, parquet.ValueOf(schema.EncodeIntSlice(lblsIdxs))) + + // Compute and store the series hash as a byte slice in big-endian format + seriesHashValue := labels.StableHash(seriesLabels) + seriesHashBytes := make([]byte, 8) + binary.BigEndian.PutUint64(seriesHashBytes, seriesHashValue) + rr.rowBuilder.Add(seriesHashIndex.ColumnIndex, parquet.ValueOf(seriesHashBytes)) + + // skip series that have no chunks in the requested time + if allChunksEmpty(chkBytes) { + continue + } + + for idx, chk := range chkBytes { + if len(chk) == 0 { + continue + } + rr.rowBuilder.Add(rr.tsdbSchema.DataColsIndexes[idx], parquet.ValueOf(chk)) + } + buf[i] = rr.rowBuilder.AppendRow(buf[i][:0]) + i++ + } + rr.totalRead += int64(i) + + if rr.ctx.Err() != nil { + return i, rr.ctx.Err() + } + + if j < len(buf) { + return i, io.EOF + } + + return i, rr.seriesSet.Err() +} diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/writer.go b/vendor/github.com/prometheus-community/parquet-common/convert/writer.go index 63b4b304fee..4b1fd8565d9 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/writer.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/writer.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "io" + "log/slog" "os" "path/filepath" @@ -31,104 +32,68 @@ import ( "github.com/prometheus-community/parquet-common/util" ) -type ShardedWriter struct { - name string - - rowGroupSize int - numRowGroups int - - currentShard int +type PreShardedWriter struct { + shard int rr parquet.RowReader - s *schema.TSDBSchema + schema *schema.TSDBSchema outSchemaProjections []*schema.TSDBProjection pipeReaderWriter PipeReaderWriter opts *convertOpts -} -func NewShardedWrite( - rr parquet.RowReader, - s *schema.TSDBSchema, - outSchemaProjections []*schema.TSDBProjection, - pipeReaderWriter PipeReaderWriter, - opts *convertOpts, -) *ShardedWriter { - return &ShardedWriter{ - name: opts.name, - rowGroupSize: opts.rowGroupSize, - numRowGroups: opts.numRowGroups, - currentShard: 0, - rr: rr, - outSchemaProjections: outSchemaProjections, - s: s, - pipeReaderWriter: pipeReaderWriter, - opts: opts, - } + logger *slog.Logger } -func (c *ShardedWriter) Write(ctx context.Context) error { - if err := c.convertShards(ctx); err != nil { - return fmt.Errorf("unable to convert shards: %w", err) - } - return nil -} - -func (c *ShardedWriter) convertShards(ctx context.Context) error { - for { - if ok, err := c.convertShard(ctx); err != nil { - return fmt.Errorf("unable to convert shard: %w", err) - } else if !ok { - break - } +func (c *PreShardedWriter) Write(ctx context.Context) error { + c.logger.Info("starting conversion for shard", "shard", c.shard) + if err := c.convertShard(ctx); err != nil { + return errors.Wrap(err, "failed to convert shard") } + c.logger.Info("finished conversion for shard", "shard", c.shard) return nil } -func (c *ShardedWriter) convertShard(ctx context.Context) (bool, error) { - rowsToWrite := c.numRowGroups * c.rowGroupSize - - n, err := c.writeFile(ctx, c.s, rowsToWrite) - if err != nil { - return false, err - } - - c.currentShard++ - - if n < int64(rowsToWrite) { - return false, nil - } - - return true, nil +func (c *PreShardedWriter) convertShard(ctx context.Context) error { + outSchemas := outSchemasForShard(c.opts.name, c.shard, c.outSchemaProjections) + _, err := writeFile(ctx, c.schema, outSchemas, c.rr, c.pipeReaderWriter, c.opts) + return err } -func (c *ShardedWriter) writeFile(ctx context.Context, schema *schema.TSDBSchema, rowsToWrite int) (int64, error) { +func writeFile( + ctx context.Context, + inSchema *schema.TSDBSchema, + outSchemas map[string]*schema.TSDBProjection, + rr parquet.RowReader, + pipeReaderWriter PipeReaderWriter, + opts *convertOpts, +) (int64, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() fileOpts := []parquet.WriterOption{ parquet.SortingWriterConfig( - parquet.SortingColumns(c.opts.buildSortingColumns()...), + parquet.SortingColumns(opts.buildSortingColumns()...), ), - parquet.MaxRowsPerRowGroup(int64(c.rowGroupSize)), - parquet.BloomFilters(c.opts.buildBloomfilterColumns()...), - parquet.PageBufferSize(c.opts.pageBufferSize), - parquet.WriteBufferSize(c.opts.writeBufferSize), - parquet.ColumnPageBuffers(c.opts.columnPageBuffers), + parquet.MaxRowsPerRowGroup(int64(opts.rowGroupSize)), + parquet.BloomFilters(opts.buildBloomfilterColumns()...), + parquet.PageBufferSize(opts.pageBufferSize), + parquet.WriteBufferSize(opts.writeBufferSize), + parquet.ColumnPageBuffers(opts.columnPageBuffers), } - for k, v := range schema.Metadata { + for k, v := range inSchema.Metadata { fileOpts = append(fileOpts, parquet.KeyValueMetadata(k, v)) } writer, err := newSplitFileWriter( - ctx, schema.Schema, c.outSchemasForCurrentShard(), c.pipeReaderWriter, fileOpts..., + ctx, inSchema.Schema, outSchemas, pipeReaderWriter, fileOpts..., ) if err != nil { return 0, fmt.Errorf("unable to create row writer: %w", err) } - n, err := parquet.CopyRows(writer, newBufferedReader(ctx, newLimitReader(c.rr, rowsToWrite))) + n, err := parquet.CopyRows(writer, newBufferedReader(ctx, newLimitReader(rr, opts.numRowGroups*opts.rowGroupSize))) if err != nil { return 0, fmt.Errorf("unable to copy rows: %w", err) } @@ -141,11 +106,11 @@ func (c *ShardedWriter) writeFile(ctx context.Context, schema *schema.TSDBSchema return n, nil } -func (c *ShardedWriter) outSchemasForCurrentShard() map[string]*schema.TSDBProjection { - outSchemas := make(map[string]*schema.TSDBProjection, len(c.outSchemaProjections)) +func outSchemasForShard(name string, shard int, outSchemaProjections []*schema.TSDBProjection) map[string]*schema.TSDBProjection { + outSchemas := make(map[string]*schema.TSDBProjection, len(outSchemaProjections)) - for _, projection := range c.outSchemaProjections { - outSchemas[projection.FilenameFunc(c.name, c.currentShard)] = projection + for _, projection := range outSchemaProjections { + outSchemas[projection.FilenameFunc(name, shard)] = projection } return outSchemas } diff --git a/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go b/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go index 3f0cf8ec02b..0aa97c9f952 100644 --- a/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go +++ b/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go @@ -98,6 +98,10 @@ func FromLabelsFile(lf parquet.FileView) (*TSDBSchema, error) { func (b *Builder) AddLabelNameColumn(lbls ...string) { for _, lbl := range lbls { + col := LabelToColumn(lbl) + if _, ok := b.g[col]; ok { + continue + } b.g[LabelToColumn(lbl)] = parquet.Optional(parquet.Encoded(parquet.String(), &parquet.RLEDictionary)) } } diff --git a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go index 29b23317e4a..cd5f1e29617 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go @@ -19,9 +19,11 @@ import ( "fmt" "slices" "sort" + "sync" "github.com/parquet-go/parquet-go" "github.com/prometheus/prometheus/model/labels" + "golang.org/x/sync/errgroup" "github.com/prometheus-community/parquet-common/schema" "github.com/prometheus-community/parquet-common/storage" @@ -31,12 +33,19 @@ import ( type Constraint interface { fmt.Stringer - // filter returns a set of non-overlapping increasing row indexes that may satisfy the constraint. - filter(ctx context.Context, rgIdx int, primary bool, rr []RowRange) ([]RowRange, error) // init initializes the constraint with respect to the file schema and projections. init(f storage.ParquetFileView) error + // path is the path for the column that is constrained path() string + + // prefilter returns a set of non-overlapping increasing row indexes that may satisfy the constraint. + // This MUST be a superset of the real set of matching rows. + prefilter(rgIdx int, rr []RowRange) ([]RowRange, error) + + // filter returns a set of non-overlapping increasing row indexes that do satisfy the constraint. + // This MUST be the precise set of matching rows. + filter(ctx context.Context, rgIdx int, primary bool, rr []RowRange) ([]RowRange, error) } // MatchersToConstraints converts Prometheus label matchers into parquet search constraints. @@ -154,23 +163,55 @@ func sortConstraintsBySortingColumns(cs []Constraint, sc []parquet.SortingColumn // // Returns a slice of RowRange that represent the rows satisfying all constraints, // or an error if any constraint fails to filter. -func Filter(ctx context.Context, s storage.ParquetShard, rgIdx int, cs ...Constraint) ([]RowRange, error) { - rg := s.LabelsFile().RowGroups()[rgIdx] +func Filter(ctx context.Context, f storage.ParquetShard, rgIdx int, cs ...Constraint) ([]RowRange, error) { + rg := f.LabelsFile().RowGroups()[rgIdx] + // Constraints for sorting columns are cheaper to evaluate, so we sort them first. sc := rg.SortingColumns() sortConstraintsBySortingColumns(cs, sc) - var err error + var ( + err error + mu sync.Mutex + g errgroup.Group + ) + + // First pass prefilter with a quick index scan to find a superset of matching rows rr := []RowRange{{From: int64(0), Count: rg.NumRows()}} for i := range cs { - isPrimary := len(sc) > 0 && cs[i].path() == sc[0].Path()[0] - rr, err = cs[i].filter(ctx, rgIdx, isPrimary, rr) + rr, err = cs[i].prefilter(rgIdx, rr) if err != nil { - return nil, fmt.Errorf("unable to filter with constraint %d: %w", i, err) + return nil, fmt.Errorf("unable to prefilter with constraint %d: %w", i, err) } } - return rr, nil + res := slices.Clone(rr) + + if len(res) == 0 { + return nil, nil + } + + // Second pass page filter find the real set of matching rows, done concurrently because it involves IO + for i := range cs { + g.Go(func() error { + isPrimary := len(sc) > 0 && cs[i].path() == sc[0].Path()[0] + + srr, err := cs[i].filter(ctx, rgIdx, isPrimary, rr) + if err != nil { + return fmt.Errorf("unable to filter with constraint %d: %w", i, err) + } + mu.Lock() + res = intersectRowRanges(res, srr) + mu.Unlock() + + return nil + }) + } + if err = g.Wait(); err != nil { + return nil, fmt.Errorf("unable to do second pass filter: %w", err) + } + + return res, nil } type PageToRead struct { @@ -276,7 +317,7 @@ func (s *SymbolTable) ResetWithRange(pg parquet.Page, l, r int) { } sidx := 0 - for i := 0; i < l; i++ { + for i := range l { if s.defs[i] == 1 { sidx++ } @@ -290,6 +331,10 @@ func (s *SymbolTable) ResetWithRange(pg parquet.Page, l, r int) { s.dict = dict } +func Equal(path string, value parquet.Value) Constraint { + return &equalConstraint{pth: path, val: value} +} + type equalConstraint struct { pth string @@ -303,11 +348,7 @@ func (ec *equalConstraint) String() string { return fmt.Sprintf("equal(%q,%q)", ec.pth, ec.val) } -func Equal(path string, value parquet.Value) Constraint { - return &equalConstraint{pth: path, val: value} -} - -func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, rr []RowRange) ([]RowRange, error) { +func (ec *equalConstraint) prefilter(rgIdx int, rr []RowRange) ([]RowRange, error) { if len(rr) == 0 { return nil, nil } @@ -320,7 +361,7 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, // If match empty, return rr (filter nothing) // otherwise return empty if ec.matches(parquet.ValueOf("")) { - return rr, nil + return slices.Clone(rr), nil } return []RowRange{}, nil } @@ -341,10 +382,90 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, return nil, fmt.Errorf("unable to read column index: %w", err) } res := make([]RowRange, 0) + for i := range cidx.NumPages() { + // If page does not intersect from, to; we can immediately discard it + pfrom := oidx.FirstRowIndex(i) + pcount := rg.NumRows() - pfrom + if i < oidx.NumPages()-1 { + pcount = oidx.FirstRowIndex(i+1) - pfrom + } + pto := pfrom + pcount + if pfrom > to { + break + } + if pto < from { + continue + } + // Page intersects [from, to] but we might be able to discard it with statistics + if cidx.NullPage(i) { + if ec.matches(parquet.ValueOf("")) { + res = append(res, RowRange{pfrom, pcount}) + } + continue + } - readPgs := make([]PageToRead, 0, 10) + // If we are not matching the empty string ( which would be satisfied by Null too ), we can + // use page statistics to skip rows + minv, maxv := cidx.MinValue(i), cidx.MaxValue(i) + if !ec.matches(parquet.ValueOf("")) && !maxv.IsNull() && ec.comp(ec.val, maxv) > 0 { + if cidx.IsDescending() { + break + } + continue + } + if !ec.matches(parquet.ValueOf("")) && !minv.IsNull() && ec.comp(ec.val, minv) < 0 { + if cidx.IsAscending() { + break + } + continue + } + // We cannot discard the page through statistics but we might need to read it to see if it has the value + res = append(res, RowRange{From: pfrom, Count: pto - pfrom}) + } + if len(res) == 0 { + return nil, nil + } + return intersectRowRanges(simplify(res), rr), nil +} + +func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, rr []RowRange) ([]RowRange, error) { + if len(rr) == 0 { + return nil, nil + } + from, to := rr[0].From, rr[len(rr)-1].From+rr[len(rr)-1].Count + + rg := ec.f.RowGroups()[rgIdx] + + col, ok := rg.Schema().Lookup(ec.path()) + if !ok { + // If match empty, return rr (filter nothing) + // otherwise return empty + if ec.matches(parquet.ValueOf("")) { + return slices.Clone(rr), nil + } + return []RowRange{}, nil + } - for i := 0; i < cidx.NumPages(); i++ { + cc := rg.ColumnChunks()[col.ColumnIndex] + if skip, err := ec.skipByBloomfilter(cc); err != nil { + return nil, fmt.Errorf("unable to skip by bloomfilter: %w", err) + } else if skip { + return nil, nil + } + + oidx, err := cc.OffsetIndex() + if err != nil { + return nil, fmt.Errorf("unable to read offset index: %w", err) + } + cidx, err := cc.ColumnIndex() + if err != nil { + return nil, fmt.Errorf("unable to read column index: %w", err) + } + var ( + res = make([]RowRange, 0) + readPgs = make([]PageToRead, 0, 10) + ) + for i := range cidx.NumPages() { poff, pcsz := oidx.Offset(i), oidx.CompressedPageSize(i) // If page does not intersect from, to; we can immediately discard it @@ -367,6 +488,7 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, } continue } + // If we are not matching the empty string ( which would be satisfied by Null too ), we can // use page statistics to skip rows minv, maxv := cidx.MinValue(i), cidx.MaxValue(i) @@ -386,7 +508,6 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, readPgs = append(readPgs, NewPageToRead(i, pfrom, pto, poff, pcsz)) } - // Did not find any pages if len(readPgs) == 0 { return intersectRowRanges(simplify(res), rr), nil } @@ -406,7 +527,6 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, if err != nil { return nil, err } - defer func() { _ = pgs.Close() }() symbols := new(SymbolTable) @@ -506,6 +626,7 @@ func (ec *equalConstraint) skipByBloomfilter(cc parquet.ColumnChunk) (bool, erro return !ok, nil } +// r MUST be a matcher of type Regex func Regex(path string, r *labels.Matcher) (Constraint, error) { if r.Type != labels.MatchRegexp { return nil, fmt.Errorf("unsupported matcher type: %s", r.Type) @@ -537,7 +658,7 @@ func (rc *regexConstraint) String() string { return fmt.Sprintf("regex(%v,%v)", rc.pth, rc.r.GetRegexString()) } -func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, rr []RowRange) ([]RowRange, error) { +func (rc *regexConstraint) prefilter(rgIdx int, rr []RowRange) ([]RowRange, error) { if len(rr) == 0 { return nil, nil } @@ -550,11 +671,11 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, // If match empty, return rr (filter nothing) // otherwise return empty if rc.matchesEmpty { - return rr, nil + return slices.Clone(rr), nil } return []RowRange{}, nil } - cc := rg.ColumnChunks()[col.ColumnIndex] + cc := rg.ColumnChunks()[col.ColumnIndex].(*parquet.FileColumnChunk) oidx, err := cc.OffsetIndex() if err != nil { @@ -565,12 +686,85 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, return nil, fmt.Errorf("unable to read column index: %w", err) } res := make([]RowRange, 0) + for i := range cidx.NumPages() { + // If page does not intersect from, to; we can immediately discard it + pfrom := oidx.FirstRowIndex(i) + pcount := rg.NumRows() - pfrom + if i < oidx.NumPages()-1 { + pcount = oidx.FirstRowIndex(i+1) - pfrom + } + pto := pfrom + pcount + if pfrom > to { + break + } + if pto < from { + continue + } + // Page intersects [from, to] but we might be able to discard it with statistics + if cidx.NullPage(i) { + if rc.matchesEmpty { + res = append(res, RowRange{pfrom, pcount}) + } + continue + } + // If we have a special regular expression that works with statistics, we can use them to skip. + // This works for i.e.: 'pod_name=~"thanos-.*"' or 'status_code=~"403|404"' + minv, maxv := cidx.MinValue(i), cidx.MaxValue(i) + if !rc.minv.IsNull() && !rc.maxv.IsNull() { + if !rc.matchesEmpty && !maxv.IsNull() && rc.comp(rc.minv, maxv) > 0 { + if cidx.IsDescending() { + break + } + continue + } + if !rc.matchesEmpty && !minv.IsNull() && rc.comp(rc.maxv, minv) < 0 { + if cidx.IsAscending() { + break + } + continue + } + } + res = append(res, RowRange{From: pfrom, Count: pto - pfrom}) + } + if len(res) == 0 { + return nil, nil + } + return intersectRowRanges(simplify(res), rr), nil +} - readPgs := make([]PageToRead, 0, 10) +func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, isPrimary bool, rr []RowRange) ([]RowRange, error) { + if len(rr) == 0 { + return nil, nil + } + from, to := rr[0].From, rr[len(rr)-1].From+rr[len(rr)-1].Count - for i := 0; i < cidx.NumPages(); i++ { - poff, pcsz := uint64(oidx.Offset(i)), oidx.CompressedPageSize(i) + rg := rc.f.RowGroups()[rgIdx] + col, ok := rg.Schema().Lookup(rc.path()) + if !ok { + // If match empty, return rr (filter nothing) + // otherwise return empty + if rc.matchesEmpty { + return slices.Clone(rr), nil + } + return []RowRange{}, nil + } + cc := rg.ColumnChunks()[col.ColumnIndex].(*parquet.FileColumnChunk) + + oidx, err := cc.OffsetIndex() + if err != nil { + return nil, fmt.Errorf("unable to read offset index: %w", err) + } + cidx, err := cc.ColumnIndex() + if err != nil { + return nil, fmt.Errorf("unable to read column index: %w", err) + } + var ( + res = make([]RowRange, 0) + readPgs = make([]PageToRead, 0, 10) + ) + for i := range cidx.NumPages() { + poff, pcsz := oidx.Offset(i), oidx.CompressedPageSize(i) // If page does not intersect from, to; we can immediately discard it pfrom := oidx.FirstRowIndex(i) pcount := rg.NumRows() - pfrom @@ -608,20 +802,16 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, continue } } - - // We cannot discard the page through statistics but we might need to read it to see if it has the value - readPgs = append(readPgs, PageToRead{pfrom: pfrom, pto: pto, idx: i, off: int64(poff), csz: pcsz}) + readPgs = append(readPgs, NewPageToRead(i, pfrom, pto, poff, pcsz)) } - - // Did not find any pages if len(readPgs) == 0 { return intersectRowRanges(simplify(res), rr), nil } dictOff, dictSz := rc.f.DictionaryPageBounds(rgIdx, col.ColumnIndex) - minOffset := uint64(readPgs[0].off) - maxOffset := readPgs[len(readPgs)-1].off + readPgs[len(readPgs)-1].csz + minOffset := uint64(readPgs[0].Offset()) + maxOffset := readPgs[len(readPgs)-1].Offset() + readPgs[len(readPgs)-1].CompressedSize() // If the gap between the first page and the dic page is less than PagePartitioningMaxGapSize, // we include the dic to be read in the single read @@ -633,7 +823,6 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, if err != nil { return nil, err } - defer func() { _ = pgs.Close() }() symbols := new(SymbolTable) @@ -705,8 +894,9 @@ func (rc *regexConstraint) init(f storage.ParquetFileView) error { rc.maxv = slices.MaxFunc(sm, rc.comp) } else if len(rc.r.Prefix()) > 0 { rc.minv = parquet.ValueOf(rc.r.Prefix()) - // 16 is the default prefix length, maybe we should read the actual value from somewhere? - rc.maxv = parquet.ValueOf(append([]byte(rc.r.Prefix()), bytes.Repeat([]byte{0xff}, 16)...)) + // 16 is the default prefix length and the schema builder uses it by default, if we ever change it this would + // break, and we'd need to read the length from the schema or from the metadata. + rc.maxv = parquet.ValueOf(append([]byte(rc.r.Prefix()), bytes.Repeat([]byte{0xff}, parquet.DefaultColumnIndexSizeLimit)...)) } return nil @@ -744,6 +934,12 @@ func (nc *notConstraint) String() string { return fmt.Sprintf("not(%v)", nc.c.String()) } +func (nc *notConstraint) prefilter(_ int, rr []RowRange) ([]RowRange, error) { + // NOT constraints cannot be prefiltered since the child constraint returns a superset of the matching row range, + // if we were to complement this row range the result here would be a subset and this would violate our interface. + return slices.Clone(rr), nil +} + func (nc *notConstraint) filter(ctx context.Context, rgIdx int, primary bool, rr []RowRange) ([]RowRange, error) { base, err := nc.c.filter(ctx, rgIdx, primary, rr) if err != nil { diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index 4d4bbf2eab3..b8baa92d179 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -146,10 +146,6 @@ func (m *Materializer) Materialize(ctx context.Context, hints *prom_storage.Sele attribute.Int("row_ranges_count", len(rr)), ) - if err := m.checkRowCountQuota(rr); err != nil { - span.SetAttributes(attribute.String("quota_failure", "row_count")) - return nil, err - } sLbls, err := m.MaterializeAllLabels(ctx, rgi, rr) if err != nil { return nil, errors.Wrapf(err, "error materializing labels") @@ -393,6 +389,11 @@ func (m *Materializer) MaterializeAllLabels(ctx context.Context, rgi int, rr []R attribute.Int64("total_rows_requested", totalRowsRequested), ) + if err := m.checkRowCountQuota(rr); err != nil { + span.SetAttributes(attribute.String("quota_failure", "row_count")) + return nil, err + } + // Get column indexes for all rows in the specified ranges columnIndexes, err := m.getColumnIndexes(ctx, rgi, rr) if err != nil { @@ -535,6 +536,8 @@ func totalRows(rr []RowRange) int64 { return res } +// MaterializeChunks returns an iterator that materializes chunks for the given row ranges lazily. +// It does not consume row count quotas, as opposed to the materialization of labels. func (m *Materializer) MaterializeChunks(ctx context.Context, rgi int, mint, maxt int64, rr []RowRange) (ChunksIteratorIterator, error) { var err error ctx, span := tracer.Start(ctx, "Materializer.MaterializeChunks") diff --git a/vendor/github.com/prometheus-community/parquet-common/search/rowrange.go b/vendor/github.com/prometheus-community/parquet-common/search/rowrange.go index 9112a711b75..1531ecc29b1 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/rowrange.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/rowrange.go @@ -14,6 +14,7 @@ package search import ( + "slices" "sort" ) @@ -81,6 +82,9 @@ func intersectRowRanges(lhs, rhs []RowRange) []RowRange { func complementRowRanges(lhs, rhs []RowRange) []RowRange { res := make([]RowRange, 0) + // rhs is modified in place, to make it concurrency safe we need to clone it + rhs = slices.Clone(rhs) + l, r := 0, 0 for l < len(lhs) && r < len(rhs) { al, bl := lhs[l].From, lhs[l].From+lhs[l].Count @@ -132,6 +136,9 @@ func simplify(rr []RowRange) []RowRange { return nil } + // rr is modified in place, to make it concurrency safe we need to clone it + rr = slices.Clone(rr) + sort.Slice(rr, func(i, j int) bool { return rr[i].From < rr[j].From }) diff --git a/vendor/github.com/prometheus-community/parquet-common/storage/read_at.go b/vendor/github.com/prometheus-community/parquet-common/storage/read_at.go index 4738f607349..341e235c058 100644 --- a/vendor/github.com/prometheus-community/parquet-common/storage/read_at.go +++ b/vendor/github.com/prometheus-community/parquet-common/storage/read_at.go @@ -102,7 +102,7 @@ func (b *bReadAt) ReadAt(p []byte, off int64) (n int, err error) { if err == io.EOF { err = nil } - return + return n, err } type optimisticReaderAt struct { diff --git a/vendor/modules.txt b/vendor/modules.txt index 8cc82a18c7d..41074bf8fc3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -967,8 +967,8 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250827225610-65f0b68d35e6 -## explicit; go 1.23.4 +# github.com/prometheus-community/parquet-common v0.0.0-20251023184424-4f977ece2a46 +## explicit; go 1.24.0 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable github.com/prometheus-community/parquet-common/schema @@ -1071,7 +1071,7 @@ github.com/prometheus/common/version # github.com/prometheus/exporter-toolkit v0.14.0 ## explicit; go 1.22 github.com/prometheus/exporter-toolkit/web -# github.com/prometheus/otlptranslator v0.0.0-20250620074007-94f535e0c588 +# github.com/prometheus/otlptranslator v0.0.2 => github.com/prometheus/otlptranslator v0.0.0-20250620074007-94f535e0c588 ## explicit; go 1.23.0 github.com/prometheus/otlptranslator # github.com/prometheus/procfs v0.16.1 @@ -2009,3 +2009,4 @@ sigs.k8s.io/yaml/goyaml.v3 # github.com/google/gnostic => github.com/googleapis/gnostic v0.6.9 # gopkg.in/alecthomas/kingpin.v2 => github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497 # google.golang.org/grpc => google.golang.org/grpc v1.71.2 +# github.com/prometheus/otlptranslator => github.com/prometheus/otlptranslator v0.0.0-20250620074007-94f535e0c588 From 68f12fe232eda01d2844344ebc8a0cc6c6ed206c Mon Sep 17 00:00:00 2001 From: yeya24 Date: Wed, 29 Oct 2025 15:25:32 -0700 Subject: [PATCH 2/6] fix lint Signed-off-by: yeya24 --- pkg/querier/parquet_queryable_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index e842a69dda8..0d21e0938c6 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -3,6 +3,7 @@ package querier import ( "context" "fmt" + util_log "github.com/cortexproject/cortex/pkg/util/log" "math/rand" "path/filepath" "strconv" @@ -570,6 +571,7 @@ func convertBlockToParquet(t *testing.T, ctx context.Context, userBucketClient o tsdbBlock.MinTime(), tsdbBlock.MaxTime(), []convert.Convertible{tsdbBlock}, + util_log.SLogger, converterOpts..., ) require.NoError(t, err) From 9fa4d7ce5a73511f02a7dd85786aa34710577741 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Wed, 29 Oct 2025 15:41:00 -0700 Subject: [PATCH 3/6] lint Signed-off-by: yeya24 --- pkg/querier/parquet_queryable_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 0d21e0938c6..9b5436955e6 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -3,7 +3,6 @@ package querier import ( "context" "fmt" - util_log "github.com/cortexproject/cortex/pkg/util/log" "math/rand" "path/filepath" "strconv" @@ -40,6 +39,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/limiter" + util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" ) From ff6584279fdd7b2c5817b7679e1c1dddbf314021 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Wed, 29 Oct 2025 16:18:23 -0700 Subject: [PATCH 4/6] fix test Signed-off-by: yeya24 --- pkg/querier/parquet_queryable_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 9b5436955e6..263bb361490 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -544,7 +544,7 @@ func TestParquetQueryable_Limits(t *testing.T) { if testData.expectedErr != nil { require.False(t, set.Next()) err = set.Err() - require.EqualError(t, err, testData.expectedErr.Error()) + require.ErrorContains(t, err, testData.expectedErr.Error()) return } From f5f4ec9008fa0ae2a1c7515cbfb9beb86b8fdcb2 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Wed, 29 Oct 2025 21:19:32 -0700 Subject: [PATCH 5/6] fix unit test Signed-off-by: yeya24 --- pkg/parquetconverter/converter_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index 81caa86c63d..1f23ffa382f 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -247,6 +247,7 @@ func TestConverter_BlockConversionFailure(t *testing.T) { // Create a new converter with test configuration cfg := Config{ + MaxRowsPerRowGroup: 1e6, MetaSyncConcurrency: 1, DataDir: t.TempDir(), } From e5290e5f0f28c7dac070b68316fc0016c4b4df60 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Wed, 29 Oct 2025 22:01:31 -0700 Subject: [PATCH 6/6] fix test Signed-off-by: yeya24 --- pkg/parquetconverter/converter_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index 1f23ffa382f..98020340f53 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -303,6 +303,7 @@ func TestConverter_ShouldNotFailOnAccessDenyError(t *testing.T) { // Create a new converter with test configuration cfg := Config{ + MaxRowsPerRowGroup: 1e6, MetaSyncConcurrency: 1, DataDir: t.TempDir(), }