Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions integration/parquet_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ func TestParquetFuzz(t *testing.T) {
"-store-gateway.sharding-enabled": "false",
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
"-frontend.query-vertical-shard-size": "1",
"-alertmanager.web.external-url": "http://localhost/alertmanager",
// Enable vertical sharding.
"-frontend.query-vertical-shard-size": "3",
"-frontend.max-cache-freshness": "1m",
// enable experimental promQL funcs
"-querier.enable-promql-experimental-functions": "true",
Expand Down Expand Up @@ -130,16 +131,20 @@ func TestParquetFuzz(t *testing.T) {
// Wait until we convert the blocks
cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} {
found := false
foundBucketIndex := false

err := bkt.Iter(context.Background(), "", func(name string) error {
fmt.Println(name)
if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) {
found = true
}
if name == "bucket-index.json.gz" {
foundBucketIndex = true
}
return nil
}, objstore.WithRecursiveIter())
require.NoError(t, err)
return found
return found && foundBucketIndex
})

att, err := bkt.Attributes(context.Background(), "bucket-index.json.gz")
Expand Down Expand Up @@ -178,7 +183,7 @@ func TestParquetFuzz(t *testing.T) {
}
ps := promqlsmith.New(rnd, lbls, opts...)

runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 500, false)
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false)

require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
Expand Down
18 changes: 16 additions & 2 deletions integration/query_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ func TestVerticalShardingFuzz(t *testing.T) {
}
ps := promqlsmith.New(rnd, lbls, opts...)

runQueryFuzzTestCases(t, ps, c1, c2, now, start, end, scrapeInterval, 1000, false)
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false)
}

func TestProtobufCodecFuzz(t *testing.T) {
Expand Down Expand Up @@ -1838,7 +1838,7 @@ func runQueryFuzzTestCases(t *testing.T, ps *promqlsmith.PromQLSmith, c1, c2 *e2
failures++
}
} else if !cmp.Equal(tc.res1, tc.res2, comparer) {
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
t.Logf("case %d results mismatch.\n%s: %s\nres1 len: %d data: %s\nres2 len: %d data: %s\n", i, qt, tc.query, resultLength(tc.res1), tc.res1.String(), resultLength(tc.res2), tc.res2.String())
failures++
}
}
Expand Down Expand Up @@ -1872,3 +1872,17 @@ func isValidQuery(generatedQuery parser.Expr, skipStdAggregations bool) bool {
}
return isValid
}

func resultLength(x model.Value) int {
vx, xvec := x.(model.Vector)
if xvec {
return vx.Len()
}

mx, xMatrix := x.(model.Matrix)
if xMatrix {
return mx.Len()
}
// Other type, return 0
return 0
}
9 changes: 8 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cortexproject/cortex/pkg/querier/tripperware/instantquery"
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
cortexquerysharding "github.com/cortexproject/cortex/pkg/querysharding"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
Expand Down Expand Up @@ -511,7 +512,13 @@ func (t *Cortex) initFlusher() (serv services.Service, err error) {
// initQueryFrontendTripperware instantiates the tripperware used by the query frontend
// to optimize Prometheus query requests.
func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) {
queryAnalyzer := querysharding.NewQueryAnalyzer()
var queryAnalyzer querysharding.Analyzer
queryAnalyzer = querysharding.NewQueryAnalyzer()
if t.Cfg.Querier.EnableParquetQueryable {
// Disable vertical sharding for binary expression with ignore for parquet queryable.
queryAnalyzer = cortexquerysharding.NewDisableBinaryExpressionAnalyzer(queryAnalyzer)
}

// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
Expand Down
58 changes: 47 additions & 11 deletions pkg/querier/parquet_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,32 @@ import (
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/opentracing/opentracing-go"
"github.com/parquet-go/parquet-go"
"github.com/pkg/errors"
"github.com/prometheus-community/parquet-common/queryable"
"github.com/prometheus-community/parquet-common/schema"
"github.com/prometheus-community/parquet-common/search"
parquet_storage "github.com/prometheus-community/parquet-common/storage"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/strutil"
"golang.org/x/sync/errgroup"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querysharding"
"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/multierror"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
Expand Down Expand Up @@ -153,6 +154,7 @@ func NewParquetQueryable(
userID, _ := tenant.TenantID(ctx)
return int64(limits.ParquetMaxFetchedDataBytes(userID))
}),
queryable.WithMaterializedLabelsFilterCallback(materializedLabelsFilterCallback),
queryable.WithMaterializedSeriesCallback(func(ctx context.Context, cs []storage.ChunkSeries) error {
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
lbls := make([][]cortexpb.LabelAdapter, 0, len(cs))
Expand Down Expand Up @@ -432,17 +434,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
span, ctx := opentracing.StartSpanFromContext(ctx, "parquetQuerierWithFallback.Select")
defer span.Finish()

userID, err := tenant.TenantID(ctx)
newMatchers, shardMatcher, err := querysharding.ExtractShardingMatchers(matchers)
if err != nil {
return storage.ErrSeriesSet(err)
}

if q.limits.QueryVerticalShardSize(userID) > 1 {
uLogger := util_log.WithUserID(userID, q.logger)
level.Warn(uLogger).Log("msg", "parquet queryable enabled but vertical sharding > 1. Falling back to the block storage")

return q.blocksStoreQuerier.Select(ctx, sortSeries, h, matchers...)
}
defer shardMatcher.Close()

hints := storage.SelectHints{
Start: q.minT,
Expand Down Expand Up @@ -483,7 +479,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
go func() {
span, _ := opentracing.StartSpanFromContext(ctx, "parquetQuerier.Select")
defer span.Finish()
p <- q.parquetQuerier.Select(InjectBlocksIntoContext(ctx, parquet...), sortSeries, &hints, matchers...)
parquetCtx := InjectBlocksIntoContext(ctx, parquet...)
if shardMatcher != nil {
parquetCtx = injectShardMatcherIntoContext(parquetCtx, shardMatcher)
}
p <- q.parquetQuerier.Select(parquetCtx, sortSeries, &hints, newMatchers...)
}()
}

Expand Down Expand Up @@ -570,6 +570,26 @@ func (q *parquetQuerierWithFallback) incrementOpsMetric(method string, remaining
}
}

type shardMatcherLabelsFilter struct {
shardMatcher *storepb.ShardMatcher
}

func (f *shardMatcherLabelsFilter) Filter(lbls labels.Labels) bool {
return f.shardMatcher.MatchesLabels(lbls)
}

func (f *shardMatcherLabelsFilter) Close() {
f.shardMatcher.Close()
}

func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHints) (search.MaterializedLabelsFilter, bool) {
shardMatcher, exists := extractShardMatcherFromContext(ctx)
if !exists || !shardMatcher.IsSharded() {
return nil, false
}
return &shardMatcherLabelsFilter{shardMatcher: shardMatcher}, true
}

type cacheInterface[T any] interface {
Get(path string) T
Set(path string, reader T)
Expand Down Expand Up @@ -655,3 +675,19 @@ func (n noopCache[T]) Get(_ string) (r T) {
func (n noopCache[T]) Set(_ string, _ T) {

}

var (
shardMatcherCtxKey contextKey = 1
)

func injectShardMatcherIntoContext(ctx context.Context, sm *storepb.ShardMatcher) context.Context {
return context.WithValue(ctx, shardMatcherCtxKey, sm)
}

func extractShardMatcherFromContext(ctx context.Context) (*storepb.ShardMatcher, bool) {
if sm := ctx.Value(shardMatcherCtxKey); sm != nil {
return sm.(*storepb.ShardMatcher), true
}

return nil, false
}
131 changes: 88 additions & 43 deletions pkg/querier/parquet_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math/rand"
"path/filepath"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -75,49 +76,6 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "user-1")

t.Run("should fallback when vertical sharding is enabled", func(t *testing.T) {
finder := &blocksFinderMock{}
stores := createStore()

q := &blocksStoreQuerier{
minT: minT,
maxT: maxT,
finder: finder,
stores: stores,
consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil),
logger: log.NewNopLogger(),
metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()),
limits: &blocksStoreLimitsMock{},

storeGatewayConsistencyCheckMaxAttempts: 3,
}

mParquetQuerier := &mockParquetQuerier{}
pq := &parquetQuerierWithFallback{
minT: minT,
maxT: maxT,
finder: finder,
blocksStoreQuerier: q,
parquetQuerier: mParquetQuerier,
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
limits: defaultOverrides(t, 4),
logger: log.NewNopLogger(),
defaultBlockStoreType: parquetBlockStore,
}

finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
&bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}},
&bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}},
}, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil)

