Skip to content

Commit cef84e7

Browse files
authored
Add compactor percentage based sharding (#6759)
1 parent ce1f0ea commit cef84e7

File tree

7 files changed

+202
-9
lines changed

7 files changed

+202
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* [FEATURE] Ruler: Add support for group labels. #6665
1414
* [FEATURE] Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet. #6716
1515
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
16+
* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738
1617
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715
1718
* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. Add a `-ingester.return-all-metadata` flag to make the metadata API run when the deployment. Please set this flag to `false` to use the metadata API with the limits later. #6681 #6744
1819
* [ENHANCEMENT] Ingester: Add a `cortex_ingester_active_native_histogram_series` metric to track # of active NH series. #6695

docs/configuration/config-file-reference.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3708,9 +3708,10 @@ query_rejection:
37083708

37093709
# The default tenant's shard size when the shuffle-sharding strategy is used by
37103710
# the compactor. When this setting is specified in the per-tenant overrides, a
3711-
# value of 0 disables shuffle sharding for the tenant.
3711+
# value of 0 disables shuffle sharding for the tenant. If the value is < 1 and >
3712+
# 0 the shard size will be a percentage of the total compactors
37123713
# CLI flag: -compactor.tenant-shard-size
3713-
[compactor_tenant_shard_size: <int> | default = 0]
3714+
[compactor_tenant_shard_size: <float> | default = 0]
37143715

37153716
# Index size limit in bytes for each compaction partition. 0 means no limit
37163717
# CLI flag: -compactor.partition-index-size-bytes

pkg/compactor/compactor.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ type BlockDeletableCheckerFactory func(
244244

245245
// Limits defines limits used by the Compactor.
246246
type Limits interface {
247-
CompactorTenantShardSize(userID string) int
247+
CompactorTenantShardSize(userID string) float64
248248
CompactorPartitionIndexSizeBytes(userID string) int64
249249
CompactorPartitionSeriesCount(userID string) int64
250250
}
@@ -1122,6 +1122,10 @@ func (c *Compactor) ownUserForCleanUp(userID string) (bool, error) {
11221122
return c.ownUser(userID, true)
11231123
}
11241124

1125+
func (c *Compactor) getShardSizeForUser(userID string) int {
1126+
return util.DynamicShardSize(c.limits.CompactorTenantShardSize(userID), c.ring.InstancesCount())
1127+
}
1128+
11251129
func (c *Compactor) ownUser(userID string, isCleanUp bool) (bool, error) {
11261130
if !c.allowedTenants.IsAllowed(userID) {
11271131
return false, nil
@@ -1135,7 +1139,8 @@ func (c *Compactor) ownUser(userID string, isCleanUp bool) (bool, error) {
11351139
// If we aren't cleaning up user blocks, and we are using shuffle-sharding, ownership is determined by a subring
11361140
// Cleanup should only be owned by a single compactor, as there could be race conditions during block deletion
11371141
if !isCleanUp && c.compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle {
1138-
subRing := c.ring.ShuffleShard(userID, c.limits.CompactorTenantShardSize(userID))
1142+
shardSize := c.getShardSizeForUser(userID)
1143+
subRing := c.ring.ShuffleShard(userID, shardSize)
11391144

11401145
rs, err := subRing.GetAllHealthy(RingOp)
11411146
if err != nil {

pkg/compactor/compactor_test.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"regexp"
1515
"strconv"
1616
"strings"
17+
"sync"
1718
"testing"
1819
"time"
1920

@@ -2206,3 +2207,184 @@ func TestCompactor_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T
22062207
return len(healthy) == 1 && len(unhealthy) == 0
22072208
})
22082209
}
2210+
2211+
func TestCompactor_GetShardSizeForUser(t *testing.T) {
2212+
2213+
// User to shardsize
2214+
users := []struct {
2215+
userID string
2216+
tenantShardSize float64
2217+
expectedShardSize int
2218+
expectedShardSizeAfterScaleup int
2219+
}{
2220+
{
2221+
userID: "user-1",
2222+
tenantShardSize: 6,
2223+
expectedShardSize: 6,
2224+
expectedShardSizeAfterScaleup: 6,
2225+
},
2226+
{
2227+
userID: "user-2",
2228+
tenantShardSize: 1,
2229+
expectedShardSize: 1,
2230+
expectedShardSizeAfterScaleup: 1,
2231+
},
2232+
{
2233+
userID: "user-3",
2234+
tenantShardSize: 0.4,
2235+
expectedShardSize: 2,
2236+
expectedShardSizeAfterScaleup: 4,
2237+
},
2238+
{
2239+
userID: "user-4",
2240+
tenantShardSize: 0.01,
2241+
expectedShardSize: 1,
2242+
expectedShardSizeAfterScaleup: 1,
2243+
},
2244+
}
2245+
2246+
inmem := objstore.WithNoopInstr(objstore.NewInMemBucket())
2247+
tenantLimits := newMockTenantLimits(map[string]*validation.Limits{})
2248+
2249+
for _, user := range users {
2250+
id, err := ulid.New(ulid.Now(), rand.Reader)
2251+
require.NoError(t, err)
2252+
require.NoError(t, inmem.Upload(context.Background(), user.userID+"/"+id.String()+"/meta.json", strings.NewReader(mockBlockMetaJSON(id.String()))))
2253+
limits := validation.Limits{}
2254+
flagext.DefaultValues(&limits)
2255+
limits.CompactorTenantShardSize = user.tenantShardSize
2256+
tenantLimits.setLimits(user.userID, &limits)
2257+
}
2258+
2259+
// Create a shared KV Store
2260+
kvstore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
2261+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
2262+
2263+
// Create compactors
2264+
var compactors []*Compactor
2265+
for i := 0; i < 5; i++ {
2266+
// Setup config
2267+
cfg := prepareConfig()
2268+
2269+
cfg.ShardingEnabled = true
2270+
cfg.ShardingRing.InstanceID = fmt.Sprintf("compactor-%d", i)
2271+
cfg.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i)
2272+
cfg.ShardingRing.WaitStabilityMinDuration = time.Second
2273+
cfg.ShardingRing.WaitStabilityMaxDuration = 5 * time.Second
2274+
cfg.ShardingRing.KVStore.Mock = kvstore
2275+
2276+
// Compactor will get its own temp dir for storing local files.
2277+
overrides, _ := validation.NewOverrides(validation.Limits{}, tenantLimits)
2278+
compactor, _, tsdbPlanner, _, _ := prepare(t, cfg, inmem, nil)
2279+
compactor.limits = overrides
2280+
//compactor.limits.tenantLimits = tenantLimits
2281+
compactor.logger = log.NewNopLogger()
2282+
defer services.StopAndAwaitTerminated(context.Background(), compactor) //nolint:errcheck
2283+
2284+
compactors = append(compactors, compactor)
2285+
2286+
// Mock the planner as if there's no compaction to do,
2287+
// in order to simplify tests (all in all, we just want to
2288+
// test our logic and not TSDB compactor which we expect to
2289+
// be already tested).
2290+
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
2291+
}
2292+
2293+
// Start all compactors
2294+
for _, c := range compactors {
2295+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
2296+
}
2297+
2298+
// Wait until a run has been completed on each compactor
2299+
for _, c := range compactors {
2300+
cortex_testutil.Poll(t, 120*time.Second, true, func() interface{} {
2301+
return prom_testutil.ToFloat64(c.CompactionRunsCompleted) >= 1
2302+
})
2303+
}
2304+
2305+
assert.Equal(t, 5, compactors[0].ring.InstancesCount())
2306+
2307+
for _, user := range users {
2308+
assert.Equal(t, user.expectedShardSize, compactors[0].getShardSizeForUser(user.userID))
2309+
}
2310+
2311+
// Scaleup compactors
2312+
// Create compactors
2313+
var compactors2 []*Compactor
2314+
for i := 5; i < 10; i++ {
2315+
// Setup config
2316+
cfg := prepareConfig()
2317+
2318+
cfg.ShardingEnabled = true
2319+
cfg.ShardingRing.InstanceID = fmt.Sprintf("compactor-%d", i)
2320+
cfg.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i)
2321+
cfg.ShardingRing.WaitStabilityMinDuration = time.Second
2322+
cfg.ShardingRing.WaitStabilityMaxDuration = 5 * time.Second
2323+
cfg.ShardingRing.KVStore.Mock = kvstore
2324+
2325+
// Compactor will get its own temp dir for storing local files.
2326+
overrides, _ := validation.NewOverrides(validation.Limits{}, tenantLimits)
2327+
compactor, _, tsdbPlanner, _, _ := prepare(t, cfg, inmem, nil)
2328+
compactor.limits = overrides
2329+
//compactor.limits.tenantLimits = tenantLimits
2330+
compactor.logger = log.NewNopLogger()
2331+
defer services.StopAndAwaitTerminated(context.Background(), compactor) //nolint:errcheck
2332+
2333+
compactors2 = append(compactors2, compactor)
2334+
2335+
// Mock the planner as if there's no compaction to do,
2336+
// in order to simplify tests (all in all, we just want to
2337+
// test our logic and not TSDB compactor which we expect to
2338+
// be already tested).
2339+
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
2340+
}
2341+
2342+
// Start all compactors
2343+
for _, c := range compactors2 {
2344+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
2345+
}
2346+
2347+
// Wait until a run has been completed on each compactor
2348+
for _, c := range compactors2 {
2349+
cortex_testutil.Poll(t, 120*time.Second, true, func() interface{} {
2350+
return prom_testutil.ToFloat64(c.CompactionRunsCompleted) >= 1
2351+
})
2352+
}
2353+
2354+
assert.Equal(t, 10, compactors[0].ring.InstancesCount())
2355+
2356+
for _, user := range users {
2357+
assert.Equal(t, user.expectedShardSizeAfterScaleup, compactors[0].getShardSizeForUser(user.userID))
2358+
}
2359+
}
2360+
2361+
type mockTenantLimits struct {
2362+
limits map[string]*validation.Limits
2363+
m sync.Mutex
2364+
}
2365+
2366+
// newMockTenantLimits creates a new mockTenantLimits that returns per-tenant limits based on
2367+
// the given map
2368+
func newMockTenantLimits(limits map[string]*validation.Limits) *mockTenantLimits {
2369+
return &mockTenantLimits{
2370+
limits: limits,
2371+
}
2372+
}
2373+
2374+
func (l *mockTenantLimits) ByUserID(userID string) *validation.Limits {
2375+
l.m.Lock()
2376+
defer l.m.Unlock()
2377+
return l.limits[userID]
2378+
}
2379+
2380+
func (l *mockTenantLimits) AllByUserID() map[string]*validation.Limits {
2381+
l.m.Lock()
2382+
defer l.m.Unlock()
2383+
return l.limits
2384+
}
2385+
2386+
func (l *mockTenantLimits) setLimits(userID string, limits *validation.Limits) {
2387+
l.m.Lock()
2388+
defer l.m.Unlock()
2389+
l.limits[userID] = limits
2390+
}

pkg/compactor/partition_compaction_grouper.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/cortexproject/cortex/pkg/ring"
2424
"github.com/cortexproject/cortex/pkg/storage/tsdb"
25+
"github.com/cortexproject/cortex/pkg/util"
2526
)
2627

2728
var (
@@ -146,7 +147,8 @@ func (g *PartitionCompactionGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta)
146147

147148
// Check whether this compactor exists on the subring based on user ID
148149
func (g *PartitionCompactionGrouper) checkSubringForCompactor() (bool, error) {
149-
subRing := g.ring.ShuffleShard(g.userID, g.limits.CompactorTenantShardSize(g.userID))
150+
shardSize := util.DynamicShardSize(g.limits.CompactorTenantShardSize(g.userID), g.ring.InstancesCount())
151+
subRing := g.ring.ShuffleShard(g.userID, shardSize)
150152

151153
rs, err := subRing.GetAllHealthy(RingOp)
152154
if err != nil {

pkg/compactor/shuffle_sharding_grouper.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/thanos-io/thanos/pkg/compact"
2020

2121
"github.com/cortexproject/cortex/pkg/ring"
22+
"github.com/cortexproject/cortex/pkg/util"
2223
)
2324

2425
type ShuffleShardingGrouper struct {
@@ -279,7 +280,8 @@ func (g *ShuffleShardingGrouper) isGroupVisited(blocks []*metadata.Meta, compact
279280

280281
// Check whether this compactor exists on the subring based on user ID
281282
func (g *ShuffleShardingGrouper) checkSubringForCompactor() (bool, error) {
282-
subRing := g.ring.ShuffleShard(g.userID, g.limits.CompactorTenantShardSize(g.userID))
283+
shardSize := util.DynamicShardSize(g.limits.CompactorTenantShardSize(g.userID), g.ring.InstancesCount())
284+
subRing := g.ring.ShuffleShard(g.userID, shardSize)
283285

284286
rs, err := subRing.GetAllHealthy(RingOp)
285287
if err != nil {

pkg/util/validation/limits.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ type Limits struct {
197197

198198
// Compactor.
199199
CompactorBlocksRetentionPeriod model.Duration `yaml:"compactor_blocks_retention_period" json:"compactor_blocks_retention_period"`
200-
CompactorTenantShardSize int `yaml:"compactor_tenant_shard_size" json:"compactor_tenant_shard_size"`
200+
CompactorTenantShardSize float64 `yaml:"compactor_tenant_shard_size" json:"compactor_tenant_shard_size"`
201201
CompactorPartitionIndexSizeBytes int64 `yaml:"compactor_partition_index_size_bytes" json:"compactor_partition_index_size_bytes"`
202202
CompactorPartitionSeriesCount int64 `yaml:"compactor_partition_series_count" json:"compactor_partition_series_count"`
203203

@@ -294,7 +294,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
294294
f.Var(&l.RulerQueryOffset, "ruler.query-offset", "Duration to offset all rule evaluation queries per-tenant.")
295295

296296
f.Var(&l.CompactorBlocksRetentionPeriod, "compactor.blocks-retention-period", "Delete blocks containing samples older than the specified retention period. 0 to disable.")
297-
f.IntVar(&l.CompactorTenantShardSize, "compactor.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the compactor. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
297+
f.Float64Var(&l.CompactorTenantShardSize, "compactor.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the compactor. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total compactors")
298298
// Default to 64GB because this is the hard limit of index size in Cortex
299299
f.Int64Var(&l.CompactorPartitionIndexSizeBytes, "compactor.partition-index-size-bytes", 68719476736, "Index size limit in bytes for each compaction partition. 0 means no limit")
300300
f.Int64Var(&l.CompactorPartitionSeriesCount, "compactor.partition-series-count", 0, "Time series count limit for each compaction partition. 0 means no limit")
@@ -830,7 +830,7 @@ func (o *Overrides) CompactorBlocksRetentionPeriod(userID string) time.Duration
830830
}
831831

832832
// CompactorTenantShardSize returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy.
833-
func (o *Overrides) CompactorTenantShardSize(userID string) int {
833+
func (o *Overrides) CompactorTenantShardSize(userID string) float64 {
834834
return o.GetOverridesForUser(userID).CompactorTenantShardSize
835835
}
836836

0 commit comments

Comments
 (0)