Skip to content

Commit 506be73

Browse files
authored
Add cleaner and converter metrics for parquet (#6809)
* add cleaner and converter metrics for parquet Signed-off-by: yeya24 <[email protected]> * changelog Signed-off-by: yeya24 <[email protected]> --------- Signed-off-by: yeya24 <[email protected]>
1 parent 648356c commit 506be73

File tree

8 files changed

+234
-10
lines changed

8 files changed

+234
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778
4040
* [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780
4141
* [ENHANCEMENT] Querier: Support chunks cache for parquet queryable. #6805
42+
* [ENHANCEMENT] Parquet Storage: Add some metrics for parquet blocks and converter. #6809
4243
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
4344
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
4445
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

pkg/compactor/blocks_cleaner.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ type BlocksCleaner struct {
7272
blocksFailedTotal prometheus.Counter
7373
blocksMarkedForDeletion *prometheus.CounterVec
7474
tenantBlocks *prometheus.GaugeVec
75+
tenantParquetBlocks *prometheus.GaugeVec
7576
tenantBlocksMarkedForDelete *prometheus.GaugeVec
7677
tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec
7778
tenantPartialBlocks *prometheus.GaugeVec
@@ -154,6 +155,10 @@ func NewBlocksCleaner(
154155
Name: "cortex_bucket_blocks_count",
155156
Help: "Total number of blocks in the bucket. Includes blocks marked for deletion, but not partial blocks.",
156157
}, commonLabels),
158+
tenantParquetBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
159+
Name: "cortex_bucket_parquet_blocks_count",
160+
Help: "Total number of parquet blocks in the bucket. Blocks marked for deletion are included.",
161+
}, commonLabels),
157162
tenantBlocksMarkedForDelete: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
158163
Name: "cortex_bucket_blocks_marked_for_deletion_count",
159164
Help: "Total number of blocks marked for deletion in the bucket.",
@@ -354,6 +359,7 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro
354359
for _, userID := range c.lastOwnedUsers {
355360
if !isActive[userID] && !isMarkedForDeletion[userID] {
356361
c.tenantBlocks.DeleteLabelValues(userID)
362+
c.tenantParquetBlocks.DeleteLabelValues(userID)
357363
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
358364
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
359365
c.tenantPartialBlocks.DeleteLabelValues(userID)
@@ -451,6 +457,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
451457

452458
// Given all blocks have been deleted, we can also remove the metrics.
453459
c.tenantBlocks.DeleteLabelValues(userID)
460+
c.tenantParquetBlocks.DeleteLabelValues(userID)
454461
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
455462
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
456463
c.tenantPartialBlocks.DeleteLabelValues(userID)
@@ -602,7 +609,8 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
602609
begin = time.Now()
603610
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger)
604611

605-
if c.cfgProvider.ParquetConverterEnabled(userID) {
612+
parquetEnabled := c.cfgProvider.ParquetConverterEnabled(userID)
613+
if parquetEnabled {
606614
w.EnableParquet()
607615
}
608616

@@ -676,6 +684,9 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
676684
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction))
677685
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))
678686
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
687+
if parquetEnabled {
688+
c.tenantParquetBlocks.WithLabelValues(userID).Set(float64(len(idx.ParquetBlocks())))
689+
}
679690

680691
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
681692
begin = time.Now()

