Skip to content

Commit d103475

Browse files
authored
Implementing Parquet Queryable with fallback (#6743)
* Implementing Parquet Queryable with fallback Signed-off-by: alanprot <[email protected]> * changing modules to create the parquet queryable Signed-off-by: alanprot <[email protected]> * adding ParquetConverterEnabled config Signed-off-by: alanprot <[email protected]> * BlockStoreQueryable: querying only blocks in the context when present Signed-off-by: alanprot <[email protected]> * test parquet querier fallback logic Signed-off-by: alanprot <[email protected]> * changelog Signed-off-by: alanprot <[email protected]> * Some bugfixes and change in the config Signed-off-by: alanprot <[email protected]> * addressing comments Signed-off-by: alanprot <[email protected]> * adding cortex_parquet_queryable_selects_queried_total metric Signed-off-by: alanprot <[email protected]> * update parquet common Signed-off-by: alanprot <[email protected]> * Creating test parquet fuzz integration test Signed-off-by: alanprot <[email protected]> * using right build flag for fuxx testing Signed-off-by: alanprot <[email protected]> --------- Signed-off-by: alanprot <[email protected]>
1 parent 8532bc2 commit d103475

33 files changed

+3182
-133
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663
1212
* [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680
1313
* [FEATURE] Ruler: Add support for group labels. #6665
14-
* [FEATURE] Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet. #6716
14+
* [FEATURE] Experimental Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet and Parquet Queryable. #6716 #6743
1515
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
1616
* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738
1717
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ require (
8282
github.com/hashicorp/golang-lru/v2 v2.0.7
8383
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
8484
github.com/parquet-go/parquet-go v0.25.0
85-
github.com/prometheus-community/parquet-common v0.0.0-20250428074311-306c8486441d
85+
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73
8686
github.com/prometheus/procfs v0.15.1
8787
github.com/sercand/kuberesolver/v5 v5.1.1
8888
github.com/tjhop/slog-gokit v0.1.3

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1573,8 +1573,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
15731573
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
15741574
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
15751575
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
1576-
github.com/prometheus-community/parquet-common v0.0.0-20250428074311-306c8486441d h1:j7d62fP5x6yUFNgNDth5JCLOoj6ZclXkBneSATbPZig=
1577-
github.com/prometheus-community/parquet-common v0.0.0-20250428074311-306c8486441d/go.mod h1:Eo3B53ZLcfCEV06clM4UIFTgwxRXm0BHdiaRslKe3Y8=
1576+
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73 h1:AogORrmarkYfUOI7/lqOhz9atYmLZo69vPQ/SFkPSxE=
1577+
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73/go.mod h1:zRW/xXBlELf8v9h9uqWvDkjOr3N5BtQGZ6LsDX9Ea/A=
15781578
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 h1:owfYHh79h8Y5HvNMGyww+DaVwo10CKiRW1RQrrZzIwg=
15791579
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0/go.mod h1:rT989D4UtOcfd9tVqIZRVIM8rkg+9XbreBjFNEKXvVI=
15801580
github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA=

integration/parquet_querier_test.go

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
//go:build integration_query_fuzz
2+
// +build integration_query_fuzz
3+
4+
package integration
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"math/rand"
10+
"path/filepath"
11+
"strconv"
12+
"testing"
13+
"time"
14+
15+
"github.com/cortexproject/promqlsmith"
16+
"github.com/prometheus/prometheus/model/labels"
17+
"github.com/stretchr/testify/require"
18+
"github.com/thanos-io/objstore"
19+
"github.com/thanos-io/thanos/pkg/block"
20+
"github.com/thanos-io/thanos/pkg/block/metadata"
21+
22+
"github.com/cortexproject/cortex/integration/e2e"
23+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
24+
"github.com/cortexproject/cortex/integration/e2ecortex"
25+
"github.com/cortexproject/cortex/pkg/storage/bucket"
26+
"github.com/cortexproject/cortex/pkg/storage/tsdb"
27+
"github.com/cortexproject/cortex/pkg/util/log"
28+
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
29+
)
30+
31+
func TestParquetFuzz(t *testing.T) {
32+
33+
s, err := e2e.NewScenario(networkName)
34+
require.NoError(t, err)
35+
defer s.Close()
36+
37+
consul := e2edb.NewConsulWithName("consul")
38+
require.NoError(t, s.StartAndWaitReady(consul))
39+
40+
baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
41+
flags := mergeFlags(
42+
baseFlags,
43+
map[string]string{
44+
"-target": "all,parquet-converter",
45+
"-blocks-storage.tsdb.ship-interval": "1s",
46+
"-blocks-storage.bucket-store.sync-interval": "1s",
47+
"-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s",
48+
"-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s",
49+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
50+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
51+
"-querier.query-store-for-labels-enabled": "true",
52+
// compactor
53+
"-compactor.cleanup-interval": "1s",
54+
// Ingester.
55+
"-ring.store": "consul",
56+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
57+
// Distributor.
58+
"-distributor.replication-factor": "1",
59+
// Store-gateway.
60+
"-store-gateway.sharding-enabled": "false",
61+
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
62+
// alert manager
63+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
64+
"-frontend.query-vertical-shard-size": "1",
65+
"-frontend.max-cache-freshness": "1m",
66+
// enable experimental promQL funcs
67+
"-querier.enable-promql-experimental-functions": "true",
68+
// parquet-converter
69+
"-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
70+
"-parquet-converter.conversion-interval": "1s",
71+
"-parquet-converter.enabled": "true",
72+
// Querier
73+
"-querier.query-parquet-files": "true",
74+
},
75+
)
76+
77+
// make alert manager config dir
78+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
79+
80+
ctx := context.Background()
81+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
82+
dir := filepath.Join(s.SharedDir(), "data")
83+
numSeries := 10
84+
numSamples := 60
85+
lbls := make([]labels.Labels, 0, numSeries*2)
86+
scrapeInterval := time.Minute
87+
statusCodes := []string{"200", "400", "404", "500", "502"}
88+
now := time.Now()
89+
start := now.Add(-time.Hour * 24)
90+
end := now.Add(-time.Hour)
91+
92+
for i := 0; i < numSeries; i++ {
93+
lbls = append(lbls, labels.Labels{
94+
{Name: labels.MetricName, Value: "test_series_a"},
95+
{Name: "job", Value: "test"},
96+
{Name: "series", Value: strconv.Itoa(i % 3)},
97+
{Name: "status_code", Value: statusCodes[i%5]},
98+
})
99+
100+
lbls = append(lbls, labels.Labels{
101+
{Name: labels.MetricName, Value: "test_series_b"},
102+
{Name: "job", Value: "test"},
103+
{Name: "series", Value: strconv.Itoa((i + 1) % 3)},
104+
{Name: "status_code", Value: statusCodes[(i+1)%5]},
105+
})
106+
}
107+
id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
108+
require.NoError(t, err)
109+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
110+
require.NoError(t, s.StartAndWaitReady(minio))
111+
112+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
113+
require.NoError(t, s.StartAndWaitReady(cortex))
114+
115+
storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
116+
require.NoError(t, err)
117+
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)
118+
119+
err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
120+
require.NoError(t, err)
121+
122+
// Wait until we convert the blocks
123+
cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} {
124+
found := false
125+
126+
err := bkt.Iter(context.Background(), "", func(name string) error {
127+
fmt.Println(name)
128+
if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) {
129+
found = true
130+
}
131+
return nil
132+
}, objstore.WithRecursiveIter())
133+
require.NoError(t, err)
134+
return found
135+
})
136+
137+
att, err := bkt.Attributes(context.Background(), "bucket-index.json.gz")
138+
require.NoError(t, err)
139+
numberOfIndexesUpdate := 0
140+
lastUpdate := att.LastModified
141+
142+
cortex_testutil.Poll(t, 30*time.Second, 5, func() interface{} {
143+
att, err := bkt.Attributes(context.Background(), "bucket-index.json.gz")
144+
require.NoError(t, err)
145+
if lastUpdate != att.LastModified {
146+
lastUpdate = att.LastModified
147+
numberOfIndexesUpdate++
148+
}
149+
return numberOfIndexesUpdate
150+
})
151+
152+
c1, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
153+
require.NoError(t, err)
154+
155+
err = writeFileToSharedDir(s, "prometheus.yml", []byte(""))
156+
require.NoError(t, err)
157+
prom := e2edb.NewPrometheus("", map[string]string{
158+
"--enable-feature": "promql-experimental-functions",
159+
})
160+
require.NoError(t, s.StartAndWaitReady(prom))
161+
162+
c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint())
163+
require.NoError(t, err)
164+
waitUntilReady(t, ctx, c1, c2, `{job="test"}`, start, end)
165+
166+
opts := []promqlsmith.Option{
167+
promqlsmith.WithEnableOffset(true),
168+
promqlsmith.WithEnableAtModifier(true),
169+
promqlsmith.WithEnabledFunctions(enabledFunctions),
170+
}
171+
ps := promqlsmith.New(rnd, lbls, opts...)
172+
173+
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 500, false)
174+
175+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
176+
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
177+
}

