Skip to content

Commit 9daf065

Browse files
committed
Add NoCompactMarkCheckAfter config
Signed-off-by: SungJin1212 <[email protected]>
1 parent 0ccb743 commit 9daf065

File tree

6 files changed

+41
-15
lines changed

6 files changed

+41
-15
lines changed

pkg/compactor/blocks_cleaner.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type BlocksCleanerConfig struct {
4545
ShardingStrategy string
4646
CompactionStrategy string
4747
BlockRanges []int64
48+
NoCompactMarkCheckAfter time.Duration
4849
}
4950

5051
type BlocksCleaner struct {
@@ -777,7 +778,7 @@ func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool,
777778
c.tenantParquetBlocks.WithLabelValues(userID).Set(float64(len(idx.ParquetBlocks())))
778779
remainingBlocksToConvert := 0
779780
for _, b := range idx.NonParquetBlocks() {
780-
if cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.BlockRanges, b.ID, noCompactMarkCheckFunc) {
781+
if cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.NoCompactMarkCheckAfter.Milliseconds(), c.cfg.BlockRanges, b.ID, noCompactMarkCheckFunc) {
781782
remainingBlocksToConvert++
782783
}
783784
}

pkg/compactor/compactor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,9 @@ type Config struct {
304304
AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
305305
CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
306306
CleanerCachingBucketEnabled bool `yaml:"cleaner_caching_bucket_enabled"`
307+
308+
// Injected internally
309+
NoCompactMarkCheckAfter time.Duration `yaml:"-"`
307310
}
308311

309312
// RegisterFlags registers the Compactor flags.
@@ -753,6 +756,7 @@ func (c *Compactor) starting(ctx context.Context) error {
753756
ShardingStrategy: c.compactorCfg.ShardingStrategy,
754757
CompactionStrategy: c.compactorCfg.CompactionStrategy,
755758
BlockRanges: c.compactorCfg.BlockRanges.ToMilliseconds(),
759+
NoCompactMarkCheckAfter: c.compactorCfg.NoCompactMarkCheckAfter,
756760
}, cleanerBucketClient, cleanerUsersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
757761
c.compactorMetrics.syncerBlocksMarkedForDeletion, c.compactorMetrics.remainingPlannedCompactions)
758762

pkg/cortex/modules.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,7 @@ func (t *Cortex) initParquetConverter() (serv services.Service, err error) {
737737
func (t *Cortex) initCompactor() (serv services.Service, err error) {
738738
t.Cfg.Compactor.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort
739739
ingestionReplicationFactor := t.Cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor
740+
t.Cfg.Compactor.NoCompactMarkCheckAfter = t.Cfg.ParquetConverter.NoCompactMarkCheckAfter
740741

741742
t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides, ingestionReplicationFactor)
742743
if err != nil {

pkg/parquetconverter/converter.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,11 @@ var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
5454
type NoCompactMarkCheckFunc = func(bId ulid.ULID) bool
5555

5656
type Config struct {
57-
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
58-
ConversionInterval time.Duration `yaml:"conversion_interval"`
59-
MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"`
60-
FileBufferEnabled bool `yaml:"file_buffer_enabled"`
57+
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
58+
ConversionInterval time.Duration `yaml:"conversion_interval"`
59+
MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"`
60+
FileBufferEnabled bool `yaml:"file_buffer_enabled"`
61+
NoCompactMarkCheckAfter time.Duration `yaml:"no_compact_mark_check_after"`
6162

6263
DataDir string `yaml:"data_dir"`
6364

@@ -109,6 +110,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
109110
f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Max number of rows per parquet row group.")
110111
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "The frequency at which the conversion job runs.")
111112
f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Whether to enable buffering the writes in disk to reduce memory utilization.")
113+
f.DurationVar(&cfg.NoCompactMarkCheckAfter, "parquet-converter.no-compact-mark-check-after", time.Hour*13, "The time after which a `no-compact-mark.json` file should be checked.")
112114
}
113115

114116
func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) {
@@ -392,7 +394,7 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
392394
return cortex_parquet.ExistBlockNoCompact(ctx, uBucket, logger, b.ULID)
393395
}
394396

