Skip to content

Commit 36f3413

Browse files
authored
Add new compaction policy to prioritize fragmented intervals (#18802)
1 parent 12f6f31 commit 36f3413

File tree

10 files changed

+697
-65
lines changed

10 files changed

+697
-65
lines changed

server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,12 @@ public final int compareCandidates(CompactionCandidate o1, CompactionCandidate o
6464
}
6565

6666
@Override
67-
public boolean isEligibleForCompaction(
67+
public Eligibility checkEligibilityForCompaction(
6868
CompactionCandidate candidate,
69-
CompactionStatus currentCompactionStatus,
7069
CompactionTaskStatus latestTaskStatus
7170
)
7271
{
73-
return true;
72+
return Eligibility.OK;
7473
}
7574

7675
/**

server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,20 @@ public CompactionStatistics getStats()
137137
return CompactionStatistics.create(totalBytes, numSegments(), numIntervals);
138138
}
139139

140+
@Nullable
141+
public CompactionStatistics getCompactedStats()
142+
{
143+
return (currentStatus == null || currentStatus.getCompactedStats() == null)
144+
? null : currentStatus.getCompactedStats();
145+
}
146+
147+
@Nullable
148+
public CompactionStatistics getUncompactedStats()
149+
{
150+
return (currentStatus == null || currentStatus.getUncompactedStats() == null)
151+
? null : currentStatus.getUncompactedStats();
152+
}
153+
140154
/**
141155
* Current compaction status of the time chunk corresponding to this candidate.
142156
*/

server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,28 @@
2121

2222
import com.fasterxml.jackson.annotation.JsonSubTypes;
2323
import com.fasterxml.jackson.annotation.JsonTypeInfo;
24+
import org.apache.druid.java.util.common.StringUtils;
2425
import org.apache.druid.server.coordinator.duty.CompactSegments;
2526

27+
import java.util.Objects;
28+
2629
/**
2730
* Policy used by {@link CompactSegments} duty to pick segments for compaction.
2831
*/
2932
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
3033
@JsonSubTypes(value = {
3134
@JsonSubTypes.Type(name = "newestSegmentFirst", value = NewestSegmentFirstPolicy.class),
32-
@JsonSubTypes.Type(name = "fixedIntervalOrder", value = FixedIntervalOrderPolicy.class)
35+
@JsonSubTypes.Type(name = "fixedIntervalOrder", value = FixedIntervalOrderPolicy.class),
36+
@JsonSubTypes.Type(name = "mostFragmentedFirst", value = MostFragmentedIntervalFirstPolicy.class)
3337
})
3438
public interface CompactionCandidateSearchPolicy
3539
{
3640
/**
3741
* Compares between two compaction candidates. Used to determine the
3842
* order in which segments and intervals should be picked for compaction.
3943
*
40-
* @return A positive value if {@code candidateA} should be picked first, a
41-
* negative value if {@code candidateB} should be picked first or zero if the
44+
* @return A negative value if {@code candidateA} should be picked first, a
45+
* positive value if {@code candidateB} should be picked first or zero if the
4246
* order does not matter.
4347
*/
4448
int compareCandidates(CompactionCandidate candidateA, CompactionCandidate candidateB);
@@ -47,10 +51,71 @@ public interface CompactionCandidateSearchPolicy
4751
* Checks if the given {@link CompactionCandidate} is eligible for compaction
4852
* in the current iteration. A policy may implement this method to skip
4953
* compacting intervals or segments that do not fulfil some required criteria.
54+
*
55+
* @return {@link Eligibility#OK} only if eligible.
5056
*/
51-
boolean isEligibleForCompaction(
57+
Eligibility checkEligibilityForCompaction(
5258
CompactionCandidate candidate,
53-
CompactionStatus currentCompactionStatus,
5459
CompactionTaskStatus latestTaskStatus
5560
);
61+
62+
/**
63+
* Describes the eligibility of an interval for compaction.
64+
*/
65+
class Eligibility
66+
{
67+
public static final Eligibility OK = new Eligibility(true, null);
68+
69+
private final boolean eligible;
70+
private final String reason;
71+
72+
private Eligibility(boolean eligible, String reason)
73+
{
74+
this.eligible = eligible;
75+
this.reason = reason;
76+
}
77+
78+
public boolean isEligible()
79+
{
80+
return eligible;
81+
}
82+
83+
public String getReason()
84+
{
85+
return reason;
86+
}
87+
88+
public static Eligibility fail(String messageFormat, Object... args)
89+
{
90+
return new Eligibility(false, StringUtils.format(messageFormat, args));
91+
}
92+
93+
@Override
94+
public boolean equals(Object object)
95+
{
96+
if (this == object) {
97+
return true;
98+
}
99+
if (object == null || getClass() != object.getClass()) {
100+
return false;
101+
}
102+
Eligibility that = (Eligibility) object;
103+
return eligible == that.eligible && Objects.equals(reason, that.reason);
104+
}
105+
106+
@Override
107+
public int hashCode()
108+
{
109+
return Objects.hash(eligible, reason);
110+
}
111+
112+
@Override
113+
public String toString()
114+
{
115+
return "Eligibility{" +
116+
"eligible=" + eligible +
117+
", reason='" + reason + '\'' +
118+
'}';
119+
}
120+
}
56121
}

server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,14 @@ public void decrement(CompactionStatistics other)
6565
numIntervals -= other.getNumIntervals();
6666
numSegments -= other.getNumSegments();
6767
}
68+
69+
@Override
70+
public String toString()
71+
{
72+
return "CompactionStatistics{" +
73+
"totalBytes=" + totalBytes +
74+
", numSegments=" + numSegments +
75+
", numIntervals=" + numIntervals +
76+
'}';
77+
}
6878
}

0 commit comments

Comments
 (0)