From 835e0b5a2cd4335fca4faf7a162a4282cc368b69 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Wed, 9 Jul 2025 14:54:50 -0700 Subject: [PATCH 1/7] add query limiter support to parquet querier Signed-off-by: yeya24 --- go.mod | 2 + go.sum | 6 +- pkg/querier/error_translate_queryable.go | 6 ++ pkg/querier/parquet_queryable.go | 58 ++++++++++++- .../queryable/parquet_queryable.go | 47 +++++++++-- .../parquet-common/search/limits.go | 76 +++++++++++++++++ .../parquet-common/search/materialize.go | 83 ++++++++++++++++--- vendor/modules.txt | 3 +- 8 files changed, 258 insertions(+), 23 deletions(-) create mode 100644 vendor/github.com/prometheus-community/parquet-common/search/limits.go diff --git a/go.mod b/go.mod index fe03be51c3e..041fe57bcbf 100644 --- a/go.mod +++ b/go.mod @@ -325,3 +325,5 @@ replace github.com/thanos-io/objstore => github.com/thanos-io/objstore v0.0.0-20 // v3.3.1 with https://github.com/prometheus/prometheus/pull/16252. (same as thanos) replace github.com/prometheus/prometheus => github.com/thanos-io/thanos-prometheus v0.0.0-20250610133519-082594458a88 + +replace github.com/prometheus-community/parquet-common => github.com/yeya24/parquet-common v0.0.0-20250709074935-71782c65c3d4 diff --git a/go.sum b/go.sum index 26277164a42..082fc52a229 100644 --- a/go.sum +++ b/go.sum @@ -778,7 +778,7 @@ github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQ github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= github.com/opentracing-contrib/go-grpc v0.1.2 h1:MP16Ozc59kqqwn1v18aQxpeGZhsBanJ2iurZYaQSZ+g= github.com/opentracing-contrib/go-grpc v0.1.2/go.mod h1:glU6rl1Fhfp9aXUHkE36K2mR4ht8vih0ekOVlWKEUHM= -github.com/opentracing-contrib/go-stdlib v1.1.0 h1:cZBWc4pA4e65tqTJddbflK435S0tDImj6c9BMvkdUH0= +github.com/opentracing-contrib/go-stdlib v1.1.0 h1:hSJ8yYaiAO/k2YZUeWJWpQCPE2wRCDtxRnir0gU6wbA= github.com/opentracing-contrib/go-stdlib v1.1.0/go.mod h1:S0p+X9p6dcBkoMTL+Qq2VOvxKs9ys5PpYWXWqlCS0bQ= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= @@ -814,8 +814,6 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994 h1:xHR2Xex5XWYl5rQKObX8sVqykPXzlL0Rytd9mKo0sss= -github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA= @@ -955,6 +953,8 @@ github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8 github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= +github.com/yeya24/parquet-common v0.0.0-20250709074935-71782c65c3d4 h1:hvkx/1uPrrYN3sNhA1rdU4ri0Z7L31838nvLu2NVtD0= +github.com/yeya24/parquet-common v0.0.0-20250709074935-71782c65c3d4/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/pkg/querier/error_translate_queryable.go b/pkg/querier/error_translate_queryable.go index 0c55a15c588..7e0418d463c 100644 --- a/pkg/querier/error_translate_queryable.go +++ b/pkg/querier/error_translate_queryable.go @@ -5,6 +5,7 @@ import ( "github.com/gogo/status" "github.com/pkg/errors" + "github.com/prometheus-community/parquet-common/search" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" @@ -48,6 +49,11 @@ func TranslateToPromqlAPIError(err error) error { return err // 422 } + if search.IsResourceExhausted(err) { + cause := errors.Cause(err) + return cause // 422 + } + s, ok := status.FromError(err) if !ok { diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 481034ef4d3..ced154a86ac 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -23,11 +23,13 @@ import ( "github.com/thanos-io/thanos/pkg/strutil" "golang.org/x/sync/errgroup" + "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/multierror" "github.com/cortexproject/cortex/pkg/util/services" @@ -132,6 +134,60 @@ func NewParquetQueryable( cDecoder := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()) + parquetQueryableOpts := []queryable.QueryableOpts{ + queryable.WithRowCountLimitFunc(func(ctx context.Context) int64 { + userID, err := tenant.TenantID(ctx) + if err != nil { + // This shouldn't happen + return 0 + } + return int64(limits.MaxFetchedSeriesPerQuery(userID)) + }), + queryable.WithChunkBytesLimitFunc(func(ctx context.Context) int64 { + userID, err := tenant.TenantID(ctx) + if err != nil { + // This shouldn't happen + return 0 + } + return int64(limits.MaxFetchedChunkBytesPerQuery(userID)) + }), + queryable.WithDataBytesLimitFunc(func(ctx context.Context) int64 { + userID, err := tenant.TenantID(ctx) + if err != nil { + // This shouldn't happen + return 0 + } + return int64(limits.MaxFetchedDataBytesPerQuery(userID)) + }), + queryable.WithMaterializedSeriesCallback(func(ctx context.Context, cs []storage.ChunkSeries) error { + queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx) + lbls := make([][]cortexpb.LabelAdapter, 0, len(cs)) + for _, series := range cs { + chunkSize := 0 + lblSize := 0 + lblAdapter := cortexpb.FromLabelsToLabelAdapters(series.Labels()) + lbls = append(lbls, lblAdapter) + for _, lbl := range lblAdapter { + lblSize += lbl.Size() + } + iter := series.Iterator(nil) + for iter.Next() { + chk := iter.At() + chunkSize += len(chk.Chunk.Bytes()) + } + if err := queryLimiter.AddChunkBytes(chunkSize); err != nil { + return validation.LimitError(err.Error()) + } + if err := queryLimiter.AddDataBytes(chunkSize + lblSize); err != nil { + return validation.LimitError(err.Error()) + } + } + if err := queryLimiter.AddSeries(lbls...); err != nil { + return validation.LimitError(err.Error()) + } + return nil + }), + } parquetQueryable, err := queryable.NewParquetQueryable(cDecoder, func(ctx context.Context, mint, maxt int64) ([]parquet_storage.ParquetShard, error) { userID, err := tenant.TenantID(ctx) if err != nil { @@ -182,7 +238,7 @@ func NewParquetQueryable( } return shards, errGroup.Wait() - }) + }, parquetQueryableOpts...) p := &parquetQueryableWithFallback{ subservices: manager, diff --git a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go index 3bcd8bcbd47..74f4ec26ba0 100644 --- a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go +++ b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go @@ -34,11 +34,21 @@ import ( type ShardsFinderFunction func(ctx context.Context, mint, maxt int64) ([]storage.ParquetShard, error) type queryableOpts struct { - concurrency int + concurrency int + rowCountLimitFunc search.QuotaLimitFunc + chunkBytesLimitFunc search.QuotaLimitFunc + dataBytesLimitFunc search.QuotaLimitFunc + materializedSeriesCallback search.MaterializedSeriesFunc } var DefaultQueryableOpts = queryableOpts{ - concurrency: runtime.GOMAXPROCS(0), + concurrency: runtime.GOMAXPROCS(0), + rowCountLimitFunc: search.NoopQuotaLimitFunc, + chunkBytesLimitFunc: search.NoopQuotaLimitFunc, + dataBytesLimitFunc: search.NoopQuotaLimitFunc, + materializedSeriesCallback: func(_ context.Context, _ []prom_storage.ChunkSeries) error { + return nil + }, } type QueryableOpts func(*queryableOpts) @@ -50,6 +60,30 @@ func WithConcurrency(concurrency int) QueryableOpts { } } +func WithRowCountLimitFunc(fn search.QuotaLimitFunc) QueryableOpts { + return func(opts *queryableOpts) { + opts.rowCountLimitFunc = fn + } +} + +func WithChunkBytesLimitFunc(fn search.QuotaLimitFunc) QueryableOpts { + return func(opts *queryableOpts) { + opts.chunkBytesLimitFunc = fn + } +} + +func WithDataBytesLimitFunc(fn search.QuotaLimitFunc) QueryableOpts { + return func(opts *queryableOpts) { + opts.dataBytesLimitFunc = fn + } +} + +func WithMaterializedSeriesCallback(fn search.MaterializedSeriesFunc) QueryableOpts { + return func(opts *queryableOpts) { + opts.materializedSeriesCallback = fn + } +} + type parquetQueryable struct { shardsFinder ShardsFinderFunction d *schema.PrometheusParquetChunksDecoder @@ -191,8 +225,11 @@ func (p parquetQuerier) queryableShards(ctx context.Context, mint, maxt int64) ( return nil, err } qBlocks := make([]*queryableShard, len(shards)) + rowCountQuota := search.NewQuota(p.opts.rowCountLimitFunc(ctx)) + chunkBytesQuota := search.NewQuota(p.opts.chunkBytesLimitFunc(ctx)) + dataBytesQuota := search.NewQuota(p.opts.dataBytesLimitFunc(ctx)) for i, shard := range shards { - qb, err := newQueryableShard(p.opts, shard, p.d) + qb, err := newQueryableShard(p.opts, shard, p.d, rowCountQuota, chunkBytesQuota, dataBytesQuota) if err != nil { return nil, err } @@ -207,12 +244,12 @@ type queryableShard struct { concurrency int } -func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schema.PrometheusParquetChunksDecoder) (*queryableShard, error) { +func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schema.PrometheusParquetChunksDecoder, rowCountQuota *search.Quota, chunkBytesQuota *search.Quota, dataBytesQuota *search.Quota) (*queryableShard, error) { s, err := block.TSDBSchema() if err != nil { return nil, err } - m, err := search.NewMaterializer(s, d, block, opts.concurrency) + m, err := search.NewMaterializer(s, d, block, opts.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, opts.materializedSeriesCallback) if err != nil { return nil, err } diff --git a/vendor/github.com/prometheus-community/parquet-common/search/limits.go b/vendor/github.com/prometheus-community/parquet-common/search/limits.go new file mode 100644 index 00000000000..12e1d027bb7 --- /dev/null +++ b/vendor/github.com/prometheus-community/parquet-common/search/limits.go @@ -0,0 +1,76 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright (c) The Thanos Authors. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +// This package is a modified copy from +// https://github.com/thanos-io/thanos-parquet-gateway/blob/cfc1279f605d1c629c4afe8b1e2a340e8b15ecdc/internal/limits/limit.go. + +package search + +import ( + "context" + "errors" + "fmt" + "sync" +) + +type resourceExhausted struct { + used int64 +} + +func (re *resourceExhausted) Error() string { + return fmt.Sprintf("resource exhausted (used %d)", re.used) +} + +func IsResourceExhausted(err error) bool { + var re *resourceExhausted + return errors.As(err, &re) +} + +type Quota struct { + mu sync.Mutex + q int64 + u int64 +} + +func NewQuota(n int64) *Quota { + return &Quota{q: n, u: n} +} + +func UnlimitedQuota() *Quota { + return NewQuota(0) +} + +func (q *Quota) Reserve(n int64) error { + if q.q == 0 { + return nil + } + + q.mu.Lock() + defer q.mu.Unlock() + + if q.u-n < 0 { + return &resourceExhausted{used: q.q} + } + q.u -= n + return nil +} + +type QuotaLimitFunc func(ctx context.Context) int64 + +func NoopQuotaLimitFunc(ctx context.Context) int64 { + return 0 +} diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index 51538182b0a..ee194a8f97d 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "io" + "iter" "maps" "slices" "sync" @@ -43,12 +44,26 @@ type Materializer struct { concurrency int dataColToIndex []int + + rowCountQuota *Quota + chunkBytesQuota *Quota + dataBytesQuota *Quota + + materializedSeriesCallback MaterializedSeriesFunc } +// MaterializedSeriesFunc is a callback function that can be used to add limiter or statistic logics for +// materialized series. +type MaterializedSeriesFunc func(ctx context.Context, series []prom_storage.ChunkSeries) error + func NewMaterializer(s *schema.TSDBSchema, d *schema.PrometheusParquetChunksDecoder, block storage.ParquetShard, concurrency int, + rowCountQuota *Quota, + chunkBytesQuota *Quota, + dataBytesQuota *Quota, + materializeSeriesCallback MaterializedSeriesFunc, ) (*Materializer, error) { colIdx, ok := block.LabelsFile().Schema().Lookup(schema.ColIndexes) if !ok { @@ -66,19 +81,26 @@ func NewMaterializer(s *schema.TSDBSchema, } return &Materializer{ - s: s, - d: d, - b: block, - colIdx: colIdx.ColumnIndex, - concurrency: concurrency, - partitioner: util.NewGapBasedPartitioner(block.ChunksFile().Cfg.PagePartitioningMaxGapSize), - dataColToIndex: dataColToIndex, + s: s, + d: d, + b: block, + colIdx: colIdx.ColumnIndex, + concurrency: concurrency, + partitioner: util.NewGapBasedPartitioner(block.ChunksFile().Cfg.PagePartitioningMaxGapSize), + dataColToIndex: dataColToIndex, + rowCountQuota: rowCountQuota, + chunkBytesQuota: chunkBytesQuota, + dataBytesQuota: dataBytesQuota, + materializedSeriesCallback: materializeSeriesCallback, }, nil } // Materialize reconstructs the ChunkSeries that belong to the specified row ranges (rr). // It uses the row group index (rgi) and time bounds (mint, maxt) to filter and decode the series. func (m *Materializer) Materialize(ctx context.Context, rgi int, mint, maxt int64, skipChunks bool, rr []RowRange) ([]prom_storage.ChunkSeries, error) { + if err := m.checkRowCountQuota(rr); err != nil { + return nil, err + } sLbls, err := m.materializeAllLabels(ctx, rgi, rr) if err != nil { return nil, errors.Wrapf(err, "error materializing labels") @@ -106,6 +128,10 @@ func (m *Materializer) Materialize(ctx context.Context, rgi int, mint, maxt int6 return len(cs.(*concreteChunksSeries).chks) == 0 }) } + + if err := m.materializedSeriesCallback(ctx, results); err != nil { + return nil, err + } return results, err } @@ -125,7 +151,7 @@ func (m *Materializer) MaterializeAllLabelNames() []string { func (m *Materializer) MaterializeLabelNames(ctx context.Context, rgi int, rr []RowRange) ([]string, error) { labelsRg := m.b.LabelsFile().RowGroups()[rgi] cc := labelsRg.ColumnChunks()[m.colIdx] - colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr) + colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) if err != nil { return nil, errors.Wrap(err, "materializer failed to materialize columns") } @@ -164,7 +190,7 @@ func (m *Materializer) MaterializeLabelValues(ctx context.Context, name string, return []string{}, nil } cc := labelsRg.ColumnChunks()[cIdx.ColumnIndex] - values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr) + values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) if err != nil { return nil, errors.Wrap(err, "materializer failed to materialize columns") } @@ -208,7 +234,7 @@ func (m *Materializer) MaterializeAllLabelValues(ctx context.Context, name strin func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []RowRange) ([][]labels.Label, error) { labelsRg := m.b.LabelsFile().RowGroups()[rgi] cc := labelsRg.ColumnChunks()[m.colIdx] - colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr) + colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) if err != nil { return nil, errors.Wrap(err, "materializer failed to materialize columns") } @@ -232,7 +258,7 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R for cIdx, v := range colsMap { errGroup.Go(func() error { cc := labelsRg.ColumnChunks()[cIdx] - values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr) + values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) if err != nil { return errors.Wrap(err, "failed to materialize labels values") } @@ -279,7 +305,7 @@ func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, max r := make([][]chunks.Meta, totalRows(rr)) for i := minDataCol; i <= min(maxDataCol, len(m.dataColToIndex)-1); i++ { - values, err := m.materializeColumn(ctx, m.b.ChunksFile(), rgi, rg.ColumnChunks()[m.dataColToIndex[i]], rr) + values, err := m.materializeColumn(ctx, m.b.ChunksFile(), rgi, rg.ColumnChunks()[m.dataColToIndex[i]], rr, true) if err != nil { return r, err } @@ -296,7 +322,7 @@ func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, max return r, nil } -func (m *Materializer) materializeColumn(ctx context.Context, file *storage.ParquetFile, rgi int, cc parquet.ColumnChunk, rr []RowRange) ([]parquet.Value, error) { +func (m *Materializer) materializeColumn(ctx context.Context, file *storage.ParquetFile, rgi int, cc parquet.ColumnChunk, rr []RowRange, chunkColumn bool) ([]parquet.Value, error) { if len(rr) == 0 { return nil, nil } @@ -331,6 +357,9 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq } } } + if err := m.checkBytesQuota(maps.Keys(pagesToRowsMap), oidx, chunkColumn); err != nil { + return nil, err + } pageRanges := m.coalescePageRanges(pagesToRowsMap, oidx) @@ -464,6 +493,34 @@ func (m *Materializer) coalescePageRanges(pagedIdx map[int][]RowRange, offset pa return r } +func (m *Materializer) checkRowCountQuota(rr []RowRange) error { + if err := m.rowCountQuota.Reserve(totalRows(rr)); err != nil { + return fmt.Errorf("would fetch too many rows: %w", err) + } + return nil +} + +func (m *Materializer) checkBytesQuota(pages iter.Seq[int], oidx parquet.OffsetIndex, chunkColumn bool) error { + total := totalBytes(pages, oidx) + if chunkColumn { + if err := m.chunkBytesQuota.Reserve(total); err != nil { + return fmt.Errorf("would fetch too many chunk bytes: %w", err) + } + } + if err := m.dataBytesQuota.Reserve(total); err != nil { + return fmt.Errorf("would fetch too many data bytes: %w", err) + } + return nil +} + +func totalBytes(pages iter.Seq[int], oidx parquet.OffsetIndex) int64 { + res := int64(0) + for i := range pages { + res += oidx.CompressedPageSize(i) + } + return res +} + type valuesIterator struct { p parquet.Page diff --git a/vendor/modules.txt b/vendor/modules.txt index 58a73b08224..01bedabbd89 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -947,7 +947,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994 +# github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994 => github.com/yeya24/parquet-common v0.0.0-20250709074935-71782c65c3d4 ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable @@ -1988,3 +1988,4 @@ sigs.k8s.io/yaml/goyaml.v3 # gopkg.in/alecthomas/kingpin.v2 => github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497 # github.com/thanos-io/objstore => github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 # github.com/prometheus/prometheus => github.com/thanos-io/thanos-prometheus v0.0.0-20250610133519-082594458a88 +# github.com/prometheus-community/parquet-common => github.com/yeya24/parquet-common v0.0.0-20250709074935-71782c65c3d4 From 478c10eaf1d5311f95cf4589be7fcd9dcdf7dcf6 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Thu, 10 Jul 2025 00:15:30 -0700 Subject: [PATCH 2/7] add unit tests Signed-off-by: yeya24 --- pkg/querier/error_translate_queryable_test.go | 37 +-- pkg/querier/parquet_queryable.go | 12 +- pkg/querier/parquet_queryable_test.go | 220 ++++++++++++++++++ 3 files changed, 233 insertions(+), 36 deletions(-) diff --git a/pkg/querier/error_translate_queryable_test.go b/pkg/querier/error_translate_queryable_test.go index b5d13a96e4f..41940e40ede 100644 --- a/pkg/querier/error_translate_queryable_test.go +++ b/pkg/querier/error_translate_queryable_test.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/regexp" "github.com/pkg/errors" + "github.com/prometheus-community/parquet-common/search" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/promslog" "github.com/prometheus/common/route" @@ -45,40 +46,8 @@ func TestApiStatusCodes(t *testing.T) { }, { - err: validation.AccessDeniedError("access denied"), - expectedString: "access denied", - expectedCode: 422, - }, - - { - err: promql.ErrTooManySamples("query execution"), - expectedString: "too many samples", - expectedCode: 422, - }, - - { - err: promql.ErrQueryCanceled("query execution"), - expectedString: "query was canceled", - expectedCode: 499, - }, - - { - err: promql.ErrQueryTimeout("query execution"), - expectedString: "query timed out", - expectedCode: 503, - }, - - // Status code 400 is remapped to 422 (only choice we have) - { - err: httpgrpc.Errorf(http.StatusBadRequest, "test string"), - expectedString: "test string", - expectedCode: 422, - }, - - // 404 is also translated to 422 - { - err: httpgrpc.Errorf(http.StatusNotFound, "not found"), - expectedString: "not found", + err: search.NewQuota(1).Reserve(2), + expectedString: "resource exhausted (used 1)", expectedCode: 422, }, diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index ced154a86ac..9fc33956bae 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -163,6 +163,7 @@ func NewParquetQueryable( queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx) lbls := make([][]cortexpb.LabelAdapter, 0, len(cs)) for _, series := range cs { + chkCount := 0 chunkSize := 0 lblSize := 0 lblAdapter := cortexpb.FromLabelsToLabelAdapters(series.Labels()) @@ -174,10 +175,17 @@ func NewParquetQueryable( for iter.Next() { chk := iter.At() chunkSize += len(chk.Chunk.Bytes()) + chkCount++ } - if err := queryLimiter.AddChunkBytes(chunkSize); err != nil { - return validation.LimitError(err.Error()) + if chkCount > 0 { + if err := queryLimiter.AddChunks(chkCount); err != nil { + return validation.LimitError(err.Error()) + } + if err := queryLimiter.AddChunkBytes(chunkSize); err != nil { + return validation.LimitError(err.Error()) + } } + if err := queryLimiter.AddDataBytes(chunkSize + lblSize); err != nil { return validation.LimitError(err.Error()) } diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 33e8f73bedb..453650e5bf6 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -2,26 +2,42 @@ package querier import ( "context" + "fmt" + "math/rand" + "path/filepath" "testing" "time" "github.com/go-kit/log" "github.com/oklog/ulid/v2" + "github.com/prometheus-community/parquet-common/convert" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/user" + "github.com/cortexproject/cortex/integration/e2e" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/series" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" "github.com/cortexproject/cortex/pkg/storage/parquet" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/limiter" + "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -409,6 +425,210 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { }) } +func TestParquetQueryable_Limits(t *testing.T) { + t.Parallel() + + const ( + metricName = "test_metric" + minT = int64(0) + maxT = int64(1000) + ) + + bkt, tempDir := cortex_testutil.PrepareFilesystemBucket(t) + + config := Config{ + QueryStoreAfter: 0, + StoreGatewayQueryStatsEnabled: false, + StoreGatewayConsistencyCheckMaxAttempts: 3, + ParquetQueryableShardCacheSize: 100, + ParquetQueryableDefaultBlockStore: "parquet", + } + + storageCfg := cortex_tsdb.BlocksStorageConfig{ + Bucket: bucket.Config{ + Backend: "filesystem", + Filesystem: filesystem.Config{ + Directory: tempDir, + }, + }, + } + + ctx := context.Background() + seriesCount := 100 + lbls := make([]labels.Labels, seriesCount) + for i := 0; i < seriesCount; i++ { + lbls[i] = labels.Labels{ + {Name: labels.MetricName, Value: metricName}, + {Name: "series", Value: fmt.Sprintf("%d", i)}, + } + } + + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + blockID, err := e2e.CreateBlock(ctx, rnd, tempDir, lbls, 100, 0, 1000, 10, 1000) + require.NoError(t, err) + + blockDir := filepath.Join(tempDir, blockID.String()) + userBkt := bucket.NewUserBucketClient("user-1", bkt, nil) + err = block.Upload(ctx, log.NewNopLogger(), userBkt, blockDir, metadata.NoneFunc) + require.NoError(t, err) + + err = convertBlockToParquet(t, ctx, userBkt, blockID, blockDir) + require.NoError(t, err) + + // Create a mocked bucket index blocks finder + finder := &blocksFinderMock{} + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ + &bucketindex.Block{ID: blockID, Parquet: &parquet.ConverterMarkMeta{Version: parquet.CurrentVersion}}, + }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) + + tests := map[string]struct { + limits *validation.Overrides + queryLimiter *limiter.QueryLimiter + expectedErr error + }{ + "row count limit hit - Parquet Queryable": { + limits: func() *validation.Overrides { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + limits.MaxFetchedSeriesPerQuery = 1 + return validation.NewOverrides(limits, nil) + }(), + queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0), + expectedErr: fmt.Errorf("would fetch too many rows: resource exhausted (used 1)"), + }, + "max series per query limit hit": { + limits: func() *validation.Overrides { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + return validation.NewOverrides(limits, nil) + }(), + queryLimiter: limiter.NewQueryLimiter(1, 0, 0, 0), + expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, 1)), + }, + "max chunks per query limit hit": { + limits: func() *validation.Overrides { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + limits.MaxChunksPerQuery = 1 + return validation.NewOverrides(limits, nil) + }(), + queryLimiter: limiter.NewQueryLimiter(0, 0, 1, 0), + expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, 1)), + }, + "max chunk page size limit hit - Parquet Queryable": { + limits: func() *validation.Overrides { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + limits.MaxFetchedChunkBytesPerQuery = 1 + return validation.NewOverrides(limits, nil) + }(), + queryLimiter: limiter.NewQueryLimiter(0, 1, 0, 0), + expectedErr: fmt.Errorf("materializer failed to materialize chunks: would fetch too many chunk bytes: resource exhausted (used 1)"), + }, + "max chunk bytes per query limit hit": { + limits: func() *validation.Overrides { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + return validation.NewOverrides(limits, nil) + }(), + queryLimiter: limiter.NewQueryLimiter(0, 1, 0, 0), + expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunkBytesHit, 1)), + }, + "max data bytes per query limit hit": { + limits: func() *validation.Overrides { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + limits.MaxFetchedDataBytesPerQuery = 1 + return validation.NewOverrides(limits, nil) + }(), + queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 1), + expectedErr: fmt.Errorf("error materializing labels: materializer failed to materialize columns: would fetch too many data bytes: resource exhausted (used 1)"), + }, + "limits within bounds - should succeed": { + limits: func() *validation.Overrides { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + limits.MaxFetchedSeriesPerQuery = 1000 + limits.MaxFetchedChunkBytesPerQuery = 1000000 + limits.MaxFetchedDataBytesPerQuery = 1000000 + return validation.NewOverrides(limits, nil) + }(), + queryLimiter: limiter.NewQueryLimiter(1000, 1000000, 1000, 1000000), + expectedErr: nil, + }, + } + + for testName, testData := range tests { + testData := testData + t.Run(testName, func(t *testing.T) { + t.Parallel() + + ctx := user.InjectOrgID(context.Background(), "user-1") + ctx = limiter.AddQueryLimiterToContext(ctx, testData.queryLimiter) + + mockBlocksStoreQueryable := &BlocksStoreQueryable{finder: finder, Service: services.NewIdleService(func(_ context.Context) error { + return nil + }, func(_ error) error { + return nil + })} + + parquetQueryable, err := NewParquetQueryable(config, storageCfg, testData.limits, mockBlocksStoreQueryable, log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + err = services.StartAndAwaitRunning(ctx, parquetQueryable.(*parquetQueryableWithFallback)) + require.NoError(t, err) + + querier, err := parquetQueryable.Querier(minT, maxT) + require.NoError(t, err) + defer querier.Close() + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), + } + + set := querier.Select(ctx, true, nil, matchers...) + if testData.expectedErr != nil { + require.False(t, set.Next()) + err = set.Err() + require.EqualError(t, err, testData.expectedErr.Error()) + return + } + + require.NoError(t, set.Err()) + }) + } +} + +// convertBlockToParquet converts a TSDB block to parquet and uploads it to the bucket +func convertBlockToParquet(t *testing.T, ctx context.Context, userBucketClient objstore.Bucket, blockID ulid.ULID, blockDir string) error { + tsdbBlock, err := tsdb.OpenBlock(nil, blockDir, chunkenc.NewPool(), tsdb.DefaultPostingsDecoderFactory) + require.NoError(t, err) + + converterOpts := []convert.ConvertOption{ + convert.WithSortBy(labels.MetricName), + convert.WithColDuration(time.Hour * 8), + convert.WithRowGroupSize(1000), + convert.WithName(blockID.String()), + } + + _, err = convert.ConvertTSDBBlock( + ctx, + userBucketClient, + tsdbBlock.MinTime(), + tsdbBlock.MaxTime(), + []convert.Convertible{tsdbBlock}, + converterOpts..., + ) + require.NoError(t, err) + + _ = tsdbBlock.Close() + + // Write parquet converter marker + err = parquet.WriteConverterMark(ctx, blockID, userBucketClient) + require.NoError(t, err) + + return nil +} + func defaultOverrides(t *testing.T, queryVerticalShardSize int) *validation.Overrides { limits := validation.Limits{} flagext.DefaultValues(&limits) From e7f696cf27e21ca24eeaf30e3381407a6f016bcf Mon Sep 17 00:00:00 2001 From: yeya24 Date: Thu, 10 Jul 2025 00:16:56 -0700 Subject: [PATCH 3/7] changelog Signed-off-by: yeya24 --- CHANGELOG.md | 1 + go.sum | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ff1a4b2ce3..30d8d16d672 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ * [ENHANCEMENT] Store Gateway: Allow to ignore syncing blocks older than certain time using `ignore_blocks_before`. #6830 * [ENHANCEMENT] Distributor: Add native histograms max sample size bytes limit validation. #6834 * [ENHANCEMENT] Querier: Support caching parquet labels file in parquet queryable. #6835 +* [ENHANCEMENT] Querier: Support query limits in parquet queryable. #6870 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/go.sum b/go.sum index 082fc52a229..cc5393c6353 100644 --- a/go.sum +++ b/go.sum @@ -778,7 +778,7 @@ github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQ github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= github.com/opentracing-contrib/go-grpc v0.1.2 h1:MP16Ozc59kqqwn1v18aQxpeGZhsBanJ2iurZYaQSZ+g= github.com/opentracing-contrib/go-grpc v0.1.2/go.mod h1:glU6rl1Fhfp9aXUHkE36K2mR4ht8vih0ekOVlWKEUHM= -github.com/opentracing-contrib/go-stdlib v1.1.0 h1:hSJ8yYaiAO/k2YZUeWJWpQCPE2wRCDtxRnir0gU6wbA= +github.com/opentracing-contrib/go-stdlib v1.1.0 h1:cZBWc4pA4e65tqTJddbflK435S0tDImj6c9BMvkdUH0= github.com/opentracing-contrib/go-stdlib v1.1.0/go.mod h1:S0p+X9p6dcBkoMTL+Qq2VOvxKs9ys5PpYWXWqlCS0bQ= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= From fbdc11ee1d28d8a77bf99bcb6a46829e1fd34110 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Thu, 10 Jul 2025 08:35:31 -0700 Subject: [PATCH 4/7] use upstream code instead of fork Signed-off-by: yeya24 --- go.mod | 4 +--- go.sum | 4 ++-- .../queryable/parquet_queryable.go | 17 ++++++++++------- .../parquet-common/search/limits.go | 6 ++++++ .../parquet-common/search/materialize.go | 5 +++++ vendor/modules.txt | 3 +-- 6 files changed, 25 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index 041fe57bcbf..24210961be2 100644 --- a/go.mod +++ b/go.mod @@ -83,7 +83,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 - github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994 + github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 github.com/tjhop/slog-gokit v0.1.4 @@ -325,5 +325,3 @@ replace github.com/thanos-io/objstore => github.com/thanos-io/objstore v0.0.0-20 // v3.3.1 with https://github.com/prometheus/prometheus/pull/16252. (same as thanos) replace github.com/prometheus/prometheus => github.com/thanos-io/thanos-prometheus v0.0.0-20250610133519-082594458a88 - -replace github.com/prometheus-community/parquet-common => github.com/yeya24/parquet-common v0.0.0-20250709074935-71782c65c3d4 diff --git a/go.sum b/go.sum index cc5393c6353..3e0c1eff203 100644 --- a/go.sum +++ b/go.sum @@ -814,6 +814,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= +github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 h1:XoOXq+q+CcY8MZqAVoPtdG3R6o84aeZpZFDM+C9DJXg= +github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA= @@ -953,8 +955,6 @@ github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8 github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= -github.com/yeya24/parquet-common v0.0.0-20250709074935-71782c65c3d4 h1:hvkx/1uPrrYN3sNhA1rdU4ri0Z7L31838nvLu2NVtD0= -github.com/yeya24/parquet-common v0.0.0-20250709074935-71782c65c3d4/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go index 74f4ec26ba0..c4b2996f5ab 100644 --- a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go +++ b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go @@ -42,13 +42,11 @@ type queryableOpts struct { } var DefaultQueryableOpts = queryableOpts{ - concurrency: runtime.GOMAXPROCS(0), - rowCountLimitFunc: search.NoopQuotaLimitFunc, - chunkBytesLimitFunc: search.NoopQuotaLimitFunc, - dataBytesLimitFunc: search.NoopQuotaLimitFunc, - materializedSeriesCallback: func(_ context.Context, _ []prom_storage.ChunkSeries) error { - return nil - }, + concurrency: runtime.GOMAXPROCS(0), + rowCountLimitFunc: search.NoopQuotaLimitFunc, + chunkBytesLimitFunc: search.NoopQuotaLimitFunc, + dataBytesLimitFunc: search.NoopQuotaLimitFunc, + materializedSeriesCallback: search.NoopMaterializedSeriesFunc, } type QueryableOpts func(*queryableOpts) @@ -60,24 +58,29 @@ func WithConcurrency(concurrency int) QueryableOpts { } } +// WithRowCountLimitFunc sets a callback function to get limit for matched row count. func WithRowCountLimitFunc(fn search.QuotaLimitFunc) QueryableOpts { return func(opts *queryableOpts) { opts.rowCountLimitFunc = fn } } +// WithChunkBytesLimitFunc sets a callback function to get limit for chunk column page bytes fetched. func WithChunkBytesLimitFunc(fn search.QuotaLimitFunc) QueryableOpts { return func(opts *queryableOpts) { opts.chunkBytesLimitFunc = fn } } +// WithDataBytesLimitFunc sets a callback function to get limit for data (including label and chunk) +// column page bytes fetched. func WithDataBytesLimitFunc(fn search.QuotaLimitFunc) QueryableOpts { return func(opts *queryableOpts) { opts.dataBytesLimitFunc = fn } } +// WithMaterializedSeriesCallback sets a callback function to process the materialized series. func WithMaterializedSeriesCallback(fn search.MaterializedSeriesFunc) QueryableOpts { return func(opts *queryableOpts) { opts.materializedSeriesCallback = fn diff --git a/vendor/github.com/prometheus-community/parquet-common/search/limits.go b/vendor/github.com/prometheus-community/parquet-common/search/limits.go index 12e1d027bb7..dbb96aada90 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/limits.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/limits.go @@ -35,21 +35,25 @@ func (re *resourceExhausted) Error() string { return fmt.Sprintf("resource exhausted (used %d)", re.used) } +// IsResourceExhausted checks if the error is a resource exhausted error. func IsResourceExhausted(err error) bool { var re *resourceExhausted return errors.As(err, &re) } +// Quota is a limiter for a resource. type Quota struct { mu sync.Mutex q int64 u int64 } +// NewQuota creates a new quota with the given limit. func NewQuota(n int64) *Quota { return &Quota{q: n, u: n} } +// UnlimitedQuota creates a new quota with no limit. func UnlimitedQuota() *Quota { return NewQuota(0) } @@ -69,8 +73,10 @@ func (q *Quota) Reserve(n int64) error { return nil } +// QuotaLimitFunc is a function that returns the limit value. type QuotaLimitFunc func(ctx context.Context) int64 +// NoopQuotaLimitFunc returns 0 which means no limit. func NoopQuotaLimitFunc(ctx context.Context) int64 { return 0 } diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index ee194a8f97d..2f485503e35 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -56,6 +56,11 @@ type Materializer struct { // materialized series. type MaterializedSeriesFunc func(ctx context.Context, series []prom_storage.ChunkSeries) error +// NoopMaterializedSeriesFunc is a noop callback function that does nothing. +func NoopMaterializedSeriesFunc(_ context.Context, _ []prom_storage.ChunkSeries) error { + return nil +} + func NewMaterializer(s *schema.TSDBSchema, d *schema.PrometheusParquetChunksDecoder, block storage.ParquetShard, diff --git a/vendor/modules.txt b/vendor/modules.txt index 01bedabbd89..8f3f592537a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -947,7 +947,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994 => github.com/yeya24/parquet-common v0.0.0-20250709074935-71782c65c3d4 +# github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable @@ -1988,4 +1988,3 @@ sigs.k8s.io/yaml/goyaml.v3 # gopkg.in/alecthomas/kingpin.v2 => github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497 # github.com/thanos-io/objstore => github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 # github.com/prometheus/prometheus => github.com/thanos-io/thanos-prometheus v0.0.0-20250610133519-082594458a88 -# github.com/prometheus-community/parquet-common => github.com/yeya24/parquet-common v0.0.0-20250709074935-71782c65c3d4 From 2cb9d3def9cc5f6bff1a175fb30d063e39bd82fd Mon Sep 17 00:00:00 2001 From: yeya24 Date: Thu, 10 Jul 2025 13:21:24 -0700 Subject: [PATCH 5/7] add dedicated limit configurations for new parquet storage limits Signed-off-by: yeya24 --- pkg/querier/parquet_queryable.go | 8 ++++---- pkg/querier/parquet_queryable_test.go | 7 +++---- pkg/util/validation/limits.go | 25 +++++++++++++++++++++++++ 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 9fc33956bae..58cd3d56109 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -141,7 +141,7 @@ func NewParquetQueryable( // This shouldn't happen return 0 } - return int64(limits.MaxFetchedSeriesPerQuery(userID)) + return int64(limits.ParquetMaxFetchedRowCount(userID)) }), queryable.WithChunkBytesLimitFunc(func(ctx context.Context) int64 { userID, err := tenant.TenantID(ctx) @@ -149,7 +149,7 @@ func NewParquetQueryable( // This shouldn't happen return 0 } - return int64(limits.MaxFetchedChunkBytesPerQuery(userID)) + return int64(limits.ParquetMaxFetchedChunkBytes(userID)) }), queryable.WithDataBytesLimitFunc(func(ctx context.Context) int64 { userID, err := tenant.TenantID(ctx) @@ -157,7 +157,7 @@ func NewParquetQueryable( // This shouldn't happen return 0 } - return int64(limits.MaxFetchedDataBytesPerQuery(userID)) + return int64(limits.ParquetMaxFetchedDataBytes(userID)) }), queryable.WithMaterializedSeriesCallback(func(ctx context.Context, cs []storage.ChunkSeries) error { queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx) @@ -440,7 +440,7 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool userID, err := tenant.TenantID(ctx) if err != nil { - storage.ErrSeriesSet(err) + return storage.ErrSeriesSet(err) } if q.limits.QueryVerticalShardSize(userID) > 1 { diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 453650e5bf6..13cdde6cd57 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -490,7 +490,7 @@ func TestParquetQueryable_Limits(t *testing.T) { limits: func() *validation.Overrides { limits := validation.Limits{} flagext.DefaultValues(&limits) - limits.MaxFetchedSeriesPerQuery = 1 + limits.ParquetMaxFetchedRowCount = 1 return validation.NewOverrides(limits, nil) }(), queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0), @@ -509,7 +509,6 @@ func TestParquetQueryable_Limits(t *testing.T) { limits: func() *validation.Overrides { limits := validation.Limits{} flagext.DefaultValues(&limits) - limits.MaxChunksPerQuery = 1 return validation.NewOverrides(limits, nil) }(), queryLimiter: limiter.NewQueryLimiter(0, 0, 1, 0), @@ -519,7 +518,7 @@ func TestParquetQueryable_Limits(t *testing.T) { limits: func() *validation.Overrides { limits := validation.Limits{} flagext.DefaultValues(&limits) - limits.MaxFetchedChunkBytesPerQuery = 1 + limits.ParquetMaxFetchedChunkBytes = 1 return validation.NewOverrides(limits, nil) }(), queryLimiter: limiter.NewQueryLimiter(0, 1, 0, 0), @@ -538,7 +537,7 @@ func TestParquetQueryable_Limits(t *testing.T) { limits: func() *validation.Overrides { limits := validation.Limits{} flagext.DefaultValues(&limits) - limits.MaxFetchedDataBytesPerQuery = 1 + limits.ParquetMaxFetchedDataBytes = 1 return validation.NewOverrides(limits, nil) }(), queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 1), diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 372c1b2c1e6..7fa217fa49c 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -187,6 +187,11 @@ type Limits struct { QueryVerticalShardSize int `yaml:"query_vertical_shard_size" json:"query_vertical_shard_size" doc:"hidden"` QueryPartialData bool `yaml:"query_partial_data" json:"query_partial_data" doc:"nocli|description=Enable to allow queries to be evaluated with data from a single zone, if other zones are not available.|default=false"` + // Parquet Queryable enforced limits. + ParquetMaxFetchedRowCount int `yaml:"parquet_max_fetched_row_count" json:"parquet_max_fetched_row_count" doc:"hidden"` + ParquetMaxFetchedChunkBytes int `yaml:"parquet_max_fetched_chunk_bytes" json:"parquet_max_fetched_chunk_bytes" doc:"hidden"` + ParquetMaxFetchedDataBytes int `yaml:"parquet_max_fetched_data_bytes" json:"parquet_max_fetched_data_bytes" doc:"hidden"` + // Query Frontend / Scheduler enforced limits. MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant" json:"max_outstanding_requests_per_tenant"` QueryPriority QueryPriority `yaml:"query_priority" json:"query_priority" doc:"nocli|description=Configuration for query priority."` @@ -320,6 +325,11 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total parquet converters.") f.BoolVar(&l.ParquetConverterEnabled, "parquet-converter.enabled", false, "If set, enables the Parquet converter to create the parquet files.") + // Parquet Queryable enforced limits. + f.IntVar(&l.ParquetMaxFetchedRowCount, "querier.parquet-queryable.max-fetched-row-count", 0, "The maximum number of rows that can be fetched when querying parquet storage. 0 to disable.") + f.IntVar(&l.ParquetMaxFetchedChunkBytes, "querier.parquet-queryable.max-fetched-chunk-bytes", 0, "The maximum number of bytes that can be used to fetch chunk column pages when querying parquet storage. 0 to disable.") + f.IntVar(&l.ParquetMaxFetchedDataBytes, "querier.parquet-queryable.max-fetched-data-bytes", 0, "The maximum number of bytes that can be used to fetch all column pages when querying parquet storage. 0 to disable.") + // Store-gateway. f.Float64Var(&l.StoreGatewayTenantShardSize, "store-gateway.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set when the store-gateway sharding is enabled with the shuffle-sharding strategy. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 the shard size will be a percentage of the total store-gateways.") f.IntVar(&l.MaxDownloadedBytesPerRequest, "store-gateway.max-downloaded-bytes-per-request", 0, "The maximum number of data bytes to download per gRPC request in Store Gateway, including Series/LabelNames/LabelValues requests. 0 to disable.") @@ -894,6 +904,21 @@ func (o *Overrides) ParquetConverterEnabled(userID string) bool { return o.GetOverridesForUser(userID).ParquetConverterEnabled } +// ParquetMaxFetchedRowCount returns the maximum number of rows that can be fetched when querying parquet storage. +func (o *Overrides) ParquetMaxFetchedRowCount(userID string) int { + return o.GetOverridesForUser(userID).ParquetMaxFetchedRowCount +} + +// ParquetMaxFetchedChunkBytes returns the maximum number of bytes that can be used to fetch chunk column pages when querying parquet storage. +func (o *Overrides) ParquetMaxFetchedChunkBytes(userID string) int { + return o.GetOverridesForUser(userID).ParquetMaxFetchedChunkBytes +} + +// ParquetMaxFetchedDataBytes returns the maximum number of bytes that can be used to fetch all column pages when querying parquet storage. +func (o *Overrides) ParquetMaxFetchedDataBytes(userID string) int { + return o.GetOverridesForUser(userID).ParquetMaxFetchedDataBytes +} + // CompactorPartitionIndexSizeBytes returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy. func (o *Overrides) CompactorPartitionIndexSizeBytes(userID string) int64 { return o.GetOverridesForUser(userID).CompactorPartitionIndexSizeBytes From a98f50a373b091461cd798204e7b30d6d154dc7a Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 11 Jul 2025 10:13:41 -0700 Subject: [PATCH 6/7] address comments Signed-off-by: yeya24 --- pkg/querier/error_translate_queryable_test.go | 36 +++++++++++++++++++ pkg/querier/parquet_queryable.go | 24 +++++-------- pkg/util/validation/limits.go | 2 +- 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/pkg/querier/error_translate_queryable_test.go b/pkg/querier/error_translate_queryable_test.go index 41940e40ede..a89e7752619 100644 --- a/pkg/querier/error_translate_queryable_test.go +++ b/pkg/querier/error_translate_queryable_test.go @@ -45,6 +45,42 @@ func TestApiStatusCodes(t *testing.T) { expectedCode: 422, }, + { + err: validation.AccessDeniedError("access denied"), + expectedString: "access denied", + expectedCode: 422, + }, + + {err: promql.ErrTooManySamples("query execution"), + expectedString: "too many samples", + expectedCode: 422, + }, + + { + err: promql.ErrQueryCanceled("query execution"), + expectedString: "query was canceled", + expectedCode: 499, + }, + + { + err: promql.ErrQueryTimeout("query execution"), + expectedString: "query timed out", + expectedCode: 503, + }, + + // Status code 400 is remapped to 422 (only choice we have) + { + err: httpgrpc.Errorf(http.StatusBadRequest, "test string"), + expectedString: "test string", + expectedCode: 422, + }, + + // 404 is also translated to 422 + { + err: httpgrpc.Errorf(http.StatusNotFound, "not found"), + expectedString: "not found", + }, + { err: search.NewQuota(1).Reserve(2), expectedString: "resource exhausted (used 1)", diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 58cd3d56109..8d7fe7152ed 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -136,27 +136,21 @@ func NewParquetQueryable( parquetQueryableOpts := []queryable.QueryableOpts{ queryable.WithRowCountLimitFunc(func(ctx context.Context) int64 { - userID, err := tenant.TenantID(ctx) - if err != nil { - // This shouldn't happen - return 0 - } + // Ignore error as this shouldn't happen. + // If failed to resolve tenant we will just use the default limit value. + userID, _ := tenant.TenantID(ctx) return int64(limits.ParquetMaxFetchedRowCount(userID)) }), queryable.WithChunkBytesLimitFunc(func(ctx context.Context) int64 { - userID, err := tenant.TenantID(ctx) - if err != nil { - // This shouldn't happen - return 0 - } + // Ignore error as this shouldn't happen. + // If failed to resolve tenant we will just use the default limit value. + userID, _ := tenant.TenantID(ctx) return int64(limits.ParquetMaxFetchedChunkBytes(userID)) }), queryable.WithDataBytesLimitFunc(func(ctx context.Context) int64 { - userID, err := tenant.TenantID(ctx) - if err != nil { - // This shouldn't happen - return 0 - } + // Ignore error as this shouldn't happen. + // If failed to resolve tenant we will just use the default limit value. + userID, _ := tenant.TenantID(ctx) return int64(limits.ParquetMaxFetchedDataBytes(userID)) }), queryable.WithMaterializedSeriesCallback(func(ctx context.Context, cs []storage.ChunkSeries) error { diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 7fa217fa49c..fcd96fea36b 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -326,7 +326,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.ParquetConverterEnabled, "parquet-converter.enabled", false, "If set, enables the Parquet converter to create the parquet files.") // Parquet Queryable enforced limits. - f.IntVar(&l.ParquetMaxFetchedRowCount, "querier.parquet-queryable.max-fetched-row-count", 0, "The maximum number of rows that can be fetched when querying parquet storage. 0 to disable.") + f.IntVar(&l.ParquetMaxFetchedRowCount, "querier.parquet-queryable.max-fetched-row-count", 0, "The maximum number of rows that can be fetched when querying parquet storage. Each row maps to a series in a parquet file. This limit applies before materializing chunks. 0 to disable.") f.IntVar(&l.ParquetMaxFetchedChunkBytes, "querier.parquet-queryable.max-fetched-chunk-bytes", 0, "The maximum number of bytes that can be used to fetch chunk column pages when querying parquet storage. 0 to disable.") f.IntVar(&l.ParquetMaxFetchedDataBytes, "querier.parquet-queryable.max-fetched-data-bytes", 0, "The maximum number of bytes that can be used to fetch all column pages when querying parquet storage. 0 to disable.") From 6e3d97ebdf883c24bbab42f3db9c468430e5e60d Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 11 Jul 2025 14:26:55 -0700 Subject: [PATCH 7/7] fix test Signed-off-by: yeya24 --- pkg/querier/error_translate_queryable_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/querier/error_translate_queryable_test.go b/pkg/querier/error_translate_queryable_test.go index a89e7752619..b1b34149096 100644 --- a/pkg/querier/error_translate_queryable_test.go +++ b/pkg/querier/error_translate_queryable_test.go @@ -51,7 +51,8 @@ func TestApiStatusCodes(t *testing.T) { expectedCode: 422, }, - {err: promql.ErrTooManySamples("query execution"), + { + err: promql.ErrTooManySamples("query execution"), expectedString: "too many samples", expectedCode: 422, }, @@ -79,6 +80,7 @@ func TestApiStatusCodes(t *testing.T) { { err: httpgrpc.Errorf(http.StatusNotFound, "not found"), expectedString: "not found", + expectedCode: 422, }, {