Skip to content

Commit 0ccb743

Browse files
committed
Evaluate compact mark lazily
Signed-off-by: SungJin1212 <[email protected]>
1 parent 528d1af commit 0ccb743

File tree

4 files changed

+63
-24
lines changed

4 files changed

+63
-24
lines changed

pkg/compactor/blocks_cleaner.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -767,7 +767,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
767767
return nil
768768
}
769769

770-
func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool, idx *bucketindex.Index, partials, totalBlocksBlocksMarkedForNoCompaction float64, noCompactMarkCheckFunc func(blockID ulid.ULID) bool) {
770+
func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool, idx *bucketindex.Index, partials, totalBlocksBlocksMarkedForNoCompaction float64, noCompactMarkCheckFunc cortex_parquet.NoCompactMarkCheckFunc) {
771771
c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks)))
772772
c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks)))
773773
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(totalBlocksBlocksMarkedForNoCompaction)
@@ -777,7 +777,7 @@ func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool,
777777
c.tenantParquetBlocks.WithLabelValues(userID).Set(float64(len(idx.ParquetBlocks())))
778778
remainingBlocksToConvert := 0
779779
for _, b := range idx.NonParquetBlocks() {
780-
if cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.BlockRanges, noCompactMarkCheckFunc(b.ID)) {
780+
if cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.BlockRanges, b.ID, noCompactMarkCheckFunc) {
781781
remainingBlocksToConvert++
782782
}
783783
}

pkg/parquetconverter/converter.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/go-kit/log"
1717
"github.com/go-kit/log/level"
18+
"github.com/oklog/ulid/v2"
1819
"github.com/parquet-go/parquet-go"
1920
"github.com/pkg/errors"
2021
"github.com/prometheus-community/parquet-common/convert"
@@ -50,6 +51,8 @@ const (
5051

5152
var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
5253

54+
type NoCompactMarkCheckFunc = func(bId ulid.ULID) bool
55+
5356
type Config struct {
5457
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
5558
ConversionInterval time.Duration `yaml:"conversion_interval"`
@@ -385,8 +388,11 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
385388
continue
386389
}
387390

388-
noCompactMarkExist := cortex_parquet.ExistBlockNoCompact(ctx, uBucket, logger, b.ULID)
389-
if !cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.blockRanges, noCompactMarkExist) {
391+
noCompactMarkCheckFunc := func(bId ulid.ULID) bool {
392+
return cortex_parquet.ExistBlockNoCompact(ctx, uBucket, logger, b.ULID)
393+
}
394+
395+
if !cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.blockRanges, b.ULID, noCompactMarkCheckFunc) {
390396
continue
391397
}
392398

pkg/storage/parquet/util.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
package parquet
22

