Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
* [ENHANCEMENT] Store Gateway: Allow to ignore syncing blocks older than certain time using `ignore_blocks_before`. #6830
* [ENHANCEMENT] Distributor: Add native histograms max sample size bytes limit validation. #6834
* [ENHANCEMENT] Querier: Support caching parquet labels file in parquet queryable. #6835
* [ENHANCEMENT] Querier: Support query limits in parquet queryable. #6870
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/oklog/ulid/v2 v2.1.1
github.com/parquet-go/parquet-go v0.25.1
github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994
github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643
github.com/prometheus/procfs v0.16.1
github.com/sercand/kuberesolver/v5 v5.1.1
github.com/tjhop/slog-gokit v0.1.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -814,8 +814,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994 h1:xHR2Xex5XWYl5rQKObX8sVqykPXzlL0Rytd9mKo0sss=
github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is=
github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 h1:XoOXq+q+CcY8MZqAVoPtdG3R6o84aeZpZFDM+C9DJXg=
github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is=
github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4=
github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis=
github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA=
Expand Down
6 changes: 6 additions & 0 deletions pkg/querier/error_translate_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/gogo/status"
"github.com/pkg/errors"
"github.com/prometheus-community/parquet-common/search"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -48,6 +49,11 @@ func TranslateToPromqlAPIError(err error) error {
return err // 422
}

if search.IsResourceExhausted(err) {
cause := errors.Cause(err)
return cause // 422
}

s, ok := status.FromError(err)

if !ok {
Expand Down
9 changes: 7 additions & 2 deletions pkg/querier/error_translate_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/grafana/regexp"
"github.com/pkg/errors"
"github.com/prometheus-community/parquet-common/search"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/promslog"
"github.com/prometheus/common/route"
Expand Down Expand Up @@ -50,8 +51,7 @@ func TestApiStatusCodes(t *testing.T) {
expectedCode: 422,
},

{
err: promql.ErrTooManySamples("query execution"),
{err: promql.ErrTooManySamples("query execution"),
expectedString: "too many samples",
expectedCode: 422,
},
Expand Down Expand Up @@ -79,6 +79,11 @@ func TestApiStatusCodes(t *testing.T) {
{
err: httpgrpc.Errorf(http.StatusNotFound, "not found"),
expectedString: "not found",
},

{
err: search.NewQuota(1).Reserve(2),
expectedString: "resource exhausted (used 1)",
expectedCode: 422,
},

Expand Down
62 changes: 60 additions & 2 deletions pkg/querier/parquet_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
"github.com/thanos-io/thanos/pkg/strutil"
"golang.org/x/sync/errgroup"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/multierror"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -132,6 +134,62 @@ func NewParquetQueryable(

cDecoder := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool())

parquetQueryableOpts := []queryable.QueryableOpts{
queryable.WithRowCountLimitFunc(func(ctx context.Context) int64 {
// Ignore error as this shouldn't happen.
// If failed to resolve tenant we will just use the default limit value.
userID, _ := tenant.TenantID(ctx)
return int64(limits.ParquetMaxFetchedRowCount(userID))
}),
queryable.WithChunkBytesLimitFunc(func(ctx context.Context) int64 {
// Ignore error as this shouldn't happen.
// If failed to resolve tenant we will just use the default limit value.
userID, _ := tenant.TenantID(ctx)
return int64(limits.ParquetMaxFetchedChunkBytes(userID))
}),
queryable.WithDataBytesLimitFunc(func(ctx context.Context) int64 {
// Ignore error as this shouldn't happen.
// If failed to resolve tenant we will just use the default limit value.
userID, _ := tenant.TenantID(ctx)
return int64(limits.ParquetMaxFetchedDataBytes(userID))
}),
queryable.WithMaterializedSeriesCallback(func(ctx context.Context, cs []storage.ChunkSeries) error {
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
lbls := make([][]cortexpb.LabelAdapter, 0, len(cs))
for _, series := range cs {
chkCount := 0
chunkSize := 0
lblSize := 0
lblAdapter := cortexpb.FromLabelsToLabelAdapters(series.Labels())
lbls = append(lbls, lblAdapter)
for _, lbl := range lblAdapter {
lblSize += lbl.Size()
}
iter := series.Iterator(nil)
for iter.Next() {
chk := iter.At()
chunkSize += len(chk.Chunk.Bytes())
chkCount++
}
if chkCount > 0 {
if err := queryLimiter.AddChunks(chkCount); err != nil {
return validation.LimitError(err.Error())
}
if err := queryLimiter.AddChunkBytes(chunkSize); err != nil {
return validation.LimitError(err.Error())
}
}

if err := queryLimiter.AddDataBytes(chunkSize + lblSize); err != nil {
return validation.LimitError(err.Error())
}
}
if err := queryLimiter.AddSeries(lbls...); err != nil {
return validation.LimitError(err.Error())
}
return nil
}),
}
parquetQueryable, err := queryable.NewParquetQueryable(cDecoder, func(ctx context.Context, mint, maxt int64) ([]parquet_storage.ParquetShard, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
Expand Down Expand Up @@ -182,7 +240,7 @@ func NewParquetQueryable(
}

return shards, errGroup.Wait()
})
}, parquetQueryableOpts...)

p := &parquetQueryableWithFallback{
subservices: manager,
Expand Down Expand Up @@ -376,7 +434,7 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool

userID, err := tenant.TenantID(ctx)
if err != nil {
storage.ErrSeriesSet(err)
return storage.ErrSeriesSet(err)
}

if q.limits.QueryVerticalShardSize(userID) > 1 {
Expand Down
Loading
Loading