Skip to content

Commit 68a2993

Browse files
authored
Implement partition compaction planner (#6469)
* Implement partition compaction grouper Signed-off-by: Alex Le <[email protected]> * fix comment Signed-off-by: Alex Le <[email protected]> * replace level 1 compaction limits with ingestion replication factor Signed-off-by: Alex Le <[email protected]> * fix doc Signed-off-by: Alex Le <[email protected]> * update compaction_visit_marker_timeout default value Signed-off-by: Alex Le <[email protected]> * update default value for compactor_partition_index_size_limit_in_bytes Signed-off-by: Alex Le <[email protected]> * refactor code Signed-off-by: Alex Le <[email protected]> * address comments and refactor Signed-off-by: Alex Le <[email protected]> * address comment Signed-off-by: Alex Le <[email protected]> * address comment Signed-off-by: Alex Le <[email protected]> * update config name Signed-off-by: Alex Le <[email protected]> * Implement partition compaction planner Signed-off-by: Alex Le <[email protected]> * fix after rebase Signed-off-by: Alex Le <[email protected]> * addressed comments Signed-off-by: Alex Le <[email protected]> * updated doc and refactored metric Signed-off-by: Alex Le <[email protected]> * fix test Signed-off-by: Alex Le <[email protected]> --------- Signed-off-by: Alex Le <[email protected]>
1 parent d5e9b5f commit 68a2993

File tree

7 files changed

+497
-11
lines changed

7 files changed

+497
-11
lines changed

docs/blocks-storage/compactor.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,12 @@ compactor:
285285
# CLI flag: -compactor.ring.wait-active-instance-timeout
286286
[wait_active_instance_timeout: <duration> | default = 10m]
287287

288+
# How long shuffle sharding planner would wait before running planning code.
289+
# This delay would prevent double compaction when two compactors claimed same
290+
# partition in grouper at same time.
291+
# CLI flag: -compactor.sharding-planner-delay
292+
[sharding_planner_delay: <duration> | default = 10s]
293+
288294
# The compaction strategy to use. Supported values are: default, partitioning.
289295
# CLI flag: -compactor.compaction-strategy
290296
[compaction_strategy: <string> | default = "default"]

docs/configuration/config-file-reference.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2334,6 +2334,12 @@ sharding_ring:
23342334
# CLI flag: -compactor.ring.wait-active-instance-timeout
23352335
[wait_active_instance_timeout: <duration> | default = 10m]
23362336
2337+
# How long shuffle sharding planner would wait before running planning code.
2338+
# This delay would prevent double compaction when two compactors claimed same
2339+
# partition in grouper at same time.
2340+
# CLI flag: -compactor.sharding-planner-delay
2341+
[sharding_planner_delay: <duration> | default = 10s]
2342+
23372343
# The compaction strategy to use. Supported values are: default, partitioning.
23382344
# CLI flag: -compactor.compaction-strategy
23392345
[compaction_strategy: <string> | default = "default"]

pkg/compactor/compactor.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ var (
153153
plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner {
154154

155155
if cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
156-
return NewPartitionCompactionPlanner(ctx, bkt, logger)
156+
return NewPartitionCompactionPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, userID, cfg.ShardingPlannerDelay, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, compactorMetrics)
157157
} else {
158158
return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
159159
}
@@ -234,9 +234,10 @@ type Config struct {
234234
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
235235

236236
// Compactors sharding.
237-
ShardingEnabled bool `yaml:"sharding_enabled"`
238-
ShardingStrategy string `yaml:"sharding_strategy"`
239-
ShardingRing RingConfig `yaml:"sharding_ring"`
237+
ShardingEnabled bool `yaml:"sharding_enabled"`
238+
ShardingStrategy string `yaml:"sharding_strategy"`
239+
ShardingRing RingConfig `yaml:"sharding_ring"`
240+
ShardingPlannerDelay time.Duration `yaml:"sharding_planner_delay"`
240241

241242
// Compaction strategy.
242243
CompactionStrategy string `yaml:"compaction_strategy"`
@@ -304,6 +305,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
304305

305306
f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.")
306307
f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status")
308+
309+
f.DurationVar(&cfg.ShardingPlannerDelay, "compactor.sharding-planner-delay", 10*time.Second, "How long shuffle sharding planner would wait before running planning code. This delay would prevent double compaction when two compactors claimed same partition in grouper at same time.")
307310
}
308311

309312
func (cfg *Config) Validate(limits validation.Limits) error {

pkg/compactor/compactor_metrics.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type compactorMetrics struct {
3939
remainingPlannedCompactions *prometheus.GaugeVec
4040
compactionErrorsCount *prometheus.CounterVec
4141
partitionCount *prometheus.GaugeVec
42+
compactionsNotPlanned *prometheus.CounterVec
4243
}
4344

4445
const (
@@ -174,6 +175,10 @@ func newCompactorMetricsWithLabels(reg prometheus.Registerer, commonLabels []str
174175
Name: "cortex_compactor_group_partition_count",
175176
Help: "Number of partitions for each compaction group.",
176177
}, compactionLabels)
178+
m.compactionsNotPlanned = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
179+
Name: "cortex_compactor_group_compactions_not_planned_total",
180+
Help: "Total number of group compaction not planned due to error.",
181+
}, compactionLabels)
177182