3-
func ShouldConvertBlockToParquet(mint, maxt int64, timeRanges []int64, noCompactMarkExist bool) bool {
3+
import (
4+
"github.com/oklog/ulid/v2"
5+
)
6+
7+
type NoCompactMarkCheckFunc = func(bId ulid.ULID) bool
8+
9+
func ShouldConvertBlockToParquet(mint, maxt int64, timeRanges []int64, bId ulid.ULID, checkFunc NoCompactMarkCheckFunc) bool {
410
// We assume timeRanges[0] is the TSDB block duration (2h), and we don't convert them.
511
blockTimeRange := getBlockTimeRange(mint, maxt, timeRanges)
612
if blockTimeRange > timeRanges[0] {
713
return true
814
}
915

10-
if blockTimeRange == timeRanges[0] && noCompactMarkExist {
16+
if blockTimeRange == timeRanges[0] && checkFunc(bId) {
1117
return true
1218
}
1319
return false

pkg/storage/parquet/util_test.go

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,80 +4,107 @@ import (
44
"testing"
55
"time"
66

7+
"crypto/rand"
8+
9+
"github.com/oklog/ulid/v2"
710
"github.com/stretchr/testify/require"
811

912
"github.com/cortexproject/cortex/pkg/storage/tsdb"
1013
)
1114

1215
func TestShouldConvertBlockToParquet(t *testing.T) {
1316
for _, tc := range []struct {
14-
name string
15-
mint, maxt int64
16-
durations tsdb.DurationList
17-
expected bool
18-
noCompactMarkExist bool
17+
name string
18+
mint, maxt int64
19+
durations tsdb.DurationList
20+
expected bool
21+
checkFunc NoCompactMarkCheckFunc
1922
}{
2023
{
2124
name: "2h block. Don't convert",
2225
mint: 0,
2326
maxt: 2 * time.Hour.Milliseconds(),
2427
durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour},
2528
expected: false,
29+
checkFunc: func(bId ulid.ULID) bool {
30+
return false
31+
},
2632
},
2733
{
2834
name: "1h block. Don't convert",
2935
mint: 0,
3036
maxt: 1 * time.Hour.Milliseconds(),
3137
durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour},
3238
expected: false,
39+
checkFunc: func(bId ulid.ULID) bool {
40+
return false
41+
},
3342
},
3443
{
35-
name: "2h block. Exist NoCompactMark. Convert",
36-
mint: 0,
37-
maxt: 2 * time.Hour.Milliseconds(),
38-
durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour},
39-
expected: true,
40-
noCompactMarkExist: true,
44+
name: "2h block. Exist NoCompactMark. Convert",
45+
mint: 0,
46+
maxt: 2 * time.Hour.Milliseconds(),
47+
durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour},
48+
expected: true,
49+
checkFunc: func(bId ulid.ULID) bool {
50+
return true
51+
},
4152
},
4253
{
43-
name: "1h block. Exist NoCompactMark. Convert",
44-
mint: 0,
45-
maxt: 1 * time.Hour.Milliseconds(),
46-
durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour},
47-
expected: true,
48-
noCompactMarkExist: true,
54+
name: "1h block. Exist NoCompactMark. Convert",
55+
mint: 0,
56+
maxt: 1 * time.Hour.Milliseconds(),
57+
durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour},
58+
expected: true,
59+
checkFunc: func(bId ulid.ULID) bool {
60+
return true
61+
},
4962
},
5063
{
5164
name: "3h block. Convert",
5265
mint: 0,
5366
maxt: 3 * time.Hour.Milliseconds(),
5467
durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour},
5568
expected: true,
69+
checkFunc: func(bId ulid.ULID) bool {
70+
return false
71+
},
5672
},
5773
{
5874
name: "12h block. Convert",
5975
mint: 0,
6076
maxt: 12 * time.Hour.Milliseconds(),
6177
durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour},
6278
expected: true,
79+
checkFunc: func(bId ulid.ULID) bool {
80+
return false
81+
},
6382
},
6483
{
6584
name: "12h block with 1h offset. Convert",
6685
mint: time.Hour.Milliseconds(),
6786
maxt: 13 * time.Hour.Milliseconds(),
6887
durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour},
6988
expected: true,
89+
checkFunc: func(bId ulid.ULID) bool {
90+
return false
91+
},
7092
},
7193
{
7294
name: "24h block. Convert",
7395
mint: 0,
7496
maxt: 24 * time.Hour.Milliseconds(),
7597
durations: tsdb.DurationList{2 * time.Hour, 12 * time.Hour},
7698
expected: true,
99+
checkFunc: func(bId ulid.ULID) bool {
100+
return false
101+
},
77102
},
78103
} {
79104
t.Run(tc.name, func(t *testing.T) {
80-
res := ShouldConvertBlockToParquet(tc.mint, tc.maxt, (&tc.durations).ToMilliseconds(), tc.noCompactMarkExist)
105+
id, err := ulid.New(ulid.Now(), rand.Reader)
106+
require.NoError(t, err)
107+
res := ShouldConvertBlockToParquet(tc.mint, tc.maxt, (&tc.durations).ToMilliseconds(), id, tc.checkFunc)
81108
require.Equal(t, tc.expected, res)
82109
})
83110
}

0 commit comments

Comments
 (0)