pkg/compactor/blocks_cleaner_test.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
163163
createBlockVisitMarker(t, bucketClient, "user-1", block11) // Partial block only has visit marker.
164164
createDeletionMark(t, bucketClient, "user-2", block7, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold.
165165

166-
// Blocks for user-3, marked for deletion.
166+
// Blocks for user-3, tenant marked for deletion.
167167
require.NoError(t, tsdb.WriteTenantDeletionMark(context.Background(), bucketClient, "user-3", tsdb.NewTenantDeletionMark(time.Now())))
168168
block9 := createTSDBBlock(t, bucketClient, "user-3", 10, 30, nil)
169169
block10 := createTSDBBlock(t, bucketClient, "user-3", 30, 50, nil)
@@ -206,6 +206,11 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
206206
}, bucketClient, logger, reg)
207207
require.NoError(t, err)
208208
cfgProvider := newMockConfigProvider()
209+
cfgProvider.parquetConverterEnabled = map[string]bool{
210+
"user-3": true,
211+
"user-5": true,
212+
"user-6": true,
213+
}
209214
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
210215
Name: blocksMarkedForDeletionName,
211216
Help: blocksMarkedForDeletionHelp,
@@ -333,8 +338,13 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
333338
cortex_bucket_blocks_partials_count{user="user-2"} 0
334339
cortex_bucket_blocks_partials_count{user="user-5"} 0
335340
cortex_bucket_blocks_partials_count{user="user-6"} 0
341+
# HELP cortex_bucket_parquet_blocks_count Total number of parquet blocks in the bucket. Blocks marked for deletion are included.
342+
# TYPE cortex_bucket_parquet_blocks_count gauge
343+
cortex_bucket_parquet_blocks_count{user="user-5"} 0
344+
cortex_bucket_parquet_blocks_count{user="user-6"} 1
336345
`),
337346
"cortex_bucket_blocks_count",
347+
"cortex_bucket_parquet_blocks_count",
338348
"cortex_bucket_blocks_marked_for_deletion_count",
339349
"cortex_bucket_blocks_marked_for_no_compaction_count",
340350
"cortex_bucket_blocks_partials_count",
@@ -1013,16 +1023,21 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
10131023
}
10141024

10151025
type mockConfigProvider struct {
1016-
userRetentionPeriods map[string]time.Duration
1026+
userRetentionPeriods map[string]time.Duration
1027+
parquetConverterEnabled map[string]bool
10171028
}
10181029

10191030
func (m *mockConfigProvider) ParquetConverterEnabled(userID string) bool {
1031+
if result, ok := m.parquetConverterEnabled[userID]; ok {
1032+
return result
1033+
}
10201034
return false
10211035
}
10221036

10231037
func newMockConfigProvider() *mockConfigProvider {
10241038
return &mockConfigProvider{
1025-
userRetentionPeriods: make(map[string]time.Duration),
1039+
userRetentionPeriods: make(map[string]time.Duration),
1040+
parquetConverterEnabled: make(map[string]bool),
10261041
}
10271042
}
10281043

pkg/parquetconverter/converter.go

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@ type Converter struct {
8888
fetcherMetrics *block.FetcherMetrics
8989

9090
baseConverterOptions []convert.ConvertOption
91+
92+
metrics *metrics
93+
94+
// Keep track of the last owned users.
95+
// This is not thread safe now.
96+
lastOwnedUsers map[string]struct{}
9197
}
9298

9399
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
@@ -125,6 +131,7 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex
125131
pool: chunkenc.NewPool(),
126132
blockRanges: blockRanges,
127133
fetcherMetrics: block.NewFetcherMetrics(registerer, nil, nil),
134+
metrics: newMetrics(registerer),
128135
bkt: bkt,
129136
baseConverterOptions: []convert.ConvertOption{
130137
convert.WithSortBy(labels.MetricName),
@@ -194,6 +201,9 @@ func (c *Converter) running(ctx context.Context) error {
194201

195202
for _, userID := range users {
196203
if !c.limits.ParquetConverterEnabled(userID) {
204+
// It is possible that parquet is disabled for the userID so we
205+
// need to check if the user was owned last time.
206+
c.cleanupMetricsForNotOwnedUser(userID)
197207
continue
198208
}
199209

@@ -211,13 +221,15 @@ func (c *Converter) running(ctx context.Context) error {
211221
continue
212222
}
213223
if !owned {
224+
c.cleanupMetricsForNotOwnedUser(userID)
214225
continue
215226
}
216227

217228
if markedForDeletion, err := cortex_tsdb.TenantDeletionMarkExists(ctx, c.bkt, userID); err != nil {
218229
level.Warn(userLogger).Log("msg", "unable to check if user is marked for deletion", "user", userID, "err", err)
219230
continue
220231
} else if markedForDeletion {
232+
c.metrics.deleteMetricsForTenant(userID)
221233
level.Info(userLogger).Log("msg", "skipping user because it is marked for deletion", "user", userID)
222234
continue
223235
}
@@ -229,6 +241,8 @@ func (c *Converter) running(ctx context.Context) error {
229241
level.Error(userLogger).Log("msg", "failed to convert user", "err", err)
230242
}
231243
}
244+
c.lastOwnedUsers = ownedUsers
245+
c.metrics.ownedUsers.Set(float64(len(ownedUsers)))
232246

233247
// Delete local files for unowned tenants, if there are any. This cleans up
234248
// leftover local files for tenants that belong to different converter now,
@@ -269,8 +283,15 @@ func (c *Converter) stopping(_ error) error {
269283
}
270284

271285
func (c *Converter) discoverUsers(ctx context.Context) ([]string, error) {
272-
// Only active users are considered.
273-
active, _, _, err := c.usersScanner.ScanUsers(ctx)
286+
// Only active users are considered for conversion.
287+
// We still check deleting and deleted users just to clean up metrics.
288+
active, deleting, deleted, err := c.usersScanner.ScanUsers(ctx)
289+
for _, userID := range deleting {
290+
c.cleanupMetricsForNotOwnedUser(userID)
291+
}
292+
for _, userID := range deleted {
293+
c.cleanupMetricsForNotOwnedUser(userID)
294+
}
274295
return active, err
275296
}
276297

@@ -378,6 +399,7 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
378399
}
379400

380401
level.Info(logger).Log("msg", "converting block", "block", b.ULID.String(), "dir", bdir)
402+
start := time.Now()
381403

382404
converterOpts := append(c.baseConverterOptions, convert.WithName(b.ULID.String()))
383405

@@ -397,14 +419,18 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
397419
_ = tsdbBlock.Close()
398420

399421
if err != nil {
400-
level.Error(logger).Log("msg", "Error converting block", "err", err)
422+
level.Error(logger).Log("msg", "Error converting block", "block", b.ULID.String(), "err", err)
401423
continue
402424
}
425+
duration := time.Since(start)
426+
c.metrics.convertBlockDuration.WithLabelValues(userID).Set(duration.Seconds())
427+
level.Info(logger).Log("msg", "successfully converted block", "block", b.ULID.String(), "duration", duration)
403428

404-
err = cortex_parquet.WriteConverterMark(ctx, b.ULID, uBucket)
405-
if err != nil {
406-
level.Error(logger).Log("msg", "Error writing block", "err", err)
429+
if err = cortex_parquet.WriteConverterMark(ctx, b.ULID, uBucket); err != nil {
430+
level.Error(logger).Log("msg", "Error writing block", "block", b.ULID.String(), "err", err)
431+
continue
407432
}
433+
c.metrics.convertedBlocks.WithLabelValues(userID).Inc()
408434
}
409435

410436
return nil
@@ -442,6 +468,12 @@ func (c *Converter) ownBlock(ring ring.ReadRing, blockId string) (bool, error) {
442468
return rs.Instances[0].Addr == c.ringLifecycler.Addr, nil
443469
}
444470

471+
func (c *Converter) cleanupMetricsForNotOwnedUser(userID string) {
472+
if _, ok := c.lastOwnedUsers[userID]; ok {
473+
c.metrics.deleteMetricsForTenant(userID)
474+
}
475+
}
476+
445477
func (c *Converter) compactRootDir() string {
446478
return filepath.Join(c.cfg.DataDir, "compact")
447479
}

pkg/parquetconverter/converter_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/go-kit/log"
1111
"github.com/oklog/ulid"
1212
"github.com/prometheus/client_golang/prometheus"
13+
"github.com/prometheus/client_golang/prometheus/testutil"
1314
"github.com/prometheus/prometheus/model/labels"
1415
"github.com/prometheus/prometheus/tsdb"
1516
"github.com/stretchr/testify/assert"
@@ -96,6 +97,11 @@ func TestConverter(t *testing.T) {
9697
return len(blocksConverted)
9798
})
9899

100+
// Verify metrics after conversion
101+
require.Equal(t, float64(len(blocksConverted)), testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user)))
102+
require.Greater(t, testutil.ToFloat64(c.metrics.convertBlockDuration.WithLabelValues(user)), 0.0)
103+
require.Equal(t, 1.0, testutil.ToFloat64(c.metrics.ownedUsers))
104+
99105
// Verify all files are there
100106
for _, block := range blocksConverted {
101107
for _, file := range []string{
@@ -121,6 +127,20 @@ func TestConverter(t *testing.T) {
121127
test.Poll(t, time.Minute, 0, func() interface{} {
122128
return len(c.listTenantsWithMetaSyncDirectories())
123129
})
130+
131+
// Verify metrics after user deletion
132+
test.Poll(t, time.Minute*10, true, func() interface{} {
133+
if testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user)) != 0.0 {
134+
return false
135+
}
136+
if testutil.ToFloat64(c.metrics.convertBlockDuration.WithLabelValues(user)) != 0.0 {
137+
return false
138+
}
139+
if testutil.ToFloat64(c.metrics.ownedUsers) != 0.0 {
140+
return false
141+
}
142+
return true
143+
})
124144
}
125145

126146
func prepareConfig() Config {
@@ -159,3 +179,42 @@ func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket,
159179
c := newConverter(cfg, bucketClient, storageCfg, blockRanges.ToMilliseconds(), logger, registry, overrides, scanner)
160180
return c, logger, registry
161181
}
182+
183+
func TestConverter_CleanupMetricsForNotOwnedUser(t *testing.T) {
184+
// Create a new registry for testing
185+
reg := prometheus.NewRegistry()
186+
187+
// Create a new converter with test configuration
188+
cfg := Config{}
189+
storageCfg := cortex_tsdb.BlocksStorageConfig{}
190+
limits := &validation.Overrides{}
191+
converter := newConverter(cfg, nil, storageCfg, []int64{7200000}, nil, reg, limits, nil)
192+
193+
// Add some test metrics for a user
194+
userID := "test-user"
195+
converter.metrics.convertedBlocks.WithLabelValues(userID).Inc()
196+
converter.metrics.convertBlockDuration.WithLabelValues(userID).Set(1.0)
197+
198+
// Verify metrics exist before cleanup
199+
assert.Equal(t, 1.0, testutil.ToFloat64(converter.metrics.convertedBlocks.WithLabelValues(userID)))
200+
assert.Equal(t, 1.0, testutil.ToFloat64(converter.metrics.convertBlockDuration.WithLabelValues(userID)))
201+
202+
// Set lastOwnedUsers to empty (user was never owned)
203+
converter.lastOwnedUsers = map[string]struct{}{}
204+
// Clean up metrics for the user will do nothing as the user was never owned
205+
converter.cleanupMetricsForNotOwnedUser(userID)
206+
assert.Equal(t, 1.0, testutil.ToFloat64(converter.metrics.convertedBlocks.WithLabelValues(userID)))
207+
assert.Equal(t, 1.0, testutil.ToFloat64(converter.metrics.convertBlockDuration.WithLabelValues(userID)))
208+
209+
// Mark the user as previously owned
210+
converter.lastOwnedUsers = map[string]struct{}{
211+
userID: {},
212+
}
213+
214+
// Clean up metrics for the user
215+
converter.cleanupMetricsForNotOwnedUser(userID)
216+
217+
// Verify metrics are deleted
218+
assert.Equal(t, 0.0, testutil.ToFloat64(converter.metrics.convertedBlocks.WithLabelValues(userID)))
219+
assert.Equal(t, 0.0, testutil.ToFloat64(converter.metrics.convertBlockDuration.WithLabelValues(userID)))
220+
}

pkg/parquetconverter/metrics.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package parquetconverter
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
"github.com/prometheus/client_golang/prometheus/promauto"
6+
)
7+
8+
type metrics struct {
9+
convertedBlocks *prometheus.CounterVec
10+
convertBlockDuration *prometheus.GaugeVec
11+
ownedUsers prometheus.Gauge
12+
}
13+
14+
func newMetrics(reg prometheus.Registerer) *metrics {
15+
return &metrics{
16+
convertedBlocks: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
17+
Name: "cortex_parquet_converter_converted_blocks_total",
18+
Help: "Total number of converted blocks per user.",
19+
}, []string{"user"}),
20+
convertBlockDuration: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
21+
Name: "cortex_parquet_converter_convert_block_duration_seconds",
22+
Help: "Time taken to for the latest block conversion for the user.",
23+
}, []string{"user"}),
24+
ownedUsers: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
25+
Name: "cortex_parquet_converter_users_owned",
26+
Help: "Number of users that the parquet converter owns.",
27+
}),
28+
}
29+
}
30+
31+
func (m *metrics) deleteMetricsForTenant(userID string) {
32+
m.convertedBlocks.DeleteLabelValues(userID)
33+
m.convertBlockDuration.DeleteLabelValues(userID)
34+
}

pkg/storage/tsdb/bucketindex/index.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,16 @@ func (idx *Index) IsEmpty() bool {
7070
return len(idx.Blocks) == 0 && len(idx.BlockDeletionMarks) == 0
7171
}
7272

73+
func (idx *Index) ParquetBlocks() []*Block {
74+
blocks := make([]*Block, 0, len(idx.Blocks))
75+
for _, b := range idx.Blocks {
76+
if b.Parquet != nil {
77+
blocks = append(blocks, b)
78+
}
79+
}
80+
return blocks
81+
}
82+
7383
// Block holds the information about a block in the index.
7484
type Block struct {
7585
// Block ID.

0 commit comments

Comments
 (0)