Skip to content

Commit 469ae45

Browse files
authored
Fallback to the blocks queryable if vertical sharding is enabled (#6764)
* Fallback to the blocks queryable if vertical sharding is enabled Signed-off-by: alanprot <[email protected]> * update parquet common Signed-off-by: alanprot <[email protected]> * Opening parquet files in parallel Signed-off-by: alanprot <[email protected]> * pulling parquet common again Signed-off-by: alanprot <[email protected]> * comments Signed-off-by: alanprot <[email protected]> * enable opmistic readers Signed-off-by: alanprot <[email protected]> * creating the base converter options only 1 time Signed-off-by: alanprot <[email protected]> * Removing fallback test when vertical sharding is enable for get labels and values Signed-off-by: alanprot <[email protected]> * lint Signed-off-by: alanprot <[email protected]> * update parquet common once again Signed-off-by: alanprot <[email protected]> --------- Signed-off-by: alanprot <[email protected]>
1 parent 5101dfe commit 469ae45

File tree

15 files changed

+493
-164
lines changed

15 files changed

+493
-164
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ require (
8282
github.com/hashicorp/golang-lru/v2 v2.0.7
8383
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
8484
github.com/parquet-go/parquet-go v0.25.0
85-
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73
85+
github.com/prometheus-community/parquet-common v0.0.0-20250528231323-eec9c3c020f0
8686
github.com/prometheus/procfs v0.15.1
8787
github.com/sercand/kuberesolver/v5 v5.1.1
8888
github.com/tjhop/slog-gokit v0.1.3

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1573,8 +1573,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
15731573
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
15741574
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
15751575
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
1576-
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73 h1:AogORrmarkYfUOI7/lqOhz9atYmLZo69vPQ/SFkPSxE=
1577-
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73/go.mod h1:zRW/xXBlELf8v9h9uqWvDkjOr3N5BtQGZ6LsDX9Ea/A=
1576+
github.com/prometheus-community/parquet-common v0.0.0-20250528231323-eec9c3c020f0 h1:XCSo9v3if0v0G+aAO/hSUr/Ck9KJXcUPzDFt1dJnAV8=
1577+
github.com/prometheus-community/parquet-common v0.0.0-20250528231323-eec9c3c020f0/go.mod h1:zRW/xXBlELf8v9h9uqWvDkjOr3N5BtQGZ6LsDX9Ea/A=
15781578
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 h1:owfYHh79h8Y5HvNMGyww+DaVwo10CKiRW1RQrrZzIwg=
15791579
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0/go.mod h1:rT989D4UtOcfd9tVqIZRVIM8rkg+9XbreBjFNEKXvVI=
15801580
github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA=

pkg/parquetconverter/converter.go

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
"flag"
66
"fmt"
77
"hash/fnv"
8+
"math/rand"
89
"os"
910
"path/filepath"
11+
"sort"
1012
"strings"
1113
"time"
1214

@@ -47,6 +49,8 @@ var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
4749
type Config struct {
4850
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
4951
ConversionInterval time.Duration `yaml:"conversion_interval"`
52+
MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"`
53+
FileBufferEnabled bool `yaml:"file_buffer_enabled"`
5054

5155
DataDir string `yaml:"data_dir"`
5256

@@ -78,14 +82,18 @@ type Converter struct {
7882
blockRanges []int64
7983

8084
fetcherMetrics *block.FetcherMetrics
85+
86+
baseConverterOptions []convert.ConvertOption
8187
}
8288

8389
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
8490
cfg.Ring.RegisterFlags(f)
8591

8692
f.StringVar(&cfg.DataDir, "parquet-converter.data-dir", "./data", "Data directory in which to cache blocks and process conversions.")
8793
f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from the long term storage.")
94+
f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Max number of rows per parquet row group.")
8895
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "The frequency at which the conversion job runs.")
96+
f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Whether to enable buffering the writes in disk to reduce memory utilization.")
8997
}
9098

9199
func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) {
@@ -106,6 +114,11 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex
106114
blockRanges: blockRanges,
107115
fetcherMetrics: block.NewFetcherMetrics(registerer, nil, nil),
108116
bkt: bkt,
117+
baseConverterOptions: []convert.ConvertOption{
118+
convert.WithSortBy(labels.MetricName),
119+
convert.WithColDuration(time.Hour * 8),
120+
convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup),
121+
},
109122
}
110123

