introduce clear separation between proposed compaction, eligibility status, and compaction mode, along with improved task state tracking#18968
Conversation
kfaraz
left a comment
There was a problem hiding this comment.
Thanks for the PR, @cecemei !
I like the idea of separating the CompactionStatus (i.e. the current degree of compaction of an interval) from the Eligibility (i.e. whether an interval should be picked for compaction or not).
I have left some suggestions to aid with the separation. Let me know if they make sense.
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
Outdated
Show resolved
Hide resolved
| @JsonCreator | ||
| public ClientCompactionIntervalSpec( | ||
| @JsonProperty("interval") Interval interval, | ||
| @JsonProperty("uncompactedSegments") @Nullable List<SegmentDescriptor> uncompactedSegments, |
There was a problem hiding this comment.
It is a little confusing to add segments here since there is already a SpecificSegmentsSpec which specifies the set of segments that should be compacted.
I feel we should add a new type of CompactionInputSpec which will be used for incremental compaction only. It would have both a non-null interval as well as a non-empty set of segment IDs to compact incrementally. The CompactionTask would handle the different input specs accordingly.
There was a problem hiding this comment.
SpecificSegmentsSpec is not supported by msq, and it somewhat felt a bit deprecated to me, maybe because of the segment lock stuff. i kinda like to specify an interval since it gives some certainty, also wonder maybe we should check non-null for interval here? i didnt see any instance with null interval.
There was a problem hiding this comment.
i kinda like to specify an interval since it gives some certainty
Yes, I agree, in the new scheme of things, it is always nice to specify the interval.
That's why I advise adding a new implementation of CompactionInputSpec which will be used only for incremental compaction so that it is easy to distinguish the two on the task side as well. We need not piggyback on the existing CompactionIntervalSpec.
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
Outdated
Show resolved
Hide resolved
| * | ||
| * @return Pair of eligibility status and compaction status with reason for first failed check | ||
| */ | ||
| Pair<CompactionCandidateSearchPolicy.Eligibility, CompactionStatus> evaluate() |
There was a problem hiding this comment.
Since the CompactionStatus and eligibility are going to be two separate things now, this class should not deal with eligibility at all. We can retain the old code in this class.
Once the CompactionStatus has been computed, that should be passed to CompactionStatusTracker.computeEligiblity() to determine the eligibility.
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
Outdated
Show resolved
Hide resolved
...r/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
Outdated
Show resolved
Hide resolved
@cecemei , just so that we are on the same page, the It would be nice to get rid of the SKIPPED state too since it is more of an eligibility thing, but I guess we can leave it for now. |
...r/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionEligibility.java
Outdated
Show resolved
Hide resolved
...r/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionEligibility.java
Fixed
Show fixed
Hide fixed
server/src/main/java/org/apache/druid/server/compaction/CompactionEligibility.java
Fixed
Show fixed
Hide fixed
server/src/test/java/org/apache/druid/server/compaction/CompactionEligibilityEvaluateTest.java
Fixed
Show fixed
Hide fixed
I addressed most comments except for the coordinator based compaction and incremental compaction feature change. The major change is i moved a lot of CompactionStatus stuff to CompactionEligibility, and update the CHECK to return String (null for previous CompactionStatus.COMPLETE, and non-null for previous CompactionStatus.pending...), it seems simpler since after all we're just looking for a reason to do compaction. |
| * <p> | ||
| * This method performs a two-stage evaluation: | ||
| * <ol> | ||
| * <li>First, uses {@link Evaluator} to check if the candidate needs compaction |
There was a problem hiding this comment.
did we make any changes to the Evaluator logic in moving it to this file or is it just lift and place in new location?
...r/src/main/java/org/apache/druid/server/compaction/DataSourceCompactibleSegmentIterator.java
Outdated
Show resolved
Hide resolved
|
Had a brief discussion with @cecemei on this. I think there has been some confusion regarding the roles of In short, we shouldn't need to make any major modification to any class in this PR except I have shared some details offline. We should be able to get this resolved by Monday. |
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
Fixed
Show fixed
Hide fixed
|
Hi @kfaraz @capistrant , I made some major changes to this pr, and hopefully it's in a much better state, please take a look when you're available, thanks! The incremental compaction related change has been removed from this pr, i'm going to make another one for that. |
server/src/main/java/org/apache/druid/server/compaction/CompactionMode.java
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
Show resolved
Hide resolved
kfaraz
left a comment
There was a problem hiding this comment.
@cecemei , I feel a lot of changes here are really not needed and probably make the definitions of various compaction entities confusing.
- The only change that is really needed here is adding a
CompactionModeenum toEligibility. - The existing terminology
CompactionCandidate,CompactionStatus,Eligibility,CompactionTaskStatuscaptures the respective purposes nicely and also provides a clear distinction. Let's stick with the existing terminology unless there is some real gap in the current nomenclature/functionality. - If you feel it needs some explanation, we may add/update javadocs.
- You may either remove all the refactors from this PR (so that we can quickly unblock incremental compaction in #18996) OR update this PR to only make suggested modifications that do not break the existing definitions.
P.S. Existing definitions
| Class | Definition |
|---|---|
CompactionCandidate |
A potential candidate for compaction, identified by an interval and some segment IDs |
CompactionStatus |
Current status of a CompactionCandidate indicated by number of compacted/uncompacted segments/bytes in the interval. |
Eligibility |
Given a CompactionCandidate and CompactionStatus, does the policy think it is eligible for compaction? |
CompactionTaskStatus |
Status of past/ongoing compaction tasks for a CompactionCandidate |
All of these are distinct from each other and should preferably remain so.
There is room for improvement in these classes, but we should do them in a manner that does not completely change the existing semantics.
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java
Outdated
Show resolved
Hide resolved
| return createCandidate(proposedCompaction, eligibility, null); | ||
| } | ||
|
|
||
| public CompactionCandidate createCandidate( |
There was a problem hiding this comment.
We shouldn't really have this method here. The CompactionMode is an attribute that we associate to a CompactionCandidate. One should not be responsible for creating the other.
| '}'; | ||
| } | ||
| } | ||
| CompactionCandidate createCandidate(CompactionCandidate.ProposedCompaction candidate, CompactionStatus eligibility); |
There was a problem hiding this comment.
It doesn't make sense for the policy to be creating candidates since the candidates are identified much earlier by the DatasourceCompactibleSegmentIterator itself. The policy can only check the eligibility of a candidate for compaction. (I suppose the name CompactionCandidateSearchPolicy might be misleading here, since it is really the CompactionCandidatePickAndPriorizePolicy but it can be a mouthful, so we can stick with what we have.)
We should retain the original method here. The caller (CompactionJobQueue) should just check the eligibility and then decide whether to launch a job or not. If yes, then whether full or incremental.
There was a problem hiding this comment.
gotcha - can just return CompactionMode here and let callsite handle creation of candidate.
There was a problem hiding this comment.
No we should continue to use the existing method in CompactionCandidateSearchPolicy i.e. checkEligibility().
Just add a new enum inside the Eligibility class.
| * Used by {@link CompactionStatusTracker#computeCompactionTaskState(CompactionCandidate)}. | ||
| * The callsite then determines whether to launch compaction task or not. | ||
| */ | ||
| public enum TaskState |
There was a problem hiding this comment.
Please do not add a new TaskState, use CompactionTaskStatus instead. If it doesn't contain any info, let's add it there.
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
Show resolved
Hide resolved
| CompactionCandidate candidatesWithStatus = CompactionCandidate | ||
| .from(partialEternitySegments, null) | ||
| .withCurrentStatus(CompactionStatus.skipped("Segments have partial-eternity intervals")); | ||
| CompactionCandidate candidatesWithStatus = |
There was a problem hiding this comment.
This class probably doesn't need to change in this PR.
The Eligibility should be computed and used only by the CompactionJobQueue (and maybe CompactSegments for backward compatibility).
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
Fixed
Show fixed
Hide fixed
| return false; | ||
| } | ||
| // Check if the job is already running or succeeded | ||
| final TaskState candidateState = statusTracker.computeCompactionTaskState(job.getCandidate().getCandidate()); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note
| } else { | ||
| // As these segments will be compacted, we will aggregate the statistic to the Compacted statistics | ||
| snapshotBuilder.addToComplete(entry); | ||
| final TaskState compactionTaskState = statusTracker.computeCompactionTaskState(candidate); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note
| = statusTracker.computeCompactionStatus(candidateSegments, policy); | ||
| Assert.assertEquals(CompactionStatus.State.PENDING, status.getState()); | ||
| Assert.assertEquals("Not compacted yet", status.getReason()); | ||
| TaskState status = statusTracker.computeCompactionTaskState(candidateSegments.getCandidate()); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note test
| "Segment timeline not updated since last compaction task succeeded", | ||
| status.getReason() | ||
| ); | ||
| status = statusTracker.computeCompactionTaskState(candidateSegments.getCandidate()); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note test
| statusTracker.onSegmentTimelineUpdated(DateTimes.nowUtc()); | ||
| status = statusTracker.computeCompactionStatus(candidateSegments, policy); | ||
| Assert.assertEquals(CompactionStatus.State.PENDING, status.getState()); | ||
| status = statusTracker.computeCompactionTaskState(candidateSegments.getCandidate()); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note test
This PR refactors compaction eligibility evaluation by introducing clear separation between proposed compaction (what segments to compact), eligibility status (whether compaction is needed), and compaction mode (how to compact), along with improved task state tracking.
Motivation
The previous
CompactionStatusimplementation mixed multiple concerns:This made it difficult to reason about compaction decisions and track the state of compaction candidates through the system.
Key Changes
1. Introduced
CompactionModeEnumNew enum to represent different compaction strategies:
FULL_COMPACTION: Compact all segments in the intervalNOT_APPLICABLE: No compaction should be performedThe mode determines whether a candidate will be queued for compaction or skipped.
2. Refactored
CompactionCandidateNew structure:
ProposedCompaction: Nested class containing the segment list, intervals, and statisticsCompactionStatus: Eligibility evaluation result (COMPLETE, ELIGIBLE, NOT_ELIGIBLE)CompactionMode: How to perform compaction (FULL_COMPACTION or NOT_APPLICABLE)policyNote: Optional note from the policy explaining why this candidate was chosen/skippedBenefits:
3. Updated
CompactionStatusNew responsibilities:
COMPLETE,ELIGIBLE, orNOT_ELIGIBLESeparation from policy:
CompactionStatuschecks config-based requirements (granularity, rollup, etc.)4. Simplified
CompactionCandidateSearchPolicyChanged interface:
checkEligibilityForCompaction(candidate, taskStatus) → EligibilitycreateCandidate(proposedCompaction, eligibility) → CompactionCandidateBenefits:
5. Improved Task State Tracking
New
TaskStateenum inCompactionCandidate:READY: No task running, can start compactionTASK_IN_PROGRESS: Compaction task already runningRECENTLY_COMPLETED: Task recently finished, segments not yet updatedBenefits:
Code Flow Example
Testing
CompactionCandidateTestfor new candidate structureCompactionStatusBuilderTestfor eligibility evaluationThis PR has: