Skip to content

Commit af52750

Browse files
kfarazcecemei
andauthored
Add CompactionMode to compaction Eligibility (split from #18968 by @cecemei) (#19054)
* Add CompactionMode to compaction Eligibility * Deprecate old stuff * Clean up comment * Javadocs update * Fix test * revert job queue change * Eligibility --------- Co-authored-by: cecemei <yingqian.mei@gmail.com>
1 parent a09d6f8 commit af52750

File tree

5 files changed

+64
-15
lines changed

5 files changed

+64
-15
lines changed

indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ private boolean startJobIfPendingAndReady(
282282
}
283283

284284
// Check if the job is already running, completed or skipped
285-
final CompactionStatus compactionStatus = getCurrentStatusForJob(job, policy);
285+
final CompactionStatus compactionStatus = statusTracker.computeCompactionStatus(job.getCandidate(), policy);
286286
switch (compactionStatus.getState()) {
287287
case RUNNING:
288288
return false;
@@ -378,14 +378,6 @@ private void persistPendingIndexingState(CompactionJob job)
378378
}
379379
}
380380

381-
public CompactionStatus getCurrentStatusForJob(CompactionJob job, CompactionCandidateSearchPolicy policy)
382-
{
383-
final CompactionStatus compactionStatus = statusTracker.computeCompactionStatus(job.getCandidate(), policy);
384-
final CompactionCandidate candidatesWithStatus = job.getCandidate().withCurrentStatus(null);
385-
statusTracker.onCompactionStatusComputed(candidatesWithStatus, null);
386-
return compactionStatus;
387-
}
388-
389381
public static CompactionConfigValidationResult validateCompactionJob(BatchIndexingJob job)
390382
{
391383
// For MSQ jobs, do not perform any validation

server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.druid.java.util.common.StringUtils;
2525
import org.apache.druid.server.coordinator.duty.CompactSegments;
2626

27+
import javax.annotation.Nullable;
2728
import java.util.Objects;
2829

2930
/**
@@ -64,15 +65,18 @@ Eligibility checkEligibilityForCompaction(
6465
*/
6566
class Eligibility
6667
{
67-
public static final Eligibility OK = new Eligibility(true, null);
68+
public static final Eligibility OK = new Eligibility(true, null, CompactionMode.FULL_COMPACTION);
6869

6970
private final boolean eligible;
7071
private final String reason;
72+
@Nullable
73+
private final CompactionMode mode;
7174

72-
private Eligibility(boolean eligible, String reason)
75+
private Eligibility(boolean eligible, String reason, @Nullable CompactionMode mode)
7376
{
7477
this.eligible = eligible;
7578
this.reason = reason;
79+
this.mode = mode;
7680
}
7781

7882
public boolean isEligible()
@@ -85,9 +89,19 @@ public String getReason()
8589
return reason;
8690
}
8791

92+
/**
93+
* The mode of compaction (full or minor). This is non-null only when the
94+
* candidate is considered to be eligible for compaction by the policy.
95+
*/
96+
@Nullable
97+
public CompactionMode getMode()
98+
{
99+
return mode;
100+
}
101+
88102
public static Eligibility fail(String messageFormat, Object... args)
89103
{
90-
return new Eligibility(false, StringUtils.format(messageFormat, args));
104+
return new Eligibility(false, StringUtils.format(messageFormat, args), null);
91105
}
92106

93107
@Override
@@ -100,21 +114,22 @@ public boolean equals(Object object)
100114
return false;
101115
}
102116
Eligibility that = (Eligibility) object;
103-
return eligible == that.eligible && Objects.equals(reason, that.reason);
117+
return eligible == that.eligible && Objects.equals(reason, that.reason) && Objects.equals(mode, that.mode);
104118
}
105119

106120
@Override
107121
public int hashCode()
108122
{
109-
return Objects.hash(eligible, reason);
123+
return Objects.hash(eligible, reason, mode);
110124
}
111125

112126
@Override
113127
public String toString()
114128
{
115129
return "Eligibility{" +
116130
"eligible=" + eligible +
117-
", reason='" + reason + '\'' +
131+
", reason='" + reason +
132+
", mode='" + mode + '\'' +
118133
'}';
119134
}
120135
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.server.compaction;
21+
22+
/**
23+
* Represents the mode of compaction for segment intervals.
24+
*/
25+
public enum CompactionMode
26+
{
27+
/**
28+
* Indicates that all existing segments of the interval will be picked for compaction.
29+
*/
30+
FULL_COMPACTION;
31+
}

server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,12 @@
4949
/**
5050
* Simulates runs of auto-compaction duty to obtain the expected list of
5151
* compaction tasks that would be submitted by the actual compaction duty.
52+
*
53+
* @deprecated The simulator does not support the Overlord-based CompactionJobQueue
54+
* or the new reindexing templates. It will either be fully replaced or undergo
55+
* a major overhaul in the upcoming releases.
5256
*/
57+
@Deprecated
5358
public class CompactionRunSimulator
5459
{
5560
private final CompactionStatusTracker statusTracker;

server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.druid.indexer.TaskStatus;
2424
import org.apache.druid.java.util.common.DateTimes;
2525
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
26+
import org.checkerframework.checker.nullness.qual.Nullable;
2627
import org.joda.time.DateTime;
2728
import org.joda.time.Duration;
2829
import org.joda.time.Interval;
@@ -58,6 +59,7 @@ public void removeDatasource(String datasource)
5859
datasourceStatuses.remove(datasource);
5960
}
6061

62+
@Nullable
6163
public CompactionTaskStatus getLatestTaskStatus(CompactionCandidate candidates)
6264
{
6365
return datasourceStatuses
@@ -79,7 +81,11 @@ public Set<String> getSubmittedTaskIds()
7981
* Checks if compaction can be started for the given {@link CompactionCandidate}.
8082
* This method assumes that the given candidate is eligible for compaction
8183
* based on the current compaction config/supervisor of the datasource.
84+
*
85+
* @deprecated This method is used only by Coordinator-based CompactSegments
86+
* duty and will be removed in the future.
8287
*/
88+
@Deprecated
8389
public CompactionStatus computeCompactionStatus(
8490
CompactionCandidate candidate,
8591
CompactionCandidateSearchPolicy searchPolicy

0 commit comments

Comments
 (0)