395-
if !cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.blockRanges, b.ULID, noCompactMarkCheckFunc) {
397+
if !cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.NoCompactMarkCheckAfter.Milliseconds(), c.blockRanges, b.ULID, noCompactMarkCheckFunc) {
396398
continue
397399
}
398400

pkg/storage/parquet/util.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,21 @@ import (
66

77
type NoCompactMarkCheckFunc = func(bId ulid.ULID) bool
88

9-
func ShouldConvertBlockToParquet(mint, maxt int64, timeRanges []int64, bId ulid.ULID, checkFunc NoCompactMarkCheckFunc) bool {
9+
func ShouldConvertBlockToParquet(mint, maxt, noCompactMarkCheckAfter int64, timeRanges []int64, bId ulid.ULID, checkFunc NoCompactMarkCheckFunc) bool {
1010
// We assume timeRanges[0] is the TSDB block duration (2h), and we don't convert them.
1111
blockTimeRange := getBlockTimeRange(mint, maxt, timeRanges)
1212
if blockTimeRange > timeRanges[0] {
1313
return true
1414
}
1515

16-
if blockTimeRange == timeRanges[0] && checkFunc(bId) {
17-
return true
16+
// We should check if 2h blocks have a `no-compact-mark.json` file
17+
// since these will never be compacted to 12h block.
18+
// We check if the `no-compact-mark.json` file exists only for blocks
19+
// after the noCompactMarkCheckAfter to reduce calls to `checkFunc`.
20+
if mint >= noCompactMarkCheckAfter {
21+
if blockTimeRange == timeRanges[0] && checkFunc(bId) {
22+
return true
23+
}
1824
}
1925
return false
2026
}

pkg/storage/parquet/util_test.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@ import (
1414

1515
func TestShouldConvertBlockToParquet(t *testing.T) {
1616
for _, tc := range []struct {
17-
name string
18-
mint, maxt int64
19-
durations tsdb.DurationList
20-
expected bool
21-
checkFunc NoCompactMarkCheckFunc
17+
name string
18+
mint, maxt int64
19+
noCompactMarkCheckAfter int64
20+
durations tsdb.DurationList
21+
expected bool
22+
checkFunc NoCompactMarkCheckFunc
2223
}{
2324
{
2425
name: "2h block. Don't convert",
@@ -50,6 +51,17 @@ func TestShouldConvertBlockToParquet(t *testing.T) {
5051
return true
5152
},
5253
},
54+
{
55+
name: "2h block. Exist NoCompactMark. noCompactMarkCheckAfter is one hour. Not Convert",
56+
mint: 0,
57+
maxt: 2 * time.Hour.Milliseconds(),
58+
noCompactMarkCheckAfter: 1 * time.Hour.Milliseconds(),
59+
durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour},
60+
expected: false,
61+
checkFunc: func(bId ulid.ULID) bool {
62+
return true
63+
},
64+
},
5365
{
5466
name: "1h block. Exist NoCompactMark. Convert",
5567
mint: 0,
@@ -104,7 +116,7 @@ func TestShouldConvertBlockToParquet(t *testing.T) {
104116
t.Run(tc.name, func(t *testing.T) {
105117
id, err := ulid.New(ulid.Now(), rand.Reader)
106118
require.NoError(t, err)
107-
res := ShouldConvertBlockToParquet(tc.mint, tc.maxt, (&tc.durations).ToMilliseconds(), id, tc.checkFunc)
119+
res := ShouldConvertBlockToParquet(tc.mint, tc.maxt, tc.noCompactMarkCheckAfter, (&tc.durations).ToMilliseconds(), id, tc.checkFunc)
108120
require.Equal(t, tc.expected, res)
109121
})
110122
}

0 commit comments

Comments
 (0)