Skip to content

Commit 90ad777

Browse files
authored
Create feature flag to switch between current shuffle sharding group planner and partition compaction group planner (#6141)
* Create feature flag to switch between current shuffle sharding group planner and partition compaction group planner Signed-off-by: Alex Le <[email protected]> * rename Signed-off-by: Alex Le <[email protected]> * update doc Signed-off-by: Alex Le <[email protected]> --------- Signed-off-by: Alex Le <[email protected]>
1 parent a467830 commit 90ad777

File tree

6 files changed

+132
-27
lines changed

6 files changed

+132
-27
lines changed

docs/blocks-storage/compactor.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,10 @@ compactor:
285285
# CLI flag: -compactor.ring.wait-active-instance-timeout
286286
[wait_active_instance_timeout: <duration> | default = 10m]
287287

288+
# The compaction strategy to use. Supported values are: default, partitioning.
289+
# CLI flag: -compactor.compaction-mode
290+
[compaction_mode: <string> | default = "default"]
291+
288292
# How long block visit marker file should be considered as expired and able to
289293
# be picked up by compactor again.
290294
# CLI flag: -compactor.block-visit-marker-timeout

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2216,6 +2216,10 @@ sharding_ring:
22162216
# CLI flag: -compactor.ring.wait-active-instance-timeout
22172217
[wait_active_instance_timeout: <duration> | default = 10m]
22182218
2219+
# The compaction strategy to use. Supported values are: default, partitioning.
2220+
# CLI flag: -compactor.compaction-mode
2221+
[compaction_mode: <string> | default = "default"]
2222+
22192223
# How long block visit marker file should be considered as expired and able to
22202224
# be picked up by compactor again.
22212225
# CLI flag: -compactor.block-visit-marker-timeout

pkg/compactor/compactor.go

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,12 @@ var (
5353
errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s"
5454
RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
5555

56-
supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle}
57-
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
58-
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
56+
supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle}
57+
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
58+
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
59+
supportedCompactionStrategies = []string{util.CompactionStrategyDefault, util.CompactionStrategyPartitioning}
60+
errInvalidCompactionStrategy = errors.New("invalid compaction strategy")
61+
errInvalidCompactionStrategyPartitioning = errors.New("compaction strategy partitioning can only be enabled when shuffle sharding is enabled")
5962

6063
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, _ prometheus.Counter, _ prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper {
6164
return compact.NewDefaultGrouperWithMetrics(
@@ -77,29 +80,33 @@ var (
7780
}
7881

7982
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper {
80-
return NewShuffleShardingGrouper(
81-
ctx,
82-
logger,
83-
bkt,
84-
cfg.AcceptMalformedIndex,
85-
true, // Enable vertical compaction
86-
blocksMarkedForNoCompaction,
87-
metadata.NoneFunc,
88-
syncerMetrics,
89-
compactorMetrics,
90-
cfg,
91-
ring,
92-
ringLifecycle.Addr,
93-
ringLifecycle.ID,
94-
limits,
95-
userID,
96-
cfg.BlockFilesConcurrency,
97-
cfg.BlocksFetchConcurrency,
98-
cfg.CompactionConcurrency,
99-
cfg.BlockVisitMarkerTimeout,
100-
blockVisitMarkerReadFailed,
101-
blockVisitMarkerWriteFailed,
102-
noCompactionMarkFilter.NoCompactMarkedBlocks)
83+
if cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
84+
return NewPartitionCompactionGrouper(ctx, logger, bkt)
85+
} else {
86+
return NewShuffleShardingGrouper(
87+
ctx,
88+
logger,
89+
bkt,
90+
cfg.AcceptMalformedIndex,
91+
true, // Enable vertical compaction
92+
blocksMarkedForNoCompaction,
93+
metadata.NoneFunc,
94+
syncerMetrics,
95+
compactorMetrics,
96+
cfg,
97+
ring,
98+
ringLifecycle.Addr,
99+
ringLifecycle.ID,
100+
limits,
101+
userID,
102+
cfg.BlockFilesConcurrency,
103+
cfg.BlocksFetchConcurrency,
104+
cfg.CompactionConcurrency,
105+
cfg.BlockVisitMarkerTimeout,
106+
blockVisitMarkerReadFailed,
107+
blockVisitMarkerWriteFailed,
108+
noCompactionMarkFilter.NoCompactMarkedBlocks)
109+
}
103110
}
104111

105112
DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
@@ -123,7 +130,11 @@ var (
123130

124131
plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner {
125132

126-
return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
133+
if cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
134+
return NewPartitionCompactionPlanner(ctx, bkt, logger)
135+
} else {
136+
return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
137+
}
127138
}
128139
return compactor, plannerFactory, nil
129140
}
@@ -202,6 +213,9 @@ type Config struct {
202213
ShardingStrategy string `yaml:"sharding_strategy"`
203214
ShardingRing RingConfig `yaml:"sharding_ring"`
204215

216+
// Compaction mode.
217+
CompactionStrategy string `yaml:"compaction_mode"`
218+
205219
// No need to add options to customize the retry backoff,
206220
// given the defaults should be fine, but allow to override
207221
// it in tests.
@@ -244,6 +258,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
244258
f.IntVar(&cfg.CleanupConcurrency, "compactor.cleanup-concurrency", 20, "Max number of tenants for which blocks cleanup and maintenance should run concurrently.")
245259
f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.")
246260
f.StringVar(&cfg.ShardingStrategy, "compactor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
261+
f.StringVar(&cfg.CompactionStrategy, "compactor.compaction-mode", util.CompactionStrategyDefault, fmt.Sprintf("The compaction strategy to use. Supported values are: %s.", strings.Join(supportedCompactionStrategies, ", ")))
247262
f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+
248263
"If not 0, blocks will be marked for deletion and compactor component will permanently delete blocks marked for deletion from the bucket. "+
249264
"If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.")
@@ -290,6 +305,15 @@ func (cfg *Config) Validate(limits validation.Limits) error {
290305
}
291306
}
292307