178183
return &m
179184
}
@@ -225,6 +230,7 @@ func (m *compactorMetrics) initMetricWithCompactionLabelValues(labelValue ...str
225230
m.compactionFailures.WithLabelValues(labelValue...)
226231
m.verticalCompactions.WithLabelValues(labelValue...)
227232
m.partitionCount.WithLabelValues(labelValue...)
233+
m.compactionsNotPlanned.WithLabelValues(labelValue...)
228234
}
229235

230236
func (m *compactorMetrics) deleteMetricsForDeletedTenant(userID string) {
@@ -236,4 +242,5 @@ func (m *compactorMetrics) deleteMetricsForDeletedTenant(userID string) {
236242
m.compactionFailures.DeleteLabelValues(userID)
237243
m.verticalCompactions.DeleteLabelValues(userID)
238244
m.partitionCount.DeleteLabelValues(userID)
245+
m.compactionsNotPlanned.DeleteLabelValues(userID)
239246
}

pkg/compactor/compactor_metrics_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,11 @@ func TestSyncerMetrics(t *testing.T) {
135135
cortex_compactor_group_partition_count{user="aaa"} 511060
136136
cortex_compactor_group_partition_count{user="bbb"} 522170
137137
cortex_compactor_group_partition_count{user="ccc"} 533280
138+
# HELP cortex_compactor_group_compactions_not_planned_total Total number of group compaction not planned due to error.
139+
# TYPE cortex_compactor_group_compactions_not_planned_total counter
140+
cortex_compactor_group_compactions_not_planned_total{user="aaa"} 544390
141+
cortex_compactor_group_compactions_not_planned_total{user="bbb"} 555500
142+
cortex_compactor_group_compactions_not_planned_total{user="ccc"} 566610
138143
`))
139144
require.NoError(t, err)
140145

@@ -191,4 +196,7 @@ func generateTestData(cm *compactorMetrics, base float64) {
191196
cm.partitionCount.WithLabelValues("aaa").Add(46 * base)
192197
cm.partitionCount.WithLabelValues("bbb").Add(47 * base)
193198
cm.partitionCount.WithLabelValues("ccc").Add(48 * base)
199+
cm.compactionsNotPlanned.WithLabelValues("aaa").Add(49 * base)
200+
cm.compactionsNotPlanned.WithLabelValues("bbb").Add(50 * base)
201+
cm.compactionsNotPlanned.WithLabelValues("ccc").Add(51 * base)
194202
}

pkg/compactor/partition_compaction_planner.go

Lines changed: 125 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,148 @@ package compactor
22

33
import (
44
"context"
5+
"fmt"
6+
"time"
57

68
"github.com/go-kit/log"
9+
"github.com/go-kit/log/level"
10+
"github.com/oklog/ulid"
11+
"github.com/pkg/errors"
712
"github.com/thanos-io/objstore"
813
"github.com/thanos-io/thanos/pkg/block/metadata"
14+
15+
"github.com/cortexproject/cortex/pkg/storage/tsdb"
16+
)
17+
18+
var (
19+
plannerCompletedPartitionError = errors.New("got completed partition")
20+
plannerVisitedPartitionError = errors.New("got partition visited by other compactor")
921
)
1022

1123
type PartitionCompactionPlanner struct {
12-
ctx context.Context
13-
bkt objstore.InstrumentedBucket
14-
logger log.Logger
24+
ctx context.Context
25+
bkt objstore.InstrumentedBucket
26+
logger log.Logger
27+
ranges []int64
28+
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark
29+
ringLifecyclerID string
30+
userID string
31+
plannerDelay time.Duration
32+
partitionVisitMarkerTimeout time.Duration
33+
partitionVisitMarkerFileUpdateInterval time.Duration
34+
compactorMetrics *compactorMetrics
1535
}
1636

1737
func NewPartitionCompactionPlanner(
1838
ctx context.Context,
1939
bkt objstore.InstrumentedBucket,
2040
logger log.Logger,
41+
ranges []int64,
42+
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark,
43+
ringLifecyclerID string,
44+
userID string,
45+
plannerDelay time.Duration,
46+
partitionVisitMarkerTimeout time.Duration,
47+
partitionVisitMarkerFileUpdateInterval time.Duration,
48+
compactorMetrics *compactorMetrics,
2149
) *PartitionCompactionPlanner {
2250
return &PartitionCompactionPlanner{
23-
ctx: ctx,
24-
bkt: bkt,
25-
logger: logger,
51+
ctx: ctx,
52+
bkt: bkt,
53+
logger: logger,
54+
ranges: ranges,
55+
noCompBlocksFunc: noCompBlocksFunc,
56+
ringLifecyclerID: ringLifecyclerID,
57+
userID: userID,
58+
plannerDelay: plannerDelay,
59+
partitionVisitMarkerTimeout: partitionVisitMarkerTimeout,
60+
partitionVisitMarkerFileUpdateInterval: partitionVisitMarkerFileUpdateInterval,
61+
compactorMetrics: compactorMetrics,
2662
}
2763
}
2864

2965
func (p *PartitionCompactionPlanner) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error) {
30-
panic("PartitionCompactionPlanner not implemented")
66+
cortexMetaExtensions, err := tsdb.ConvertToCortexMetaExtensions(extensions)
67+
if err != nil {
68+
return nil, err
69+
}
70+
if cortexMetaExtensions == nil {
71+
return nil, fmt.Errorf("cortexMetaExtensions cannot be nil")
72+
}
73+
return p.PlanWithPartition(ctx, metasByMinTime, cortexMetaExtensions, errChan)
74+
}
75+
76+
func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasByMinTime []*metadata.Meta, cortexMetaExtensions *tsdb.CortexMetaExtensions, errChan chan error) ([]*metadata.Meta, error) {
77+
partitionInfo := cortexMetaExtensions.PartitionInfo
78+
if partitionInfo == nil {
79+
return nil, fmt.Errorf("partitionInfo cannot be nil")
80+
}
81+
partitionID := partitionInfo.PartitionID
82+
partitionedGroupID := partitionInfo.PartitionedGroupID
83+
84+
// This delay would prevent double compaction when two compactors
85+
// claimed same partition in grouper at same time.
86+
time.Sleep(p.plannerDelay)
87+
88+
visitMarker := newPartitionVisitMarker(p.ringLifecyclerID, partitionedGroupID, partitionID)
89+
visitMarkerManager := NewVisitMarkerManager(p.bkt, p.logger, p.ringLifecyclerID, visitMarker)
90+
existingPartitionVisitMarker := &partitionVisitMarker{}
91+
err := visitMarkerManager.ReadVisitMarker(p.ctx, existingPartitionVisitMarker)
92+
visitMarkerExists := true
93+
if err != nil {
94+
if errors.Is(err, errorVisitMarkerNotFound) {
95+
visitMarkerExists = false
96+
} else {
97+
p.compactorMetrics.compactionsNotPlanned.WithLabelValues(p.userID, cortexMetaExtensions.TimeRangeStr()).Inc()
98+
return nil, fmt.Errorf("unable to get visit marker file for partition with partition ID %d, partitioned group ID %d: %s", partitionID, partitionedGroupID, err.Error())
99+
}
100+
}
101+
if visitMarkerExists {
102+
if existingPartitionVisitMarker.GetStatus() == Completed {
103+
level.Warn(p.logger).Log("msg", "partition is in completed status", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "compactor_id", p.ringLifecyclerID, existingPartitionVisitMarker.String())
104+
return nil, plannerCompletedPartitionError
105+
}
106+
if !existingPartitionVisitMarker.IsPendingByCompactor(p.partitionVisitMarkerTimeout, partitionID, p.ringLifecyclerID) {
107+
level.Warn(p.logger).Log("msg", "partition is not visited by current compactor", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "compactor_id", p.ringLifecyclerID, existingPartitionVisitMarker.String())
108+
return nil, plannerVisitedPartitionError
109+
}
110+
}
111+
112+
// Ensure all blocks fits within the largest range. This is a double check
113+
// to ensure there's no bug in the previous blocks grouping, given this Plan()
114+
// is just a pass-through.
115+
// Modified from https://github.com/cortexproject/cortex/pull/2616/files#diff-e3051fc530c48bb276ba958dd8fadc684e546bd7964e6bc75cef9a86ef8df344R28-R63
116+
largestRange := p.ranges[len(p.ranges)-1]
117+
rangeStart := getRangeStart(metasByMinTime[0], largestRange)
118+
rangeEnd := rangeStart + largestRange
119+
noCompactMarked := p.noCompBlocksFunc()
120+
resultMetas := make([]*metadata.Meta, 0, len(metasByMinTime))
121+
122+
for _, b := range metasByMinTime {
123+
if b.ULID == DUMMY_BLOCK_ID {
124+
continue
125+
}
126+
blockID := b.ULID.String()
127+
if _, excluded := noCompactMarked[b.ULID]; excluded {
128+
continue
129+
}
130+
131+
if b.MinTime < rangeStart || b.MaxTime > rangeEnd {
132+
p.compactorMetrics.compactionsNotPlanned.WithLabelValues(p.userID, cortexMetaExtensions.TimeRangeStr()).Inc()
133+
level.Warn(p.logger).Log("msg", "block is outside the largest expected range", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "block_id", blockID, "block_min_time", b.MinTime, "block_max_time", b.MaxTime, "range_start", rangeStart, "range_end", rangeEnd)
134+
return nil, fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", blockID, b.MinTime, b.MaxTime, rangeStart, rangeEnd)
135+
}
136+
137+
resultMetas = append(resultMetas, b)
138+
}
139+
140+
if len(resultMetas) < 1 {
141+
p.compactorMetrics.compactionsNotPlanned.WithLabelValues(p.userID, cortexMetaExtensions.TimeRangeStr()).Inc()
142+
level.Warn(p.logger).Log("msg", "result meta size is empty", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "group_size", len(metasByMinTime))
143+
return nil, nil
144+
}
145+
146+
go visitMarkerManager.HeartBeat(p.ctx, errChan, p.partitionVisitMarkerFileUpdateInterval, false)
147+
148+
return resultMetas, nil
31149
}

0 commit comments

Comments
 (0)