diff --git a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java index 7d7d117f08fc..0a68f6a51c99 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java @@ -64,13 +64,12 @@ public final int compareCandidates(CompactionCandidate o1, CompactionCandidate o } @Override - public boolean isEligibleForCompaction( + public Eligibility checkEligibilityForCompaction( CompactionCandidate candidate, - CompactionStatus currentCompactionStatus, CompactionTaskStatus latestTaskStatus ) { - return true; + return Eligibility.OK; } /** diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java index f936f3d49a91..af8b32ebe6db 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java @@ -137,6 +137,20 @@ public CompactionStatistics getStats() return CompactionStatistics.create(totalBytes, numSegments(), numIntervals); } + @Nullable + public CompactionStatistics getCompactedStats() + { + return (currentStatus == null || currentStatus.getCompactedStats() == null) + ? null : currentStatus.getCompactedStats(); + } + + @Nullable + public CompactionStatistics getUncompactedStats() + { + return (currentStatus == null || currentStatus.getUncompactedStats() == null) + ? null : currentStatus.getUncompactedStats(); + } + /** * Current compaction status of the time chunk corresponding to this candidate. */ diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java index cc99e03bf214..bfb69787dd84 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java @@ -21,15 +21,19 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.coordinator.duty.CompactSegments; +import java.util.Objects; + /** * Policy used by {@link CompactSegments} duty to pick segments for compaction. */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { @JsonSubTypes.Type(name = "newestSegmentFirst", value = NewestSegmentFirstPolicy.class), - @JsonSubTypes.Type(name = "fixedIntervalOrder", value = FixedIntervalOrderPolicy.class) + @JsonSubTypes.Type(name = "fixedIntervalOrder", value = FixedIntervalOrderPolicy.class), + @JsonSubTypes.Type(name = "mostFragmentedFirst", value = MostFragmentedIntervalFirstPolicy.class) }) public interface CompactionCandidateSearchPolicy { @@ -37,8 +41,8 @@ public interface CompactionCandidateSearchPolicy * Compares between two compaction candidates. Used to determine the * order in which segments and intervals should be picked for compaction. * - * @return A positive value if {@code candidateA} should be picked first, a - * negative value if {@code candidateB} should be picked first or zero if the + * @return A negative value if {@code candidateA} should be picked first, a + * positive value if {@code candidateB} should be picked first or zero if the * order does not matter. */ int compareCandidates(CompactionCandidate candidateA, CompactionCandidate candidateB); @@ -47,10 +51,71 @@ public interface CompactionCandidateSearchPolicy * Checks if the given {@link CompactionCandidate} is eligible for compaction * in the current iteration. A policy may implement this method to skip * compacting intervals or segments that do not fulfil some required criteria. + * + * @return {@link Eligibility#OK} only if eligible. */ - boolean isEligibleForCompaction( + Eligibility checkEligibilityForCompaction( CompactionCandidate candidate, - CompactionStatus currentCompactionStatus, CompactionTaskStatus latestTaskStatus ); + + /** + * Describes the eligibility of an interval for compaction. + */ + class Eligibility + { + public static final Eligibility OK = new Eligibility(true, null); + + private final boolean eligible; + private final String reason; + + private Eligibility(boolean eligible, String reason) + { + this.eligible = eligible; + this.reason = reason; + } + + public boolean isEligible() + { + return eligible; + } + + public String getReason() + { + return reason; + } + + public static Eligibility fail(String messageFormat, Object... args) + { + return new Eligibility(false, StringUtils.format(messageFormat, args)); + } + + @Override + public boolean equals(Object object) + { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + Eligibility that = (Eligibility) object; + return eligible == that.eligible && Objects.equals(reason, that.reason); + } + + @Override + public int hashCode() + { + return Objects.hash(eligible, reason); + } + + @Override + public String toString() + { + return "Eligibility{" + + "eligible=" + eligible + + ", reason='" + reason + '\'' + + '}'; + } + } } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java index d7e51655861b..7d43a09aed87 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java @@ -65,4 +65,14 @@ public void decrement(CompactionStatistics other) numIntervals -= other.getNumIntervals(); numSegments -= other.getNumSegments(); } + + @Override + public String toString() + { + return "CompactionStatistics{" + + "totalBytes=" + totalBytes + + ", numSegments=" + numSegments + + ", numIntervals=" + numIntervals + + '}'; + } } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java index be4acd00e21c..cc52513b16c5 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java @@ -36,11 +36,17 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; import org.apache.druid.timeline.CompactionState; +import org.apache.druid.timeline.DataSegment; import org.apache.druid.utils.CollectionUtils; +import org.joda.time.Interval; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -49,7 +55,7 @@ */ public class CompactionStatus { - private static final CompactionStatus COMPLETE = new CompactionStatus(State.COMPLETE, null); + private static final CompactionStatus COMPLETE = new CompactionStatus(State.COMPLETE, null, null, null); public enum State { @@ -62,9 +68,7 @@ public enum State * The order of the checks must be honored while evaluating them. */ private static final List> CHECKS = Arrays.asList( - Evaluator::inputBytesAreWithinLimit, Evaluator::segmentsHaveBeenCompactedAtLeastOnce, - Evaluator::allCandidatesHaveSameCompactionState, Evaluator::partitionsSpecIsUpToDate, Evaluator::indexSpecIsUpToDate, Evaluator::segmentGranularityIsUpToDate, @@ -78,11 +82,20 @@ public enum State private final State state; private final String reason; - - private CompactionStatus(State state, String reason) + private final CompactionStatistics compactedStats; + private final CompactionStatistics uncompactedStats; + + private CompactionStatus( + State state, + String reason, + CompactionStatistics compactedStats, + CompactionStatistics uncompactedStats + ) { this.state = state; this.reason = reason; + this.compactedStats = compactedStats; + this.uncompactedStats = uncompactedStats; } public boolean isComplete() @@ -105,18 +118,45 @@ public State getState() return state; } + public CompactionStatistics getCompactedStats() + { + return compactedStats; + } + + public CompactionStatistics getUncompactedStats() + { + return uncompactedStats; + } + @Override public String toString() { return "CompactionStatus{" + "state=" + state + ", reason=" + reason + + ", compactedStats=" + compactedStats + + ", uncompactedStats=" + uncompactedStats + '}'; } public static CompactionStatus pending(String reasonFormat, Object... args) { - return new CompactionStatus(State.PENDING, StringUtils.format(reasonFormat, args)); + return new CompactionStatus(State.PENDING, StringUtils.format(reasonFormat, args), null, null); + } + + public static CompactionStatus pending( + CompactionStatistics compactedStats, + CompactionStatistics uncompactedStats, + String reasonFormat, + Object... args + ) + { + return new CompactionStatus( + State.PENDING, + StringUtils.format(reasonFormat, args), + compactedStats, + uncompactedStats + ); } /** @@ -193,34 +233,26 @@ private static String asString(PartitionsSpec partitionsSpec) public static CompactionStatus skipped(String reasonFormat, Object... args) { - return new CompactionStatus(State.SKIPPED, StringUtils.format(reasonFormat, args)); + return new CompactionStatus(State.SKIPPED, StringUtils.format(reasonFormat, args), null, null); } public static CompactionStatus running(String message) { - return new CompactionStatus(State.RUNNING, message); - } - - public static CompactionStatus complete(String message) - { - return new CompactionStatus(State.COMPLETE, message); + return new CompactionStatus(State.RUNNING, message, null, null); } /** * Determines the CompactionStatus of the given candidate segments by evaluating * the {@link #CHECKS} one by one. If any check returns an incomplete status, - * further checks are not performed and the incomplete status is returned. + * further checks are still performed to determine the number of uncompacted + * segments but only the first incomplete status is returned. */ static CompactionStatus compute( CompactionCandidate candidateSegments, DataSourceCompactionConfig config ) { - final Evaluator evaluator = new Evaluator(candidateSegments, config); - return CHECKS.stream() - .map(f -> f.apply(evaluator)) - .filter(status -> !status.isComplete()) - .findFirst().orElse(COMPLETE); + return new Evaluator(candidateSegments, config).evaluate(); } @Nullable @@ -288,58 +320,127 @@ static DimensionRangePartitionsSpec getEffectiveRangePartitionsSpec(DimensionRan } /** - * Evaluates {@link #CHECKS} to determine the compaction status. + * Evaluates {@link #CHECKS} to determine the compaction status of a + * {@link CompactionCandidate}. */ private static class Evaluator { private final DataSourceCompactionConfig compactionConfig; private final CompactionCandidate candidateSegments; - private final CompactionState lastCompactionState; private final ClientCompactionTaskQueryTuningConfig tuningConfig; - private final UserCompactionTaskGranularityConfig existingGranularitySpec; private final UserCompactionTaskGranularityConfig configuredGranularitySpec; + private final List uncompactedSegments = new ArrayList<>(); + private final Map> unknownStateToSegments = new HashMap<>(); + private Evaluator( CompactionCandidate candidateSegments, DataSourceCompactionConfig compactionConfig ) { this.candidateSegments = candidateSegments; - this.lastCompactionState = candidateSegments.getSegments().get(0).getLastCompactionState(); this.compactionConfig = compactionConfig; this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(compactionConfig); this.configuredGranularitySpec = compactionConfig.getGranularitySpec(); - if (lastCompactionState == null) { - this.existingGranularitySpec = null; + } + + private CompactionStatus evaluate() + { + final CompactionStatus inputBytesCheck = inputBytesAreWithinLimit(); + if (inputBytesCheck.isSkipped()) { + return inputBytesCheck; + } + + final List reasonsForCompaction = + CHECKS.stream() + .map(f -> f.apply(this)) + .filter(status -> !status.isComplete()) + .map(CompactionStatus::getReason) + .collect(Collectors.toList()); + + // Consider segments which have passed all checks to be compacted + final List compactedSegments = unknownStateToSegments + .values() + .stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + + if (reasonsForCompaction.isEmpty()) { + return COMPLETE; } else { - this.existingGranularitySpec = UserCompactionTaskGranularityConfig.from( - lastCompactionState.getGranularitySpec() + return CompactionStatus.pending( + createStats(compactedSegments), + createStats(uncompactedSegments), + reasonsForCompaction.get(0) ); } } private CompactionStatus segmentsHaveBeenCompactedAtLeastOnce() { - if (lastCompactionState == null) { - return CompactionStatus.pending("not compacted yet"); - } else { - return COMPLETE; + // Identify the compaction states of all the segments + for (DataSegment segment : candidateSegments.getSegments()) { + final CompactionState segmentState = segment.getLastCompactionState(); + if (segmentState == null) { + uncompactedSegments.add(segment); + } else { + unknownStateToSegments.computeIfAbsent(segmentState, s -> new ArrayList<>()).add(segment); + } } - } - private CompactionStatus allCandidatesHaveSameCompactionState() - { - final boolean allHaveSameCompactionState = candidateSegments.getSegments().stream().allMatch( - segment -> lastCompactionState.equals(segment.getLastCompactionState()) - ); - if (allHaveSameCompactionState) { + if (uncompactedSegments.isEmpty()) { return COMPLETE; } else { - return CompactionStatus.pending("segments have different last compaction states"); + return CompactionStatus.pending("not compacted yet"); } } private CompactionStatus partitionsSpecIsUpToDate() + { + return evaluateForAllCompactionStates(this::partitionsSpecIsUpToDate); + } + + private CompactionStatus indexSpecIsUpToDate() + { + return evaluateForAllCompactionStates(this::indexSpecIsUpToDate); + } + + private CompactionStatus projectionsAreUpToDate() + { + return evaluateForAllCompactionStates(this::projectionsAreUpToDate); + } + + private CompactionStatus segmentGranularityIsUpToDate() + { + return evaluateForAllCompactionStates(this::segmentGranularityIsUpToDate); + } + + private CompactionStatus rollupIsUpToDate() + { + return evaluateForAllCompactionStates(this::rollupIsUpToDate); + } + + private CompactionStatus queryGranularityIsUpToDate() + { + return evaluateForAllCompactionStates(this::queryGranularityIsUpToDate); + } + + private CompactionStatus dimensionsSpecIsUpToDate() + { + return evaluateForAllCompactionStates(this::dimensionsSpecIsUpToDate); + } + + private CompactionStatus metricsSpecIsUpToDate() + { + return evaluateForAllCompactionStates(this::metricsSpecIsUpToDate); + } + + private CompactionStatus transformSpecFilterIsUpToDate() + { + return evaluateForAllCompactionStates(this::transformSpecFilterIsUpToDate); + } + + private CompactionStatus partitionsSpecIsUpToDate(CompactionState lastCompactionState) { PartitionsSpec existingPartionsSpec = lastCompactionState.getPartitionsSpec(); if (existingPartionsSpec instanceof DimensionRangePartitionsSpec) { @@ -357,7 +458,7 @@ private CompactionStatus partitionsSpecIsUpToDate() ); } - private CompactionStatus indexSpecIsUpToDate() + private CompactionStatus indexSpecIsUpToDate(CompactionState lastCompactionState) { return CompactionStatus.completeIfNullOrEqual( "indexSpec", @@ -367,7 +468,7 @@ private CompactionStatus indexSpecIsUpToDate() ); } - private CompactionStatus projectionsAreUpToDate() + private CompactionStatus projectionsAreUpToDate(CompactionState lastCompactionState) { return CompactionStatus.completeIfNullOrEqual( "projections", @@ -390,7 +491,7 @@ private CompactionStatus inputBytesAreWithinLimit() } } - private CompactionStatus segmentGranularityIsUpToDate() + private CompactionStatus segmentGranularityIsUpToDate(CompactionState lastCompactionState) { if (configuredGranularitySpec == null || configuredGranularitySpec.getSegmentGranularity() == null) { @@ -398,6 +499,7 @@ private CompactionStatus segmentGranularityIsUpToDate() } final Granularity configuredSegmentGranularity = configuredGranularitySpec.getSegmentGranularity(); + final UserCompactionTaskGranularityConfig existingGranularitySpec = getGranularitySpec(lastCompactionState); final Granularity existingSegmentGranularity = existingGranularitySpec == null ? null : existingGranularitySpec.getSegmentGranularity(); @@ -406,7 +508,8 @@ private CompactionStatus segmentGranularityIsUpToDate() } else if (existingSegmentGranularity == null) { // Candidate segments were compacted without segment granularity specified // Check if the segments already have the desired segment granularity - boolean needsCompaction = candidateSegments.getSegments().stream().anyMatch( + final List segmentsForState = unknownStateToSegments.get(lastCompactionState); + boolean needsCompaction = segmentsForState.stream().anyMatch( segment -> !configuredSegmentGranularity.isAligned(segment.getInterval()) ); if (needsCompaction) { @@ -427,11 +530,13 @@ private CompactionStatus segmentGranularityIsUpToDate() return COMPLETE; } - private CompactionStatus rollupIsUpToDate() + private CompactionStatus rollupIsUpToDate(CompactionState lastCompactionState) { if (configuredGranularitySpec == null) { return COMPLETE; } else { + final UserCompactionTaskGranularityConfig existingGranularitySpec + = getGranularitySpec(lastCompactionState); return CompactionStatus.completeIfNullOrEqual( "rollup", configuredGranularitySpec.isRollup(), @@ -441,11 +546,13 @@ private CompactionStatus rollupIsUpToDate() } } - private CompactionStatus queryGranularityIsUpToDate() + private CompactionStatus queryGranularityIsUpToDate(CompactionState lastCompactionState) { if (configuredGranularitySpec == null) { return COMPLETE; } else { + final UserCompactionTaskGranularityConfig existingGranularitySpec + = getGranularitySpec(lastCompactionState); return CompactionStatus.completeIfNullOrEqual( "queryGranularity", configuredGranularitySpec.getQueryGranularity(), @@ -460,7 +567,7 @@ private CompactionStatus queryGranularityIsUpToDate() * which can create a mismatch between expected and actual order of dimensions. Partition dimensions are separately * covered in {@link Evaluator#partitionsSpecIsUpToDate()} check. */ - private CompactionStatus dimensionsSpecIsUpToDate() + private CompactionStatus dimensionsSpecIsUpToDate(CompactionState lastCompactionState) { if (compactionConfig.getDimensionsSpec() == null) { return COMPLETE; @@ -488,7 +595,7 @@ private CompactionStatus dimensionsSpecIsUpToDate() } } - private CompactionStatus metricsSpecIsUpToDate() + private CompactionStatus metricsSpecIsUpToDate(CompactionState lastCompactionState) { final AggregatorFactory[] configuredMetricsSpec = compactionConfig.getMetricsSpec(); if (ArrayUtils.isEmpty(configuredMetricsSpec)) { @@ -512,7 +619,7 @@ private CompactionStatus metricsSpecIsUpToDate() } } - private CompactionStatus transformSpecFilterIsUpToDate() + private CompactionStatus transformSpecFilterIsUpToDate(CompactionState lastCompactionState) { if (compactionConfig.getTransformSpec() == null) { return COMPLETE; @@ -526,5 +633,45 @@ private CompactionStatus transformSpecFilterIsUpToDate() String::valueOf ); } + + /** + * Evaluates the given check for each entry in the {@link #unknownStateToSegments}. + * If any entry fails the given check by returning a status which is not + * COMPLETE, all the segments with that state are moved to {@link #uncompactedSegments}. + * + * @return The first status which is not COMPLETE. + */ + private CompactionStatus evaluateForAllCompactionStates( + Function check + ) + { + CompactionStatus firstIncompleteStatus = null; + for (CompactionState state : List.copyOf(unknownStateToSegments.keySet())) { + final CompactionStatus status = check.apply(state); + if (!status.isComplete()) { + uncompactedSegments.addAll(unknownStateToSegments.remove(state)); + if (firstIncompleteStatus == null) { + firstIncompleteStatus = status; + } + } + } + + return firstIncompleteStatus == null ? COMPLETE : firstIncompleteStatus; + } + + private static UserCompactionTaskGranularityConfig getGranularitySpec( + CompactionState compactionState + ) + { + return UserCompactionTaskGranularityConfig.from(compactionState.getGranularitySpec()); + } + + private static CompactionStatistics createStats(List segments) + { + final Set segmentIntervals = + segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()); + final long totalBytes = segments.stream().mapToLong(DataSegment::getSize).sum(); + return CompactionStatistics.create(totalBytes, segments.size(), segmentIntervals.size()); + } } } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java index 401f413e7fd8..1dc409e7361e 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java @@ -96,17 +96,19 @@ public CompactionStatus computeCompactionStatus( if (lastTaskStatus != null && lastTaskStatus.getState() == TaskState.SUCCESS && snapshotTime != null && snapshotTime.isBefore(lastTaskStatus.getUpdatedTime())) { - return CompactionStatus.complete( + return CompactionStatus.skipped( "Segment timeline not updated since last compaction task succeeded" ); } // Skip intervals that have been filtered out by the policy - if (!searchPolicy.isEligibleForCompaction(candidate, CompactionStatus.pending(""), lastTaskStatus)) { - return CompactionStatus.skipped("Rejected by search policy"); + final CompactionCandidateSearchPolicy.Eligibility eligibility + = searchPolicy.checkEligibilityForCompaction(candidate, lastTaskStatus); + if (eligibility.isEligible()) { + return CompactionStatus.pending("Not compacted yet"); + } else { + return CompactionStatus.skipped("Rejected by search policy: %s", eligibility.getReason()); } - - return CompactionStatus.pending("Not compacted yet"); } /** diff --git a/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java index 3e8726471b13..24a2f001afe3 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java @@ -56,13 +56,14 @@ public int compareCandidates(CompactionCandidate candidateA, CompactionCandidate } @Override - public boolean isEligibleForCompaction( + public Eligibility checkEligibilityForCompaction( CompactionCandidate candidate, - CompactionStatus currentCompactionStatus, CompactionTaskStatus latestTaskStatus ) { - return findIndex(candidate) < Integer.MAX_VALUE; + return findIndex(candidate) < Integer.MAX_VALUE + ? Eligibility.OK + : Eligibility.fail("Datasource/Interval is not in the list of 'eligibleCandidates'"); } private int findIndex(CompactionCandidate candidate) diff --git a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java new file mode 100644 index 000000000000..38e534c8273f --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.compaction; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.common.config.Configs; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.guice.annotations.UnstableApi; +import org.apache.druid.java.util.common.HumanReadableBytes; + +import javax.annotation.Nullable; +import java.util.Comparator; + +/** + * Experimental {@link CompactionCandidateSearchPolicy} which prioritizes compaction + * of intervals with the largest number of small uncompacted segments. + *

+ * This policy favors cluster stability (by prioritizing reduction of segment + * count) over performance of queries on newer intervals. For the latter, use + * {@link NewestSegmentFirstPolicy}. + */ +@UnstableApi +public class MostFragmentedIntervalFirstPolicy extends BaseCandidateSearchPolicy +{ + private static final HumanReadableBytes SIZE_2_GB = new HumanReadableBytes("2GiB"); + private static final HumanReadableBytes SIZE_10_MB = new HumanReadableBytes("10MiB"); + + private final int minUncompactedCount; + private final HumanReadableBytes minUncompactedBytes; + private final HumanReadableBytes maxAverageUncompactedBytesPerSegment; + + @JsonCreator + public MostFragmentedIntervalFirstPolicy( + @JsonProperty("minUncompactedCount") @Nullable Integer minUncompactedCount, + @JsonProperty("minUncompactedBytes") @Nullable HumanReadableBytes minUncompactedBytes, + @JsonProperty("maxAverageUncompactedBytesPerSegment") @Nullable + HumanReadableBytes maxAverageUncompactedBytesPerSegment, + @JsonProperty("priorityDatasource") @Nullable String priorityDatasource + ) + { + super(priorityDatasource); + + InvalidInput.conditionalException( + minUncompactedCount == null || minUncompactedCount > 0, + "'minUncompactedCount'[%s] must be greater than 0", + minUncompactedCount + ); + InvalidInput.conditionalException( + maxAverageUncompactedBytesPerSegment == null || maxAverageUncompactedBytesPerSegment.getBytes() > 0, + "'minUncompactedCount'[%s] must be greater than 0", + maxAverageUncompactedBytesPerSegment + ); + + this.minUncompactedCount = Configs.valueOrDefault(minUncompactedCount, 100); + this.minUncompactedBytes = Configs.valueOrDefault(minUncompactedBytes, SIZE_10_MB); + this.maxAverageUncompactedBytesPerSegment + = Configs.valueOrDefault(maxAverageUncompactedBytesPerSegment, SIZE_2_GB); + } + + /** + * Minimum number of uncompacted segments that must be present in an interval + * to make it eligible for compaction. + */ + @JsonProperty + public int getMinUncompactedCount() + { + return minUncompactedCount; + } + + /** + * Minimum total bytes of uncompacted segments that must be present in an + * interval to make it eligible for compaction. Default value is {@link #SIZE_10_MB}. + */ + @JsonProperty + public HumanReadableBytes getMinUncompactedBytes() + { + return minUncompactedBytes; + } + + /** + * Maximum average size of uncompacted segments in an interval eligible for + * compaction. Default value is {@link #SIZE_2_GB}. + */ + @JsonProperty + public HumanReadableBytes getMaxAverageUncompactedBytesPerSegment() + { + return maxAverageUncompactedBytesPerSegment; + } + + @Override + protected Comparator getSegmentComparator() + { + return this::compare; + } + + private int compare(CompactionCandidate candidateA, CompactionCandidate candidateB) + { + final double fragmentationDiff + = computeFragmentationIndex(candidateB) - computeFragmentationIndex(candidateA); + return (int) fragmentationDiff; + } + + @Override + public Eligibility checkEligibilityForCompaction( + CompactionCandidate candidate, + CompactionTaskStatus latestTaskStatus + ) + { + final CompactionStatistics uncompacted = candidate.getUncompactedStats(); + if (uncompacted == null) { + return Eligibility.OK; + } else if (uncompacted.getNumSegments() < 1) { + return Eligibility.fail("No uncompacted segments in interval"); + } else if (uncompacted.getNumSegments() < minUncompactedCount) { + return Eligibility.fail( + "Uncompacted segments[%,d] in interval must be at least [%,d]", + uncompacted.getNumSegments(), minUncompactedCount + ); + } else if (uncompacted.getTotalBytes() < minUncompactedBytes.getBytes()) { + return Eligibility.fail( + "Uncompacted bytes[%,d] in interval must be at least [%,d]", + uncompacted.getTotalBytes(), minUncompactedBytes.getBytes() + ); + } + + final long avgSegmentSize = (uncompacted.getTotalBytes() / uncompacted.getNumSegments()); + if (avgSegmentSize > maxAverageUncompactedBytesPerSegment.getBytes()) { + return Eligibility.fail( + "Average size[%,d] of uncompacted segments in interval must be at most [%,d]", + avgSegmentSize, maxAverageUncompactedBytesPerSegment.getBytes() + ); + } else { + return Eligibility.OK; + } + } + + /** + * Computes the degree of fragmentation in the interval of the given compaction + * candidate. Calculated as the number of uncompacted segments plus an additional + * term that captures the "smallness" of segments in that interval. + * A higher fragmentation index causes the candidate to be higher in priority + * for compaction. + */ + private double computeFragmentationIndex(CompactionCandidate candidate) + { + final CompactionStatistics uncompacted = candidate.getUncompactedStats(); + if (uncompacted == null || uncompacted.getNumSegments() < 1 || uncompacted.getTotalBytes() < 1) { + return 0; + } + + final long avgUncompactedSize = Math.max(1, uncompacted.getTotalBytes() / uncompacted.getNumSegments()); + + // Fragmentation index increases as uncompacted segment count increases + double segmentCountTerm = uncompacted.getNumSegments(); + + // Fragmentation index increases as avg uncompacted segment size decreases + double segmentSizeTerm = + (1.0f * minUncompactedCount * maxAverageUncompactedBytesPerSegment.getBytes()) / avgUncompactedSize; + + return segmentCountTerm + segmentSizeTerm; + } +} diff --git a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java index c0496b0705c2..1314a1a0bc79 100644 --- a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java +++ b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java @@ -120,7 +120,7 @@ public void testComputeCompactionStatusForSuccessfulTask() statusTracker.onTaskFinished("task1", TaskStatus.success("task1")); status = statusTracker.computeCompactionStatus(candidateSegments, policy); - Assert.assertEquals(CompactionStatus.State.COMPLETE, status.getState()); + Assert.assertEquals(CompactionStatus.State.SKIPPED, status.getState()); Assert.assertEquals( "Segment timeline not updated since last compaction task succeeded", status.getReason() diff --git a/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java new file mode 100644 index 000000000000..594fe91020b9 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.compaction; + +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.segment.TestDataSource; +import org.apache.druid.server.coordinator.CreateDataSegments; +import org.apache.druid.timeline.DataSegment; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.util.List; + +public class MostFragmentedIntervalFirstPolicyTest +{ + private static final DataSegment SEGMENT = + CreateDataSegments.ofDatasource(TestDataSource.WIKI).eachOfSizeInMb(100).get(0); + + @Test + public void test_thresholdValues_ofDefaultPolicy() + { + final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy(null, null, null, null); + Assertions.assertEquals(100, policy.getMinUncompactedCount()); + Assertions.assertEquals(new HumanReadableBytes("10MiB"), policy.getMinUncompactedBytes()); + Assertions.assertEquals(new HumanReadableBytes("2GiB"), policy.getMaxAverageUncompactedBytesPerSegment()); + Assertions.assertNull(policy.getPriorityDatasource()); + } + + @Test + public void test_checkEligibilityForCompaction_fails_ifUncompactedCountLessThanCutoff() + { + final int minUncompactedCount = 10_000; + final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( + minUncompactedCount, + HumanReadableBytes.valueOf(1), + HumanReadableBytes.valueOf(10_000), + null + ); + + Assertions.assertEquals( + CompactionCandidateSearchPolicy.Eligibility.fail( + "Uncompacted segments[1] in interval must be at least [10,000]" + ), + policy.checkEligibilityForCompaction(createCandidate(1, 100L), null) + ); + Assertions.assertEquals( + CompactionCandidateSearchPolicy.Eligibility.OK, + policy.checkEligibilityForCompaction(createCandidate(10_001, 100L), null) + ); + } + + @Test + public void test_checkEligibilityForCompaction_fails_ifUncompactedBytesLessThanCutoff() + { + final HumanReadableBytes minUncompactedBytes = HumanReadableBytes.valueOf(10_000); + final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( + 1, + minUncompactedBytes, + HumanReadableBytes.valueOf(10_000), + null + ); + + Assertions.assertEquals( + CompactionCandidateSearchPolicy.Eligibility.fail( + "Uncompacted bytes[100] in interval must be at least [10,000]" + ), + policy.checkEligibilityForCompaction(createCandidate(1, 100L), null) + ); + Assertions.assertEquals( + CompactionCandidateSearchPolicy.Eligibility.OK, + policy.checkEligibilityForCompaction(createCandidate(100, 10_000L), null) + ); + } + + @Test + public void test_checkEligibilityForCompaction_fails_ifAvgSegmentSizeGreaterThanCutoff() + { + final HumanReadableBytes maxAvgSegmentSize = HumanReadableBytes.valueOf(100); + final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( + 1, + HumanReadableBytes.valueOf(100), + maxAvgSegmentSize, + null + ); + + Assertions.assertEquals( + CompactionCandidateSearchPolicy.Eligibility.fail( + "Average size[10,000] of uncompacted segments in interval must be at most [100]" + ), + policy.checkEligibilityForCompaction(createCandidate(1, 10_000L), null) + ); + Assertions.assertEquals( + CompactionCandidateSearchPolicy.Eligibility.OK, + policy.checkEligibilityForCompaction(createCandidate(1, 100L), null) + ); + } + + @Test + public void test_policy_favorsIntervalWithMoreUncompactedSegments_ifTotalBytesIsEqual() + { + final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( + 1, + HumanReadableBytes.valueOf(1), + HumanReadableBytes.valueOf(10_000), + null + ); + + final CompactionCandidate candidateA = createCandidate(1, 1000L); + final CompactionCandidate candidateB = createCandidate(2, 500L); + + verifyCandidateIsEligible(candidateA, policy); + verifyCandidateIsEligible(candidateB, policy); + + Assertions.assertTrue(policy.compareCandidates(candidateA, candidateB) > 0); + Assertions.assertTrue(policy.compareCandidates(candidateB, candidateA) < 0); + } + + @Test + public void test_policy_favorsIntervalWithMoreUncompactedSegments_ifAverageSizeIsEqual() + { + final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( + 1, + HumanReadableBytes.valueOf(1), + HumanReadableBytes.valueOf(10_000), + null + ); + + final CompactionCandidate candidateA = createCandidate(1, 1000L); + final CompactionCandidate candidateB = createCandidate(2, 1000L); + + verifyCandidateIsEligible(candidateA, policy); + verifyCandidateIsEligible(candidateB, policy); + + Assertions.assertTrue(policy.compareCandidates(candidateA, candidateB) > 0); + Assertions.assertTrue(policy.compareCandidates(candidateB, candidateA) < 0); + } + + @Test + public void test_policy_favorsIntervalWithSmallerSegments_ifCountIsEqual() + { + final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( + 1, + HumanReadableBytes.valueOf(1), + HumanReadableBytes.valueOf(10_000), + null + ); + + final CompactionCandidate candidateA = createCandidate(10, 500L); + final CompactionCandidate candidateB = createCandidate(10, 1000L); + + verifyCandidateIsEligible(candidateA, policy); + verifyCandidateIsEligible(candidateB, policy); + + Assertions.assertTrue(policy.compareCandidates(candidateA, candidateB) < 0); + Assertions.assertTrue(policy.compareCandidates(candidateB, candidateA) > 0); + } + + @Test + public void test_compareCandidates_returnsZeroIfSegmentCountAndAvgSizeScaleEquivalently() + { + final MostFragmentedIntervalFirstPolicy policy = new MostFragmentedIntervalFirstPolicy( + 100, + HumanReadableBytes.valueOf(1), + HumanReadableBytes.valueOf(100), + null + ); + + final CompactionCandidate candidateA = createCandidate(100, 25); + final CompactionCandidate candidateB = createCandidate(400, 100); + + verifyCandidateIsEligible(candidateA, policy); + verifyCandidateIsEligible(candidateB, policy); + + Assertions.assertEquals(0, policy.compareCandidates(candidateA, candidateB)); + Assertions.assertEquals(0, policy.compareCandidates(candidateB, candidateA)); + } + + private CompactionCandidate createCandidate(int numSegments, long avgSizeBytes) + { + final CompactionStatistics dummyCompactedStats = CompactionStatistics.create(1L, 1L, 1L); + final CompactionStatistics uncompactedStats = CompactionStatistics.create( + avgSizeBytes * numSegments, + numSegments, + 1L + ); + return CompactionCandidate.from(List.of(SEGMENT), null) + .withCurrentStatus(CompactionStatus.pending(dummyCompactedStats, uncompactedStats, "")); + } + + private void verifyCandidateIsEligible(CompactionCandidate candidate, MostFragmentedIntervalFirstPolicy policy) + { + Assertions.assertEquals( + CompactionCandidateSearchPolicy.Eligibility.OK, + policy.checkEligibilityForCompaction(candidate, null) + ); + } +}