diff --git a/go.mod b/go.mod index fc488024b01..da3d57eaa70 100644 --- a/go.mod +++ b/go.mod @@ -84,7 +84,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 - github.com/prometheus-community/parquet-common v0.0.0-20250807102632-2aeeceacebf0 + github.com/prometheus-community/parquet-common v0.0.0-20250827225610-65f0b68d35e6 github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 github.com/tjhop/slog-gokit v0.1.4 diff --git a/go.sum b/go.sum index 8e2c87cd829..79538449cf9 100644 --- a/go.sum +++ b/go.sum @@ -1611,8 +1611,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20250807102632-2aeeceacebf0 h1:5mm5mMmEhUdvqf4NsVvEGWry0IeXkeZEfODbZ70c1Ok= -github.com/prometheus-community/parquet-common v0.0.0-20250807102632-2aeeceacebf0/go.mod h1:MbAv/yCv9GORLj0XvXgRF913R9Jc04+BvVq4VJpPCi0= +github.com/prometheus-community/parquet-common v0.0.0-20250827225610-65f0b68d35e6 h1:jWcDrCpAU047f2NTGtm3vRPqJ8skDOkdKCC5sSfSN4Q= +github.com/prometheus-community/parquet-common v0.0.0-20250827225610-65f0b68d35e6/go.mod h1:MbAv/yCv9GORLj0XvXgRF913R9Jc04+BvVq4VJpPCi0= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA= diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index e5bab841604..608a7cdc801 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -159,38 +159,34 @@ func NewParquetQueryable( return int64(limits.ParquetMaxFetchedDataBytes(userID)) }), queryable.WithMaterializedLabelsFilterCallback(materializedLabelsFilterCallback), - queryable.WithMaterializedSeriesCallback(func(ctx context.Context, cs []storage.ChunkSeries) error { + queryable.WithMaterializedSeriesCallback(func(ctx context.Context, series storage.ChunkSeries) error { queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx) - lbls := make([][]cortexpb.LabelAdapter, 0, len(cs)) - for _, series := range cs { - chkCount := 0 - chunkSize := 0 - lblSize := 0 - lblAdapter := cortexpb.FromLabelsToLabelAdapters(series.Labels()) - lbls = append(lbls, lblAdapter) - for _, lbl := range lblAdapter { - lblSize += lbl.Size() - } - iter := series.Iterator(nil) - for iter.Next() { - chk := iter.At() - chunkSize += len(chk.Chunk.Bytes()) - chkCount++ - } - if chkCount > 0 { - if err := queryLimiter.AddChunks(chkCount); err != nil { - return validation.LimitError(err.Error()) - } - if err := queryLimiter.AddChunkBytes(chunkSize); err != nil { - return validation.LimitError(err.Error()) - } + chkCount := 0 + chunkSize := 0 + lblSize := 0 + lblAdapter := cortexpb.FromLabelsToLabelAdapters(series.Labels()) + for _, lbl := range lblAdapter { + lblSize += lbl.Size() + } + iter := series.Iterator(nil) + for iter.Next() { + chk := iter.At() + chunkSize += len(chk.Chunk.Bytes()) + chkCount++ + } + if chkCount > 0 { + if err := queryLimiter.AddChunks(chkCount); err != nil { + return validation.LimitError(err.Error()) } - - if err := queryLimiter.AddDataBytes(chunkSize + lblSize); err != nil { + if err := queryLimiter.AddChunkBytes(chunkSize); err != nil { return validation.LimitError(err.Error()) } } - if err := queryLimiter.AddSeries(lbls...); err != nil { + + if err := queryLimiter.AddDataBytes(chunkSize + lblSize); err != nil { + return validation.LimitError(err.Error()) + } + if err := queryLimiter.AddSeries(lblAdapter); err != nil { return validation.LimitError(err.Error()) } return nil diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 70baa010b66..4ec9b87333b 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -478,7 +478,7 @@ func TestParquetQueryable_Limits(t *testing.T) { return validation.NewOverrides(limits, nil) }(), queryLimiter: limiter.NewQueryLimiter(0, 1, 0, 0), - expectedErr: fmt.Errorf("materializer failed to materialize chunks: would fetch too many chunk bytes: resource exhausted (used 1)"), + expectedErr: fmt.Errorf("materializer failed to create chunks iterator: failed to create column value iterator: would fetch too many chunk bytes: resource exhausted (used 1)"), }, "max chunk bytes per query limit hit": { limits: func() *validation.Overrides { diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go index 778f29d5c6b..819f9cd13d5 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go @@ -15,6 +15,7 @@ package convert import ( "context" + "encoding/binary" "fmt" "io" "math" @@ -29,6 +30,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/thanos-io/objstore" @@ -362,7 +364,17 @@ func NewTsdbRowReader(ctx context.Context, mint, maxt, colDuration int64, blks [ var ( seriesSets = make([]storage.ChunkSeriesSet, 0, len(blks)) closers = make([]io.Closer, 0, len(blks)) + ok = false ) + // If we fail to build the row reader, make sure we release resources. + // This could be either a controlled error or a panic. + defer func() { + if !ok { + for i := range closers { + _ = closers[i].Close() + } + } + }() b := schema.NewBuilder(mint, maxt, colDuration) @@ -400,7 +412,7 @@ func NewTsdbRowReader(ctx context.Context, mint, maxt, colDuration int64, blks [ return nil, fmt.Errorf("unable to get label names from block: %w", err) } - postings := sortedPostings(ctx, indexr, ops.sortedLabels...) + postings := sortedPostings(ctx, indexr, mint, maxt, ops.sortedLabels...) seriesSet := tsdb.NewBlockChunkSeriesSet(blk.Meta().ULID, indexr, chunkr, tombsr, postings, mint, maxt, false) seriesSets = append(seriesSets, seriesSet) @@ -414,7 +426,7 @@ func NewTsdbRowReader(ctx context.Context, mint, maxt, colDuration int64, blks [ return nil, fmt.Errorf("unable to build index reader from block: %w", err) } - return &TsdbRowReader{ + rr := &TsdbRowReader{ ctx: ctx, seriesSet: cseriesSet, closers: closers, @@ -423,7 +435,9 @@ func NewTsdbRowReader(ctx context.Context, mint, maxt, colDuration int64, blks [ rowBuilder: parquet.NewRowBuilder(s.Schema), encoder: schema.NewPrometheusParquetChunksEncoder(s, ops.maxSamplesPerChunk), - }, nil + } + ok = true + return rr, nil } func (rr *TsdbRowReader) Close() error { @@ -438,7 +452,7 @@ func (rr *TsdbRowReader) Schema() *schema.TSDBSchema { return rr.tsdbSchema } -func sortedPostings(ctx context.Context, indexr tsdb.IndexReader, sortedLabels ...string) index.Postings { +func sortedPostings(ctx context.Context, indexr tsdb.IndexReader, mint, maxt int64, sortedLabels ...string) index.Postings { p := tsdb.AllSortedPostings(ctx, indexr) if len(sortedLabels) == 0 { @@ -451,16 +465,25 @@ func sortedPostings(ctx context.Context, indexr tsdb.IndexReader, sortedLabels . labels labels.Labels } series := make([]s, 0, 128) + chks := make([]chunks.Meta, 0, 128) scratchBuilder := labels.NewScratchBuilder(10) lb := labels.NewBuilder(labels.EmptyLabels()) i := 0 +P: for p.Next() { scratchBuilder.Reset() - err := indexr.Series(p.At(), &scratchBuilder, nil) - if err != nil { - return index.ErrPostings(fmt.Errorf("expand series: %w", err)) + chks = chks[:0] + if err := indexr.Series(p.At(), &scratchBuilder, &chks); err != nil { + return index.ErrPostings(fmt.Errorf("unable to expand series: %w", err)) } + hasChunks := slices.ContainsFunc(chks, func(chk chunks.Meta) bool { + return mint <= chk.MaxTime && chk.MinTime <= maxt + }) + if !hasChunks { + continue P + } + lb.Reset(scratchBuilder.Labels()) series = append(series, s{labels: lb.Keep(sortedLabels...).Labels(), ref: p.At(), idx: i}) @@ -531,10 +554,14 @@ func (rr *TsdbRowReader) ReadRows(buf []parquet.Row) (int, error) { i, j := 0, 0 lblsIdxs := []int{} - colIndex, ok := rr.tsdbSchema.Schema.Lookup(schema.ColIndexes) + colIndex, ok := rr.tsdbSchema.Schema.Lookup(schema.ColIndexesColumn) if !ok { return 0, fmt.Errorf("unable to find indexes") } + seriesHashIndex, ok := rr.tsdbSchema.Schema.Lookup(schema.SeriesHashColumn) + if !ok { + return 0, fmt.Errorf("unable to find series hash column") + } for promise := range c { j++ @@ -548,7 +575,8 @@ func (rr *TsdbRowReader) ReadRows(buf []parquet.Row) (int, error) { rr.rowBuilder.Reset() lblsIdxs = lblsIdxs[:0] - promise.s.Labels().Range(func(l labels.Label) { + seriesLabels := promise.s.Labels() + seriesLabels.Range(func(l labels.Label) { colName := schema.LabelToColumn(l.Name) lc, _ := rr.tsdbSchema.Schema.Lookup(colName) rr.rowBuilder.Add(lc.ColumnIndex, parquet.ValueOf(l.Value)) @@ -557,6 +585,12 @@ func (rr *TsdbRowReader) ReadRows(buf []parquet.Row) (int, error) { rr.rowBuilder.Add(colIndex.ColumnIndex, parquet.ValueOf(schema.EncodeIntSlice(lblsIdxs))) + // Compute and store the series hash as a byte slice in big-endian format + seriesHashValue := labels.StableHash(seriesLabels) + seriesHashBytes := make([]byte, 8) + binary.BigEndian.PutUint64(seriesHashBytes, seriesHashValue) + rr.rowBuilder.Add(seriesHashIndex.ColumnIndex, parquet.ValueOf(seriesHashBytes)) + // skip series that have no chunks in the requested time if allChunksEmpty(chkBytes) { continue diff --git a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go index e5518eaf6c3..2b4d1590d88 100644 --- a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go +++ b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go @@ -18,7 +18,6 @@ import ( "runtime" "sort" "strings" - "sync" "github.com/prometheus/prometheus/model/labels" prom_storage "github.com/prometheus/prometheus/storage" @@ -263,7 +262,6 @@ func (p parquetQuerier) Select(ctx context.Context, sorted bool, sp *prom_storag }() span.SetAttributes( - attribute.Bool("sorted", sorted), attribute.Int64("mint", p.mint), attribute.Int64("maxt", p.maxt), attribute.String("matchers", matchersToString(matchers)), @@ -280,6 +278,11 @@ func (p parquetQuerier) Select(ctx context.Context, sorted bool, sp *prom_storag if err != nil { return prom_storage.ErrSeriesSet(err) } + // If we are merging multiple shards then each shard series set needs to be sorted. + if len(shards) > 1 { + sorted = true + } + span.SetAttributes(attribute.Bool("sorted", sorted)) seriesSet := make([]prom_storage.ChunkSeriesSet, len(shards)) minT, maxT := p.mint, p.maxt @@ -358,10 +361,13 @@ func (b queryableShard) Query(ctx context.Context, sorted bool, sp *prom_storage errGroup, ctx := errgroup.WithContext(ctx) errGroup.SetLimit(b.concurrency) - results := make([]prom_storage.ChunkSeries, 0, 1024) - rMtx := sync.Mutex{} + rowGroupCount := len(b.shard.LabelsFile().RowGroups()) + results := make([][]prom_storage.ChunkSeries, rowGroupCount) + for i := range results { + results[i] = make([]prom_storage.ChunkSeries, 0, 1024/rowGroupCount) + } - for rgi := range b.shard.LabelsFile().RowGroups() { + for rgi := range rowGroupCount { errGroup.Go(func() error { cs, err := search.MatchersToConstraints(matchers...) if err != nil { @@ -380,18 +386,18 @@ func (b queryableShard) Query(ctx context.Context, sorted bool, sp *prom_storage return nil } - series, err := b.m.Materialize(ctx, sp, rgi, mint, maxt, skipChunks, rr) + seriesSetIter, err := b.m.Materialize(ctx, sp, rgi, mint, maxt, skipChunks, rr) if err != nil { return err } - if len(series) == 0 { - return nil + defer func() { _ = seriesSetIter.Close() }() + for seriesSetIter.Next() { + results[rgi] = append(results[rgi], seriesSetIter.At()) } - - rMtx.Lock() - results = append(results, series...) - rMtx.Unlock() - return nil + if sorted { + sort.Sort(byLabels(results[rgi])) + } + return seriesSetIter.Err() }) } @@ -399,10 +405,20 @@ func (b queryableShard) Query(ctx context.Context, sorted bool, sp *prom_storage return nil, err } + totalResults := 0 + for _, res := range results { + totalResults += len(res) + } + + resultsFlattened := make([]prom_storage.ChunkSeries, 0, totalResults) + for _, res := range results { + resultsFlattened = append(resultsFlattened, res...) + } if sorted { - sort.Sort(byLabels(results)) + sort.Sort(byLabels(resultsFlattened)) } - return convert.NewChunksSeriesSet(results), nil + + return convert.NewChunksSeriesSet(resultsFlattened), nil } func (b queryableShard) LabelNames(ctx context.Context, limit int64, matchers []*labels.Matcher) ([]string, error) { diff --git a/vendor/github.com/prometheus-community/parquet-common/schema/schema.go b/vendor/github.com/prometheus-community/parquet-common/schema/schema.go index c3a27ccc15f..edae87bc6f9 100644 --- a/vendor/github.com/prometheus-community/parquet-common/schema/schema.go +++ b/vendor/github.com/prometheus-community/parquet-common/schema/schema.go @@ -27,7 +27,8 @@ import ( const ( LabelColumnPrefix = "l_" DataColumnPrefix = "s_data_" - ColIndexes = "s_col_indexes" + ColIndexesColumn = "s_col_indexes" + SeriesHashColumn = "s_series_hash" DataColSizeMd = "data_col_duration_ms" MinTMd = "minT" diff --git a/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go b/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go index 4d1df102eff..3f0cf8ec02b 100644 --- a/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go +++ b/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go @@ -58,7 +58,7 @@ func NewBuilder(mint, maxt, colDuration int64) *Builder { // It extracts metadata (mint, maxt, dataColDurationMs) from the file's key-value metadata // and reconstructs the schema by examining the file's columns to identify label columns. // Returns an error if the metadata cannot be parsed or the schema cannot be built. -func FromLabelsFile(lf *parquet.File) (*TSDBSchema, error) { +func FromLabelsFile(lf parquet.FileView) (*TSDBSchema, error) { md := MetadataToMap(lf.Metadata().KeyValueMetadata) mint, err := strconv.ParseInt(md[MinTMd], 0, 64) if err != nil { @@ -105,7 +105,8 @@ func (b *Builder) AddLabelNameColumn(lbls ...string) { func (b *Builder) Build() (*TSDBSchema, error) { colIdx := 0 - b.g[ColIndexes] = parquet.Encoded(parquet.Leaf(parquet.ByteArrayType), &parquet.DeltaByteArray) + b.g[ColIndexesColumn] = parquet.Encoded(parquet.Leaf(parquet.ByteArrayType), &parquet.DeltaByteArray) + b.g[SeriesHashColumn] = parquet.Leaf(parquet.ByteArrayType) for i := b.mint; i <= b.maxt; i += b.dataColDurationMs { b.g[DataColumn(colIdx)] = parquet.Encoded(parquet.Leaf(parquet.ByteArrayType), &parquet.DeltaLengthByteArray) colIdx++ @@ -158,7 +159,7 @@ func (s *TSDBSchema) DataColumIdx(t int64) int { // LabelsProjection creates a TSDBProjection containing only label columns and column indexes. // This projection is used for creating parquet files that contain only the label metadata // without the actual time series data columns. The resulting projection includes: -// - ColIndexes column for row indexing +// - ColIndexesColumn column for row indexing // - All label columns extracted from the original schema // // Parameters: @@ -168,11 +169,17 @@ func (s *TSDBSchema) DataColumIdx(t int64) int { func (s *TSDBSchema) LabelsProjection(opts ...CompressionOpts) (*TSDBProjection, error) { g := make(parquet.Group) - lc, ok := s.Schema.Lookup(ColIndexes) + lc, ok := s.Schema.Lookup(ColIndexesColumn) if !ok { - return nil, fmt.Errorf("column %v not found", ColIndexes) + return nil, fmt.Errorf("column %v not found", ColIndexesColumn) } - g[ColIndexes] = lc.Node + g[ColIndexesColumn] = lc.Node + + lhc, ok := s.Schema.Lookup(SeriesHashColumn) + if !ok { + return nil, fmt.Errorf("column %v not found", SeriesHashColumn) + } + g[SeriesHashColumn] = lhc.Node for _, c := range s.Schema.Columns() { if _, ok := ExtractLabelFromColumn(c[0]); !ok { @@ -189,7 +196,7 @@ func (s *TSDBSchema) LabelsProjection(opts ...CompressionOpts) (*TSDBProjection, return LabelsPfileNameForShard(name, shard) }, Schema: WithCompression(parquet.NewSchema("labels-projection", g), opts...), - ExtraOptions: []parquet.WriterOption{parquet.SkipPageBounds(ColIndexes)}, + ExtraOptions: []parquet.WriterOption{parquet.SkipPageBounds(ColIndexesColumn), parquet.SkipPageBounds(SeriesHashColumn)}, }, nil } diff --git a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go index 1db078ebaf5..29b23317e4a 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go @@ -21,7 +21,6 @@ import ( "sort" "github.com/parquet-go/parquet-go" - "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus-community/parquet-common/schema" @@ -47,26 +46,48 @@ type Constraint interface { func MatchersToConstraints(matchers ...*labels.Matcher) ([]Constraint, error) { r := make([]Constraint, 0, len(matchers)) for _, matcher := range matchers { + var c Constraint + S: switch matcher.Type { case labels.MatchEqual: - r = append(r, Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(matcher.Value))) + c = Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(matcher.Value)) case labels.MatchNotEqual: - r = append(r, Not(Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(matcher.Value)))) + c = Not(Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(matcher.Value))) case labels.MatchRegexp: - res, err := labels.NewFastRegexMatcher(matcher.Value) + if matcher.GetRegexString() == ".*" { + continue + } + if matcher.GetRegexString() == ".+" { + c = Not(Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(""))) + break S + } + if set := matcher.SetMatches(); len(set) == 1 { + c = Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(set[0])) + break S + } + rc, err := Regex(schema.LabelToColumn(matcher.Name), matcher) if err != nil { - return nil, err + return nil, fmt.Errorf("unable to construct regex matcher: %w", err) } - r = append(r, Regex(schema.LabelToColumn(matcher.Name), res)) + c = rc case labels.MatchNotRegexp: - res, err := labels.NewFastRegexMatcher(matcher.Value) + inverted, err := matcher.Inverse() if err != nil { - return nil, err + return nil, fmt.Errorf("unable to invert matcher: %w", err) + } + if set := inverted.SetMatches(); len(set) == 1 { + c = Not(Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(set[0]))) + break S } - r = append(r, Not(Regex(schema.LabelToColumn(matcher.Name), res))) + rc, err := Regex(schema.LabelToColumn(matcher.Name), inverted) + if err != nil { + return nil, fmt.Errorf("unable to construct regex matcher: %w", err) + } + c = Not(rc) default: return nil, fmt.Errorf("unsupported matcher type %s", matcher.Type) } + r = append(r, c) } return r, nil } @@ -152,30 +173,56 @@ func Filter(ctx context.Context, s storage.ParquetShard, rgIdx int, cs ...Constr return rr, nil } -type pageToRead struct { +type PageToRead struct { + idx int + // for data pages pfrom int64 pto int64 - idx int - // for data and dictionary pages - off int - csz int + off int64 + csz int64 // compressed size } -// symbolTable is a helper that can decode the i-th value of a page. +func NewPageToRead(idx int, pfrom, pto, off, csz int64) PageToRead { + return PageToRead{ + idx: idx, + pfrom: pfrom, + pto: pto, + off: off, + csz: csz, + } +} + +func (p *PageToRead) From() int64 { + return p.pfrom +} + +func (p *PageToRead) To() int64 { + return p.pto +} + +func (p *PageToRead) Offset() int64 { + return p.off +} + +func (p *PageToRead) CompressedSize() int64 { + return p.csz +} + +// SymbolTable is a helper that can decode the i-th value of a page. // Using it we only need to allocate an int32 slice and not a slice of // string values. // It only works for optional dictionary encoded columns. All of our label // columns are that though. -type symbolTable struct { +type SymbolTable struct { dict parquet.Dictionary syms []int32 defs []byte } -func (s *symbolTable) Get(r int) parquet.Value { +func (s *SymbolTable) Get(r int) parquet.Value { i := s.GetIndex(r) switch i { case -1: @@ -185,7 +232,7 @@ func (s *symbolTable) Get(r int) parquet.Value { } } -func (s *symbolTable) GetIndex(i int) int32 { +func (s *SymbolTable) GetIndex(i int) int32 { switch s.defs[i] { case 1: return s.syms[i] @@ -194,7 +241,7 @@ func (s *symbolTable) GetIndex(i int) int32 { } } -func (s *symbolTable) Reset(pg parquet.Page) { +func (s *SymbolTable) Reset(pg parquet.Page) { dict := pg.Dictionary() data := pg.Data() syms := data.Int32() @@ -216,6 +263,33 @@ func (s *symbolTable) Reset(pg parquet.Page) { s.dict = dict } +func (s *SymbolTable) ResetWithRange(pg parquet.Page, l, r int) { + dict := pg.Dictionary() + data := pg.Data() + syms := data.Int32() + s.defs = pg.DefinitionLevels() + + if s.syms == nil { + s.syms = make([]int32, len(s.defs)) + } else { + s.syms = slices.Grow(s.syms, len(s.defs))[:len(s.defs)] + } + + sidx := 0 + for i := 0; i < l; i++ { + if s.defs[i] == 1 { + sidx++ + } + } + for i := l; i < r; i++ { + if s.defs[i] == 1 { + s.syms[i] = syms[sidx] + sidx++ + } + } + s.dict = dict +} + type equalConstraint struct { pth string @@ -268,10 +342,10 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, } res := make([]RowRange, 0) - readPgs := make([]pageToRead, 0, 10) + readPgs := make([]PageToRead, 0, 10) for i := 0; i < cidx.NumPages(); i++ { - poff, pcsz := uint64(oidx.Offset(i)), oidx.CompressedPageSize(i) + poff, pcsz := oidx.Offset(i), oidx.CompressedPageSize(i) // If page does not intersect from, to; we can immediately discard it pfrom := oidx.FirstRowIndex(i) @@ -309,7 +383,7 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, continue } // We cannot discard the page through statistics but we might need to read it to see if it has the value - readPgs = append(readPgs, pageToRead{pfrom: pfrom, pto: pto, idx: i, off: int(poff), csz: int(pcsz)}) + readPgs = append(readPgs, NewPageToRead(i, pfrom, pto, poff, pcsz)) } // Did not find any pages @@ -319,8 +393,8 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, dictOff, dictSz := ec.f.DictionaryPageBounds(rgIdx, col.ColumnIndex) - minOffset := uint64(readPgs[0].off) - maxOffset := readPgs[len(readPgs)-1].off + readPgs[len(readPgs)-1].csz + minOffset := uint64(readPgs[0].Offset()) + maxOffset := readPgs[len(readPgs)-1].Offset() + readPgs[len(readPgs)-1].CompressedSize() // If the gap between the first page and the dic page is less than PagePartitioningMaxGapSize, // we include the dic to be read in the single read @@ -335,10 +409,10 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, defer func() { _ = pgs.Close() }() - symbols := new(symbolTable) + symbols := new(SymbolTable) for _, p := range readPgs { - pfrom := p.pfrom - pto := p.pto + pfrom := p.From() + pto := p.To() if err := pgs.SeekToRow(pfrom); err != nil { return nil, fmt.Errorf("unable to seek to row: %w", err) @@ -348,8 +422,6 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, return nil, fmt.Errorf("unable to read page: %w", err) } - symbols.Reset(pg) - // The page has the value, we need to find the matching row ranges n := int(pg.NumRows()) bl := int(max(pfrom, from) - pfrom) @@ -357,6 +429,7 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, var l, r int switch { case cidx.IsAscending() && primary: + symbols.Reset(pg) l = sort.Search(n, func(i int) bool { return ec.comp(ec.val, symbols.Get(i)) <= 0 }) r = sort.Search(n, func(i int) bool { return ec.comp(ec.val, symbols.Get(i)) < 0 }) @@ -365,6 +438,7 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, } default: off, count := bl, 0 + symbols.ResetWithRange(pg, bl, br) for j := bl; j < br; j++ { if !ec.matches(symbols.Get(j)) { if count != 0 { @@ -382,6 +456,7 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, res = append(res, RowRange{pfrom + int64(off), int64(count)}) } } + parquet.Release(pg) } if len(res) == 0 { @@ -431,15 +506,31 @@ func (ec *equalConstraint) skipByBloomfilter(cc parquet.ColumnChunk) (bool, erro return !ok, nil } -func Regex(path string, r *labels.FastRegexMatcher) Constraint { - return ®exConstraint{pth: path, cache: make(map[parquet.Value]bool), r: r} +func Regex(path string, r *labels.Matcher) (Constraint, error) { + if r.Type != labels.MatchRegexp { + return nil, fmt.Errorf("unsupported matcher type: %s", r.Type) + } + return ®exConstraint{ + pth: path, + cache: make(map[int32]bool), + r: r, + }, nil } type regexConstraint struct { - pth string - cache map[parquet.Value]bool f storage.ParquetFileView - r *labels.FastRegexMatcher + pth string + cache map[int32]bool + + // if its a "set" or "prefix" regex + // for set, those are minv and maxv of the set, for prefix minv is the prefix, maxv is prefix+max(charset)*16 + minv parquet.Value + maxv parquet.Value + + r *labels.Matcher + matchesEmpty bool + + comp func(l, r parquet.Value) int } func (rc *regexConstraint) String() string { @@ -458,20 +549,13 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, if !ok { // If match empty, return rr (filter nothing) // otherwise return empty - if rc.matches(parquet.ValueOf("")) { + if rc.matchesEmpty { return rr, nil } return []RowRange{}, nil } cc := rg.ColumnChunks()[col.ColumnIndex] - pgs, err := rc.f.GetPages(ctx, cc, 0, 0) - if err != nil { - return nil, errors.Wrap(err, "failed to get pages") - } - - defer func() { _ = pgs.Close() }() - oidx, err := cc.OffsetIndex() if err != nil { return nil, fmt.Errorf("unable to read offset index: %w", err) @@ -480,11 +564,13 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, if err != nil { return nil, fmt.Errorf("unable to read column index: %w", err) } - var ( - symbols = new(symbolTable) - res = make([]RowRange, 0) - ) + res := make([]RowRange, 0) + + readPgs := make([]PageToRead, 0, 10) + for i := 0; i < cidx.NumPages(); i++ { + poff, pcsz := uint64(oidx.Offset(i)), oidx.CompressedPageSize(i) + // If page does not intersect from, to; we can immediately discard it pfrom := oidx.FirstRowIndex(i) pcount := rg.NumRows() - pfrom @@ -500,14 +586,61 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, } // Page intersects [from, to] but we might be able to discard it with statistics if cidx.NullPage(i) { - if rc.matches(parquet.ValueOf("")) { + if rc.matchesEmpty { res = append(res, RowRange{pfrom, pcount}) } continue } - // TODO: use setmatches / prefix for statistics + // If we have a special regular expression that works with statistics, we can use them to skip. + // This works for i.e.: 'pod_name=~"thanos-.*"' or 'status_code=~"403|404"' + minv, maxv := cidx.MinValue(i), cidx.MaxValue(i) + if !rc.minv.IsNull() && !rc.maxv.IsNull() { + if !rc.matchesEmpty && !maxv.IsNull() && rc.comp(rc.minv, maxv) > 0 { + if cidx.IsDescending() { + break + } + continue + } + if !rc.matchesEmpty && !minv.IsNull() && rc.comp(rc.maxv, minv) < 0 { + if cidx.IsAscending() { + break + } + continue + } + } // We cannot discard the page through statistics but we might need to read it to see if it has the value + readPgs = append(readPgs, PageToRead{pfrom: pfrom, pto: pto, idx: i, off: int64(poff), csz: pcsz}) + } + + // Did not find any pages + if len(readPgs) == 0 { + return intersectRowRanges(simplify(res), rr), nil + } + + dictOff, dictSz := rc.f.DictionaryPageBounds(rgIdx, col.ColumnIndex) + + minOffset := uint64(readPgs[0].off) + maxOffset := readPgs[len(readPgs)-1].off + readPgs[len(readPgs)-1].csz + + // If the gap between the first page and the dic page is less than PagePartitioningMaxGapSize, + // we include the dic to be read in the single read + if int(minOffset-(dictOff+dictSz)) < rc.f.PagePartitioningMaxGapSize() { + minOffset = dictOff + } + + pgs, err := rc.f.GetPages(ctx, cc, int64(minOffset), int64(maxOffset)) + if err != nil { + return nil, err + } + + defer func() { _ = pgs.Close() }() + + symbols := new(SymbolTable) + for _, p := range readPgs { + pfrom := p.pfrom + pto := p.pto + if err := pgs.SeekToRow(pfrom); err != nil { return nil, fmt.Errorf("unable to seek to row: %w", err) } @@ -516,15 +649,14 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, return nil, fmt.Errorf("unable to read page: %w", err) } - symbols.Reset(pg) - // The page has the value, we need to find the matching row ranges n := int(pg.NumRows()) bl := int(max(pfrom, from) - pfrom) br := n - int(pto-min(pto, to)) off, count := bl, 0 + symbols.ResetWithRange(pg, bl, br) for j := bl; j < br; j++ { - if !rc.matches(symbols.Get(j)) { + if !rc.matches(symbols, symbols.GetIndex(j)) { if count != 0 { res = append(res, RowRange{pfrom + int64(off), int64(count)}) } @@ -539,7 +671,9 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, if count != 0 { res = append(res, RowRange{pfrom + int64(off), int64(count)}) } + parquet.Release(pg) } + if len(res) == 0 { return nil, nil } @@ -549,13 +683,32 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, func (rc *regexConstraint) init(f storage.ParquetFileView) error { c, ok := f.Schema().Lookup(rc.path()) rc.f = f + rc.matchesEmpty = rc.r.Matches("") if !ok { return nil } if stringKind := parquet.String().Type().Kind(); c.Node.Type().Kind() != stringKind { return fmt.Errorf("schema: cannot search value of kind %s in column of kind %s", stringKind, c.Node.Type().Kind()) } - rc.cache = make(map[parquet.Value]bool) + rc.cache = make(map[int32]bool) + rc.comp = c.Node.Type().Compare + + // if applicable compute the minv and maxv of the implied set of matches + rc.minv = parquet.NullValue() + rc.maxv = parquet.NullValue() + if len(rc.r.SetMatches()) > 0 { + sm := make([]parquet.Value, len(rc.r.SetMatches())) + for i, m := range rc.r.SetMatches() { + sm[i] = parquet.ValueOf(m) + } + rc.minv = slices.MinFunc(sm, rc.comp) + rc.maxv = slices.MaxFunc(sm, rc.comp) + } else if len(rc.r.Prefix()) > 0 { + rc.minv = parquet.ValueOf(rc.r.Prefix()) + // 16 is the default prefix length, maybe we should read the actual value from somewhere? + rc.maxv = parquet.ValueOf(append([]byte(rc.r.Prefix()), bytes.Repeat([]byte{0xff}, 16)...)) + } + return nil } @@ -563,11 +716,18 @@ func (rc *regexConstraint) path() string { return rc.pth } -func (rc *regexConstraint) matches(v parquet.Value) bool { - accept, seen := rc.cache[v] +func (rc *regexConstraint) matches(symbols *SymbolTable, i int32) bool { + accept, seen := rc.cache[i] if !seen { - accept = rc.r.MatchString(util.YoloString(v.ByteArray())) - rc.cache[v] = accept + var v parquet.Value + switch i { + case -1: + v = parquet.NullValue() + default: + v = symbols.dict.Index(i) + } + accept = rc.r.Matches(util.YoloString(v.ByteArray())) + rc.cache[i] = accept } return accept } diff --git a/vendor/github.com/prometheus-community/parquet-common/search/iterators.go b/vendor/github.com/prometheus-community/parquet-common/search/iterators.go new file mode 100644 index 00000000000..e3040eb4146 --- /dev/null +++ b/vendor/github.com/prometheus-community/parquet-common/search/iterators.go @@ -0,0 +1,594 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package search + +import ( + "context" + "io" + + "github.com/hashicorp/go-multierror" + "github.com/parquet-go/parquet-go" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" + prom_storage "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/util/annotations" + + "github.com/prometheus-community/parquet-common/schema" + "github.com/prometheus-community/parquet-common/storage" +) + +var _ prom_storage.ChunkSeries = &ConcreteChunksSeries{} + +type ConcreteChunksSeries struct { + lbls labels.Labels + chks []chunks.Meta +} + +func (c ConcreteChunksSeries) Labels() labels.Labels { + return c.lbls +} + +func (c ConcreteChunksSeries) Iterator(_ chunks.Iterator) chunks.Iterator { + return prom_storage.NewListChunkSeriesIterator(c.chks...) +} + +type ChunkSeriesSetCloser interface { + prom_storage.ChunkSeriesSet + + // Close releases any memory buffers held by the ChunkSeriesSet or the + // underlying ChunkSeries. It is not safe to use the ChunkSeriesSet + // or any of its ChunkSeries after calling Close. + Close() error +} + +type NoChunksConcreteLabelsSeriesSet struct { + seriesSet []*ConcreteChunksSeries + currentSeriesIdx int +} + +func NewNoChunksConcreteLabelsSeriesSet(sLbls []labels.Labels) *NoChunksConcreteLabelsSeriesSet { + seriesSet := make([]*ConcreteChunksSeries, len(sLbls)) + for i, lbls := range sLbls { + seriesSet[i] = &ConcreteChunksSeries{lbls: lbls} + } + return &NoChunksConcreteLabelsSeriesSet{ + seriesSet: seriesSet, + currentSeriesIdx: -1, + } +} + +func (s *NoChunksConcreteLabelsSeriesSet) At() prom_storage.ChunkSeries { + return s.seriesSet[s.currentSeriesIdx] +} + +func (s *NoChunksConcreteLabelsSeriesSet) Next() bool { + if s.currentSeriesIdx+1 == len(s.seriesSet) { + return false + } + s.currentSeriesIdx++ + return true +} + +func (s *NoChunksConcreteLabelsSeriesSet) Err() error { + return nil +} + +func (s *NoChunksConcreteLabelsSeriesSet) Warnings() annotations.Annotations { + return nil +} + +func (s *NoChunksConcreteLabelsSeriesSet) Close() error { + return nil +} + +// FilterEmptyChunkSeriesSet is a ChunkSeriesSet that lazily filters out series with no chunks. +// It takes a set of materialized labels and a lazy iterator of chunks.Iterators; +// the labels and iterators are iterated in tandem to yield series with chunks. +// The materialized series callback is applied to each series when the iterator advances during Next(). +type FilterEmptyChunkSeriesSet struct { + ctx context.Context + lblsSet []labels.Labels + chnkSet ChunksIteratorIterator + + currentSeries *ConcreteChunksSeries + materializedSeriesCallback MaterializedSeriesFunc + err error +} + +func NewFilterEmptyChunkSeriesSet( + ctx context.Context, + lblsSet []labels.Labels, + chnkSet ChunksIteratorIterator, + materializeSeriesCallback MaterializedSeriesFunc, +) *FilterEmptyChunkSeriesSet { + return &FilterEmptyChunkSeriesSet{ + ctx: ctx, + lblsSet: lblsSet, + chnkSet: chnkSet, + materializedSeriesCallback: materializeSeriesCallback, + } +} + +func (s *FilterEmptyChunkSeriesSet) At() prom_storage.ChunkSeries { + return s.currentSeries +} + +func (s *FilterEmptyChunkSeriesSet) Next() bool { + metas := make([]chunks.Meta, 0, 128) + for s.chnkSet.Next() { + if len(s.lblsSet) == 0 { + s.err = errors.New("less labels than chunks, this should not happen") + return false + } + lbls := s.lblsSet[0] + s.lblsSet = s.lblsSet[1:] + iter := s.chnkSet.At() + for iter.Next() { + metas = append(metas, iter.At()) + } + + if iter.Err() != nil { + s.err = iter.Err() + return false + } + + if len(metas) == 0 { + // This series has no chunks, skip it and continue to the next + continue + } + metasCpy := make([]chunks.Meta, len(metas)) + copy(metasCpy, metas) // copying prevents metas from escaping to heap + + s.currentSeries = &ConcreteChunksSeries{ + lbls: lbls, + chks: metasCpy, + } + s.err = s.materializedSeriesCallback(s.ctx, s.currentSeries) + return s.err == nil + // This series has no chunks, skip it and continue to the next + } + if s.chnkSet.Err() != nil { + s.err = s.chnkSet.Err() + } + if len(s.lblsSet) > 0 { + s.err = errors.New("more labels than chunks, this should not happen") + } + return false +} + +func (s *FilterEmptyChunkSeriesSet) Err() error { + if s.err != nil { + return s.err + } + return s.chnkSet.Err() +} + +func (s *FilterEmptyChunkSeriesSet) Warnings() annotations.Annotations { + return nil +} + +func (s *FilterEmptyChunkSeriesSet) Close() error { + return s.chnkSet.Close() +} + +type ChunksIteratorIterator interface { + Next() bool + At() chunks.Iterator + Err() error + Close() error +} + +// MultiColumnChunksDecodingIterator yields a prometheus chunks.Iterator from multiple parquet Columns. +// The column iterators are called in order for each column and zipped together, +// yielding a single iterator that in turn can yield all chunks for the same row. +// +// The "column iterators" are pagesRowValueIterator which are expected to be: +// 1. in order of the columns in the parquet file +// 2. initialized with the same set of pages and row ranges +// An error is returned if the iterators have different lengths. +type MultiColumnChunksDecodingIterator struct { + mint int64 + maxt int64 + + columnValueIterators []*ColumnValueIterator + d *schema.PrometheusParquetChunksDecoder + + // current is a chunk-decoding iterator for the materialized parquet Values + // combined by calling and zipping all column iterators in order + current *ValueDecodingChunkIterator + err error +} + +func (i *MultiColumnChunksDecodingIterator) At() chunks.Iterator { + return i.current +} + +func (i *MultiColumnChunksDecodingIterator) Next() bool { + if i.err != nil || len(i.columnValueIterators) == 0 { + return false + } + + multiColumnValues := make([]parquet.Value, 0, len(i.columnValueIterators)) + for _, columnValueIter := range i.columnValueIterators { + if !columnValueIter.Next() { + i.err = columnValueIter.Err() + if i.err != nil { + return false + } + continue + } + at := columnValueIter.At() + multiColumnValues = append(multiColumnValues, at) + } + if len(multiColumnValues) == 0 { + return false + } + + i.current = &ValueDecodingChunkIterator{ + mint: i.mint, + maxt: i.maxt, + values: multiColumnValues, + d: i.d, + } + return true +} + +func (i *MultiColumnChunksDecodingIterator) Err() error { + return i.err +} + +func (i *MultiColumnChunksDecodingIterator) Close() error { + err := &multierror.Error{} + for _, iter := range i.columnValueIterators { + err = multierror.Append(err, iter.Close()) + } + return err.ErrorOrNil() +} + +// ValueDecodingChunkIterator decodes and yields chunks from a parquet Values slice. +type ValueDecodingChunkIterator struct { + mint int64 + maxt int64 + values []parquet.Value + d *schema.PrometheusParquetChunksDecoder + + decoded []chunks.Meta + current chunks.Meta + err error +} + +func (i *ValueDecodingChunkIterator) At() chunks.Meta { + return i.current +} + +func (i *ValueDecodingChunkIterator) Next() bool { + if i.err != nil { + return false + } + if len(i.values) == 0 && len(i.decoded) == 0 { + return false + } + if len(i.decoded) > 0 { + i.current = i.decoded[0] + i.decoded = i.decoded[1:] + return true + } + value := i.values[0] + i.values = i.values[1:] + + i.decoded, i.err = i.d.Decode(value.ByteArray(), i.mint, i.maxt) + return i.Next() +} + +func (i *ValueDecodingChunkIterator) Err() error { + return i.err +} + +func (c ConcreteChunksSeries) ChunkCount() (int, error) { + return len(c.chks), nil +} + +type ColumnValueIterator struct { + currentIteratorIndex int + rowRangesIterators []*RowRangesValueIterator + + current parquet.Value + err error +} + +func (i *ColumnValueIterator) At() parquet.Value { + return i.current +} + +func (i *ColumnValueIterator) Next() bool { + if i.err != nil { + return false + } + + found := false + for !found { + if i.currentIteratorIndex >= len(i.rowRangesIterators) { + return false + } + + currentIterator := i.rowRangesIterators[i.currentIteratorIndex] + hasNext := currentIterator.Next() + + if !hasNext { + if err := currentIterator.Err(); err != nil { + i.err = err + _ = currentIterator.Close() + return false + } + // Iterator exhausted without error; close and move on to the next one + _ = currentIterator.Close() + i.currentIteratorIndex++ + continue + } + i.current = currentIterator.At() + found = true + } + return found +} + +func (i *ColumnValueIterator) Err() error { + return i.err +} + +func (i *ColumnValueIterator) Close() error { + err := &multierror.Error{} + for _, iter := range i.rowRangesIterators { + err = multierror.Append(err, iter.Close()) + } + return err.ErrorOrNil() +} + +// RowRangesValueIterator yields individual parquet Values from specified row ranges in its FilePages +type RowRangesValueIterator struct { + pgs parquet.Pages + pageIterator *PageValueIterator + + remainingRr []RowRange + currentRr RowRange + next int64 + remaining int64 + currentRow int64 + + buffer []parquet.Value + currentBufferIndex int + err error +} + +func newRowRangesValueIterator( + ctx context.Context, + file storage.ParquetFileView, + cc parquet.ColumnChunk, + pageRange PageToReadWithRow, + dictOff uint64, + dictSz uint64, +) (*RowRangesValueIterator, error) { + minOffset := uint64(pageRange.Offset()) + maxOffset := uint64(pageRange.Offset() + pageRange.CompressedSize()) + + // if dictOff == 0, it means that the collum is not dictionary encoded + if dictOff > 0 && int(minOffset-(dictOff+dictSz)) < file.PagePartitioningMaxGapSize() { + minOffset = dictOff + } + + pgs, err := file.GetPages(ctx, cc, int64(minOffset), int64(maxOffset)) + if err != nil { + if pgs != nil { + _ = pgs.Close() + } + return nil, errors.Wrap(err, "failed to get pages") + } + + err = pgs.SeekToRow(pageRange.rows[0].From) + if err != nil { + _ = pgs.Close() + return nil, errors.Wrap(err, "failed to seek to row") + } + + remainingRr := pageRange.rows + + currentRr := remainingRr[0] + next := currentRr.From + remaining := currentRr.Count + currentRow := currentRr.From + + remainingRr = remainingRr[1:] + return &RowRangesValueIterator{ + pgs: pgs, + pageIterator: new(PageValueIterator), + + remainingRr: remainingRr, + currentRr: currentRr, + next: next, + remaining: remaining, + currentRow: currentRow, + }, nil +} + +func (i *RowRangesValueIterator) At() parquet.Value { + return i.buffer[i.currentBufferIndex] +} + +func (i *RowRangesValueIterator) Next() bool { + if i.err != nil { + return false + } + + if len(i.buffer) > 0 && i.currentBufferIndex < len(i.buffer)-1 { + // Still have buffered values from previous page reads to yield + i.currentBufferIndex++ + return true + } + + if len(i.remainingRr) == 0 && i.remaining == 0 { + // Done; all rows of all pages have been read + // and all buffered values from the row ranges have been yielded + return false + } + + // Read pages until we find values for the next row range. + found := false + for !found { + // Prepare inner iterator + page, err := i.pgs.ReadPage() + if err != nil { + i.err = errors.Wrap(err, "failed to read page") + return false + } + i.pageIterator.Reset(page) + // Reset page values buffer + i.currentBufferIndex = 0 + i.buffer = i.buffer[:0] + + for i.pageIterator.Next() { + if i.currentRow == i.next { + found = true + i.buffer = append(i.buffer, i.pageIterator.At()) + + i.remaining-- + if i.remaining > 0 { + i.next = i.next + 1 + } else if len(i.remainingRr) > 0 { + i.currentRr = i.remainingRr[0] + i.next = i.currentRr.From + i.remaining = i.currentRr.Count + i.remainingRr = i.remainingRr[1:] + } + } + if i.remaining > 0 { + i.currentRow += i.pageIterator.Skip(i.next - i.currentRow - 1) + } + i.currentRow++ + } + parquet.Release(page) + if i.pageIterator.Err() != nil { + i.err = errors.Wrap(i.pageIterator.Err(), "failed to read page values") + return false + } + } + return found +} + +func (i *RowRangesValueIterator) Err() error { + return i.err +} + +func (i *RowRangesValueIterator) Close() error { + return i.pgs.Close() +} + +// PageValueIterator yields individual parquet Values from its Page. +type PageValueIterator struct { + p parquet.Page + + cachedSymbols map[int32]parquet.Value + st SymbolTable + + vr parquet.ValueReader + + current int + buffer []parquet.Value + currentBufferIndex int + err error +} + +func (i *PageValueIterator) At() parquet.Value { + if i.vr == nil { + dicIndex := i.st.GetIndex(i.current) + // Cache a clone of the current symbol table entry. + // This allows us to release the original page while avoiding unnecessary future clones. + if _, ok := i.cachedSymbols[dicIndex]; !ok { + i.cachedSymbols[dicIndex] = i.st.Get(i.current).Clone() + } + return i.cachedSymbols[dicIndex] + } + return i.buffer[i.currentBufferIndex].Clone() +} + +func (i *PageValueIterator) Next() bool { + if i.err != nil { + return false + } + + i.current++ + if i.current >= int(i.p.NumRows()) { + return false + } + + // End early if no value reader available. + if i.vr == nil { + return true + } + + i.currentBufferIndex++ + + if i.currentBufferIndex == len(i.buffer) { + n, err := i.vr.ReadValues(i.buffer[:cap(i.buffer)]) + if err != nil && err != io.EOF { + i.err = err + } + i.buffer = i.buffer[:n] + i.currentBufferIndex = 0 + } + + return true +} + +func (vi *PageValueIterator) Skip(n int64) int64 { + r := min(n, vi.p.NumRows()-int64(vi.current)-1) + // Move the row index cursor. + vi.current += int(r) + if vi.vr != nil { + // Move the page values index cursor. + vi.currentBufferIndex += int(r) + // Read more values if the current buffer is exhausted. + for vi.currentBufferIndex >= len(vi.buffer) { + vi.currentBufferIndex = vi.currentBufferIndex - len(vi.buffer) + num, err := vi.vr.ReadValues(vi.buffer[:cap(vi.buffer)]) + if err != nil && err != io.EOF { + vi.err = err + } + vi.buffer = vi.buffer[:num] + } + } + + return r +} + +func (i *PageValueIterator) Reset(p parquet.Page) { + i.p = p + i.vr = nil + if p.Dictionary() != nil { + i.st.Reset(p) + i.cachedSymbols = make(map[int32]parquet.Value, p.Dictionary().Len()) + } else { + i.vr = p.Values() + if i.buffer != nil { + i.buffer = i.buffer[:0] + } else { + i.buffer = make([]parquet.Value, 0, 128) + } + i.currentBufferIndex = -1 + } + i.current = -1 +} + +func (i *PageValueIterator) Err() error { + return i.err +} diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index 33d03825627..4d4bbf2eab3 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -16,7 +16,6 @@ package search import ( "context" "fmt" - "io" "iter" "maps" "slices" @@ -26,7 +25,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" prom_storage "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb/chunks" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -60,10 +58,10 @@ type Materializer struct { // MaterializedSeriesFunc is a callback function that can be used to add limiter or statistic logics for // materialized series. -type MaterializedSeriesFunc func(ctx context.Context, series []prom_storage.ChunkSeries) error +type MaterializedSeriesFunc func(ctx context.Context, series prom_storage.ChunkSeries) error // NoopMaterializedSeriesFunc is a noop callback function that does nothing. -func NoopMaterializedSeriesFunc(_ context.Context, _ []prom_storage.ChunkSeries) error { +func NoopMaterializedSeriesFunc(_ context.Context, _ prom_storage.ChunkSeries) error { return nil } @@ -96,9 +94,9 @@ func NewMaterializer(s *schema.TSDBSchema, materializeSeriesCallback MaterializedSeriesFunc, materializeLabelsFilterCallback MaterializedLabelsFilterCallback, ) (*Materializer, error) { - colIdx, ok := block.LabelsFile().Schema().Lookup(schema.ColIndexes) + colIdx, ok := block.LabelsFile().Schema().Lookup(schema.ColIndexesColumn) if !ok { - return nil, errors.New(fmt.Sprintf("schema index %s not found", schema.ColIndexes)) + return nil, errors.New(fmt.Sprintf("schema index %s not found", schema.ColIndexesColumn)) } dataColToIndex := make([]int, len(block.ChunksFile().Schema().Columns())) @@ -127,9 +125,10 @@ func NewMaterializer(s *schema.TSDBSchema, }, nil } -// Materialize reconstructs the ChunkSeries that belong to the specified row ranges (rr). +// Materialize creates an iterator to reconstruct the ChunkSeries that belong to the specified row ranges (rr). // It uses the row group index (rgi) and time bounds (mint, maxt) to filter and decode the series. -func (m *Materializer) Materialize(ctx context.Context, hints *prom_storage.SelectHints, rgi int, mint, maxt int64, skipChunks bool, rr []RowRange) (results []prom_storage.ChunkSeries, err error) { +func (m *Materializer) Materialize(ctx context.Context, hints *prom_storage.SelectHints, rgi int, mint, maxt int64, skipChunks bool, rr []RowRange) (ChunkSeriesSetCloser, error) { + var err error ctx, span := tracer.Start(ctx, "Materializer.Materialize") defer func() { if err != nil { @@ -148,48 +147,39 @@ func (m *Materializer) Materialize(ctx context.Context, hints *prom_storage.Sele ) if err := m.checkRowCountQuota(rr); err != nil { + span.SetAttributes(attribute.String("quota_failure", "row_count")) return nil, err } - sLbls, err := m.materializeAllLabels(ctx, rgi, rr) + sLbls, err := m.MaterializeAllLabels(ctx, rgi, rr) if err != nil { return nil, errors.Wrapf(err, "error materializing labels") } - results, rr = m.filterSeries(ctx, hints, sLbls, rr) - if !skipChunks { - chks, err := m.materializeChunks(ctx, rgi, mint, maxt, rr) - if err != nil { - return nil, errors.Wrap(err, "materializer failed to materialize chunks") - } - - for i, result := range results { - result.(*concreteChunksSeries).chks = chks[i] - } + seriesSetLabels, rr := m.FilterSeriesLabels(ctx, hints, sLbls, rr) - // If we are not skipping chunks and there is no chunks for the time range queried, lets remove the series - results = slices.DeleteFunc(results, func(cs prom_storage.ChunkSeries) bool { - return len(cs.(*concreteChunksSeries).chks) == 0 - }) + if skipChunks { + return NewNoChunksConcreteLabelsSeriesSet(seriesSetLabels), nil } - if err := m.materializedSeriesCallback(ctx, results); err != nil { - return nil, err + chksIter, err := m.MaterializeChunks(ctx, rgi, mint, maxt, rr) + if err != nil { + return nil, errors.Wrap(err, "materializer failed to create chunks iterator") } - span.SetAttributes(attribute.Int("materialized_series_count", len(results))) - return results, err + seriesSetIter := NewFilterEmptyChunkSeriesSet(ctx, seriesSetLabels, chksIter, m.materializedSeriesCallback) + + span.SetAttributes(attribute.Int("materialized_series_count", len(seriesSetLabels))) + return seriesSetIter, nil } -func (m *Materializer) filterSeries(ctx context.Context, hints *prom_storage.SelectHints, sLbls [][]labels.Label, rr []RowRange) ([]prom_storage.ChunkSeries, []RowRange) { - results := make([]prom_storage.ChunkSeries, 0, len(sLbls)) +func (m *Materializer) FilterSeriesLabels(ctx context.Context, hints *prom_storage.SelectHints, sLbls [][]labels.Label, rr []RowRange) ([]labels.Labels, []RowRange) { + seriesLabels := make([]labels.Labels, 0, len(sLbls)) labelsFilter, ok := m.materializedLabelsFilterCallback(ctx, hints) if !ok { for _, s := range sLbls { - results = append(results, &concreteChunksSeries{ - lbls: labels.New(s...), - }) + seriesLabels = append(seriesLabels, labels.New(s...)) } - return results, rr + return seriesLabels, rr } defer labelsFilter.Close() @@ -205,9 +195,7 @@ func (m *Materializer) filterSeries(ctx context.Context, hints *prom_storage.Sel lbls := labels.New(sLbls[seriesIdx]...) if labelsFilter.Filter(lbls) { - results = append(results, &concreteChunksSeries{ - lbls: lbls, - }) + seriesLabels = append(seriesLabels, lbls) // Handle row range collection if !inRange { @@ -244,7 +232,7 @@ func (m *Materializer) filterSeries(ctx context.Context, hints *prom_storage.Sel filteredRR = append(filteredRR, currentRange) } - return results, filteredRR + return seriesLabels, filteredRR } // MaterializeAllLabelNames extracts and returns all label names from the schema @@ -281,7 +269,7 @@ func (m *Materializer) MaterializeAllLabelNames() []string { func (m *Materializer) MaterializeLabelNames(ctx context.Context, rgi int, rr []RowRange) ([]string, error) { labelsRg := m.b.LabelsFile().RowGroups()[rgi] cc := labelsRg.ColumnChunks()[m.colIdx] - colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) + colsIdxs, err := m.materializeColumnSlice(ctx, m.b.LabelsFile(), rgi, rr, cc, false) if err != nil { return nil, errors.Wrap(err, "materializer failed to materialize columns") } @@ -332,7 +320,7 @@ func (m *Materializer) MaterializeLabelValues(ctx context.Context, name string, return []string{}, nil } cc := labelsRg.ColumnChunks()[cIdx.ColumnIndex] - values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) + values, err := m.materializeColumnSlice(ctx, m.b.LabelsFile(), rgi, rr, cc, false) if err != nil { return nil, errors.Wrap(err, "materializer failed to materialize columns") } @@ -387,8 +375,8 @@ func (m *Materializer) MaterializeAllLabelValues(ctx context.Context, name strin return r, nil } -func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []RowRange) ([][]labels.Label, error) { - ctx, span := tracer.Start(ctx, "Materializer.materializeAllLabels") +func (m *Materializer) MaterializeAllLabels(ctx context.Context, rgi int, rr []RowRange) ([][]labels.Label, error) { + ctx, span := tracer.Start(ctx, "Materializer.MaterializeAllLabels") var err error defer func() { if err != nil { @@ -398,9 +386,11 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R span.End() }() + totalRowsRequested := totalRows(rr) span.SetAttributes( attribute.Int("row_group_index", rgi), attribute.Int("row_ranges_count", len(rr)), + attribute.Int64("total_rows_requested", totalRowsRequested), ) // Get column indexes for all rows in the specified ranges @@ -419,10 +409,23 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R results := make([][]labels.Label, len(columnIndexes)) mtx := sync.Mutex{} errGroup := &errgroup.Group{} + errGroup.SetLimit(m.concurrency) labelsRowGroup := m.b.LabelsFile().RowGroups()[rgi] + span.SetAttributes(attribute.Int("goroutine_pool_limit", m.concurrency)) + for columnIndex, rowRanges := range columnToRowRanges { errGroup.Go(func() error { + ctx, span := tracer.Start(ctx, "Materializer.materializeAllLabels.column") + var err error + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + // Extract label name from column schema columnChunk := labelsRowGroup.ColumnChunks()[columnIndex] labelName, ok := schema.ExtractLabelFromColumn(m.b.LabelsFile().Schema().Columns()[columnIndex][0]) @@ -430,12 +433,20 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R return fmt.Errorf("column %d not found in schema", columnIndex) } + span.SetAttributes( + attribute.Int("column_index", columnIndex), + attribute.String("label_name", labelName), + attribute.Int("row_ranges_count", len(rowRanges)), + ) + // Materialize the actual label values for this column - labelValues, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, columnChunk, rowRanges, false) + labelValues, err := m.materializeColumnSlice(ctx, m.b.LabelsFile(), rgi, rowRanges, columnChunk, false) if err != nil { return errors.Wrap(err, "failed to materialize label values") } + span.SetAttributes(attribute.Int("label_values_count", len(labelValues))) + // Assign label values to the appropriate result positions mtx.Lock() defer mtx.Unlock() @@ -454,7 +465,6 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R valueIndex++ } } - return nil }) } @@ -462,7 +472,10 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R return nil, err } - span.SetAttributes(attribute.Int("materialized_labels_count", len(results))) + span.SetAttributes( + attribute.Int("materialized_labels_count", len(results)), + attribute.Int("columns_materialized", len(columnToRowRanges)), + ) return results, nil } @@ -471,7 +484,7 @@ func (m *Materializer) getColumnIndexes(ctx context.Context, rgi int, rr []RowRa labelsRowGroup := m.b.LabelsFile().RowGroups()[rgi] columnChunk := labelsRowGroup.ColumnChunks()[m.colIdx] - columnIndexes, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, columnChunk, rr, false) + columnIndexes, err := m.materializeColumnSlice(ctx, m.b.LabelsFile(), rgi, rr, columnChunk, false) if err != nil { return nil, errors.Wrap(err, "failed to materialize column indexes") } @@ -522,8 +535,9 @@ func totalRows(rr []RowRange) int64 { return res } -func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, maxt int64, rr []RowRange) (r [][]chunks.Meta, err error) { - ctx, span := tracer.Start(ctx, "Materializer.materializeChunks") +func (m *Materializer) MaterializeChunks(ctx context.Context, rgi int, mint, maxt int64, rr []RowRange) (ChunksIteratorIterator, error) { + var err error + ctx, span := tracer.Start(ctx, "Materializer.MaterializeChunks") defer func() { if err != nil { span.RecordError(err) @@ -538,11 +552,9 @@ func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, max attribute.Int64("maxt", maxt), attribute.Int("row_ranges_count", len(rr)), ) - minDataCol := m.s.DataColumIdx(mint) maxDataCol := m.s.DataColumIdx(maxt) rg := m.b.ChunksFile().RowGroups()[rgi] - r = make([][]chunks.Meta, totalRows(rr)) span.SetAttributes( attribute.Int("min_data_col", minDataCol), @@ -550,42 +562,162 @@ func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, max attribute.Int("total_rows", int(totalRows(rr))), ) + var columnValueIterators []*ColumnValueIterator for i := minDataCol; i <= min(maxDataCol, len(m.dataColToIndex)-1); i++ { - values, err := m.materializeColumn(ctx, m.b.ChunksFile(), rgi, rg.ColumnChunks()[m.dataColToIndex[i]], rr, true) + cc := rg.ColumnChunks()[m.dataColToIndex[i]] + columnValueIter, err := m.materializeColumnIter(ctx, m.b.ChunksFile(), rgi, rr, cc, true) + if err != nil { + return nil, errors.Wrap(err, "failed to create column value iterator") + } + columnValueIterators = append(columnValueIterators, columnValueIter) + } + return &MultiColumnChunksDecodingIterator{ + mint: mint, + maxt: maxt, + columnValueIterators: columnValueIterators, + d: m.d, + }, nil +} + +func (m *Materializer) materializeColumnIter( + ctx context.Context, + file storage.ParquetFileView, + rgi int, + rr []RowRange, + cc parquet.ColumnChunk, + chunkColumn bool, +) (*ColumnValueIterator, error) { + pageRanges, err := m.GetPageRangesForColummn(cc, file, rgi, rr, chunkColumn) + if err != nil { + return nil, err + } + + dictOff, dictSz := file.DictionaryPageBounds(rgi, cc.Column()) + + rowRangesValueIterators := make([]*RowRangesValueIterator, len(pageRanges)) + for i, pageRange := range pageRanges { + rowRangesValueIter, err := newRowRangesValueIterator( + ctx, file, cc, pageRange, dictOff, dictSz, + ) if err != nil { - return r, err + return nil, errors.Wrap(err, "failed to create row value iterator for page range") } + rowRangesValueIterators[i] = rowRangesValueIter + } + return &ColumnValueIterator{ + rowRangesIterators: rowRangesValueIterators, + }, nil +} + +func (m *Materializer) materializeColumnSlice( + ctx context.Context, + file storage.ParquetFileView, + rgi int, + rr []RowRange, + cc parquet.ColumnChunk, + chunkColumn bool, +) ([]parquet.Value, error) { + ctx, span := tracer.Start(ctx, "Materializer.materializeColumnSlice") + var err error + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + + span.SetAttributes( + attribute.Int("row_group_index", rgi), + attribute.Int("row_ranges_count", len(rr)), + attribute.Int64("total_rows", totalRows(rr)), + attribute.Bool("chunk_column", chunkColumn), + attribute.Int("concurrency", m.concurrency), + ) + + pageRanges, err := m.GetPageRangesForColummn(cc, file, rgi, rr, chunkColumn) + if err != nil || len(pageRanges) == 0 { + return nil, err + } + + span.SetAttributes( + attribute.Int("page_ranges_after_coalesce", len(pageRanges)), + ) + + pageRangesValues := make([][]parquet.Value, len(pageRanges)) + + dictOff, dictSz := file.DictionaryPageBounds(rgi, cc.Column()) + errGroup := &errgroup.Group{} + errGroup.SetLimit(m.concurrency) - for vi, value := range values { - chks, err := m.d.Decode(value.ByteArray(), mint, maxt) + span.SetAttributes(attribute.Int("goroutine_pool_limit", m.concurrency)) + + for i, pageRange := range pageRanges { + errGroup.Go(func() error { + valuesIter, err := newRowRangesValueIterator(ctx, file, cc, pageRange, dictOff, dictSz) if err != nil { - return r, errors.Wrap(err, "failed to decode chunks") + return errors.Wrap(err, "failed to create row values iterator for page") } - r[vi] = append(r[vi], chks...) - } + defer func() { _ = valuesIter.Close() }() + + iterValues := make([]parquet.Value, 0, totalRows(pageRange.rows)) + for valuesIter.Next() { + iterValues = append(iterValues, valuesIter.At()) + } + if err = valuesIter.Err(); err != nil { + return err + } + + pageRangesValues[i] = iterValues + return nil + }) + } + err = errGroup.Wait() + if err != nil { + return nil, errors.Wrap(err, "failed to materialize columns") } - span.SetAttributes(attribute.Int("materialized_chunks_count", len(r))) - return r, nil + valuesFlattened := make([]parquet.Value, 0, totalRows(rr)) + for _, pageRangeValues := range pageRangesValues { + valuesFlattened = append(valuesFlattened, pageRangeValues...) + } + + span.SetAttributes(attribute.Int("materialized_values_count", len(valuesFlattened))) + return valuesFlattened, nil } -func (m *Materializer) materializeColumn(ctx context.Context, file storage.ParquetFileView, rgi int, cc parquet.ColumnChunk, rr []RowRange, chunkColumn bool) ([]parquet.Value, error) { +type PageToReadWithRow struct { + PageToRead + rows []RowRange +} + +func NewPageToReadWithRow(p PageToRead, rows []RowRange) PageToReadWithRow { + return PageToReadWithRow{ + PageToRead: p, + rows: rows, + } +} + +func (pr *PageToReadWithRow) Rows() []RowRange { + return pr.rows +} + +func (m *Materializer) GetPageRangesForColummn(cc parquet.ColumnChunk, file storage.ParquetFileView, rgi int, rr []RowRange, chunkColumn bool) ([]PageToReadWithRow, error) { if len(rr) == 0 { return nil, nil } oidx, err := cc.OffsetIndex() if err != nil { - return nil, errors.Wrap(err, "could not get offset index") + return nil, errors.Wrap(err, "failed to get offset index") } cidx, err := cc.ColumnIndex() if err != nil { - return nil, errors.Wrap(err, "could not get column index") + return nil, errors.Wrap(err, "failed to get column index") } group := file.RowGroups()[rgi] - pagesToRowsMap := make(map[int][]RowRange, len(rr)) for i := 0; i < cidx.NumPages(); i++ { @@ -608,114 +740,22 @@ func (m *Materializer) materializeColumn(ctx context.Context, file storage.Parqu return nil, err } - pageRanges := m.coalescePageRanges(pagesToRowsMap, oidx) - - r := make(map[RowRange][]parquet.Value, len(pageRanges)) - rMutex := &sync.Mutex{} - for _, v := range pageRanges { - for _, rs := range v.rows { - r[rs] = make([]parquet.Value, 0, rs.Count) - } - } - - errGroup := &errgroup.Group{} - errGroup.SetLimit(m.concurrency) - - dictOff, dictSz := file.DictionaryPageBounds(rgi, cc.Column()) - - for _, p := range pageRanges { - errGroup.Go(func() error { - minOffset := uint64(p.off) - maxOffset := uint64(p.off + p.csz) - - // if dictOff == 0, it means that the collum is not dictionary encoded - if dictOff > 0 && int(minOffset-(dictOff+dictSz)) < file.PagePartitioningMaxGapSize() { - minOffset = dictOff - } - - pgs, err := file.GetPages(ctx, cc, int64(minOffset), int64(maxOffset)) - if err != nil { - return errors.Wrap(err, "failed to get pages") - } - defer func() { _ = pgs.Close() }() - err = pgs.SeekToRow(p.rows[0].From) - if err != nil { - return errors.Wrap(err, "could not seek to row") - } - - vi := new(valuesIterator) - remainingRr := p.rows - currentRr := remainingRr[0] - next := currentRr.From - remaining := currentRr.Count - currentRow := currentRr.From - - remainingRr = remainingRr[1:] - for len(remainingRr) > 0 || remaining > 0 { - page, err := pgs.ReadPage() - if err != nil { - return errors.Wrap(err, "could not read page") - } - vi.Reset(page) - - for vi.Next() { - if currentRow == next { - rMutex.Lock() - r[currentRr] = append(r[currentRr], vi.At()) - rMutex.Unlock() - remaining-- - if remaining > 0 { - next = next + 1 - } else if len(remainingRr) > 0 { - currentRr = remainingRr[0] - next = currentRr.From - remaining = currentRr.Count - remainingRr = remainingRr[1:] - } - } - - if vi.CanSkip() && remaining > 0 { - currentRow += vi.Skip(next - currentRow - 1) - } - - currentRow++ - } - parquet.Release(page) - - if vi.Error() != nil { - return vi.Error() - } - } - return nil - }) - } - err = errGroup.Wait() - if err != nil { - return nil, errors.Wrap(err, "failed to materialize columns") - } - - ranges := slices.Collect(maps.Keys(r)) - slices.SortFunc(ranges, func(a, b RowRange) int { - return int(a.From - b.From) - }) - - res := make([]parquet.Value, 0, totalRows(rr)) - for _, v := range ranges { - res = append(res, r[v]...) - } - return res, nil + pageRanges := m.CoalescePageRanges(pagesToRowsMap, oidx) + return pageRanges, nil } -type pageToReadWithRow struct { - pageToRead - rows []RowRange -} - -// Merge nearby pages to enable efficient sequential reads. +// CoalescePageRanges merges nearby pages to enable efficient sequential reads. // Pages that are not close to each other will be scheduled for concurrent reads. -func (m *Materializer) coalescePageRanges(pagedIdx map[int][]RowRange, offset parquet.OffsetIndex) []pageToReadWithRow { +func (m *Materializer) CoalescePageRanges(pagedIdx map[int][]RowRange, offset parquet.OffsetIndex) []PageToReadWithRow { + _, span := tracer.Start(context.Background(), "Materializer.CoalescePageRanges") + defer span.End() + + span.SetAttributes( + attribute.Int("input_pages_count", len(pagedIdx)), + ) + if len(pagedIdx) == 0 { - return []pageToReadWithRow{} + return []PageToReadWithRow{} } idxs := make([]int, 0, len(pagedIdx)) for idx := range pagedIdx { @@ -728,20 +768,35 @@ func (m *Materializer) coalescePageRanges(pagedIdx map[int][]RowRange, offset pa return int(offset.Offset(idxs[i])), int(offset.Offset(idxs[i]) + offset.CompressedPageSize(idxs[i])) }) - r := make([]pageToReadWithRow, 0, len(parts)) + r := make([]PageToReadWithRow, 0, len(parts)) + totalBytes := int64(0) for _, part := range parts { - pagesToRead := pageToReadWithRow{} + var rows []RowRange for i := part.ElemRng[0]; i < part.ElemRng[1]; i++ { - pagesToRead.rows = append(pagesToRead.rows, pagedIdx[idxs[i]]...) + rows = append(rows, pagedIdx[idxs[i]]...) } - pagesToRead.pfrom = int64(part.ElemRng[0]) - pagesToRead.pto = int64(part.ElemRng[1]) - pagesToRead.off = part.Start - pagesToRead.csz = part.End - part.Start - pagesToRead.rows = simplify(pagesToRead.rows) - r = append(r, pagesToRead) + + pageToReadWithRow := NewPageToReadWithRow( + NewPageToRead( + 0, + int64(part.ElemRng[0]), + int64(part.ElemRng[1]), + int64(part.Start), + int64(part.End-part.Start), + ), + rows, + ) + + r = append(r, pageToReadWithRow) + totalBytes += pageToReadWithRow.CompressedSize() } + span.SetAttributes( + attribute.Int("output_page_ranges_count", len(r)), + attribute.Int("partitions_count", len(parts)), + attribute.Int64("total_bytes_to_read", totalBytes), + ) + return r } @@ -772,105 +827,3 @@ func totalBytes(pages iter.Seq[int], oidx parquet.OffsetIndex) int64 { } return res } - -type valuesIterator struct { - p parquet.Page - - // TODO: consider using unique.Handle - cachedSymbols map[int32]parquet.Value - st symbolTable - - vr parquet.ValueReader - - current int - totalRows int - buffer []parquet.Value - currentBufferIndex int - err error -} - -func (vi *valuesIterator) Reset(p parquet.Page) { - vi.p = p - vi.vr = nil - vi.totalRows = int(p.NumRows()) - if p.Dictionary() != nil { - vi.st.Reset(p) - vi.cachedSymbols = make(map[int32]parquet.Value, p.Dictionary().Len()) - } else { - vi.vr = p.Values() - vi.buffer = make([]parquet.Value, 0, 128) - vi.currentBufferIndex = -1 - } - vi.current = -1 -} - -func (vi *valuesIterator) CanSkip() bool { - return vi.vr == nil -} - -func (vi *valuesIterator) Skip(n int64) int64 { - r := min(n, int64(vi.totalRows-vi.current-1)) - vi.current += int(r) - return r -} - -func (vi *valuesIterator) Next() bool { - if vi.err != nil { - return false - } - - vi.current++ - if vi.current >= vi.totalRows { - return false - } - - vi.currentBufferIndex++ - - if vi.currentBufferIndex == len(vi.buffer) { - n, err := vi.vr.ReadValues(vi.buffer[:cap(vi.buffer)]) - if err != nil && err != io.EOF { - vi.err = err - } - vi.buffer = vi.buffer[:n] - vi.currentBufferIndex = 0 - } - - return true -} - -func (vi *valuesIterator) Error() error { - return vi.err -} - -func (vi *valuesIterator) At() parquet.Value { - if vi.vr == nil { - dicIndex := vi.st.GetIndex(vi.current) - // Cache a clone of the current symbol table entry. - // This allows us to release the original page while avoiding unnecessary future clones. - if _, ok := vi.cachedSymbols[dicIndex]; !ok { - vi.cachedSymbols[dicIndex] = vi.st.Get(vi.current).Clone() - } - return vi.cachedSymbols[dicIndex] - } - - return vi.buffer[vi.currentBufferIndex].Clone() -} - -var _ prom_storage.ChunkSeries = &concreteChunksSeries{} - -type concreteChunksSeries struct { - lbls labels.Labels - chks []chunks.Meta -} - -func (c concreteChunksSeries) Labels() labels.Labels { - return c.lbls -} - -func (c concreteChunksSeries) Iterator(_ chunks.Iterator) chunks.Iterator { - return prom_storage.NewListChunkSeriesIterator(c.chks...) -} - -func (c concreteChunksSeries) ChunkCount() (int, error) { - return len(c.chks), nil -} diff --git a/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go b/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go index 98e5e100c43..fa4dd046130 100644 --- a/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go +++ b/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go @@ -276,7 +276,7 @@ func (s *ParquetShardOpener) ChunksFile() ParquetFileView { func (s *ParquetShardOpener) TSDBSchema() (*schema.TSDBSchema, error) { var err error s.o.Do(func() { - s.schema, err = schema.FromLabelsFile(s.labelsFile.File) + s.schema, err = schema.FromLabelsFile(s.labelsFile) }) return s.schema, err } diff --git a/vendor/github.com/prometheus-community/parquet-common/util/fixtures.go b/vendor/github.com/prometheus-community/parquet-common/util/fixtures.go index b708565d42a..432d0854a48 100644 --- a/vendor/github.com/prometheus-community/parquet-common/util/fixtures.go +++ b/vendor/github.com/prometheus-community/parquet-common/util/fixtures.go @@ -36,7 +36,7 @@ type TestData struct { MaxTime int64 } -func GenerateTestData(t *testing.T, st *teststorage.TestStorage, ctx context.Context, cfg TestConfig) TestData { +func GenerateTestData(t testing.TB, st *teststorage.TestStorage, ctx context.Context, cfg TestConfig) TestData { app := st.Appender(ctx) seriesHash := make(map[uint64]*struct{}) builder := labels.NewScratchBuilder(cfg.NumberOfLabels) diff --git a/vendor/modules.txt b/vendor/modules.txt index cdd30c96f46..70b534f55ad 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -955,7 +955,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250807102632-2aeeceacebf0 +# github.com/prometheus-community/parquet-common v0.0.0-20250827225610-65f0b68d35e6 ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable