From 696afe94a80f96ace5cb11db8cfefcb024cb9fd0 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 3 Dec 2025 08:51:07 +0530 Subject: [PATCH 1/5] Add new compaction policy to prioritize fragmented intervals --- .../compaction/BaseCandidateSearchPolicy.java | 1 - .../compaction/CompactionCandidate.java | 14 ++ .../CompactionCandidateSearchPolicy.java | 4 +- .../compaction/CompactionStatistics.java | 10 + .../server/compaction/CompactionStatus.java | 234 ++++++++++++++---- .../compaction/CompactionStatusTracker.java | 4 +- .../compaction/FixedIntervalOrderPolicy.java | 1 - .../MostFragmentedIntervalFirstPolicy.java | 130 ++++++++++ 8 files changed, 346 insertions(+), 52 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java diff --git a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java index 7d7d117f08fc..a0393ba573c7 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java @@ -66,7 +66,6 @@ public final int compareCandidates(CompactionCandidate o1, CompactionCandidate o @Override public boolean isEligibleForCompaction( CompactionCandidate candidate, - CompactionStatus currentCompactionStatus, CompactionTaskStatus latestTaskStatus ) { diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java index f936f3d49a91..af8b32ebe6db 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java @@ -137,6 +137,20 @@ public CompactionStatistics getStats() return CompactionStatistics.create(totalBytes, numSegments(), numIntervals); } + @Nullable + public CompactionStatistics getCompactedStats() + { + return (currentStatus == null || currentStatus.getCompactedStats() == null) + ? null : currentStatus.getCompactedStats(); + } + + @Nullable + public CompactionStatistics getUncompactedStats() + { + return (currentStatus == null || currentStatus.getUncompactedStats() == null) + ? null : currentStatus.getUncompactedStats(); + } + /** * Current compaction status of the time chunk corresponding to this candidate. */ diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java index cc99e03bf214..28fd7f7939a0 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java @@ -29,7 +29,8 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { @JsonSubTypes.Type(name = "newestSegmentFirst", value = NewestSegmentFirstPolicy.class), - @JsonSubTypes.Type(name = "fixedIntervalOrder", value = FixedIntervalOrderPolicy.class) + @JsonSubTypes.Type(name = "fixedIntervalOrder", value = FixedIntervalOrderPolicy.class), + @JsonSubTypes.Type(name = "mostFragmentedFirst", value = MostFragmentedIntervalFirstPolicy.class) }) public interface CompactionCandidateSearchPolicy { @@ -50,7 +51,6 @@ public interface CompactionCandidateSearchPolicy */ boolean isEligibleForCompaction( CompactionCandidate candidate, - CompactionStatus currentCompactionStatus, CompactionTaskStatus latestTaskStatus ); } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java index d7e51655861b..7d43a09aed87 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java @@ -65,4 +65,14 @@ public void decrement(CompactionStatistics other) numIntervals -= other.getNumIntervals(); numSegments -= other.getNumSegments(); } + + @Override + public String toString() + { + return "CompactionStatistics{" + + "totalBytes=" + totalBytes + + ", numSegments=" + numSegments + + ", numIntervals=" + numIntervals + + '}'; + } } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java index be4acd00e21c..84841d94f8bc 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java @@ -36,11 +36,17 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; import org.apache.druid.timeline.CompactionState; +import org.apache.druid.timeline.DataSegment; import org.apache.druid.utils.CollectionUtils; +import org.joda.time.Interval; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -49,7 +55,7 @@ */ public class CompactionStatus { - private static final CompactionStatus COMPLETE = new CompactionStatus(State.COMPLETE, null); + private static final CompactionStatus COMPLETE = new CompactionStatus(State.COMPLETE, null, null, null); public enum State { @@ -64,7 +70,6 @@ public enum State private static final List> CHECKS = Arrays.asList( Evaluator::inputBytesAreWithinLimit, Evaluator::segmentsHaveBeenCompactedAtLeastOnce, - Evaluator::allCandidatesHaveSameCompactionState, Evaluator::partitionsSpecIsUpToDate, Evaluator::indexSpecIsUpToDate, Evaluator::segmentGranularityIsUpToDate, @@ -78,11 +83,20 @@ public enum State private final State state; private final String reason; - - private CompactionStatus(State state, String reason) + private final CompactionStatistics compactedStats; + private final CompactionStatistics uncompactedStats; + + private CompactionStatus( + State state, + String reason, + CompactionStatistics compactedStats, + CompactionStatistics uncompactedStats + ) { this.state = state; this.reason = reason; + this.compactedStats = compactedStats; + this.uncompactedStats = uncompactedStats; } public boolean isComplete() @@ -105,18 +119,45 @@ public State getState() return state; } + public CompactionStatistics getCompactedStats() + { + return compactedStats; + } + + public CompactionStatistics getUncompactedStats() + { + return uncompactedStats; + } + @Override public String toString() { return "CompactionStatus{" + "state=" + state + ", reason=" + reason + + ", compactedStats=" + compactedStats + + ", uncompactedStats=" + uncompactedStats + '}'; } public static CompactionStatus pending(String reasonFormat, Object... args) { - return new CompactionStatus(State.PENDING, StringUtils.format(reasonFormat, args)); + return new CompactionStatus(State.PENDING, StringUtils.format(reasonFormat, args), null, null); + } + + public static CompactionStatus pending( + CompactionStatistics compactedStats, + CompactionStatistics uncompactedStats, + String reasonFormat, + Object... args + ) + { + return new CompactionStatus( + State.PENDING, + StringUtils.format(reasonFormat, args), + compactedStats, + uncompactedStats + ); } /** @@ -193,34 +234,26 @@ private static String asString(PartitionsSpec partitionsSpec) public static CompactionStatus skipped(String reasonFormat, Object... args) { - return new CompactionStatus(State.SKIPPED, StringUtils.format(reasonFormat, args)); + return new CompactionStatus(State.SKIPPED, StringUtils.format(reasonFormat, args), null, null); } public static CompactionStatus running(String message) { - return new CompactionStatus(State.RUNNING, message); - } - - public static CompactionStatus complete(String message) - { - return new CompactionStatus(State.COMPLETE, message); + return new CompactionStatus(State.RUNNING, message, null, null); } /** * Determines the CompactionStatus of the given candidate segments by evaluating * the {@link #CHECKS} one by one. If any check returns an incomplete status, - * further checks are not performed and the incomplete status is returned. + * further checks are still performed to determine the number of uncompacted + * segments but only the first incomplete status is returned. */ static CompactionStatus compute( CompactionCandidate candidateSegments, DataSourceCompactionConfig config ) { - final Evaluator evaluator = new Evaluator(candidateSegments, config); - return CHECKS.stream() - .map(f -> f.apply(evaluator)) - .filter(status -> !status.isComplete()) - .findFirst().orElse(COMPLETE); + return new Evaluator(candidateSegments, config).evaluate(); } @Nullable @@ -294,52 +327,115 @@ private static class Evaluator { private final DataSourceCompactionConfig compactionConfig; private final CompactionCandidate candidateSegments; - private final CompactionState lastCompactionState; private final ClientCompactionTaskQueryTuningConfig tuningConfig; - private final UserCompactionTaskGranularityConfig existingGranularitySpec; private final UserCompactionTaskGranularityConfig configuredGranularitySpec; + private final List uncompactedSegments = new ArrayList<>(); + private final Map> unknownStateToSegments = new HashMap<>(); + private Evaluator( CompactionCandidate candidateSegments, DataSourceCompactionConfig compactionConfig ) { this.candidateSegments = candidateSegments; - this.lastCompactionState = candidateSegments.getSegments().get(0).getLastCompactionState(); this.compactionConfig = compactionConfig; this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(compactionConfig); this.configuredGranularitySpec = compactionConfig.getGranularitySpec(); - if (lastCompactionState == null) { - this.existingGranularitySpec = null; + } + + private CompactionStatus evaluate() + { + final List reasonsForCompaction = + CHECKS.stream() + .map(f -> f.apply(this)) + .filter(status -> !status.isComplete()) + .map(CompactionStatus::getReason) + .collect(Collectors.toList()); + + // Consider segments which have passed all checks to be compacted + final List compactedSegments = unknownStateToSegments + .values() + .stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + + if (reasonsForCompaction.isEmpty()) { + return COMPLETE; } else { - this.existingGranularitySpec = UserCompactionTaskGranularityConfig.from( - lastCompactionState.getGranularitySpec() + return CompactionStatus.pending( + createStats(compactedSegments), + createStats(uncompactedSegments), + reasonsForCompaction.get(0) ); } } private CompactionStatus segmentsHaveBeenCompactedAtLeastOnce() { - if (lastCompactionState == null) { - return CompactionStatus.pending("not compacted yet"); - } else { - return COMPLETE; + // Identify the compaction states of all the segments + for (DataSegment segment : candidateSegments.getSegments()) { + final CompactionState segmentState = segment.getLastCompactionState(); + if (segmentState == null) { + uncompactedSegments.add(segment); + } else { + unknownStateToSegments.computeIfAbsent(segmentState, s -> new ArrayList<>()).add(segment); + } } - } - private CompactionStatus allCandidatesHaveSameCompactionState() - { - final boolean allHaveSameCompactionState = candidateSegments.getSegments().stream().allMatch( - segment -> lastCompactionState.equals(segment.getLastCompactionState()) - ); - if (allHaveSameCompactionState) { + if (uncompactedSegments.isEmpty()) { return COMPLETE; } else { - return CompactionStatus.pending("segments have different last compaction states"); + return CompactionStatus.pending("not compacted yet"); } } private CompactionStatus partitionsSpecIsUpToDate() + { + return evaluateForAllCompactionStates(this::partitionsSpecIsUpToDate); + } + + private CompactionStatus indexSpecIsUpToDate() + { + return evaluateForAllCompactionStates(this::indexSpecIsUpToDate); + } + + private CompactionStatus projectionsAreUpToDate() + { + return evaluateForAllCompactionStates(this::projectionsAreUpToDate); + } + + private CompactionStatus segmentGranularityIsUpToDate() + { + return evaluateForAllCompactionStates(this::segmentGranularityIsUpToDate); + } + + private CompactionStatus rollupIsUpToDate() + { + return evaluateForAllCompactionStates(this::rollupIsUpToDate); + } + + private CompactionStatus queryGranularityIsUpToDate() + { + return evaluateForAllCompactionStates(this::queryGranularityIsUpToDate); + } + + private CompactionStatus dimensionsSpecIsUpToDate() + { + return evaluateForAllCompactionStates(this::dimensionsSpecIsUpToDate); + } + + private CompactionStatus metricsSpecIsUpToDate() + { + return evaluateForAllCompactionStates(this::metricsSpecIsUpToDate); + } + + private CompactionStatus transformSpecFilterIsUpToDate() + { + return evaluateForAllCompactionStates(this::transformSpecFilterIsUpToDate); + } + + private CompactionStatus partitionsSpecIsUpToDate(CompactionState lastCompactionState) { PartitionsSpec existingPartionsSpec = lastCompactionState.getPartitionsSpec(); if (existingPartionsSpec instanceof DimensionRangePartitionsSpec) { @@ -357,7 +453,7 @@ private CompactionStatus partitionsSpecIsUpToDate() ); } - private CompactionStatus indexSpecIsUpToDate() + private CompactionStatus indexSpecIsUpToDate(CompactionState lastCompactionState) { return CompactionStatus.completeIfNullOrEqual( "indexSpec", @@ -367,7 +463,7 @@ private CompactionStatus indexSpecIsUpToDate() ); } - private CompactionStatus projectionsAreUpToDate() + private CompactionStatus projectionsAreUpToDate(CompactionState lastCompactionState) { return CompactionStatus.completeIfNullOrEqual( "projections", @@ -390,7 +486,7 @@ private CompactionStatus inputBytesAreWithinLimit() } } - private CompactionStatus segmentGranularityIsUpToDate() + private CompactionStatus segmentGranularityIsUpToDate(CompactionState lastCompactionState) { if (configuredGranularitySpec == null || configuredGranularitySpec.getSegmentGranularity() == null) { @@ -398,6 +494,7 @@ private CompactionStatus segmentGranularityIsUpToDate() } final Granularity configuredSegmentGranularity = configuredGranularitySpec.getSegmentGranularity(); + final UserCompactionTaskGranularityConfig existingGranularitySpec = getGranularitySpec(lastCompactionState); final Granularity existingSegmentGranularity = existingGranularitySpec == null ? null : existingGranularitySpec.getSegmentGranularity(); @@ -406,7 +503,8 @@ private CompactionStatus segmentGranularityIsUpToDate() } else if (existingSegmentGranularity == null) { // Candidate segments were compacted without segment granularity specified // Check if the segments already have the desired segment granularity - boolean needsCompaction = candidateSegments.getSegments().stream().anyMatch( + final List segmentsForState = unknownStateToSegments.get(lastCompactionState); + boolean needsCompaction = segmentsForState.stream().anyMatch( segment -> !configuredSegmentGranularity.isAligned(segment.getInterval()) ); if (needsCompaction) { @@ -427,11 +525,13 @@ private CompactionStatus segmentGranularityIsUpToDate() return COMPLETE; } - private CompactionStatus rollupIsUpToDate() + private CompactionStatus rollupIsUpToDate(CompactionState lastCompactionState) { if (configuredGranularitySpec == null) { return COMPLETE; } else { + final UserCompactionTaskGranularityConfig existingGranularitySpec + = getGranularitySpec(lastCompactionState); return CompactionStatus.completeIfNullOrEqual( "rollup", configuredGranularitySpec.isRollup(), @@ -441,11 +541,13 @@ private CompactionStatus rollupIsUpToDate() } } - private CompactionStatus queryGranularityIsUpToDate() + private CompactionStatus queryGranularityIsUpToDate(CompactionState lastCompactionState) { if (configuredGranularitySpec == null) { return COMPLETE; } else { + final UserCompactionTaskGranularityConfig existingGranularitySpec + = getGranularitySpec(lastCompactionState); return CompactionStatus.completeIfNullOrEqual( "queryGranularity", configuredGranularitySpec.getQueryGranularity(), @@ -460,7 +562,7 @@ private CompactionStatus queryGranularityIsUpToDate() * which can create a mismatch between expected and actual order of dimensions. Partition dimensions are separately * covered in {@link Evaluator#partitionsSpecIsUpToDate()} check. */ - private CompactionStatus dimensionsSpecIsUpToDate() + private CompactionStatus dimensionsSpecIsUpToDate(CompactionState lastCompactionState) { if (compactionConfig.getDimensionsSpec() == null) { return COMPLETE; @@ -488,7 +590,7 @@ private CompactionStatus dimensionsSpecIsUpToDate() } } - private CompactionStatus metricsSpecIsUpToDate() + private CompactionStatus metricsSpecIsUpToDate(CompactionState lastCompactionState) { final AggregatorFactory[] configuredMetricsSpec = compactionConfig.getMetricsSpec(); if (ArrayUtils.isEmpty(configuredMetricsSpec)) { @@ -512,7 +614,7 @@ private CompactionStatus metricsSpecIsUpToDate() } } - private CompactionStatus transformSpecFilterIsUpToDate() + private CompactionStatus transformSpecFilterIsUpToDate(CompactionState lastCompactionState) { if (compactionConfig.getTransformSpec() == null) { return COMPLETE; @@ -526,5 +628,45 @@ private CompactionStatus transformSpecFilterIsUpToDate() String::valueOf ); } + + /** + * Evaluates the given check for each entry in the {@link #unknownStateToSegments}. + * If any entry fails the given check by returning a status which is not + * COMPLETE, all the segments with that state are moved to {@link #uncompactedSegments}. + * + * @return The first status which is not COMPLETE. + */ + private CompactionStatus evaluateForAllCompactionStates( + Function check + ) + { + CompactionStatus firstIncompleteStatus = null; + for (CompactionState state : List.copyOf(unknownStateToSegments.keySet())) { + final CompactionStatus status = check.apply(state); + if (!status.isComplete()) { + uncompactedSegments.addAll(unknownStateToSegments.remove(state)); + if (firstIncompleteStatus == null) { + firstIncompleteStatus = status; + } + } + } + + return firstIncompleteStatus == null ? COMPLETE : firstIncompleteStatus; + } + + private static UserCompactionTaskGranularityConfig getGranularitySpec( + CompactionState compactionState + ) + { + return UserCompactionTaskGranularityConfig.from(compactionState.getGranularitySpec()); + } + + private static CompactionStatistics createStats(List segments) + { + final Set segmentIntervals = + segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()); + final long totalBytes = segments.stream().mapToLong(DataSegment::getSize).sum(); + return CompactionStatistics.create(totalBytes, segments.size(), segmentIntervals.size()); + } } } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java index 401f413e7fd8..73eda34725e3 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java @@ -96,13 +96,13 @@ public CompactionStatus computeCompactionStatus( if (lastTaskStatus != null && lastTaskStatus.getState() == TaskState.SUCCESS && snapshotTime != null && snapshotTime.isBefore(lastTaskStatus.getUpdatedTime())) { - return CompactionStatus.complete( + return CompactionStatus.skipped( "Segment timeline not updated since last compaction task succeeded" ); } // Skip intervals that have been filtered out by the policy - if (!searchPolicy.isEligibleForCompaction(candidate, CompactionStatus.pending(""), lastTaskStatus)) { + if (!searchPolicy.isEligibleForCompaction(candidate, lastTaskStatus)) { return CompactionStatus.skipped("Rejected by search policy"); } diff --git a/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java index 3e8726471b13..e52fec48d947 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java @@ -58,7 +58,6 @@ public int compareCandidates(CompactionCandidate candidateA, CompactionCandidate @Override public boolean isEligibleForCompaction( CompactionCandidate candidate, - CompactionStatus currentCompactionStatus, CompactionTaskStatus latestTaskStatus ) { diff --git a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java new file mode 100644 index 000000000000..17e29a6c81a5 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.compaction; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.common.config.Configs; + +import javax.annotation.Nullable; + +/** + * {@link CompactionCandidateSearchPolicy} which prioritizes compaction of the + * intervals with the largest number of small uncompacted segments. + *

+ * This policy favors cluster stability (by prioritizing reduction of segment + * count) over performance of queries on newer intervals. For the latter, use + * {@link NewestSegmentFirstPolicy}. + */ +public class MostFragmentedIntervalFirstPolicy implements CompactionCandidateSearchPolicy +{ + private static final long SIZE_2_GB = 2_000_000_000; + private static final long SIZE_10_MB = 10_000_000; + + private final int minUncompactedCount; + private final long minUncompactedBytes; + private final long maxUncompactedSize; + + @JsonCreator + public MostFragmentedIntervalFirstPolicy( + @JsonProperty("minUncompactedCount") @Nullable Integer minUncompactedCount, + @JsonProperty("minUncompactedBytes") @Nullable Long minUncompactedBytes, + @JsonProperty("maxUncompactedSize") @Nullable Long maxUncompactedSize + ) + { + this.minUncompactedCount = Configs.valueOrDefault(minUncompactedCount, 100); + this.minUncompactedBytes = Configs.valueOrDefault(minUncompactedBytes, SIZE_10_MB); + this.maxUncompactedSize = Configs.valueOrDefault(maxUncompactedSize, SIZE_2_GB); + } + + /** + * Minimum number of uncompacted segments that must be present in an interval + * to make it eligible for compaction. + */ + @JsonProperty + public int getMinUncompactedCount() + { + return minUncompactedCount; + } + + /** + * Minimum total bytes of uncompacted segments that must be present in an + * interval to make it eligible for compaction. Default value is {@link #SIZE_10_MB}. + */ + @JsonProperty + public long getMinUncompactedBytes() + { + return minUncompactedBytes; + } + + /** + * Maximum average size of uncompacted segments in an interval eligible for + * compaction. Default value is {@link #SIZE_2_GB}. + */ + @JsonProperty + public long getMaxUncompactedSize() + { + return maxUncompactedSize; + } + + @Override + public int compareCandidates(CompactionCandidate candidateA, CompactionCandidate candidateB) + { + return computePriority(candidateA) - computePriority(candidateB) > 0 + ? 1 : -1; + } + + @Override + public boolean isEligibleForCompaction( + CompactionCandidate candidate, + CompactionTaskStatus latestTaskStatus + ) + { + final CompactionStatistics uncompacted = candidate.getUncompactedStats(); + if (uncompacted == null) { + return true; + } else if (uncompacted.getNumSegments() < 1) { + return false; + } else { + return uncompacted.getNumSegments() >= minUncompactedCount + && uncompacted.getTotalBytes() >= minUncompactedBytes + && (uncompacted.getTotalBytes() / uncompacted.getNumSegments()) <= maxUncompactedSize; + } + } + + /** + * Computes the priority of the given compaction candidate by checking the + * total number and average size of uncompacted segments. + */ + private double computePriority(CompactionCandidate candidate) + { + final CompactionStatistics compacted = candidate.getCompactedStats(); + final CompactionStatistics uncompacted = candidate.getUncompactedStats(); + if (uncompacted == null || compacted == null) { + return 0; + } + + final long avgUncompactedSize = Math.max(1, uncompacted.getTotalBytes() / uncompacted.getNumSegments()); + + // Priority increases as size decreases and number increases + final double normalizingFactor = 1000f; + return (normalizingFactor * uncompacted.getNumSegments()) / avgUncompactedSize; + } +} From 8e450305899f675019411f851afd0991b14d0fa7 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 3 Dec 2025 12:55:32 +0530 Subject: [PATCH 2/5] Clean up the new policy --- .../MostFragmentedIntervalFirstPolicy.java | 45 ++++++++++--------- .../CompactionStatusTrackerTest.java | 2 +- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java index 17e29a6c81a5..db9b7b8042b1 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.common.config.Configs; +import org.apache.druid.java.util.common.HumanReadableBytes; import javax.annotation.Nullable; @@ -35,23 +36,24 @@ */ public class MostFragmentedIntervalFirstPolicy implements CompactionCandidateSearchPolicy { - private static final long SIZE_2_GB = 2_000_000_000; - private static final long SIZE_10_MB = 10_000_000; + private static final HumanReadableBytes SIZE_2_GB = new HumanReadableBytes("2GiB"); + private static final HumanReadableBytes SIZE_10_MB = new HumanReadableBytes("10MiB"); private final int minUncompactedCount; - private final long minUncompactedBytes; - private final long maxUncompactedSize; + private final HumanReadableBytes minUncompactedBytes; + private final HumanReadableBytes maxAverageUncompactedBytesPerSegment; @JsonCreator public MostFragmentedIntervalFirstPolicy( @JsonProperty("minUncompactedCount") @Nullable Integer minUncompactedCount, - @JsonProperty("minUncompactedBytes") @Nullable Long minUncompactedBytes, - @JsonProperty("maxUncompactedSize") @Nullable Long maxUncompactedSize + @JsonProperty("minUncompactedBytes") @Nullable HumanReadableBytes minUncompactedBytes, + @JsonProperty("maxAverageUncompactedBytesPerSegment") @Nullable HumanReadableBytes maxAverageUncompactedBytesPerSegment ) { this.minUncompactedCount = Configs.valueOrDefault(minUncompactedCount, 100); this.minUncompactedBytes = Configs.valueOrDefault(minUncompactedBytes, SIZE_10_MB); - this.maxUncompactedSize = Configs.valueOrDefault(maxUncompactedSize, SIZE_2_GB); + this.maxAverageUncompactedBytesPerSegment + = Configs.valueOrDefault(maxAverageUncompactedBytesPerSegment, SIZE_2_GB); } /** @@ -69,7 +71,7 @@ public int getMinUncompactedCount() * interval to make it eligible for compaction. Default value is {@link #SIZE_10_MB}. */ @JsonProperty - public long getMinUncompactedBytes() + public HumanReadableBytes getMinUncompactedBytes() { return minUncompactedBytes; } @@ -79,16 +81,17 @@ public long getMinUncompactedBytes() * compaction. Default value is {@link #SIZE_2_GB}. */ @JsonProperty - public long getMaxUncompactedSize() + public HumanReadableBytes getMaxAverageUncompactedBytesPerSegment() { - return maxUncompactedSize; + return maxAverageUncompactedBytesPerSegment; } @Override public int compareCandidates(CompactionCandidate candidateA, CompactionCandidate candidateB) { - return computePriority(candidateA) - computePriority(candidateB) > 0 - ? 1 : -1; + final double fragmentationDiff + = computeFragmentationIndex(candidateA) - computeFragmentationIndex(candidateB); + return fragmentationDiff > 0 ? 1 : -1; } @Override @@ -104,16 +107,19 @@ public boolean isEligibleForCompaction( return false; } else { return uncompacted.getNumSegments() >= minUncompactedCount - && uncompacted.getTotalBytes() >= minUncompactedBytes - && (uncompacted.getTotalBytes() / uncompacted.getNumSegments()) <= maxUncompactedSize; + && uncompacted.getTotalBytes() >= minUncompactedBytes.getBytes() + && (uncompacted.getTotalBytes() / uncompacted.getNumSegments()) + <= maxAverageUncompactedBytesPerSegment.getBytes(); } } /** - * Computes the priority of the given compaction candidate by checking the - * total number and average size of uncompacted segments. + * Computes the degree of fragmentation of the given compaction candidate by + * checking the total number and average size of uncompacted segments. + * A higher fragmentation index causes the candidate to be higher in priority + * for compaction. */ - private double computePriority(CompactionCandidate candidate) + private double computeFragmentationIndex(CompactionCandidate candidate) { final CompactionStatistics compacted = candidate.getCompactedStats(); final CompactionStatistics uncompacted = candidate.getUncompactedStats(); @@ -123,8 +129,7 @@ private double computePriority(CompactionCandidate candidate) final long avgUncompactedSize = Math.max(1, uncompacted.getTotalBytes() / uncompacted.getNumSegments()); - // Priority increases as size decreases and number increases - final double normalizingFactor = 1000f; - return (normalizingFactor * uncompacted.getNumSegments()) / avgUncompactedSize; + // Fragmentation index increases as segment count increases and avg size decreases + return (1f * uncompacted.getNumSegments()) / avgUncompactedSize; } } diff --git a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java index c0496b0705c2..1314a1a0bc79 100644 --- a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java +++ b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java @@ -120,7 +120,7 @@ public void testComputeCompactionStatusForSuccessfulTask() statusTracker.onTaskFinished("task1", TaskStatus.success("task1")); status = statusTracker.computeCompactionStatus(candidateSegments, policy); - Assert.assertEquals(CompactionStatus.State.COMPLETE, status.getState()); + Assert.assertEquals(CompactionStatus.State.SKIPPED, status.getState()); Assert.assertEquals( "Segment timeline not updated since last compaction task succeeded", status.getReason() From 8965e62b70497a2cddd8e73ed13a8b805d9001e6 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 3 Dec 2025 16:07:35 +0530 Subject: [PATCH 3/5] Fix check for input segment bytes --- .../compaction/BaseCandidateSearchPolicy.java | 4 +- .../CompactionCandidateSearchPolicy.java | 37 ++++++++++++++++++- .../server/compaction/CompactionStatus.java | 9 ++++- .../compaction/CompactionStatusTracker.java | 10 +++-- .../compaction/FixedIntervalOrderPolicy.java | 6 ++- .../MostFragmentedIntervalFirstPolicy.java | 29 +++++++++++---- 6 files changed, 77 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java index a0393ba573c7..0a68f6a51c99 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java @@ -64,12 +64,12 @@ public final int compareCandidates(CompactionCandidate o1, CompactionCandidate o } @Override - public boolean isEligibleForCompaction( + public Eligibility checkEligibilityForCompaction( CompactionCandidate candidate, CompactionTaskStatus latestTaskStatus ) { - return true; + return Eligibility.OK; } /** diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java index 28fd7f7939a0..4698b07c042e 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.coordinator.duty.CompactSegments; /** @@ -48,9 +49,43 @@ public interface CompactionCandidateSearchPolicy * Checks if the given {@link CompactionCandidate} is eligible for compaction * in the current iteration. A policy may implement this method to skip * compacting intervals or segments that do not fulfil some required criteria. + * + * @return {@link Eligibility#OK} only if eligible. */ - boolean isEligibleForCompaction( + Eligibility checkEligibilityForCompaction( CompactionCandidate candidate, CompactionTaskStatus latestTaskStatus ); + + /** + * Describes the eligibility of an interval for compaction. + */ + class Eligibility + { + public static final Eligibility OK = new Eligibility(true, null); + + private final boolean eligible; + private final String reason; + + private Eligibility(boolean eligible, String reason) + { + this.eligible = eligible; + this.reason = reason; + } + + public boolean isEligible() + { + return eligible; + } + + public String getReason() + { + return reason; + } + + public static Eligibility fail(String messageFormat, Object... args) + { + return new Eligibility(false, StringUtils.format(messageFormat, args)); + } + } } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java index 84841d94f8bc..cc52513b16c5 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java @@ -68,7 +68,6 @@ public enum State * The order of the checks must be honored while evaluating them. */ private static final List> CHECKS = Arrays.asList( - Evaluator::inputBytesAreWithinLimit, Evaluator::segmentsHaveBeenCompactedAtLeastOnce, Evaluator::partitionsSpecIsUpToDate, Evaluator::indexSpecIsUpToDate, @@ -321,7 +320,8 @@ static DimensionRangePartitionsSpec getEffectiveRangePartitionsSpec(DimensionRan } /** - * Evaluates {@link #CHECKS} to determine the compaction status. + * Evaluates {@link #CHECKS} to determine the compaction status of a + * {@link CompactionCandidate}. */ private static class Evaluator { @@ -346,6 +346,11 @@ private Evaluator( private CompactionStatus evaluate() { + final CompactionStatus inputBytesCheck = inputBytesAreWithinLimit(); + if (inputBytesCheck.isSkipped()) { + return inputBytesCheck; + } + final List reasonsForCompaction = CHECKS.stream() .map(f -> f.apply(this)) diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java index 73eda34725e3..1dc409e7361e 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java @@ -102,11 +102,13 @@ public CompactionStatus computeCompactionStatus( } // Skip intervals that have been filtered out by the policy - if (!searchPolicy.isEligibleForCompaction(candidate, lastTaskStatus)) { - return CompactionStatus.skipped("Rejected by search policy"); + final CompactionCandidateSearchPolicy.Eligibility eligibility + = searchPolicy.checkEligibilityForCompaction(candidate, lastTaskStatus); + if (eligibility.isEligible()) { + return CompactionStatus.pending("Not compacted yet"); + } else { + return CompactionStatus.skipped("Rejected by search policy: %s", eligibility.getReason()); } - - return CompactionStatus.pending("Not compacted yet"); } /** diff --git a/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java index e52fec48d947..24a2f001afe3 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java @@ -56,12 +56,14 @@ public int compareCandidates(CompactionCandidate candidateA, CompactionCandidate } @Override - public boolean isEligibleForCompaction( + public Eligibility checkEligibilityForCompaction( CompactionCandidate candidate, CompactionTaskStatus latestTaskStatus ) { - return findIndex(candidate) < Integer.MAX_VALUE; + return findIndex(candidate) < Integer.MAX_VALUE + ? Eligibility.OK + : Eligibility.fail("Datasource/Interval is not in the list of 'eligibleCandidates'"); } private int findIndex(CompactionCandidate candidate) diff --git a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java index db9b7b8042b1..1a87d6a700ad 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java @@ -95,21 +95,36 @@ public int compareCandidates(CompactionCandidate candidateA, CompactionCandidate } @Override - public boolean isEligibleForCompaction( + public Eligibility checkEligibilityForCompaction( CompactionCandidate candidate, CompactionTaskStatus latestTaskStatus ) { final CompactionStatistics uncompacted = candidate.getUncompactedStats(); if (uncompacted == null) { - return true; + return Eligibility.OK; } else if (uncompacted.getNumSegments() < 1) { - return false; + return Eligibility.fail("No uncompacted segments in interval"); + } else if (uncompacted.getNumSegments() < minUncompactedCount) { + return Eligibility.fail( + "Uncompacted segments[%,d] in interval must be at least [%,d].", + uncompacted.getNumSegments(), minUncompactedCount + ); + } else if (uncompacted.getTotalBytes() < minUncompactedBytes.getBytes()) { + return Eligibility.fail( + "Uncompacted bytes[%,d] in interval must be at least [%,d].", + minUncompactedBytes.getBytes(), uncompacted.getTotalBytes() + ); + } + + final long avgSegmentSize = (uncompacted.getTotalBytes() / uncompacted.getNumSegments()); + if (avgSegmentSize > maxAverageUncompactedBytesPerSegment.getBytes()) { + return Eligibility.fail( + "Average size[%,d] of uncompacted segments in interval must be at most [%,d].", + avgSegmentSize, maxAverageUncompactedBytesPerSegment.getBytes() + ); } else { - return uncompacted.getNumSegments() >= minUncompactedCount - && uncompacted.getTotalBytes() >= minUncompactedBytes.getBytes() - && (uncompacted.getTotalBytes() / uncompacted.getNumSegments()) - <= maxAverageUncompactedBytesPerSegment.getBytes(); + return Eligibility.OK; } } From 67f5c8931a9e69092087b945df64644a14880723 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 4 Dec 2025 17:06:55 +0530 Subject: [PATCH 4/5] Update the policy, add some tests --- .../CompactionCandidateSearchPolicy.java | 34 ++- .../MostFragmentedIntervalFirstPolicy.java | 62 ++++-- ...MostFragmentedIntervalFirstPolicyTest.java | 194 ++++++++++++++++++ 3 files changed, 272 insertions(+), 18 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java index 4698b07c042e..bfb69787dd84 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java @@ -24,6 +24,8 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.coordinator.duty.CompactSegments; +import java.util.Objects; + /** * Policy used by {@link CompactSegments} duty to pick segments for compaction. */ @@ -39,8 +41,8 @@ public interface CompactionCandidateSearchPolicy * Compares between two compaction candidates. Used to determine the * order in which segments and intervals should be picked for compaction. * - * @return A positive value if {@code candidateA} should be picked first, a - * negative value if {@code candidateB} should be picked first or zero if the + * @return A negative value if {@code candidateA} should be picked first, a + * positive value if {@code candidateB} should be picked first or zero if the * order does not matter. */ int compareCandidates(CompactionCandidate candidateA, CompactionCandidate candidateB); @@ -87,5 +89,33 @@ public static Eligibility fail(String messageFormat, Object... args) { return new Eligibility(false, StringUtils.format(messageFormat, args)); } + + @Override + public boolean equals(Object object) + { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + Eligibility that = (Eligibility) object; + return eligible == that.eligible && Objects.equals(reason, that.reason); + } + + @Override + public int hashCode() + { + return Objects.hash(eligible, reason); + } + + @Override + public String toString() + { + return "Eligibility{" + + "eligible=" + eligible + + ", reason='" + reason + '\'' + + '}'; + } } } diff --git a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java index 1a87d6a700ad..bc128671e218 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java @@ -22,19 +22,23 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.common.config.Configs; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.guice.annotations.UnstableApi; import org.apache.druid.java.util.common.HumanReadableBytes; import javax.annotation.Nullable; +import java.util.Comparator; /** - * {@link CompactionCandidateSearchPolicy} which prioritizes compaction of the - * intervals with the largest number of small uncompacted segments. + * Experimental {@link CompactionCandidateSearchPolicy} which prioritizes compaction + * of intervals with the largest number of small uncompacted segments. *

* This policy favors cluster stability (by prioritizing reduction of segment * count) over performance of queries on newer intervals. For the latter, use * {@link NewestSegmentFirstPolicy}. */ -public class MostFragmentedIntervalFirstPolicy implements CompactionCandidateSearchPolicy +@UnstableApi +public class MostFragmentedIntervalFirstPolicy extends BaseCandidateSearchPolicy { private static final HumanReadableBytes SIZE_2_GB = new HumanReadableBytes("2GiB"); private static final HumanReadableBytes SIZE_10_MB = new HumanReadableBytes("10MiB"); @@ -47,9 +51,24 @@ public class MostFragmentedIntervalFirstPolicy implements CompactionCandidateSea public MostFragmentedIntervalFirstPolicy( @JsonProperty("minUncompactedCount") @Nullable Integer minUncompactedCount, @JsonProperty("minUncompactedBytes") @Nullable HumanReadableBytes minUncompactedBytes, - @JsonProperty("maxAverageUncompactedBytesPerSegment") @Nullable HumanReadableBytes maxAverageUncompactedBytesPerSegment + @JsonProperty("maxAverageUncompactedBytesPerSegment") @Nullable + HumanReadableBytes maxAverageUncompactedBytesPerSegment, + @JsonProperty("priorityDatasource") @Nullable String priorityDatasource ) { + super(priorityDatasource); + + InvalidInput.conditionalException( + minUncompactedCount == null || minUncompactedCount > 0, + "'minUncompactedCount'[%s] must be greater than 0", + minUncompactedCount + ); + InvalidInput.conditionalException( + maxAverageUncompactedBytesPerSegment == null || maxAverageUncompactedBytesPerSegment.getBytes() > 0, + "'minUncompactedCount'[%s] must be greater than 0", + maxAverageUncompactedBytesPerSegment + ); + this.minUncompactedCount = Configs.valueOrDefault(minUncompactedCount, 100); this.minUncompactedBytes = Configs.valueOrDefault(minUncompactedBytes, SIZE_10_MB); this.maxAverageUncompactedBytesPerSegment @@ -87,10 +106,15 @@ public HumanReadableBytes getMaxAverageUncompactedBytesPerSegment() } @Override - public int compareCandidates(CompactionCandidate candidateA, CompactionCandidate candidateB) + protected Comparator getSegmentComparator() + { + return this::compare; + } + + private int compare(CompactionCandidate candidateA, CompactionCandidate candidateB) { final double fragmentationDiff - = computeFragmentationIndex(candidateA) - computeFragmentationIndex(candidateB); + = computeFragmentationIndex(candidateB) - computeFragmentationIndex(candidateA); return fragmentationDiff > 0 ? 1 : -1; } @@ -107,20 +131,20 @@ public Eligibility checkEligibilityForCompaction( return Eligibility.fail("No uncompacted segments in interval"); } else if (uncompacted.getNumSegments() < minUncompactedCount) { return Eligibility.fail( - "Uncompacted segments[%,d] in interval must be at least [%,d].", + "Uncompacted segments[%,d] in interval must be at least [%,d]", uncompacted.getNumSegments(), minUncompactedCount ); } else if (uncompacted.getTotalBytes() < minUncompactedBytes.getBytes()) { return Eligibility.fail( - "Uncompacted bytes[%,d] in interval must be at least [%,d].", - minUncompactedBytes.getBytes(), uncompacted.getTotalBytes() + "Uncompacted bytes[%,d] in interval must be at least [%,d]", + uncompacted.getTotalBytes(), minUncompactedBytes.getBytes() ); } final long avgSegmentSize = (uncompacted.getTotalBytes() / uncompacted.getNumSegments()); if (avgSegmentSize > maxAverageUncompactedBytesPerSegment.getBytes()) { return Eligibility.fail( - "Average size[%,d] of uncompacted segments in interval must be at most [%,d].", + "Average size[%,d] of uncompacted segments in interval must be at most [%,d]", avgSegmentSize, maxAverageUncompactedBytesPerSegment.getBytes() ); } else { @@ -129,22 +153,28 @@ public Eligibility checkEligibilityForCompaction( } /** - * Computes the degree of fragmentation of the given compaction candidate by - * checking the total number and average size of uncompacted segments. + * Computes the degree of fragmentation in the interval of the given compaction + * candidate. Calculated as the number of uncompacted segments plus an additional + * term that captures the "smallness" of segments in that interval. * A higher fragmentation index causes the candidate to be higher in priority * for compaction. */ private double computeFragmentationIndex(CompactionCandidate candidate) { - final CompactionStatistics compacted = candidate.getCompactedStats(); final CompactionStatistics uncompacted = candidate.getUncompactedStats(); - if (uncompacted == null || compacted == null) { + if (uncompacted == null || uncompacted.getNumSegments() < 1 || uncompacted.getTotalBytes() < 1) { return 0; } final long avgUncompactedSize = Math.max(1, uncompacted.getTotalBytes() / uncompacted.getNumSegments()); - // Fragmentation index increases as segment count increases and avg size decreases - return (1f * uncompacted.getNumSegments()) / avgUncompactedSize; + // Fragmentation index increases as uncompacted segment count increases + double segmentCountTerm = uncompacted.getNumSegments(); + + // Fragmentation index increases as avg uncompacted segment size decreases + double segmentSizeTerm = + (1.0f * minUncompactedCount * maxAverageUncompactedBytesPerSegment.getBytes()) / avgUncompactedSize; + + return segmentCountTerm + segmentSizeTerm; } } diff --git a/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java new file mode 100644 index 000000000000..346106dc8600 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.compaction; + +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.segment.TestDataSource; +import org.apache.druid.server.coordinator.CreateDataSegments; +import org.apache.druid.timeline.DataSegment; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.util.List; + +public class MostFragmentedIntervalFirstPolicyTest +{ + private static final DataSegment SEGMENT = + CreateDataSegments.ofDatasource(TestDataSource.WIKI).eachOfSizeInMb(100).get(0); + + @Test + public void test_thresholdValues_ofDefaultPolicy() + { + final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy(null, null, null, null); + Assertions.assertEquals(100, policy.getMinUncompactedCount()); + Assertions.assertEquals(new HumanReadableBytes("10MiB"), policy.getMinUncompactedBytes()); + Assertions.assertEquals(new HumanReadableBytes("2GiB"), policy.getMaxAverageUncompactedBytesPerSegment()); + Assertions.assertNull(policy.getPriorityDatasource()); + } + + @Test + public void test_checkEligibilityForCompaction_fails_ifUncompactedCountLessThanCutoff() + { + final int minUncompactedCount = 10_000; + final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( + minUncompactedCount, + HumanReadableBytes.valueOf(1), + HumanReadableBytes.valueOf(10_000), + null + ); + + Assertions.assertEquals( + CompactionCandidateSearchPolicy.Eligibility.fail( + "Uncompacted segments[1] in interval must be at least [10,000]" + ), + policy.checkEligibilityForCompaction(createCandidate(1, 100L), null) + ); + Assertions.assertEquals( + CompactionCandidateSearchPolicy.Eligibility.OK, + policy.checkEligibilityForCompaction(createCandidate(10_001, 100L), null) + ); + } + + @Test + public void test_checkEligibilityForCompaction_fails_ifUncompactedBytesLessThanCutoff() + { + final HumanReadableBytes minUncompactedBytes = HumanReadableBytes.valueOf(10_000); + final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( + 1, + minUncompactedBytes, + HumanReadableBytes.valueOf(10_000), + null + ); + + Assertions.assertEquals( + CompactionCandidateSearchPolicy.Eligibility.fail( + "Uncompacted bytes[100] in interval must be at least [10,000]" + ), + policy.checkEligibilityForCompaction(createCandidate(1, 100L), null) + ); + Assertions.assertEquals( + CompactionCandidateSearchPolicy.Eligibility.OK, + policy.checkEligibilityForCompaction(createCandidate(100, 10_000L), null) + ); + } + + @Test + public void test_checkEligibilityForCompaction_fails_ifAvgSegmentSizeGreaterThanCutoff() + { + final HumanReadableBytes maxAvgSegmentSize = HumanReadableBytes.valueOf(100); + final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( + 1, + HumanReadableBytes.valueOf(100), + maxAvgSegmentSize, + null + ); + + Assertions.assertEquals( + CompactionCandidateSearchPolicy.Eligibility.fail( + "Average size[10,000] of uncompacted segments in interval must be at most [100]" + ), + policy.checkEligibilityForCompaction(createCandidate(1, 10_000L), null) + ); + Assertions.assertEquals( + CompactionCandidateSearchPolicy.Eligibility.OK, + policy.checkEligibilityForCompaction(createCandidate(1, 100L), null) + ); + } + + @Test + public void test_policy_favorsIntervalWithMoreUncompactedSegments_ifTotalBytesIsEqual() + { + final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( + 1, + HumanReadableBytes.valueOf(1), + HumanReadableBytes.valueOf(10_000), + null + ); + + final CompactionCandidate candidateA = createCandidate(1, 1000L); + final CompactionCandidate candidateB = createCandidate(2, 500L); + + verifyCandidateIsEligible(candidateA, policy); + verifyCandidateIsEligible(candidateB, policy); + + Assertions.assertTrue(policy.compareCandidates(candidateA, candidateB) > 0); + Assertions.assertTrue(policy.compareCandidates(candidateB, candidateA) < 0); + } + + @Test + public void test_policy_favorsIntervalWithMoreUncompactedSegments_ifAverageSizeIsEqual() + { + final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( + 1, + HumanReadableBytes.valueOf(1), + HumanReadableBytes.valueOf(10_000), + null + ); + + final CompactionCandidate candidateA = createCandidate(1, 1000L); + final CompactionCandidate candidateB = createCandidate(2, 1000L); + + verifyCandidateIsEligible(candidateA, policy); + verifyCandidateIsEligible(candidateB, policy); + + Assertions.assertTrue(policy.compareCandidates(candidateA, candidateB) > 0); + Assertions.assertTrue(policy.compareCandidates(candidateB, candidateA) < 0); + } + + @Test + public void test_policy_favorsIntervalWithSmallerSegments_ifCountIsEqual() + { + final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( + 1, + HumanReadableBytes.valueOf(1), + HumanReadableBytes.valueOf(10_000), + null + ); + + final CompactionCandidate candidateA = createCandidate(10, 500L); + final CompactionCandidate candidateB = createCandidate(10, 1000L); + + verifyCandidateIsEligible(candidateA, policy); + verifyCandidateIsEligible(candidateB, policy); + + Assertions.assertTrue(policy.compareCandidates(candidateA, candidateB) < 0); + Assertions.assertTrue(policy.compareCandidates(candidateB, candidateA) > 0); + } + + private CompactionCandidate createCandidate(int numSegments, long avgSizeBytes) + { + final CompactionStatistics dummyCompactedStats = CompactionStatistics.create(1L, 1L, 1L); + final CompactionStatistics uncompactedStats = CompactionStatistics.create( + avgSizeBytes * numSegments, + numSegments, + 1L + ); + return CompactionCandidate.from(List.of(SEGMENT), null) + .withCurrentStatus(CompactionStatus.pending(dummyCompactedStats, uncompactedStats, "")); + } + + private void verifyCandidateIsEligible(CompactionCandidate candidate, MostFragmentedIntervalFirstPolicy policy) + { + Assertions.assertEquals( + CompactionCandidateSearchPolicy.Eligibility.OK, + policy.checkEligibilityForCompaction(candidate, null) + ); + } +} From 760c5df933feb8549fbd103ac0da131fc91c0107 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 4 Dec 2025 21:28:33 +0530 Subject: [PATCH 5/5] Add more tests to verify formula --- .../MostFragmentedIntervalFirstPolicy.java | 2 +- ...MostFragmentedIntervalFirstPolicyTest.java | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java index bc128671e218..38e534c8273f 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java @@ -115,7 +115,7 @@ private int compare(CompactionCandidate candidateA, CompactionCandidate candidat { final double fragmentationDiff = computeFragmentationIndex(candidateB) - computeFragmentationIndex(candidateA); - return fragmentationDiff > 0 ? 1 : -1; + return (int) fragmentationDiff; } @Override diff --git a/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java index 346106dc8600..594fe91020b9 100644 --- a/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java @@ -172,6 +172,26 @@ public void test_policy_favorsIntervalWithSmallerSegments_ifCountIsEqual() Assertions.assertTrue(policy.compareCandidates(candidateB, candidateA) > 0); } + @Test + public void test_compareCandidates_returnsZeroIfSegmentCountAndAvgSizeScaleEquivalently() + { + final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( + 100, + HumanReadableBytes.valueOf(1), + HumanReadableBytes.valueOf(100), + null + ); + + final CompactionCandidate candidateA = createCandidate(100, 25); + final CompactionCandidate candidateB = createCandidate(400, 100); + + verifyCandidateIsEligible(candidateA, policy); + verifyCandidateIsEligible(candidateB, policy); + + Assertions.assertEquals(0, policy.compareCandidates(candidateA, candidateB)); + Assertions.assertEquals(0, policy.compareCandidates(candidateB, candidateA)); + } + private CompactionCandidate createCandidate(int numSegments, long avgSizeBytes) { final CompactionStatistics dummyCompactedStats = CompactionStatistics.create(1L, 1L, 1L);