t.Run("select", func(t *testing.T) {
ss := pq.Select(ctx, true, nil, matchers...)
require.NoError(t, ss.Err())
require.Len(t, stores.queriedBlocks, 2)
require.Len(t, mParquetQuerier.queriedBlocks, 0)
})
})

t.Run("should fallback all blocks", func(t *testing.T) {
finder := &blocksFinderMock{}
stores := createStore()
Expand Down Expand Up @@ -671,3 +629,90 @@ func (m *mockParquetQuerier) Reset() {
func (mockParquetQuerier) Close() error {
return nil
}

func TestMaterializedLabelsFilterCallback(t *testing.T) {
tests := []struct {
name string
setupContext func() context.Context
expectedFilterReturned bool
expectedCallbackReturned bool
}{
{
name: "no shard matcher in context",
setupContext: func() context.Context {
return context.Background()
},
expectedFilterReturned: false,
expectedCallbackReturned: false,
},
{
name: "shard matcher exists but is not sharded",
setupContext: func() context.Context {
// Create a ShardInfo with TotalShards = 0 (not sharded)
shardInfo := &storepb.ShardInfo{
ShardIndex: 0,
TotalShards: 0, // Not sharded
By: true,
Labels: []string{"__name__"},
}

buffers := &sync.Pool{New: func() interface{} {
b := make([]byte, 0, 100)
return &b
}}
shardMatcher := shardInfo.Matcher(buffers)

return injectShardMatcherIntoContext(context.Background(), shardMatcher)
},
expectedFilterReturned: false,
expectedCallbackReturned: false,
},
{
name: "shard matcher exists and is sharded",
setupContext: func() context.Context {
// Create a ShardInfo with TotalShards > 0 (sharded)
shardInfo := &storepb.ShardInfo{
ShardIndex: 0,
TotalShards: 2, // Sharded
By: true,
Labels: []string{"__name__"},
}

buffers := &sync.Pool{New: func() interface{} {
b := make([]byte, 0, 100)
return &b
}}
shardMatcher := shardInfo.Matcher(buffers)

return injectShardMatcherIntoContext(context.Background(), shardMatcher)
},
expectedFilterReturned: true,
expectedCallbackReturned: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := tt.setupContext()

filter, exists := materializedLabelsFilterCallback(ctx, nil)

require.Equal(t, tt.expectedCallbackReturned, exists)

if tt.expectedFilterReturned {
require.NotNil(t, filter)

// Test that the filter can be used
testLabels := labels.FromStrings("__name__", "test_metric", "label1", "value1")
// We can't easily test the actual filtering logic without knowing the internal
// shard matching implementation, but we can at least verify the filter interface works
_ = filter.Filter(testLabels)

// Cleanup
filter.Close()
} else {
require.Nil(t, filter)
}
})
}
}
Loading
Loading