From dfc581128afa44f9b2c1127098bccc986a3e7d30 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Thu, 7 Aug 2025 10:16:42 -0700 Subject: [PATCH 1/2] update parquet common library to latest Signed-off-by: yeya24 --- go.mod | 2 +- go.sum | 4 +- .../parquet-common/search/constraint.go | 27 ++-- .../parquet-common/search/materialize.go | 135 ++++++++++++------ .../parquet-common/util/bitmap.go | 53 +++++++ vendor/modules.txt | 2 +- 6 files changed, 167 insertions(+), 56 deletions(-) create mode 100644 vendor/github.com/prometheus-community/parquet-common/util/bitmap.go diff --git a/go.mod b/go.mod index 42346d2d5cc..b4adb6e2987 100644 --- a/go.mod +++ b/go.mod @@ -84,7 +84,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 - github.com/prometheus-community/parquet-common v0.0.0-20250801093248-94ad2ac56fa4 + github.com/prometheus-community/parquet-common v0.0.0-20250807102632-2aeeceacebf0 github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 github.com/tjhop/slog-gokit v0.1.4 diff --git a/go.sum b/go.sum index 58c9e31a639..b18f8637c01 100644 --- a/go.sum +++ b/go.sum @@ -824,8 +824,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20250801093248-94ad2ac56fa4 h1:xNWjbJzXJ+/YhTFyFIh6qgZj8sV2DufhcR1CSW+oswE= -github.com/prometheus-community/parquet-common v0.0.0-20250801093248-94ad2ac56fa4/go.mod h1:MbAv/yCv9GORLj0XvXgRF913R9Jc04+BvVq4VJpPCi0= +github.com/prometheus-community/parquet-common v0.0.0-20250807102632-2aeeceacebf0 h1:5mm5mMmEhUdvqf4NsVvEGWry0IeXkeZEfODbZ70c1Ok= +github.com/prometheus-community/parquet-common v0.0.0-20250807102632-2aeeceacebf0/go.mod h1:MbAv/yCv9GORLj0XvXgRF913R9Jc04+BvVq4VJpPCi0= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA= diff --git a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go index 159c639c6f6..1db078ebaf5 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go @@ -172,40 +172,45 @@ type pageToRead struct { type symbolTable struct { dict parquet.Dictionary syms []int32 + defs []byte } -func (s *symbolTable) Get(i int) parquet.Value { - switch s.syms[i] { +func (s *symbolTable) Get(r int) parquet.Value { + i := s.GetIndex(r) + switch i { case -1: return parquet.NullValue() default: - return s.dict.Index(s.syms[i]) + return s.dict.Index(i) } } func (s *symbolTable) GetIndex(i int) int32 { - return s.syms[i] + switch s.defs[i] { + case 1: + return s.syms[i] + default: + return -1 + } } func (s *symbolTable) Reset(pg parquet.Page) { dict := pg.Dictionary() data := pg.Data() syms := data.Int32() - defs := pg.DefinitionLevels() + s.defs = pg.DefinitionLevels() if s.syms == nil { - s.syms = make([]int32, len(defs)) + s.syms = make([]int32, len(s.defs)) } else { - s.syms = slices.Grow(s.syms, len(defs))[:len(defs)] + s.syms = slices.Grow(s.syms, len(s.defs))[:len(s.defs)] } sidx := 0 - for i := range defs { - if defs[i] == 1 { + for i := range s.defs { + if s.defs[i] == 1 { s.syms[i] = syms[sidx] sidx++ - } else { - s.syms[i] = -1 } } s.dict = dict diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index 8c4a5a463bd..33d03825627 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -403,63 +403,115 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R attribute.Int("row_ranges_count", len(rr)), ) - labelsRg := m.b.LabelsFile().RowGroups()[rgi] - cc := labelsRg.ColumnChunks()[m.colIdx] - colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) + // Get column indexes for all rows in the specified ranges + columnIndexes, err := m.getColumnIndexes(ctx, rgi, rr) if err != nil { - return nil, errors.Wrap(err, "materializer failed to materialize columns") + return nil, errors.Wrap(err, "failed to get column indexes") } - colsMap := make(map[int]*[]parquet.Value, 10) - results := make([][]labels.Label, len(colsIdxs)) - - for _, colsIdx := range colsIdxs { - idxs, err := schema.DecodeUintSlice(colsIdx.ByteArray()) - if err != nil { - return nil, errors.Wrap(err, "materializer failed to decode column index") - } - for _, idx := range idxs { - colsMap[idx] = &[]parquet.Value{} - } + // Build mapping of which columns are needed for which row ranges + columnToRowRanges, rowRangeToStartIndex, err := m.buildColumnMappings(rr, columnIndexes) + if err != nil { + return nil, errors.Wrap(err, "failed to build collum mapping") } - errGroup, ctx := errgroup.WithContext(ctx) - errGroup.SetLimit(m.concurrency) + // Materialize label values for each column concurrently + results := make([][]labels.Label, len(columnIndexes)) + mtx := sync.Mutex{} + errGroup := &errgroup.Group{} + labelsRowGroup := m.b.LabelsFile().RowGroups()[rgi] - for cIdx, v := range colsMap { + for columnIndex, rowRanges := range columnToRowRanges { errGroup.Go(func() error { - cc := labelsRg.ColumnChunks()[cIdx] - values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) + // Extract label name from column schema + columnChunk := labelsRowGroup.ColumnChunks()[columnIndex] + labelName, ok := schema.ExtractLabelFromColumn(m.b.LabelsFile().Schema().Columns()[columnIndex][0]) + if !ok { + return fmt.Errorf("column %d not found in schema", columnIndex) + } + + // Materialize the actual label values for this column + labelValues, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, columnChunk, rowRanges, false) if err != nil { - return errors.Wrap(err, "failed to materialize labels values") + return errors.Wrap(err, "failed to materialize label values") } - *v = values + + // Assign label values to the appropriate result positions + mtx.Lock() + defer mtx.Unlock() + + valueIndex := 0 + for _, rowRange := range rowRanges { + startIndex := rowRangeToStartIndex[rowRange] + + for rowInRange := 0; rowInRange < int(rowRange.Count); rowInRange++ { + if !labelValues[valueIndex].IsNull() { + results[startIndex+rowInRange] = append(results[startIndex+rowInRange], labels.Label{ + Name: labelName, + Value: util.YoloString(labelValues[valueIndex].ByteArray()), + }) + } + valueIndex++ + } + } + return nil }) } - - if err = errGroup.Wait(); err != nil { + if err := errGroup.Wait(); err != nil { return nil, err } - for cIdx, values := range colsMap { - labelName, ok := schema.ExtractLabelFromColumn(m.b.LabelsFile().Schema().Columns()[cIdx][0]) - if !ok { - return nil, fmt.Errorf("column %d not found in schema", cIdx) - } - for i, value := range *values { - if value.IsNull() { - continue + span.SetAttributes(attribute.Int("materialized_labels_count", len(results))) + return results, nil +} + +// getColumnIndexes retrieves the column index data for all rows in the specified ranges +func (m *Materializer) getColumnIndexes(ctx context.Context, rgi int, rr []RowRange) ([]parquet.Value, error) { + labelsRowGroup := m.b.LabelsFile().RowGroups()[rgi] + columnChunk := labelsRowGroup.ColumnChunks()[m.colIdx] + + columnIndexes, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, columnChunk, rr, false) + if err != nil { + return nil, errors.Wrap(err, "failed to materialize column indexes") + } + + return columnIndexes, nil +} + +// buildColumnMappings creates mappings between columns and row ranges based on column indexes +func (m *Materializer) buildColumnMappings(rr []RowRange, columnIndexes []parquet.Value) (map[int][]RowRange, map[RowRange]int, error) { + columnToRowRanges := make(map[int][]RowRange, 10) + rowRangeToStartIndex := make(map[RowRange]int, len(rr)) + + columnIndexPos := 0 + resultIndex := 0 + + for _, rowRange := range rr { + rowRangeToStartIndex[rowRange] = resultIndex + seenColumns := util.NewBitmap(len(m.b.LabelsFile().ColumnIndexes())) + + // Process each row in the current range + for rowInRange := int64(0); rowInRange < rowRange.Count; rowInRange++ { + columnIds, err := schema.DecodeUintSlice(columnIndexes[columnIndexPos].ByteArray()) + columnIndexPos++ + + if err != nil { + return nil, nil, err } - results[i] = append(results[i], labels.Label{ - Name: labelName, - Value: util.YoloString(value.ByteArray()), - }) + + // Track which columns are needed for this row range + for _, columnId := range columnIds { + if !seenColumns.Get(columnId) { + columnToRowRanges[columnId] = append(columnToRowRanges[columnId], rowRange) + seenColumns.Set(columnId) + } + } + resultIndex++ } } - span.SetAttributes(attribute.Int("materialized_labels_count", len(results))) - return results, nil + return columnToRowRanges, rowRangeToStartIndex, nil } func totalRows(rr []RowRange) int64 { @@ -570,7 +622,6 @@ func (m *Materializer) materializeColumn(ctx context.Context, file storage.Parqu errGroup.SetLimit(m.concurrency) dictOff, dictSz := file.DictionaryPageBounds(rgi, cc.Column()) - cc.Type() for _, p := range pageRanges { errGroup.Go(func() error { @@ -732,6 +783,7 @@ type valuesIterator struct { vr parquet.ValueReader current int + totalRows int buffer []parquet.Value currentBufferIndex int err error @@ -740,6 +792,7 @@ type valuesIterator struct { func (vi *valuesIterator) Reset(p parquet.Page) { vi.p = p vi.vr = nil + vi.totalRows = int(p.NumRows()) if p.Dictionary() != nil { vi.st.Reset(p) vi.cachedSymbols = make(map[int32]parquet.Value, p.Dictionary().Len()) @@ -756,7 +809,7 @@ func (vi *valuesIterator) CanSkip() bool { } func (vi *valuesIterator) Skip(n int64) int64 { - r := min(n, vi.p.NumRows()-int64(vi.current)-1) + r := min(n, int64(vi.totalRows-vi.current-1)) vi.current += int(r) return r } @@ -767,7 +820,7 @@ func (vi *valuesIterator) Next() bool { } vi.current++ - if vi.current >= int(vi.p.NumRows()) { + if vi.current >= vi.totalRows { return false } diff --git a/vendor/github.com/prometheus-community/parquet-common/util/bitmap.go b/vendor/github.com/prometheus-community/parquet-common/util/bitmap.go new file mode 100644 index 00000000000..69a6ac2b648 --- /dev/null +++ b/vendor/github.com/prometheus-community/parquet-common/util/bitmap.go @@ -0,0 +1,53 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +const bitsPerWord = 64 + +type Bitmap struct { + size int + bits []uint64 +} + +// NewBitmap initializes a bitmap +func NewBitmap(size int) *Bitmap { + return &Bitmap{ + size: size, + bits: make([]uint64, (size+bitsPerWord-1)/bitsPerWord), + } +} + +// Set sets the bit at position i to 1 +func (bm *Bitmap) Set(i int) { + if i < 0 || i >= bm.size { + return + } + bm.bits[i/bitsPerWord] |= 1 << (i % bitsPerWord) +} + +// Clear sets the bit at position i to 0 +func (bm *Bitmap) Clear(i int) { + if i < 0 || i >= bm.size { + return + } + bm.bits[i/bitsPerWord] &^= 1 << (i % bitsPerWord) +} + +// Get returns true if the bit at position i is set +func (bm *Bitmap) Get(i int) bool { + if i < 0 || i >= bm.size { + return false + } + return (bm.bits[i/bitsPerWord] & (1 << (i % bitsPerWord))) != 0 +} diff --git a/vendor/modules.txt b/vendor/modules.txt index f0fcd6ec7b4..c44481ae754 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -960,7 +960,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250801093248-94ad2ac56fa4 +# github.com/prometheus-community/parquet-common v0.0.0-20250807102632-2aeeceacebf0 ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable From 92f6b60b91b55b0afc4658d8baae9591d8642831 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Thu, 7 Aug 2025 16:50:38 -0700 Subject: [PATCH 2/2] fix test Signed-off-by: yeya24 --- pkg/querier/parquet_queryable_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 6c52e97d143..5bb6bed1c83 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -497,7 +497,7 @@ func TestParquetQueryable_Limits(t *testing.T) { return validation.NewOverrides(limits, nil) }(), queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 1), - expectedErr: fmt.Errorf("error materializing labels: materializer failed to materialize columns: would fetch too many data bytes: resource exhausted (used 1)"), + expectedErr: fmt.Errorf("error materializing labels: failed to get column indexes: failed to materialize column indexes: would fetch too many data bytes: resource exhausted (used 1)"), }, "limits within bounds - should succeed": { limits: func() *validation.Overrides {