Skip to content

CNDB-14577: Compact all SSTables of a level shard if their number reaches a limit (#1873) #1939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main-5.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,7 @@ public enum CassandraRelevantProperties
UCS_L0_SHARDS_ENABLED("unified_compaction.l0_shards_enabled", "true"),
UCS_MAX_ADAPTIVE_COMPACTIONS("unified_compaction.max_adaptive_compactions", "5"),
UCS_MAX_SPACE_OVERHEAD("unified_compaction.max_space_overhead", "0.2"),
UCS_MAX_SSTABLES_PER_SHARD_FACTOR("unified_compaction.max_sstables_per_shard_factor", "10"),
UCS_MIN_SSTABLE_SIZE("unified_compaction.min_sstable_size", "100MiB"),
UCS_NUM_SHARDS("unified_compaction.num_shards"),
UCS_OVERLAP_INCLUSION_METHOD("unified_compaction.overlap_inclusion_method"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ private List<CompactionAggregate.UnifiedAggregate> getPendingCompactionAggregate

for (Level level : entry.getValue())
{
Collection<CompactionAggregate.UnifiedAggregate> aggregates = level.getCompactionAggregates(arena, controller, spaceAvailable);
Collection<CompactionAggregate.UnifiedAggregate> aggregates = level.getCompactionAggregates(arena, controller, getShardManager(), spaceAvailable);
// Note: We allow empty aggregates into the list of pending compactions. The pending compactions list
// is for progress tracking only, and it is helpful to see empty levels there.
pending.addAll(aggregates);
Expand Down Expand Up @@ -1506,14 +1506,96 @@ void complete()
logger.trace("Level: {}", this);
}

private List<CompactionAggregate.UnifiedAggregate> getOversizeShardsAggregates(Arena arena,
Controller controller,
ShardManager shardManager)
{
List<CompactionAggregate.UnifiedAggregate> aggregates = new ArrayList<>();
double shardThreshold = fanout * controller.getMaxSstablesPerShardFactor();
if (sstables.size() > shardThreshold)
{
List<Set<CompactionSSTable>> groups = shardManager.splitSSTablesInShards(sstables,
controller.getNumShards(max),
(sstableShard, shardRange) -> Sets.newHashSet(sstableShard));

Set<CompactionSSTable> sstablesInOversizeGroup = new HashSet<>();
for (Set<CompactionSSTable> ssTables : groups)
{
if (ssTables.size() > shardThreshold)
{
sstablesInOversizeGroup.addAll(ssTables);
}
}

if (!sstablesInOversizeGroup.isEmpty())
{
// Now combine the groups that share an sstable so that we have valid independent transactions.
// Only keep the groups that were combined with an oversize group.
groups = Overlaps.combineSetsWithCommonElement(groups);
List<CompactionSSTable> unbucketed = new ArrayList<>();

for (Set<CompactionSSTable> group : groups)
{
boolean inOverSizeGroup = false;
for (CompactionSSTable sstable : group)
{
if (sstablesInOversizeGroup.contains(sstable))
{
inOverSizeGroup = true;
break;
}
}
if (inOverSizeGroup)
{
aggregates.add(
CompactionAggregate.createUnified(group,
Overlaps.maxOverlap(group,
CompactionSSTable.startsAfter,
CompactionSSTable.firstKeyComparator,
CompactionSSTable.lastKeyComparator),
createPick(controller, nextTimeUUID(), index, group),
Collections.emptyList(),
arena,
this)
);
}
else
{
unbucketed.addAll(group);
}
}
// Add all unbucketed sstables separately. Note that this will list the level (with its set of sstables)
// even if it does not need compaction.
if (!unbucketed.isEmpty())
aggregates.add(CompactionAggregate.createUnified(unbucketed,
maxOverlap,
CompactionPick.EMPTY,
Collections.emptySet(),
arena,
this));
return aggregates;
}
}
return aggregates;
}

/// Return the compaction aggregate
Collection<CompactionAggregate.UnifiedAggregate> getCompactionAggregates(Arena arena,
Controller controller,
ShardManager shardManager,
long spaceAvailable)
{
if (logger.isTraceEnabled())
logger.trace("Creating compaction aggregate with sstable set {}", sstables);

List<CompactionAggregate.UnifiedAggregate> aggregates = new ArrayList<>();

if (sstables.isEmpty())
{
if (logger.isTraceEnabled())
logger.trace("No sstables in level {} of arena {}, skipping compaction", this, arena);
return aggregates;
}

// Note that adjacent overlap sets may include deduplicated sstable
List<Set<CompactionSSTable>> overlaps = Overlaps.constructOverlapSets(sstables,
Expand All @@ -1530,9 +1612,19 @@ Collection<CompactionAggregate.UnifiedAggregate> getCompactionAggregates(Arena a
this::makeBucket,
unbucketed::addAll);

List<CompactionAggregate.UnifiedAggregate> aggregates = new ArrayList<>();
for (Bucket bucket : buckets)
aggregates.add(bucket.constructAggregate(controller, spaceAvailable, arena));
if (!buckets.isEmpty())
{
for (Bucket bucket : buckets)
aggregates.add(bucket.constructAggregate(controller, spaceAvailable, arena));
}
else
{
// CNDB-14577: If there are no overlaps, we look if some shards have too many SSTables.
// If that's the case, we perform a major compaction on those shards.
List<CompactionAggregate.UnifiedAggregate> oversizeShardsAggregates = getOversizeShardsAggregates(arena, controller, shardManager);
if (!oversizeShardsAggregates.isEmpty())
return oversizeShardsAggregates;
}

// Add all unbucketed sstables separately. Note that this will list the level (with its set of sstables)
// even if it does not need compaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,24 @@ on the number of overlapping sources we compact; in that case we use the collect
select at most limit-many in any included overlap set, making sure that if an sstable is included in this compaction,
all older ones are also included to maintain time order.

## Non-overlapping sstables trigger

In some scenarios it is possible for (small) non-overlapping sstables to accumulate in numbers that can cause problems
due to the sheer number of sstables present. For example, in tables with regularly scheduled snapshots, which also use
time-based partitioning, that regular snapshot will often flush single-partition sstables. When the partition time
window passes, the normal overlap processing will no longer find newly flushed data that overlaps with older sstables
and will thus leave those sstables alone. Eventually we can end up with thousands of sstables on a lower level that are
never compacted.

This can be a problem, especially in combination with SAI indexing, which prefers a lower number of sstables overall.
To address it, the strategy offers a threshold for the number of sstables that can be present on any of the shards that
the sharding strategy assigns for a given level. The threshold is specified as a multiple of a level's fan factor: if
there are no normal compactions to perform on a level, and we can find a shard that has more sstables than the
threshold, we perform the equivalent of a major compaction for the smallest set of shards that contains it. The result
is split on each output shard boundary and results in a single sstable for each of the output shards. This should
create sstables that span many partitions and that will thus progress nicely through the normal processing in the
next levels of the hierarchy.

## Prioritization of compactions

Compaction strategies aim to minimize the read amplification of queries, which is defined by the number of sstables
Expand Down Expand Up @@ -533,6 +551,14 @@ UCS accepts these compaction strategy parameters:
Sets $b$ to the specified value, $\lambda$ to 1, and the default minimum sstable size to 'auto'.
Disabled by default and cannot be used in combination with `base_shard_count`, `target_sstable_size` or
`sstable_growth`.
* `max_sstables_per_shard_factor` Limits the number of SSTables per shard. If the number of sstables in a shard
exceeds this factor times the shard compaction threshold, a major compaction of the shard will be triggered.
Some conditions like slow writes can lead to SSTables being very small, and never overlap with enough other SSTables
to be compacted.
So this setting is useful to prevent the number of SSTables in a shard from growing too large, which can cause
problems due to the per-sstable overhead. Also these small SSTables may still have overlaps even if under the
compaction threshold (eg. due to write replicas) and never compacting them wastes storage space.
The default value is 10.

All UCS options can also be supplied as system properties, using the prefix `unified_compaction.`, e.g.
`-Dunified_compaction.sstable_growth=0.5` sets the default `sstable_growth` to 0.5.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public AdaptiveController(MonotonicClock clock,
Overlaps.InclusionMethod overlapInclusionMethod,
boolean parallelizeOutputShards,
boolean hasVectorType,
double maxSstablesPerShardFactor,
int intervalSec,
int minScalingParameter,
int maxScalingParameter,
Expand Down Expand Up @@ -154,6 +155,7 @@ public AdaptiveController(MonotonicClock clock,
overlapInclusionMethod,
parallelizeOutputShards,
hasVectorType,
maxSstablesPerShardFactor,
metadata);

this.scalingParameters = scalingParameters;
Expand Down Expand Up @@ -184,6 +186,7 @@ static Controller fromOptions(Environment env,
Overlaps.InclusionMethod overlapInclusionMethod,
boolean parallelizeOutputShards,
boolean hasVectorType,
double maxSstablesPerShardFactor,
TableMetadata metadata,
Map<String, String> options)
{
Expand Down Expand Up @@ -292,6 +295,7 @@ else if (staticScalingFactors != null)
overlapInclusionMethod,
parallelizeOutputShards,
hasVectorType,
maxSstablesPerShardFactor,
intervalSec,
minScalingParameter,
maxScalingParameter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ public abstract class Controller
@Deprecated(since = "CC 4.0")
static final String STATIC_SCALING_FACTORS_OPTION = "static_scaling_factors";

static final String MAX_SSTABLES_PER_SHARD_FACTOR_OPTION = "max_sstables_per_shard_factor";
static final double DEFAULT_MAX_SSTABLES_PER_SHARD_FACTOR = UCS_MAX_SSTABLES_PER_SHARD_FACTOR.getDoubleWithLegacyFallback();

protected final MonotonicClock clock;
protected final Environment env;
protected final double[] survivalFactors;
Expand Down Expand Up @@ -349,6 +352,8 @@ public abstract class Controller
final boolean l0ShardsEnabled;
final boolean hasVectorType;

final double maxSstablesPerShardFactor;

Controller(MonotonicClock clock,
Environment env,
double[] survivalFactors,
Expand All @@ -369,6 +374,7 @@ public abstract class Controller
Overlaps.InclusionMethod overlapInclusionMethod,
boolean parallelizeOutputShards,
boolean hasVectorType,
double maxSstablesPerShardFactor,
TableMetadata metadata)
{
this.clock = clock;
Expand All @@ -390,6 +396,7 @@ public abstract class Controller
this.l0ShardsEnabled = UCS_L0_SHARDS_ENABLED.getBooleanWithLegacyFallback(false); // FIXME VECTOR-23
this.parallelizeOutputShards = parallelizeOutputShards;
this.hasVectorType = hasVectorType;
this.maxSstablesPerShardFactor = maxSstablesPerShardFactor;
this.metadata = metadata;

if (maxSSTablesToCompact <= 0) // use half the maximum permitted compaction size as upper bound by default
Expand Down Expand Up @@ -800,6 +807,11 @@ public boolean hasVectorType()
return hasVectorType;
}

public double getMaxSstablesPerShardFactor()
{
return maxSstablesPerShardFactor;
}

/**
* @return true if the controller is running
*/
Expand Down Expand Up @@ -1049,6 +1061,10 @@ public static Controller fromOptions(CompactionRealm realm, Map<String, String>
? Boolean.parseBoolean(options.get(PARALLELIZE_OUTPUT_SHARDS_OPTION))
: DEFAULT_PARALLELIZE_OUTPUT_SHARDS;

double maxSstablesPerShardFactor = options.containsKey(MAX_SSTABLES_PER_SHARD_FACTOR_OPTION)
? Double.parseDouble(options.get(MAX_SSTABLES_PER_SHARD_FACTOR_OPTION))
: DEFAULT_MAX_SSTABLES_PER_SHARD_FACTOR;

return adaptive
? AdaptiveController.fromOptions(env,
survivalFactors,
Expand All @@ -1068,6 +1084,7 @@ public static Controller fromOptions(CompactionRealm realm, Map<String, String>
overlapInclusionMethod,
parallelizeOutputShards,
hasVectorType,
maxSstablesPerShardFactor,
realm.metadata(),
options)
: StaticController.fromOptions(env,
Expand All @@ -1088,6 +1105,7 @@ public static Controller fromOptions(CompactionRealm realm, Map<String, String>
overlapInclusionMethod,
parallelizeOutputShards,
hasVectorType,
maxSstablesPerShardFactor,
realm.metadata(),
options,
useVectorOptions);
Expand Down Expand Up @@ -1317,6 +1335,26 @@ public static Map<String, String> validateOptions(Map<String, String> options) t
}
}

s = options.remove(MAX_SSTABLES_PER_SHARD_FACTOR_OPTION);
if (s != null)
{
try
{
double maxSstablesPerShardFactor = Double.parseDouble(s);
if (maxSstablesPerShardFactor < 1)
throw new ConfigurationException(String.format("%s %s must be a float >= 1",
MAX_SSTABLES_PER_SHARD_FACTOR_OPTION,
s));
}
catch (NumberFormatException e)
{
throw new ConfigurationException(String.format(floatParseErr,
s,
MAX_SSTABLES_PER_SHARD_FACTOR_OPTION),
e);
}
}

return adaptive ? AdaptiveController.validateOptions(options) : StaticController.validateOptions(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public StaticController(Environment env,
Overlaps.InclusionMethod overlapInclusionMethod,
boolean parallelizeOutputShards,
boolean hasVectorType,
double maxSstablesPerShardFactor,
TableMetadata metadata)
{
super(MonotonicClock.Global.preciseTime,
Expand All @@ -98,6 +99,7 @@ public StaticController(Environment env,
overlapInclusionMethod,
parallelizeOutputShards,
hasVectorType,
maxSstablesPerShardFactor,
metadata);
this.scalingParameters = scalingParameters;
}
Expand All @@ -120,6 +122,7 @@ static Controller fromOptions(Environment env,
Overlaps.InclusionMethod overlapInclusionMethod,
boolean parallelizeOutputShards,
boolean hasVectorType,
double maxSstablesPerShardFactor,
TableMetadata metadata,
Map<String, String> options,
boolean useVectorOptions)
Expand Down Expand Up @@ -182,6 +185,7 @@ static Controller fromOptions(Environment env,
overlapInclusionMethod,
parallelizeOutputShards,
hasVectorType,
maxSstablesPerShardFactor,
metadata);
}

Expand Down
Loading