111124
c.Service = services.NewBasicService(c.starting, c.running, c.stopping)
@@ -163,6 +176,10 @@ func (c *Converter) running(ctx context.Context) error {
163176
continue
164177
}
165178
ownedUsers := map[string]struct{}{}
179+
rand.Shuffle(len(users), func(i, j int) {
180+
users[i], users[j] = users[j], users[i]
181+
})
182+
166183
for _, userID := range users {
167184
if !c.limits.ParquetConverterEnabled(userID) {
168185
continue
@@ -293,11 +310,20 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
293310
return errors.Wrap(err, "error creating block fetcher")
294311
}
295312

296-
blocks, _, err := fetcher.Fetch(ctx)
313+
blks, _, err := fetcher.Fetch(ctx)
297314
if err != nil {
298315
return errors.Wrapf(err, "failed to fetch blocks for user %s", userID)
299316
}
300317

318+
blocks := make([]*metadata.Meta, 0, len(blks))
319+
for _, blk := range blks {
320+
blocks = append(blocks, blk)
321+
}
322+
323+
sort.Slice(blocks, func(i, j int) bool {
324+
return blocks[i].MinTime > blocks[j].MinTime
325+
})
326+
301327
for _, b := range blocks {
302328
ok, err := c.ownBlock(ring, b.ULID.String())
303329
if err != nil {
@@ -345,22 +371,27 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
345371
}
346372

347373
level.Info(logger).Log("msg", "converting block", "block", b.ULID.String(), "dir", bdir)
374+
375+
converterOpts := append(c.baseConverterOptions, convert.WithName(b.ULID.String()))
376+
377+
if c.cfg.FileBufferEnabled {
378+
converterOpts = append(converterOpts, convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*")))
379+
}
380+
348381
_, err = convert.ConvertTSDBBlock(
349382
ctx,
350383
uBucket,
351384
tsdbBlock.MinTime(),
352385
tsdbBlock.MaxTime(),
353386
[]convert.Convertible{tsdbBlock},
354-
convert.WithSortBy(labels.MetricName),
355-
convert.WithColDuration(time.Hour*8),
356-
convert.WithName(b.ULID.String()),
357-
convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*")),
387+
converterOpts...,
358388
)
359389

360390
_ = tsdbBlock.Close()
361391

362392
if err != nil {
363393
level.Error(logger).Log("msg", "Error converting block", "err", err)
394+
continue
364395
}
365396

366397
err = cortex_parquet.WriteConverterMark(ctx, b.ULID, uBucket)

pkg/querier/parquet_queryable.go

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"time"
66

77
"github.com/go-kit/log"
8+
"github.com/go-kit/log/level"
9+
"github.com/parquet-go/parquet-go"
810
"github.com/pkg/errors"
911
"github.com/prometheus-community/parquet-common/schema"
1012
"github.com/prometheus-community/parquet-common/search"
@@ -16,14 +18,17 @@ import (
1618
"github.com/prometheus/prometheus/tsdb/chunkenc"
1719
"github.com/prometheus/prometheus/util/annotations"
1820
"github.com/thanos-io/thanos/pkg/strutil"
21+
"golang.org/x/sync/errgroup"
1922

2023
"github.com/cortexproject/cortex/pkg/storage/bucket"
2124
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2225
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
2326
"github.com/cortexproject/cortex/pkg/tenant"
2427
"github.com/cortexproject/cortex/pkg/util"
28+
util_log "github.com/cortexproject/cortex/pkg/util/log"
2529
"github.com/cortexproject/cortex/pkg/util/multierror"
2630
"github.com/cortexproject/cortex/pkg/util/services"
31+
"github.com/cortexproject/cortex/pkg/util/validation"
2732
)
2833

2934
type parquetQueryableFallbackMetrics struct {
@@ -59,12 +64,15 @@ type parquetQueryableWithFallback struct {
5964

6065
// metrics
6166
metrics *parquetQueryableFallbackMetrics
67+
68+
limits *validation.Overrides
69+
logger log.Logger
6270
}
6371

6472
func NewParquetQueryable(
6573
config Config,
6674
storageCfg cortex_tsdb.BlocksStorageConfig,
67-
limits BlocksStoreLimits,
75+
limits *validation.Overrides,
6876
blockStorageQueryable *BlocksStoreQueryable,
6977
logger log.Logger,
7078
reg prometheus.Registerer,
@@ -93,18 +101,29 @@ func NewParquetQueryable(
93101
}
94102
userBkt := bucket.NewUserBucketClient(userID, bucketClient, limits)
95103

96-
shards := make([]*parquet_storage.ParquetShard, 0, len(blocks))
97-
98-
for _, block := range blocks {
99-
// we always only have 1 shard - shard 0
100-
shard, err := parquet_storage.OpenParquetShard(ctx, userBkt, block.ID.String(), 0)
101-
if err != nil {
102-
return nil, err
103-
}
104-
shards = append(shards, shard)
104+
shards := make([]*parquet_storage.ParquetShard, len(blocks))
105+
errGroup := &errgroup.Group{}
106+
107+
for i, block := range blocks {
108+
errGroup.Go(func() error {
109+
// we always only have 1 shard - shard 0
110+
shard, err := parquet_storage.OpenParquetShard(ctx,
111+
userBkt,
112+
block.ID.String(),
113+
0,
114+
parquet_storage.WithFileOptions(
115+
parquet.SkipMagicBytes(true),
116+
parquet.ReadBufferSize(100*1024),
117+
parquet.SkipBloomFilters(true),
118+
),
119+
parquet_storage.WithOptimisticReader(true),
120+
)
121+
shards[i] = shard
122+
return err
123+
})
105124
}
106125

107-
return shards, nil
126+
return shards, errGroup.Wait()
108127
})
109128

110129
p := &parquetQueryableWithFallback{
@@ -115,6 +134,8 @@ func NewParquetQueryable(
115134
subservicesWatcher: services.NewFailureWatcher(),
116135
finder: blockStorageQueryable.finder,
117136
metrics: newParquetQueryableFallbackMetrics(reg),
137+
limits: limits,
138+
logger: logger,
118139
}
119140

120141
p.Service = services.NewBasicService(p.starting, p.running, p.stopping)
@@ -164,6 +185,8 @@ func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querie
164185
blocksStoreQuerier: bsq,
165186
finder: p.finder,
166187
metrics: p.metrics,
188+
limits: p.limits,
189+
logger: p.logger,
167190
}, nil
168191
}
169192

@@ -181,6 +204,9 @@ type parquetQuerierWithFallback struct {
181204

182205
// metrics
183206
metrics *parquetQueryableFallbackMetrics
207+
208+
limits *validation.Overrides
209+
logger log.Logger
184210
}
185211

186212
func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
@@ -275,6 +301,18 @@ func (q *parquetQuerierWithFallback) LabelNames(ctx context.Context, hints *stor
275301
}
276302

277303
func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
304+
userID, err := tenant.TenantID(ctx)
305+
if err != nil {
306+
storage.ErrSeriesSet(err)
307+
}
308+
309+
if q.limits.QueryVerticalShardSize(userID) > 1 {
310+
uLogger := util_log.WithUserID(userID, q.logger)
311+
level.Warn(uLogger).Log("msg", "parquet queryable enabled but vertical sharding > 1. Falling back to the block storage")
312+
313+
return q.blocksStoreQuerier.Select(ctx, sortSeries, hints, matchers...)
314+
}
315+
278316
mint, maxt, limit := q.minT, q.maxT, 0
279317

280318
if hints != nil {
@@ -288,6 +326,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
288326

289327
serieSets := []storage.SeriesSet{}
290328

329+
// Lets sort the series to merge
330+
if len(parquet) > 0 && len(remaining) > 0 {
331+
sortSeries = true
332+
}
333+
291334
if len(parquet) > 0 {
292335
serieSets = append(serieSets, q.parquetQuerier.Select(InjectBlocksIntoContext(ctx, parquet...), sortSeries, hints, matchers...))
293336
}

0 commit comments

Comments
 (0)