308+
// Make sure a valid compaction mode is being used
309+
if !util.StringsContain(supportedCompactionStrategies, cfg.CompactionStrategy) {
310+
return errInvalidCompactionStrategy
311+
}
312+
313+
if !cfg.ShardingEnabled && cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
314+
return errInvalidCompactionStrategyPartitioning
315+
}
316+
293317
return nil
294318
}
295319

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package compactor
2+
3+
import (
4+
"context"
5+
6+
"github.com/go-kit/log"
7+
"github.com/oklog/ulid"
8+
"github.com/thanos-io/objstore"
9+
"github.com/thanos-io/thanos/pkg/block/metadata"
10+
"github.com/thanos-io/thanos/pkg/compact"
11+
)
12+
13+
type PartitionCompactionGrouper struct {
14+
ctx context.Context
15+
logger log.Logger
16+
bkt objstore.InstrumentedBucket
17+
}
18+
19+
func NewPartitionCompactionGrouper(
20+
ctx context.Context,
21+
logger log.Logger,
22+
bkt objstore.InstrumentedBucket,
23+
) *PartitionCompactionGrouper {
24+
if logger == nil {
25+
logger = log.NewNopLogger()
26+
}
27+
28+
return &PartitionCompactionGrouper{
29+
ctx: ctx,
30+
logger: logger,
31+
bkt: bkt,
32+
}
33+
}
34+
35+
// Groups function modified from https://github.com/cortexproject/cortex/pull/2616
36+
func (g *PartitionCompactionGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error) {
37+
panic("PartitionCompactionGrouper not implemented")
38+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package compactor
2+
3+
import (
4+
"context"
5+
6+
"github.com/go-kit/log"
7+
"github.com/thanos-io/objstore"
8+
"github.com/thanos-io/thanos/pkg/block/metadata"
9+
)
10+
11+
type PartitionCompactionPlanner struct {
12+
ctx context.Context
13+
bkt objstore.InstrumentedBucket
14+
logger log.Logger
15+
}
16+
17+
func NewPartitionCompactionPlanner(
18+
ctx context.Context,
19+
bkt objstore.InstrumentedBucket,
20+
logger log.Logger,
21+
) *PartitionCompactionPlanner {
22+
return &PartitionCompactionPlanner{
23+
ctx: ctx,
24+
bkt: bkt,
25+
logger: logger,
26+
}
27+
}
28+
29+
func (p *PartitionCompactionPlanner) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error) {
30+
panic("PartitionCompactionPlanner not implemented")
31+
}

pkg/util/shard.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ const (
1010
// Sharding strategies.
1111
ShardingStrategyDefault = "default"
1212
ShardingStrategyShuffle = "shuffle-sharding"
13+
14+
// Compaction mode
15+
CompactionStrategyDefault = "default"
16+
CompactionStrategyPartitioning = "partitioning"
1317
)
1418

1519
var (

0 commit comments

Comments
 (0)