pkg/compactor/blocks_cleaner.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,11 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
597597
// Generate an updated in-memory version of the bucket index.
598598
begin = time.Now()
599599
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger)
600+
601+
if c.cfgProvider.ParquetConverterEnabled(userID) {
602+
w.EnableParquet()
603+
}
604+
600605
idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx)
601606
if err != nil {
602607
idxs.Status = bucketindex.GenericError

pkg/compactor/blocks_cleaner_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,10 @@ type mockConfigProvider struct {
984984
userRetentionPeriods map[string]time.Duration
985985
}
986986

987+
func (m *mockConfigProvider) ParquetConverterEnabled(userID string) bool {
988+
return false
989+
}
990+
987991
func newMockConfigProvider() *mockConfigProvider {
988992
return &mockConfigProvider{
989993
userRetentionPeriods: make(map[string]time.Duration),

pkg/compactor/compactor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,7 @@ func (cfg *Config) Validate(limits validation.Limits) error {
388388
// ConfigProvider defines the per-tenant config provider for the Compactor.
389389
type ConfigProvider interface {
390390
bucket.TenantConfigProvider
391+
ParquetConverterEnabled(userID string) bool
391392
CompactorBlocksRetentionPeriod(user string) time.Duration
392393
}
393394

pkg/cortex/modules.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -400,11 +400,20 @@ func (t *Cortex) initStoreQueryables() (services.Service, error) {
400400
var servs []services.Service
401401

402402
//nolint:revive // I prefer this form over removing 'else', because it allows q to have smaller scope.
403-
if q, err := initQueryableForEngine(t.Cfg, t.Overrides, prometheus.DefaultRegisterer); err != nil {
403+
var queriable prom_storage.Queryable
404+
if q, err := initBlockStoreQueryable(t.Cfg, t.Overrides, prometheus.DefaultRegisterer); err != nil {
404405
return nil, fmt.Errorf("failed to initialize querier: %v", err)
405406
} else {
406-
t.StoreQueryables = append(t.StoreQueryables, querier.UseAlwaysQueryable(q))
407-
if s, ok := q.(services.Service); ok {
407+
queriable = q
408+
if t.Cfg.Querier.QueryParquetFiles {
409+
pq, err := querier.NewParquetQueryable(t.Cfg.Querier, t.Cfg.BlocksStorage, t.Overrides, q, util_log.Logger, prometheus.DefaultRegisterer)
410+
if err != nil {
411+
return nil, fmt.Errorf("failed to initialize parquet querier: %v", err)
412+
}
413+
queriable = pq
414+
}
415+
t.StoreQueryables = append(t.StoreQueryables, querier.UseAlwaysQueryable(queriable))
416+
if s, ok := queriable.(services.Service); ok {
408417
servs = append(servs, s)
409418
}
410419
}
@@ -424,7 +433,7 @@ func (t *Cortex) initStoreQueryables() (services.Service, error) {
424433
}
425434
}
426435

427-
func initQueryableForEngine(cfg Config, limits *validation.Overrides, reg prometheus.Registerer) (prom_storage.Queryable, error) {
436+
func initBlockStoreQueryable(cfg Config, limits *validation.Overrides, reg prometheus.Registerer) (*querier.BlocksStoreQueryable, error) {
428437
// When running in single binary, if the blocks sharding is disabled and no custom
429438
// store-gateway address has been configured, we can set it to the running process.
430439
if cfg.isModuleEnabled(All) && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" {

pkg/parquetconverter/converter.go

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ import (
2929
cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet"
3030
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
3131
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
32-
"github.com/cortexproject/cortex/pkg/util"
33-
"github.com/cortexproject/cortex/pkg/util/flagext"
32+
"github.com/cortexproject/cortex/pkg/tenant"
3433
util_log "github.com/cortexproject/cortex/pkg/util/log"
3534
"github.com/cortexproject/cortex/pkg/util/services"
3635
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -46,10 +45,8 @@ const (
4645
var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
4746

4847
type Config struct {
49-
EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
50-
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
51-
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
52-
ConversionInterval time.Duration `yaml:"conversion_interval"`
48+
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
49+
ConversionInterval time.Duration `yaml:"conversion_interval"`
5350

5451
DataDir string `yaml:"data_dir"`
5552

@@ -64,8 +61,7 @@ type Converter struct {
6461
cfg Config
6562
storageCfg cortex_tsdb.BlocksStorageConfig
6663

67-
allowedTenants *util.AllowedTenants
68-
limits *validation.Overrides
64+
limits *validation.Overrides
6965

7066
// Ring used for sharding compactions.
7167
ringLifecycler *ring.Lifecycler
@@ -87,8 +83,6 @@ type Converter struct {
8783
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
8884
cfg.Ring.RegisterFlags(f)
8985

90-
f.Var(&cfg.EnabledTenants, "parquet-converter.enabled-tenants", "Comma separated list of tenants that can be converted. If specified, only these tenants will be converted, otherwise all tenants can be converted.")
91-
f.Var(&cfg.DisabledTenants, "parquet-converter.disabled-tenants", "Comma separated list of tenants that cannot converted.")
9286
f.StringVar(&cfg.DataDir, "parquet-converter.data-dir", "./data", "Data directory in which to cache blocks and process conversions.")
9387
f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from the long term storage.")
9488
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "The frequency at which the conversion job runs.")
@@ -107,7 +101,6 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex
107101
reg: registerer,
108102
storageCfg: storageCfg,
109103
logger: logger,
110-
allowedTenants: util.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants),
111104
limits: limits,
112105
pool: chunkenc.NewPool(),
113106
blockRanges: blockRanges,
@@ -171,6 +164,10 @@ func (c *Converter) running(ctx context.Context) error {
171164
}
172165
ownedUsers := map[string]struct{}{}
173166
for _, userID := range users {
167+
if !c.limits.ParquetConverterEnabled(userID) {
168+
continue
169+
}
170+
174171
var ring ring.ReadRing
175172
ring = c.ring
176173
if c.limits.ParquetConverterTenantShardSize(userID) > 0 {
@@ -375,15 +372,11 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
375372
return nil
376373
}
377374

378-
func (c *Converter) ownUser(r ring.ReadRing, userID string) (bool, error) {
379-
if !c.allowedTenants.IsAllowed(userID) {
375+
func (c *Converter) ownUser(r ring.ReadRing, userId string) (bool, error) {
376+
if userId == tenant.GlobalMarkersDir {
377+
// __markers__ is reserved for global markers and no tenant should be allowed to have that name.
380378
return false, nil
381379
}
382-
383-
if c.limits.ParquetConverterTenantShardSize(userID) <= 0 {
384-
return true, nil
385-
}
386-
387380
rs, err := r.GetAllHealthy(RingOp)
388381
if err != nil {
389382
return false, err

pkg/parquetconverter/converter_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,11 @@ func TestConverter(t *testing.T) {
4545
bucketClient, err := filesystem.NewBucket(t.TempDir())
4646
require.NoError(t, err)
4747
userBucket := bucket.NewPrefixedBucketClient(bucketClient, user)
48+
limits := &validation.Limits{}
49+
flagext.DefaultValues(limits)
50+
limits.ParquetConverterEnabled = true
4851

49-
c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), nil)
52+
c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits)
5053

5154
ctx := context.Background()
5255

0 commit comments

Comments
 (0)