From 0459249b778fb29094e0978e2d12a252febc2818 Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 8 Jul 2025 11:05:30 -0700 Subject: [PATCH 1/2] Updating parquet common to latest Signed-off-by: alanprot --- go.mod | 2 +- go.sum | 4 +- pkg/querier/parquet_queryable.go | 17 +- .../parquet-common/convert/convert.go | 17 +- .../parquet-common/convert/merge.go | 3 +- .../parquet-common/convert/writer.go | 148 +++++++++---- .../parquet_queryable.go | 60 +++--- .../parquet-common/schema/schema_builder.go | 15 +- .../parquet-common/search/constraint.go | 83 ++++++-- .../parquet-common/search/materialize.go | 67 +++--- .../parquet-common/storage/parquet_shard.go | 199 ++++++++++++------ .../storage/{bucket_read_at.go => read_at.go} | 67 ++++-- .../parquet-common/util/fixtures.go | 77 +++++++ vendor/modules.txt | 3 +- 14 files changed, 540 insertions(+), 222 deletions(-) rename vendor/github.com/prometheus-community/parquet-common/{search => queryable}/parquet_queryable.go (84%) rename vendor/github.com/prometheus-community/parquet-common/storage/{bucket_read_at.go => read_at.go} (56%) create mode 100644 vendor/github.com/prometheus-community/parquet-common/util/fixtures.go diff --git a/go.mod b/go.mod index 4676189520d..c64c150b833 100644 --- a/go.mod +++ b/go.mod @@ -83,7 +83,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 - github.com/prometheus-community/parquet-common v0.0.0-20250610002942-dfd72bae1309 + github.com/prometheus-community/parquet-common v0.0.0-20250708150752-0811a700a852 github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 github.com/tjhop/slog-gokit v0.1.4 diff --git a/go.sum b/go.sum index 257aa56de30..30a810fd7ad 100644 --- a/go.sum +++ b/go.sum @@ -814,8 +814,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20250610002942-dfd72bae1309 h1:xGnXldBSTFPopLYi7ce+kJb+A1h1mPTeF4SLlRTEek0= -github.com/prometheus-community/parquet-common v0.0.0-20250610002942-dfd72bae1309/go.mod h1:MwYpD+FKot7LWBMFaPS6FeM8oqo77u5erRlNkSSFPA0= +github.com/prometheus-community/parquet-common v0.0.0-20250708150752-0811a700a852 h1:GNUP6g2eSqZbzGTdFK9D1RLQLjZxCkuuA/MkgfB/enQ= +github.com/prometheus-community/parquet-common v0.0.0-20250708150752-0811a700a852/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA= diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 7542c148f1f..f302a69512b 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -11,8 +11,8 @@ import ( "github.com/opentracing/opentracing-go" "github.com/parquet-go/parquet-go" "github.com/pkg/errors" + "github.com/prometheus-community/parquet-common/queryable" "github.com/prometheus-community/parquet-common/schema" - "github.com/prometheus-community/parquet-common/search" parquet_storage "github.com/prometheus-community/parquet-common/storage" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -125,14 +125,14 @@ func NewParquetQueryable( return nil, err } - cache, err := newCache[*parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, newCacheMetrics(reg)) + cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, newCacheMetrics(reg)) if err != nil { return nil, err } cDecoder := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()) - parquetQueryable, err := search.NewParquetQueryable(cDecoder, func(ctx context.Context, mint, maxt int64) ([]*parquet_storage.ParquetShard, error) { + parquetQueryable, err := queryable.NewParquetQueryable(cDecoder, func(ctx context.Context, mint, maxt int64) ([]parquet_storage.ParquetShard, error) { userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -144,7 +144,7 @@ func NewParquetQueryable( } userBkt := bucket.NewUserBucketClient(userID, bucketClient, limits) - shards := make([]*parquet_storage.ParquetShard, len(blocks)) + shards := make([]parquet_storage.ParquetShard, len(blocks)) errGroup := &errgroup.Group{} span, ctx := opentracing.StartSpanFromContext(ctx, "parquetQuerierWithFallback.OpenShards") @@ -155,18 +155,21 @@ func NewParquetQueryable( cacheKey := fmt.Sprintf("%v-%v", userID, block.ID) shard := cache.Get(cacheKey) if shard == nil { + bucketOpener := parquet_storage.NewParquetBucketOpener(userBkt) // we always only have 1 shard - shard 0 // Use context.Background() here as the file can be cached and live after the request ends. - shard, err = parquet_storage.OpenParquetShard(context.WithoutCancel(ctx), - userBkt, + shard, err := parquet_storage.NewParquetShardOpener( + context.WithoutCancel(ctx), block.ID.String(), + bucketOpener, + bucketOpener, 0, parquet_storage.WithFileOptions( parquet.SkipMagicBytes(true), parquet.ReadBufferSize(100*1024), parquet.SkipBloomFilters(true), + parquet.OptimisticRead(true), ), - parquet_storage.WithOptimisticReader(true), ) if err != nil { return errors.Wrapf(err, "failed to open parquet shard. block: %v", block.ID.String()) diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go index aa6d49d9e61..227e1e30741 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go @@ -169,9 +169,22 @@ func ConvertTSDBBlock( if err != nil { return 0, err } - defer func() { _ = rr.Close() }() - w := NewShardedWrite(rr, rr.Schema(), bkt, &cfg) + + labelsProjection, err := rr.Schema().LabelsProjection() + if err != nil { + return 0, errors.Wrap(err, "error getting labels projection from tsdb schema") + } + chunksProjection, err := rr.Schema().ChunksProjection() + if err != nil { + return 0, errors.Wrap(err, "error getting chunks projection from tsdb schema") + } + outSchemaProjections := []*schema.TSDBProjection{ + labelsProjection, chunksProjection, + } + + pipeReaderWriter := NewPipeReaderBucketWriter(bkt) + w := NewShardedWrite(rr, rr.Schema(), outSchemaProjections, pipeReaderWriter, &cfg) return w.currentShard, errors.Wrap(w.Write(ctx), "error writing block") } diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/merge.go b/vendor/github.com/prometheus-community/parquet-common/convert/merge.go index 90db2ec54eb..84fcb692e31 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/merge.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/merge.go @@ -16,10 +16,9 @@ package convert import ( "container/heap" - "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" ) diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/writer.go b/vendor/github.com/prometheus-community/parquet-common/convert/writer.go index af60bdb182a..ddc0ab41758 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/writer.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/writer.go @@ -17,6 +17,8 @@ import ( "context" "fmt" "io" + "os" + "path/filepath" "github.com/hashicorp/go-multierror" "github.com/parquet-go/parquet-go" @@ -37,23 +39,31 @@ type ShardedWriter struct { currentShard int - rr parquet.RowReader - s *schema.TSDBSchema - bkt objstore.Bucket + rr parquet.RowReader + s *schema.TSDBSchema + outSchemaProjections []*schema.TSDBProjection + pipeReaderWriter PipeReaderWriter - ops *convertOpts + opts *convertOpts } -func NewShardedWrite(rr parquet.RowReader, s *schema.TSDBSchema, bkt objstore.Bucket, ops *convertOpts) *ShardedWriter { +func NewShardedWrite( + rr parquet.RowReader, + s *schema.TSDBSchema, + outSchemaProjections []*schema.TSDBProjection, + pipeReaderWriter PipeReaderWriter, + opts *convertOpts, +) *ShardedWriter { return &ShardedWriter{ - name: ops.name, - rowGroupSize: ops.rowGroupSize, - numRowGroups: ops.numRowGroups, - currentShard: 0, - rr: rr, - s: s, - bkt: bkt, - ops: ops, + name: opts.name, + rowGroupSize: opts.rowGroupSize, + numRowGroups: opts.numRowGroups, + currentShard: 0, + rr: rr, + outSchemaProjections: outSchemaProjections, + s: s, + pipeReaderWriter: pipeReaderWriter, + opts: opts, } } @@ -98,26 +108,21 @@ func (c *ShardedWriter) writeFile(ctx context.Context, schema *schema.TSDBSchema fileOpts := []parquet.WriterOption{ parquet.SortingWriterConfig( - parquet.SortingColumns(c.ops.buildSortingColumns()...), + parquet.SortingColumns(c.opts.buildSortingColumns()...), ), parquet.MaxRowsPerRowGroup(int64(c.rowGroupSize)), - parquet.BloomFilters(c.ops.buildBloomfilterColumns()...), - parquet.PageBufferSize(c.ops.pageBufferSize), - parquet.WriteBufferSize(c.ops.writeBufferSize), - parquet.ColumnPageBuffers(c.ops.columnPageBuffers), + parquet.BloomFilters(c.opts.buildBloomfilterColumns()...), + parquet.PageBufferSize(c.opts.pageBufferSize), + parquet.WriteBufferSize(c.opts.writeBufferSize), + parquet.ColumnPageBuffers(c.opts.columnPageBuffers), } for k, v := range schema.Metadata { fileOpts = append(fileOpts, parquet.KeyValueMetadata(k, v)) } - transformations, err := c.transformations() - if err != nil { - return 0, err - } - - writer, err := newSplitFileWriter(ctx, c.bkt, schema.Schema, transformations, - fileOpts..., + writer, err := newSplitFileWriter( + ctx, schema.Schema, c.outSchemasForCurrentShard(), c.pipeReaderWriter, fileOpts..., ) if err != nil { return 0, fmt.Errorf("unable to create row writer: %s", err) @@ -136,21 +141,66 @@ func (c *ShardedWriter) writeFile(ctx context.Context, schema *schema.TSDBSchema return n, nil } -func (c *ShardedWriter) transformations() (map[string]*schema.TSDBProjection, error) { - lblsProjection, err := c.s.LabelsProjection() +func (c *ShardedWriter) outSchemasForCurrentShard() map[string]*schema.TSDBProjection { + outSchemas := make(map[string]*schema.TSDBProjection, len(c.outSchemaProjections)) + + for _, projection := range c.outSchemaProjections { + outSchemas[projection.FilenameFunc(c.name, c.currentShard)] = projection + } + return outSchemas +} + +// PipeReaderWriter is used to write serialized data from an io.Reader to the final output destination. +type PipeReaderWriter interface { + // Write writes data to the output path and returns any error encountered during the write. + Write(ctx context.Context, r io.Reader, outPath string) error +} + +type PipeReaderBucketWriter struct { + bkt objstore.Bucket +} + +func NewPipeReaderBucketWriter(bkt objstore.Bucket) *PipeReaderBucketWriter { + return &PipeReaderBucketWriter{ + bkt: bkt, + } +} + +func (w *PipeReaderBucketWriter) Write(ctx context.Context, r io.Reader, outPath string) error { + return w.bkt.Upload(ctx, outPath, r) +} + +type PipeReaderFileWriter struct { + outDir string +} + +func NewPipeReaderFileWriter(outDir string) *PipeReaderFileWriter { + return &PipeReaderFileWriter{ + outDir: outDir, + } +} + +func (w *PipeReaderFileWriter) Write(_ context.Context, r io.Reader, outPath string) error { + // outPath may include parent path segments in addition to the filename; + // join with w.outDir to get the full path for creating any necessary parent directories. + outPath = filepath.Join(w.outDir, outPath) + outPathDir := filepath.Dir(outPath) + err := os.MkdirAll(outPathDir, os.ModePerm) if err != nil { - return nil, errors.Wrap(err, "unable to get label projection") + return errors.Wrap(err, "error creating directory for writing") } - chksProjection, err := c.s.ChunksProjection() + fileWriterCloser, err := os.Create(outPath) if err != nil { - return nil, errors.Wrap(err, "unable to create chunk projection") + return errors.Wrap(err, "error opening file for writing") } + defer func() { _ = fileWriterCloser.Close() }() - return map[string]*schema.TSDBProjection{ - schema.LabelsPfileNameForShard(c.name, c.currentShard): lblsProjection, - schema.ChunksPfileNameForShard(c.name, c.currentShard): chksProjection, - }, nil + _, err = io.Copy(fileWriterCloser, r) + if err != nil { + return errors.Wrap(err, "error copying from reader to file writer") + } + return nil } var _ parquet.RowWriter = &splitPipeFileWriter{} @@ -162,17 +212,24 @@ type fileWriter struct { r io.ReadCloser } +// splitPipeFileWriter creates a paired io.Reader and io.Writer from an io.Pipe for each output file. +// The writer receives the serialized data from parquet.GenericWriter and forwards through the pipe +// to the reader, which can be read from to write the data to any destination. type splitPipeFileWriter struct { fileWriters map[string]*fileWriter errGroup *errgroup.Group } -func newSplitFileWriter(ctx context.Context, bkt objstore.Bucket, inSchema *parquet.Schema, - files map[string]*schema.TSDBProjection, options ...parquet.WriterOption, +func newSplitFileWriter( + ctx context.Context, + inSchema *parquet.Schema, + outSchemas map[string]*schema.TSDBProjection, + pipeReaderWriter PipeReaderWriter, + options ...parquet.WriterOption, ) (*splitPipeFileWriter, error) { fileWriters := make(map[string]*fileWriter) errGroup, ctx := errgroup.WithContext(ctx) - for file, projection := range files { + for file, projection := range outSchemas { conv, err := parquet.Convert(projection.Schema, inSchema) if err != nil { return nil, fmt.Errorf("unable to convert schemas") @@ -188,7 +245,7 @@ func newSplitFileWriter(ctx context.Context, bkt objstore.Bucket, inSchema *parq } errGroup.Go(func() error { defer func() { _ = r.Close() }() - return bkt.Upload(ctx, file, r) + return pipeReaderWriter.Write(ctx, r, file) }) } return &splitPipeFileWriter{ @@ -307,14 +364,21 @@ func (b *bufferedReader) ReadRows(rows []parquet.Row) (int, error) { } current := b.current[b.currentIndex:] - i := min(len(current), len(rows)) - copy(rows[:i], current[:i]) - b.currentIndex += i + numRows := min(len(current), len(rows)) + + for i := 0; i < numRows; i++ { + // deep copy slice contents to avoid data race; + // current may return to the pool while rows is still being read by the caller + rows[i] = current[i].Clone() + } + + b.currentIndex += numRows if b.currentIndex >= len(b.current) { + // already read all rows in current; return it to the pool b.rowPool.Put(b.current[0:cap(b.current)]) b.current = nil } - return i, nil + return numRows, nil } func (b *bufferedReader) Close() { diff --git a/vendor/github.com/prometheus-community/parquet-common/search/parquet_queryable.go b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go similarity index 84% rename from vendor/github.com/prometheus-community/parquet-common/search/parquet_queryable.go rename to vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go index 6be79b938db..3bcd8bcbd47 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/parquet_queryable.go +++ b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package search +package queryable import ( "context" @@ -26,20 +26,19 @@ import ( "github.com/prometheus-community/parquet-common/convert" "github.com/prometheus-community/parquet-common/schema" + "github.com/prometheus-community/parquet-common/search" "github.com/prometheus-community/parquet-common/storage" "github.com/prometheus-community/parquet-common/util" ) -type ShardsFinderFunction func(ctx context.Context, mint, maxt int64) ([]*storage.ParquetShard, error) +type ShardsFinderFunction func(ctx context.Context, mint, maxt int64) ([]storage.ParquetShard, error) type queryableOpts struct { - concurrency int - pagePartitioningMaxGapSize int + concurrency int } var DefaultQueryableOpts = queryableOpts{ - concurrency: runtime.GOMAXPROCS(0), - pagePartitioningMaxGapSize: 10 * 1024, + concurrency: runtime.GOMAXPROCS(0), } type QueryableOpts func(*queryableOpts) @@ -51,13 +50,6 @@ func WithConcurrency(concurrency int) QueryableOpts { } } -// WithPageMaxGapSize set the max gap size between pages that should be downloaded together in a single read call -func WithPageMaxGapSize(pagePartitioningMaxGapSize int) QueryableOpts { - return func(opts *queryableOpts) { - opts.pagePartitioningMaxGapSize = pagePartitioningMaxGapSize - } -} - type parquetQueryable struct { shardsFinder ShardsFinderFunction d *schema.PrometheusParquetChunksDecoder @@ -210,17 +202,17 @@ func (p parquetQuerier) queryableShards(ctx context.Context, mint, maxt int64) ( } type queryableShard struct { - shard *storage.ParquetShard - m *Materializer + shard storage.ParquetShard + m *search.Materializer concurrency int } -func newQueryableShard(opts *queryableOpts, block *storage.ParquetShard, d *schema.PrometheusParquetChunksDecoder) (*queryableShard, error) { +func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schema.PrometheusParquetChunksDecoder) (*queryableShard, error) { s, err := block.TSDBSchema() if err != nil { return nil, err } - m, err := NewMaterializer(s, d, block, opts.concurrency, opts.pagePartitioningMaxGapSize) + m, err := search.NewMaterializer(s, d, block, opts.concurrency) if err != nil { return nil, err } @@ -239,17 +231,17 @@ func (b queryableShard) Query(ctx context.Context, sorted bool, mint, maxt int64 results := make([]prom_storage.ChunkSeries, 0, 1024) rMtx := sync.Mutex{} - for i, group := range b.shard.LabelsFile().RowGroups() { + for rgi := range b.shard.LabelsFile().RowGroups() { errGroup.Go(func() error { - cs, err := MatchersToConstraint(matchers...) + cs, err := search.MatchersToConstraint(matchers...) if err != nil { return err } - err = Initialize(b.shard.LabelsFile(), cs...) + err = search.Initialize(b.shard.LabelsFile(), cs...) if err != nil { return err } - rr, err := Filter(ctx, group, cs...) + rr, err := search.Filter(ctx, b.shard, rgi, cs...) if err != nil { return err } @@ -258,7 +250,7 @@ func (b queryableShard) Query(ctx context.Context, sorted bool, mint, maxt int64 return nil } - series, err := b.m.Materialize(ctx, i, mint, maxt, skipChunks, rr) + series, err := b.m.Materialize(ctx, rgi, mint, maxt, skipChunks, rr) if err != nil { return err } @@ -289,25 +281,25 @@ func (b queryableShard) LabelNames(ctx context.Context, limit int64, matchers [] results := make([][]string, len(b.shard.LabelsFile().RowGroups())) - for i, group := range b.shard.LabelsFile().RowGroups() { + for rgi := range b.shard.LabelsFile().RowGroups() { errGroup.Go(func() error { - cs, err := MatchersToConstraint(matchers...) + cs, err := search.MatchersToConstraint(matchers...) if err != nil { return err } - err = Initialize(b.shard.LabelsFile(), cs...) + err = search.Initialize(b.shard.LabelsFile(), cs...) if err != nil { return err } - rr, err := Filter(ctx, group, cs...) + rr, err := search.Filter(ctx, b.shard, rgi, cs...) if err != nil { return err } - series, err := b.m.MaterializeLabelNames(ctx, i, rr) + series, err := b.m.MaterializeLabelNames(ctx, rgi, rr) if err != nil { return err } - results[i] = series + results[rgi] = series return nil }) } @@ -329,25 +321,25 @@ func (b queryableShard) LabelValues(ctx context.Context, name string, limit int6 results := make([][]string, len(b.shard.LabelsFile().RowGroups())) - for i, group := range b.shard.LabelsFile().RowGroups() { + for rgi := range b.shard.LabelsFile().RowGroups() { errGroup.Go(func() error { - cs, err := MatchersToConstraint(matchers...) + cs, err := search.MatchersToConstraint(matchers...) if err != nil { return err } - err = Initialize(b.shard.LabelsFile(), cs...) + err = search.Initialize(b.shard.LabelsFile(), cs...) if err != nil { return err } - rr, err := Filter(ctx, group, cs...) + rr, err := search.Filter(ctx, b.shard, rgi, cs...) if err != nil { return err } - series, err := b.m.MaterializeLabelValues(ctx, name, i, rr) + series, err := b.m.MaterializeLabelValues(ctx, name, rgi, rr) if err != nil { return err } - results[i] = series + results[rgi] = series return nil }) } diff --git a/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go b/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go index b06db0d46ef..4b6b86fe77f 100644 --- a/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go +++ b/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go @@ -129,18 +129,17 @@ type TSDBSchema struct { } type TSDBProjection struct { + FilenameFunc func(name string, shard int) string Schema *parquet.Schema ExtraOptions []parquet.WriterOption } func (s *TSDBSchema) DataColumIdx(t int64) int { - colIdx := 0 - - for i := s.MinTs + s.DataColDurationMs; i <= t; i += s.DataColDurationMs { - colIdx++ + if t < s.MinTs { + return 0 } - return colIdx + return int((t - s.MinTs) / s.DataColDurationMs) } func (s *TSDBSchema) LabelsProjection() (*TSDBProjection, error) { @@ -163,6 +162,9 @@ func (s *TSDBSchema) LabelsProjection() (*TSDBProjection, error) { g[c[0]] = lc.Node } return &TSDBProjection{ + FilenameFunc: func(name string, shard int) string { + return LabelsPfileNameForShard(name, shard) + }, Schema: WithCompression(parquet.NewSchema("labels-projection", g)), ExtraOptions: []parquet.WriterOption{parquet.SkipPageBounds(ColIndexes)}, }, nil @@ -185,6 +187,9 @@ func (s *TSDBSchema) ChunksProjection() (*TSDBProjection, error) { } return &TSDBProjection{ + FilenameFunc: func(name string, shard int) string { + return ChunksPfileNameForShard(name, shard) + }, Schema: WithCompression(parquet.NewSchema("chunk-projection", g)), ExtraOptions: writeOptions, }, nil diff --git a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go index f5d8398b15d..03598bd28b1 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go @@ -33,7 +33,7 @@ type Constraint interface { fmt.Stringer // filter returns a set of non-overlapping increasing row indexes that may satisfy the constraint. - filter(ctx context.Context, rg parquet.RowGroup, primary bool, rr []RowRange) ([]RowRange, error) + filter(ctx context.Context, rgIdx int, primary bool, rr []RowRange) ([]RowRange, error) // init initializes the constraint with respect to the file schema and projections. init(f *storage.ParquetFile) error // path is the path for the column that is constrained @@ -76,7 +76,8 @@ func Initialize(f *storage.ParquetFile, cs ...Constraint) error { return nil } -func Filter(ctx context.Context, rg parquet.RowGroup, cs ...Constraint) ([]RowRange, error) { +func Filter(ctx context.Context, s storage.ParquetShard, rgIdx int, cs ...Constraint) ([]RowRange, error) { + rg := s.LabelsFile().RowGroups()[rgIdx] // Constraints for sorting columns are cheaper to evaluate, so we sort them first. sc := rg.SortingColumns() @@ -96,7 +97,7 @@ func Filter(ctx context.Context, rg parquet.RowGroup, cs ...Constraint) ([]RowRa rr := []RowRange{{from: int64(0), count: rg.NumRows()}} for i := range cs { isPrimary := len(sc) > 0 && cs[i].path() == sc[0].Path()[0] - rr, err = cs[i].filter(ctx, rg, isPrimary, rr) + rr, err = cs[i].filter(ctx, rgIdx, isPrimary, rr) if err != nil { return nil, fmt.Errorf("unable to filter with constraint %d: %w", i, err) } @@ -104,6 +105,18 @@ func Filter(ctx context.Context, rg parquet.RowGroup, cs ...Constraint) ([]RowRa return rr, nil } +type pageToRead struct { + // for data pages + pfrom int64 + pto int64 + + idx int + + // for data and dictionary pages + off int + csz int +} + // symbolTable is a helper that can decode the i-th value of a page. // Using it we only need to allocate an int32 slice and not a slice of // string values. @@ -168,12 +181,14 @@ func Equal(path string, value parquet.Value) Constraint { return &equalConstraint{pth: path, val: value} } -func (ec *equalConstraint) filter(ctx context.Context, rg parquet.RowGroup, primary bool, rr []RowRange) ([]RowRange, error) { +func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, rr []RowRange) ([]RowRange, error) { if len(rr) == 0 { return nil, nil } from, to := rr[0].from, rr[len(rr)-1].from+rr[len(rr)-1].count + rg := ec.f.RowGroups()[rgIdx] + col, ok := rg.Schema().Lookup(ec.path()) if !ok { // If match empty, return rr (filter nothing) @@ -191,12 +206,6 @@ func (ec *equalConstraint) filter(ctx context.Context, rg parquet.RowGroup, prim return nil, nil } - pgs, err := ec.f.GetPages(ctx, cc) - if err != nil { - return nil, errors.Wrap(err, "failed to get pages") - } - defer func() { _ = pgs.Close() }() - oidx, err := cc.OffsetIndex() if err != nil { return nil, fmt.Errorf("unable to read offset index: %w", err) @@ -205,11 +214,13 @@ func (ec *equalConstraint) filter(ctx context.Context, rg parquet.RowGroup, prim if err != nil { return nil, fmt.Errorf("unable to read column index: %w", err) } - var ( - symbols = new(symbolTable) - res = make([]RowRange, 0) - ) + res := make([]RowRange, 0) + + readPgs := make([]pageToRead, 0, 10) + for i := 0; i < cidx.NumPages(); i++ { + poff, pcsz := uint64(oidx.Offset(i)), oidx.CompressedPageSize(i) + // If page does not intersect from, to; we can immediately discard it pfrom := oidx.FirstRowIndex(i) pcount := rg.NumRows() - pfrom @@ -246,6 +257,37 @@ func (ec *equalConstraint) filter(ctx context.Context, rg parquet.RowGroup, prim continue } // We cannot discard the page through statistics but we might need to read it to see if it has the value + readPgs = append(readPgs, pageToRead{pfrom: pfrom, pto: pto, idx: i, off: int(poff), csz: int(pcsz)}) + } + + // Did not find any pages + if len(readPgs) == 0 { + return nil, nil + } + + dictOff, dictSz := ec.f.DictionaryPageBounds(rgIdx, col.ColumnIndex) + + minOffset := uint64(readPgs[0].off) + maxOffset := readPgs[len(readPgs)-1].off + readPgs[len(readPgs)-1].csz + + // If the gap between the first page and the dic page is less than PagePartitioningMaxGapSize, + // we include the dic to be read in the single read + if int(minOffset-(dictOff+dictSz)) < ec.f.Cfg.PagePartitioningMaxGapSize { + minOffset = dictOff + } + + pgs, err := ec.f.GetPages(ctx, cc, int64(minOffset), int64(maxOffset)) + if err != nil { + return nil, err + } + + defer func() { _ = pgs.Close() }() + + symbols := new(symbolTable) + for _, p := range readPgs { + pfrom := p.pfrom + pto := p.pto + if err := pgs.SeekToRow(pfrom); err != nil { return nil, fmt.Errorf("unable to seek to row: %w", err) } @@ -289,6 +331,7 @@ func (ec *equalConstraint) filter(ctx context.Context, rg parquet.RowGroup, prim } } } + if len(res) == 0 { return nil, nil } @@ -321,7 +364,7 @@ func (ec *equalConstraint) matches(v parquet.Value) bool { } func (ec *equalConstraint) skipByBloomfilter(cc parquet.ColumnChunk) (bool, error) { - if !ec.f.BloomFiltersLoaded { + if ec.f.Cfg.SkipBloomFilters { return false, nil } @@ -351,12 +394,14 @@ func (rc *regexConstraint) String() string { return fmt.Sprintf("regex(%v,%v)", rc.pth, rc.r.GetRegexString()) } -func (rc *regexConstraint) filter(ctx context.Context, rg parquet.RowGroup, primary bool, rr []RowRange) ([]RowRange, error) { +func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, rr []RowRange) ([]RowRange, error) { if len(rr) == 0 { return nil, nil } from, to := rr[0].from, rr[len(rr)-1].from+rr[len(rr)-1].count + rg := rc.f.RowGroups()[rgIdx] + col, ok := rg.Schema().Lookup(rc.path()) if !ok { // If match empty, return rr (filter nothing) @@ -368,7 +413,7 @@ func (rc *regexConstraint) filter(ctx context.Context, rg parquet.RowGroup, prim } cc := rg.ColumnChunks()[col.ColumnIndex] - pgs, err := rc.f.GetPages(ctx, cc) + pgs, err := rc.f.GetPages(ctx, cc, 0, 0) if err != nil { return nil, errors.Wrap(err, "failed to get pages") } @@ -487,8 +532,8 @@ func (nc *notConstraint) String() string { return fmt.Sprintf("not(%v)", nc.c.String()) } -func (nc *notConstraint) filter(ctx context.Context, rg parquet.RowGroup, primary bool, rr []RowRange) ([]RowRange, error) { - base, err := nc.c.filter(ctx, rg, primary, rr) +func (nc *notConstraint) filter(ctx context.Context, rgIdx int, primary bool, rr []RowRange) ([]RowRange, error) { + base, err := nc.c.filter(ctx, rgIdx, primary, rr) if err != nil { return nil, fmt.Errorf("unable to compute child constraint: %w", err) } diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index f12b5f615d7..51538182b0a 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -19,7 +19,6 @@ import ( "io" "maps" "slices" - "sort" "sync" "github.com/parquet-go/parquet-go" @@ -35,7 +34,7 @@ import ( ) type Materializer struct { - b *storage.ParquetShard + b storage.ParquetShard s *schema.TSDBSchema d *schema.PrometheusParquetChunksDecoder partitioner util.Partitioner @@ -48,9 +47,8 @@ type Materializer struct { func NewMaterializer(s *schema.TSDBSchema, d *schema.PrometheusParquetChunksDecoder, - block *storage.ParquetShard, + block storage.ParquetShard, concurrency int, - maxGapPartitioning int, ) (*Materializer, error) { colIdx, ok := block.LabelsFile().Schema().Lookup(schema.ColIndexes) if !ok { @@ -73,7 +71,7 @@ func NewMaterializer(s *schema.TSDBSchema, b: block, colIdx: colIdx.ColumnIndex, concurrency: concurrency, - partitioner: util.NewGapBasedPartitioner(maxGapPartitioning), + partitioner: util.NewGapBasedPartitioner(block.ChunksFile().Cfg.PagePartitioningMaxGapSize), dataColToIndex: dataColToIndex, }, nil } @@ -88,9 +86,8 @@ func (m *Materializer) Materialize(ctx context.Context, rgi int, mint, maxt int6 results := make([]prom_storage.ChunkSeries, len(sLbls)) for i, s := range sLbls { - sort.Sort(s) results[i] = &concreteChunksSeries{ - lbls: s, + lbls: labels.New(s...), } } @@ -128,7 +125,7 @@ func (m *Materializer) MaterializeAllLabelNames() []string { func (m *Materializer) MaterializeLabelNames(ctx context.Context, rgi int, rr []RowRange) ([]string, error) { labelsRg := m.b.LabelsFile().RowGroups()[rgi] cc := labelsRg.ColumnChunks()[m.colIdx] - colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), labelsRg, cc, rr) + colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr) if err != nil { return nil, errors.Wrap(err, "materializer failed to materialize columns") } @@ -167,7 +164,7 @@ func (m *Materializer) MaterializeLabelValues(ctx context.Context, name string, return []string{}, nil } cc := labelsRg.ColumnChunks()[cIdx.ColumnIndex] - values, err := m.materializeColumn(ctx, m.b.LabelsFile(), labelsRg, cc, rr) + values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr) if err != nil { return nil, errors.Wrap(err, "materializer failed to materialize columns") } @@ -191,7 +188,7 @@ func (m *Materializer) MaterializeAllLabelValues(ctx context.Context, name strin return []string{}, nil } cc := labelsRg.ColumnChunks()[cIdx.ColumnIndex] - pages, err := m.b.LabelsFile().GetPages(ctx, cc) + pages, err := m.b.LabelsFile().GetPages(ctx, cc, 0, 0) if err != nil { return nil, errors.Wrap(err, "failed to get pages") } @@ -208,16 +205,16 @@ func (m *Materializer) MaterializeAllLabelValues(ctx context.Context, name strin return r, nil } -func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []RowRange) ([]labels.Labels, error) { +func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []RowRange) ([][]labels.Label, error) { labelsRg := m.b.LabelsFile().RowGroups()[rgi] cc := labelsRg.ColumnChunks()[m.colIdx] - colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), labelsRg, cc, rr) + colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr) if err != nil { return nil, errors.Wrap(err, "materializer failed to materialize columns") } colsMap := make(map[int]*[]parquet.Value, 10) - results := make([]labels.Labels, len(colsIdxs)) + results := make([][]labels.Label, len(colsIdxs)) for _, colsIdx := range colsIdxs { idxs, err := schema.DecodeUintSlice(colsIdx.ByteArray()) @@ -235,7 +232,7 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R for cIdx, v := range colsMap { errGroup.Go(func() error { cc := labelsRg.ColumnChunks()[cIdx] - values, err := m.materializeColumn(ctx, m.b.LabelsFile(), labelsRg, cc, rr) + values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr) if err != nil { return errors.Wrap(err, "failed to materialize labels values") } @@ -282,7 +279,7 @@ func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, max r := make([][]chunks.Meta, totalRows(rr)) for i := minDataCol; i <= min(maxDataCol, len(m.dataColToIndex)-1); i++ { - values, err := m.materializeColumn(ctx, m.b.ChunksFile(), rg, rg.ColumnChunks()[m.dataColToIndex[i]], rr) + values, err := m.materializeColumn(ctx, m.b.ChunksFile(), rgi, rg.ColumnChunks()[m.dataColToIndex[i]], rr) if err != nil { return r, err } @@ -299,7 +296,7 @@ func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, max return r, nil } -func (m *Materializer) materializeColumn(ctx context.Context, file *storage.ParquetFile, group parquet.RowGroup, cc parquet.ColumnChunk, rr []RowRange) ([]parquet.Value, error) { +func (m *Materializer) materializeColumn(ctx context.Context, file *storage.ParquetFile, rgi int, cc parquet.ColumnChunk, rr []RowRange) ([]parquet.Value, error) { if len(rr) == 0 { return nil, nil } @@ -314,6 +311,8 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq return nil, errors.Wrap(err, "could not get column index") } + group := file.RowGroups()[rgi] + pagesToRowsMap := make(map[int][]RowRange, len(rr)) for i := 0; i < cidx.NumPages(); i++ { @@ -346,9 +345,20 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq errGroup := &errgroup.Group{} errGroup.SetLimit(m.concurrency) + dictOff, dictSz := file.DictionaryPageBounds(rgi, cc.Column()) + cc.Type() + for _, p := range pageRanges { errGroup.Go(func() error { - pgs, err := file.GetPages(ctx, cc, p.pages...) + minOffset := uint64(p.off) + maxOffset := uint64(p.off + p.csz) + + // if dictOff == 0, it means that the collum is not dictionary encoded + if dictOff > 0 && int(minOffset-(dictOff+dictSz)) < file.Cfg.PagePartitioningMaxGapSize { + minOffset = dictOff + } + + pgs, err := file.GetPages(ctx, cc, int64(minOffset), int64(maxOffset)) if err != nil { return errors.Wrap(err, "failed to get pages") } @@ -415,16 +425,16 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq return res, nil } -type pageEntryRead struct { - pages []int - rows []RowRange +type pageToReadWithRow struct { + pageToRead + rows []RowRange } // Merge nearby pages to enable efficient sequential reads. // Pages that are not close to each other will be scheduled for concurrent reads. -func (m *Materializer) coalescePageRanges(pagedIdx map[int][]RowRange, offset parquet.OffsetIndex) []pageEntryRead { +func (m *Materializer) coalescePageRanges(pagedIdx map[int][]RowRange, offset parquet.OffsetIndex) []pageToReadWithRow { if len(pagedIdx) == 0 { - return []pageEntryRead{} + return []pageToReadWithRow{} } idxs := make([]int, 0, len(pagedIdx)) for idx := range pagedIdx { @@ -437,13 +447,16 @@ func (m *Materializer) coalescePageRanges(pagedIdx map[int][]RowRange, offset pa return int(offset.Offset(idxs[i])), int(offset.Offset(idxs[i]) + offset.CompressedPageSize(idxs[i])) }) - r := make([]pageEntryRead, 0, len(parts)) + r := make([]pageToReadWithRow, 0, len(parts)) for _, part := range parts { - pagesToRead := pageEntryRead{} + pagesToRead := pageToReadWithRow{} for i := part.ElemRng[0]; i < part.ElemRng[1]; i++ { - pagesToRead.pages = append(pagesToRead.pages, idxs[i]) pagesToRead.rows = append(pagesToRead.rows, pagedIdx[idxs[i]]...) } + pagesToRead.pfrom = int64(part.ElemRng[0]) + pagesToRead.pto = int64(part.ElemRng[1]) + pagesToRead.off = part.Start + pagesToRead.csz = part.End - part.Start pagesToRead.rows = simplify(pagesToRead.rows) r = append(r, pagesToRead) } @@ -536,3 +549,7 @@ func (c concreteChunksSeries) Labels() labels.Labels { func (c concreteChunksSeries) Iterator(_ chunks.Iterator) chunks.Iterator { return prom_storage.NewListChunkSeriesIterator(c.chks...) } + +func (c concreteChunksSeries) ChunkCount() (int, error) { + return len(c.chks), nil +} diff --git a/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go b/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go index 3fd0cee19ba..4eba9de8c2a 100644 --- a/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go +++ b/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go @@ -15,8 +15,10 @@ package storage import ( "context" + "os" "sync" + "github.com/hashicorp/go-multierror" "github.com/parquet-go/parquet-go" "github.com/thanos-io/objstore" "golang.org/x/sync/errgroup" @@ -24,89 +26,167 @@ import ( "github.com/prometheus-community/parquet-common/schema" ) -var DefaultShardOptions = shardOptions{ - optimisticReader: true, +var DefaultFileOptions = ExtendedFileConfig{ + FileConfig: &parquet.FileConfig{ + SkipPageIndex: parquet.DefaultSkipPageIndex, + ReadMode: parquet.DefaultReadMode, + SkipMagicBytes: true, + SkipBloomFilters: true, + ReadBufferSize: 4096, + OptimisticRead: true, + }, + PagePartitioningMaxGapSize: 10 * 1024, } -type shardOptions struct { - fileOptions []parquet.FileOption - optimisticReader bool +type ExtendedFileConfig struct { + *parquet.FileConfig + PagePartitioningMaxGapSize int } type ParquetFile struct { *parquet.File - ReadAtWithContext - BloomFiltersLoaded bool - - optimisticReader bool + ReadAtWithContextCloser + Cfg ExtendedFileConfig } -type ShardOption func(*shardOptions) +type FileOption func(*ExtendedFileConfig) -func WithFileOptions(fileOptions ...parquet.FileOption) ShardOption { - return func(opts *shardOptions) { - opts.fileOptions = append(opts.fileOptions, fileOptions...) +func WithFileOptions(options ...parquet.FileOption) FileOption { + config := parquet.DefaultFileConfig() + config.Apply(options...) + return func(opts *ExtendedFileConfig) { + opts.FileConfig = config } } -func WithOptimisticReader(optimisticReader bool) ShardOption { - return func(opts *shardOptions) { - opts.optimisticReader = optimisticReader +// WithPageMaxGapSize set the max gap size between pages that should be downloaded together in a single read call +func WithPageMaxGapSize(pagePartitioningMaxGapSize int) FileOption { + return func(opts *ExtendedFileConfig) { + opts.PagePartitioningMaxGapSize = pagePartitioningMaxGapSize } } -func (f *ParquetFile) GetPages(ctx context.Context, cc parquet.ColumnChunk, pagesToRead ...int) (*parquet.FilePages, error) { +func (f *ParquetFile) GetPages(ctx context.Context, cc parquet.ColumnChunk, minOffset, maxOffset int64) (*parquet.FilePages, error) { colChunk := cc.(*parquet.FileColumnChunk) reader := f.WithContext(ctx) - if len(pagesToRead) > 0 && f.optimisticReader { - offset, err := cc.OffsetIndex() - if err != nil { - return nil, err - } - minOffset := offset.Offset(pagesToRead[0]) - maxOffset := offset.Offset(pagesToRead[len(pagesToRead)-1]) + offset.CompressedPageSize(pagesToRead[len(pagesToRead)-1]) - reader = newOptimisticReaderAt(reader, minOffset, maxOffset) + if f.Cfg.OptimisticRead { + reader = NewOptimisticReaderAt(reader, minOffset, maxOffset) } pages := colChunk.PagesFrom(reader) return pages, nil } -func OpenFile(r ReadAtWithContext, size int64, opts ...ShardOption) (*ParquetFile, error) { - cfg := DefaultShardOptions +func (f *ParquetFile) DictionaryPageBounds(rgIdx, colIdx int) (uint64, uint64) { + colMeta := f.Metadata().RowGroups[rgIdx].Columns[colIdx].MetaData + + return uint64(colMeta.DictionaryPageOffset), uint64(colMeta.DataPageOffset - colMeta.DictionaryPageOffset) +} + +func Open(ctx context.Context, r ReadAtWithContextCloser, size int64, opts ...FileOption) (*ParquetFile, error) { + cfg := DefaultFileOptions for _, opt := range opts { opt(&cfg) } - c, err := parquet.NewFileConfig(cfg.fileOptions...) + file, err := parquet.OpenFile(r.WithContext(ctx), size, cfg.FileConfig) if err != nil { return nil, err } - file, err := parquet.OpenFile(r, size, cfg.fileOptions...) + return &ParquetFile{ + File: file, + ReadAtWithContextCloser: r, + Cfg: cfg, + }, nil +} + +func OpenFromBucket(ctx context.Context, bkt objstore.BucketReader, name string, opts ...FileOption) (*ParquetFile, error) { + attr, err := bkt.Attributes(ctx, name) if err != nil { return nil, err } - return &ParquetFile{ - File: file, - ReadAtWithContext: r, - BloomFiltersLoaded: !c.SkipBloomFilters, - optimisticReader: cfg.optimisticReader, - }, nil + r := NewBucketReadAt(name, bkt) + return Open(ctx, r, attr.Size, opts...) +} + +func OpenFromFile(ctx context.Context, path string, opts ...FileOption) (*ParquetFile, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + stat, err := f.Stat() + if err != nil { + _ = f.Close() + return nil, err + } + r := NewFileReadAt(f) + pf, err := Open(ctx, r, stat.Size(), opts...) + if err != nil { + _ = r.Close() + return nil, err + } + // At this point, the file's lifecycle is managed by the ParquetFile + return pf, nil +} + +type ParquetShard interface { + LabelsFile() *ParquetFile + ChunksFile() *ParquetFile + TSDBSchema() (*schema.TSDBSchema, error) +} + +type ParquetOpener interface { + Open(ctx context.Context, path string, opts ...FileOption) (*ParquetFile, error) +} + +type ParquetBucketOpener struct { + bkt objstore.BucketReader +} + +func NewParquetBucketOpener(bkt objstore.BucketReader) *ParquetBucketOpener { + return &ParquetBucketOpener{ + bkt: bkt, + } +} + +func (o *ParquetBucketOpener) Open(ctx context.Context, name string, opts ...FileOption) (*ParquetFile, error) { + return OpenFromBucket(ctx, o.bkt, name, opts...) +} + +type ParquetLocalFileOpener struct{} + +func NewParquetLocalFileOpener() *ParquetLocalFileOpener { + return &ParquetLocalFileOpener{} +} + +func (o *ParquetLocalFileOpener) Open(ctx context.Context, name string, opts ...FileOption) (*ParquetFile, error) { + return OpenFromFile(ctx, name, opts...) } -type ParquetShard struct { +type ParquetShardOpener struct { labelsFile, chunksFile *ParquetFile schema *schema.TSDBSchema o sync.Once } -// OpenParquetShard opens the sharded parquet block, -// using the options param. -func OpenParquetShard(ctx context.Context, bkt objstore.Bucket, name string, shard int, opts ...ShardOption) (*ParquetShard, error) { +func NewParquetShardOpener( + ctx context.Context, + name string, + labelsFileOpener ParquetOpener, + chunksFileOpener ParquetOpener, + shard int, + opts ...FileOption, +) (*ParquetShardOpener, error) { + cfg := DefaultFileOptions + + for _, opt := range opts { + opt(&cfg) + } + labelsFileName := schema.LabelsPfileNameForShard(name, shard) chunksFileName := schema.ChunksPfileNameForShard(name, shard) @@ -114,21 +194,13 @@ func OpenParquetShard(ctx context.Context, bkt objstore.Bucket, name string, sha var labelsFile, chunksFile *ParquetFile - errGroup.Go(func() error { - labelsAttr, err := bkt.Attributes(ctx, labelsFileName) - if err != nil { - return err - } - labelsFile, err = OpenFile(NewBucketReadAt(ctx, labelsFileName, bkt), labelsAttr.Size, opts...) + errGroup.Go(func() (err error) { + labelsFile, err = labelsFileOpener.Open(ctx, labelsFileName, opts...) return err }) - errGroup.Go(func() error { - chunksFileAttr, err := bkt.Attributes(ctx, chunksFileName) - if err != nil { - return err - } - chunksFile, err = OpenFile(NewBucketReadAt(ctx, chunksFileName, bkt), chunksFileAttr.Size, opts...) + errGroup.Go(func() (err error) { + chunksFile, err = chunksFileOpener.Open(ctx, chunksFileName, opts...) return err }) @@ -136,24 +208,31 @@ func OpenParquetShard(ctx context.Context, bkt objstore.Bucket, name string, sha return nil, err } - return &ParquetShard{ + return &ParquetShardOpener{ labelsFile: labelsFile, chunksFile: chunksFile, }, nil } -func (b *ParquetShard) LabelsFile() *ParquetFile { - return b.labelsFile +func (s *ParquetShardOpener) LabelsFile() *ParquetFile { + return s.labelsFile } -func (b *ParquetShard) ChunksFile() *ParquetFile { - return b.chunksFile +func (s *ParquetShardOpener) ChunksFile() *ParquetFile { + return s.chunksFile } -func (b *ParquetShard) TSDBSchema() (*schema.TSDBSchema, error) { +func (s *ParquetShardOpener) TSDBSchema() (*schema.TSDBSchema, error) { var err error - b.o.Do(func() { - b.schema, err = schema.FromLabelsFile(b.labelsFile.File) + s.o.Do(func() { + s.schema, err = schema.FromLabelsFile(s.labelsFile.File) }) - return b.schema, err + return s.schema, err +} + +func (s *ParquetShardOpener) Close() error { + err := &multierror.Error{} + err = multierror.Append(err, s.labelsFile.Close()) + err = multierror.Append(err, s.chunksFile.Close()) + return err.ErrorOrNil() } diff --git a/vendor/github.com/prometheus-community/parquet-common/storage/bucket_read_at.go b/vendor/github.com/prometheus-community/parquet-common/storage/read_at.go similarity index 56% rename from vendor/github.com/prometheus-community/parquet-common/storage/bucket_read_at.go rename to vendor/github.com/prometheus-community/parquet-common/storage/read_at.go index 53125fbba25..9f7e7fadf2a 100644 --- a/vendor/github.com/prometheus-community/parquet-common/storage/bucket_read_at.go +++ b/vendor/github.com/prometheus-community/parquet-common/storage/read_at.go @@ -16,48 +16,71 @@ package storage import ( "context" "io" + "os" "github.com/thanos-io/objstore" ) -type ReadAtWithContext interface { - io.ReaderAt +type ReadAtWithContextCloser interface { + io.Closer WithContext(ctx context.Context) io.ReaderAt } +type fileReadAt struct { + *os.File +} + +// NewFileReadAt returns a ReadAtCloserWithContext for reading from a local file. +func NewFileReadAt(f *os.File) ReadAtWithContextCloser { + return &fileReadAt{ + File: f, + } +} + +func (f *fileReadAt) WithContext(_ context.Context) io.ReaderAt { + return f.File +} + type bReadAt struct { path string - obj objstore.Bucket - ctx context.Context + obj objstore.BucketReader } -func NewBucketReadAt(ctx context.Context, path string, obj objstore.Bucket) ReadAtWithContext { +// NewBucketReadAt returns a ReadAtWithContextCloser for reading from a bucket. +func NewBucketReadAt(path string, obj objstore.BucketReader) ReadAtWithContextCloser { return &bReadAt{ path: path, obj: obj, - ctx: ctx, } } func (b *bReadAt) WithContext(ctx context.Context) io.ReaderAt { - return &bReadAt{ - path: b.path, - obj: b.obj, - ctx: ctx, + return readAtFunc{ + f: func(p []byte, off int64) (n int, err error) { + rc, err := b.obj.GetRange(ctx, b.path, off, int64(len(p))) + if err != nil { + return 0, err + } + defer func() { _ = rc.Close() }() + n, err = io.ReadFull(rc, p) + if err == io.EOF { + err = nil + } + return + }, } } -func (b *bReadAt) ReadAt(p []byte, off int64) (n int, err error) { - rc, err := b.obj.GetRange(b.ctx, b.path, off, int64(len(p))) - if err != nil { - return 0, err - } - defer func() { _ = rc.Close() }() - n, err = io.ReadFull(rc, p) - if err == io.EOF { - err = nil - } - return +func (b *bReadAt) Close() error { + return nil +} + +type readAtFunc struct { + f func([]byte, int64) (n int, err error) +} + +func (r readAtFunc) ReadAt(p []byte, off int64) (n int, err error) { + return r.f(p, off) } type optimisticReaderAt struct { @@ -76,7 +99,7 @@ func (b optimisticReaderAt) ReadAt(p []byte, off int64) (n int, err error) { return b.r.ReadAt(p, off) } -func newOptimisticReaderAt(r io.ReaderAt, minOffset, maxOffset int64) io.ReaderAt { +func NewOptimisticReaderAt(r io.ReaderAt, minOffset, maxOffset int64) io.ReaderAt { if minOffset < maxOffset { b := make([]byte, maxOffset-minOffset) n, err := r.ReadAt(b, minOffset) diff --git a/vendor/github.com/prometheus-community/parquet-common/util/fixtures.go b/vendor/github.com/prometheus-community/parquet-common/util/fixtures.go new file mode 100644 index 00000000000..b708565d42a --- /dev/null +++ b/vendor/github.com/prometheus-community/parquet-common/util/fixtures.go @@ -0,0 +1,77 @@ +package util + +import ( + "context" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/util/teststorage" + "github.com/stretchr/testify/require" +) + +type TestConfig struct { + TotalMetricNames int + MetricsPerMetricName int + NumberOfLabels int + RandomLabels int + NumberOfSamples int +} + +func DefaultTestConfig() TestConfig { + return TestConfig{ + TotalMetricNames: 1_000, + MetricsPerMetricName: 20, + NumberOfLabels: 5, + RandomLabels: 3, + NumberOfSamples: 250, + } +} + +type TestData struct { + SeriesHash map[uint64]*struct{} + MinTime int64 + MaxTime int64 +} + +func GenerateTestData(t *testing.T, st *teststorage.TestStorage, ctx context.Context, cfg TestConfig) TestData { + app := st.Appender(ctx) + seriesHash := make(map[uint64]*struct{}) + builder := labels.NewScratchBuilder(cfg.NumberOfLabels) + + for i := 0; i < cfg.TotalMetricNames; i++ { + for n := 0; n < cfg.MetricsPerMetricName; n++ { + builder.Reset() + builder.Add(labels.MetricName, fmt.Sprintf("metric_%d", i)) + builder.Add("unique", fmt.Sprintf("unique_%d", n)) + + for j := 0; j < cfg.NumberOfLabels; j++ { + builder.Add(fmt.Sprintf("label_name_%v", j), fmt.Sprintf("label_value_%v", j)) + } + + firstRandom := rand.Int() % 10 + for k := firstRandom; k < firstRandom+cfg.RandomLabels; k++ { + builder.Add(fmt.Sprintf("random_name_%v", k), fmt.Sprintf("random_value_%v", k)) + } + + builder.Sort() + lbls := builder.Labels() + seriesHash[lbls.Hash()] = &struct{}{} + for s := 0; s < cfg.NumberOfSamples; s++ { + _, err := app.Append(0, lbls, (1 * time.Minute * time.Duration(s)).Milliseconds(), float64(i)) + require.NoError(t, err) + } + } + } + + require.NoError(t, app.Commit()) + h := st.Head() + + return TestData{ + SeriesHash: seriesHash, + MinTime: h.MinTime(), + MaxTime: h.MaxTime(), + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index e61fa11ff04..47c1dd06fd0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -947,9 +947,10 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250610002942-dfd72bae1309 +# github.com/prometheus-community/parquet-common v0.0.0-20250708150752-0811a700a852 ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert +github.com/prometheus-community/parquet-common/queryable github.com/prometheus-community/parquet-common/schema github.com/prometheus-community/parquet-common/search github.com/prometheus-community/parquet-common/storage From 7193d3f910dfdc569b1eeffa8c5b72fedf4533be Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 8 Jul 2025 12:55:47 -0700 Subject: [PATCH 2/2] fix shard creation Signed-off-by: alanprot --- pkg/querier/parquet_queryable.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index f302a69512b..481034ef4d3 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -143,7 +143,7 @@ func NewParquetQueryable( return nil, errors.Errorf("failed to extract blocks from context") } userBkt := bucket.NewUserBucketClient(userID, bucketClient, limits) - + bucketOpener := parquet_storage.NewParquetBucketOpener(userBkt) shards := make([]parquet_storage.ParquetShard, len(blocks)) errGroup := &errgroup.Group{} @@ -155,10 +155,9 @@ func NewParquetQueryable( cacheKey := fmt.Sprintf("%v-%v", userID, block.ID) shard := cache.Get(cacheKey) if shard == nil { - bucketOpener := parquet_storage.NewParquetBucketOpener(userBkt) // we always only have 1 shard - shard 0 // Use context.Background() here as the file can be cached and live after the request ends. - shard, err := parquet_storage.NewParquetShardOpener( + shard, err = parquet_storage.NewParquetShardOpener( context.WithoutCancel(ctx), block.ID.String(), bucketOpener,