Skip to content

Commit fa9914b

Browse files
authored
Fix bug in coordinator-based compaction duty and simulator (#18812)
Changes: - Fix bug in `CompactionRunSimulator` so that it honors the compaction policy being verified - Fix bug in `CompactSegments` so that the duty does not launch tasks for an interval that has already been skipped by the policy - This bug does not occur in Overlord-based compaction supervisors - It does not affect any real scenarios currently since the only existing production policy `newestSegmentFirst` never skips intervals anyway.
1 parent 36f3413 commit fa9914b

File tree

4 files changed

+88
-4
lines changed

4 files changed

+88
-4
lines changed

indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ DataSourceCompactibleSegmentIterator getCompactibleCandidates(
118118
config,
119119
timeline,
120120
Intervals.complementOf(searchInterval),
121+
// This policy is used only while creating jobs
122+
// The actual order of jobs is determined by the policy used in CompactionJobQueue
121123
new NewestSegmentFirstPolicy(null)
122124
);
123125

server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,18 @@ public void onTaskSubmitted(String taskId, CompactionCandidate candidateSegments
132132
};
133133

134134
// Unlimited task slots to ensure that simulator does not skip any interval
135-
final DruidCompactionConfig configWithUnlimitedTaskSlots = compactionConfig.withClusterConfig(
136-
new ClusterCompactionConfig(1.0, Integer.MAX_VALUE, null, null, null)
135+
final ClusterCompactionConfig clusterConfig = compactionConfig.clusterConfig();
136+
final ClusterCompactionConfig configWithUnlimitedTaskSlots = new ClusterCompactionConfig(
137+
1.0,
138+
Integer.MAX_VALUE,
139+
clusterConfig.getCompactionPolicy(),
140+
clusterConfig.isUseSupervisors(),
141+
clusterConfig.getEngine()
137142
);
138143

139144
final CoordinatorRunStats stats = new CoordinatorRunStats();
140145
new CompactSegments(simulationStatusTracker, readOnlyOverlordClient).run(
141-
configWithUnlimitedTaskSlots,
146+
compactionConfig.withClusterConfig(configWithUnlimitedTaskSlots),
142147
dataSourcesSnapshot,
143148
defaultEngine,
144149
stats

server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,8 +245,10 @@ private int submitCompactionTasks(
245245

246246
if (compactionStatus.isComplete()) {
247247
snapshotBuilder.addToComplete(candidatesWithStatus);
248+
continue;
248249
} else if (compactionStatus.isSkipped()) {
249250
snapshotBuilder.addToSkipped(candidatesWithStatus);
251+
continue;
250252
} else {
251253
// As these segments will be compacted, we will aggregate the statistic to the Compacted statistics
252254
snapshotBuilder.addToComplete(entry);

server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.druid.java.util.common.parsers.CloseableIterator;
3232
import org.apache.druid.metadata.LockFilterPolicy;
3333
import org.apache.druid.rpc.indexing.NoopOverlordClient;
34+
import org.apache.druid.segment.TestDataSource;
35+
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
3436
import org.apache.druid.server.coordinator.CreateDataSegments;
3537
import org.apache.druid.server.coordinator.DruidCompactionConfig;
3638
import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
@@ -111,7 +113,80 @@ public void testSimulateClusterCompactionConfigUpdate()
111113
);
112114
Assert.assertEquals(
113115
Collections.singletonList(
114-
Arrays.asList("wiki", Intervals.of("2013-01-10/P1D"), 10, 1_000_000_000L, 1, "skip offset from latest[P1D]")
116+
List.of("wiki", Intervals.of("2013-01-10/P1D"), 10, 1_000_000_000L, 1, "skip offset from latest[P1D]")
117+
),
118+
skippedTable.getRows()
119+
);
120+
}
121+
122+
@Test
123+
public void testSimulate_withFixedIntervalOrderPolicy()
124+
{
125+
final TestSegmentsMetadataManager segmentsMetadataManager = new TestSegmentsMetadataManager();
126+
127+
// Add some segments to the timeline
128+
final String dataSource = TestDataSource.WIKI;
129+
final List<DataSegment> wikiSegments
130+
= CreateDataSegments.ofDatasource(dataSource)
131+
.forIntervals(10, Granularities.DAY)
132+
.withNumPartitions(10)
133+
.startingAt("2013-01-01")
134+
.eachOfSizeInMb(100);
135+
wikiSegments.forEach(segmentsMetadataManager::addSegment);
136+
137+
final FixedIntervalOrderPolicy policy = new FixedIntervalOrderPolicy(
138+
List.of(
139+
new FixedIntervalOrderPolicy.Candidate(dataSource, Intervals.of("2013-01-08/P1D")),
140+
new FixedIntervalOrderPolicy.Candidate(dataSource, Intervals.of("2013-01-04/P1D"))
141+
)
142+
);
143+
final CompactionSimulateResult simulateResult = simulator.simulateRunWithConfig(
144+
DruidCompactionConfig
145+
.empty()
146+
.withClusterConfig(new ClusterCompactionConfig(null, null, policy, null, null))
147+
.withDatasourceConfig(
148+
InlineSchemaDataSourceCompactionConfig.builder().forDataSource(dataSource).build()
149+
),
150+
segmentsMetadataManager.getRecentDataSourcesSnapshot(),
151+
CompactionEngine.NATIVE
152+
);
153+
154+
Assert.assertNotNull(simulateResult);
155+
156+
final Map<CompactionStatus.State, Table> compactionStates = simulateResult.getCompactionStates();
157+
Assert.assertNotNull(compactionStates);
158+
159+
Assert.assertNull(compactionStates.get(CompactionStatus.State.COMPLETE));
160+
Assert.assertNull(compactionStates.get(CompactionStatus.State.RUNNING));
161+
162+
final Table pendingTable = compactionStates.get(CompactionStatus.State.PENDING);
163+
Assert.assertEquals(
164+
List.of("dataSource", "interval", "numSegments", "bytes", "maxTaskSlots", "reasonToCompact"),
165+
pendingTable.getColumnNames()
166+
);
167+
Assert.assertEquals(
168+
List.of(
169+
List.of("wiki", Intervals.of("2013-01-08/P1D"), 10, 1_000_000_000L, 1, "not compacted yet"),
170+
List.of("wiki", Intervals.of("2013-01-04/P1D"), 10, 1_000_000_000L, 1, "not compacted yet")
171+
),
172+
pendingTable.getRows()
173+
);
174+
175+
final Table skippedTable = compactionStates.get(CompactionStatus.State.SKIPPED);
176+
Assert.assertEquals(
177+
List.of("dataSource", "interval", "numSegments", "bytes", "reasonToSkip"),
178+
skippedTable.getColumnNames()
179+
);
180+
Assert.assertEquals(
181+
List.of(
182+
List.of("wiki", Intervals.of("2013-01-02/P1D"), 10, 1_000_000_000L, 1, "Rejected by search policy"),
183+
List.of("wiki", Intervals.of("2013-01-03/P1D"), 10, 1_000_000_000L, 1, "Rejected by search policy"),
184+
List.of("wiki", Intervals.of("2013-01-07/P1D"), 10, 1_000_000_000L, 1, "Rejected by search policy"),
185+
List.of("wiki", Intervals.of("2013-01-05/P1D"), 10, 1_000_000_000L, 1, "Rejected by search policy"),
186+
List.of("wiki", Intervals.of("2013-01-06/P1D"), 10, 1_000_000_000L, 1, "Rejected by search policy"),
187+
List.of("wiki", Intervals.of("2013-01-01/P1D"), 10, 1_000_000_000L, 1, "Rejected by search policy"),
188+
List.of("wiki", Intervals.of("2013-01-09/P1D"), 10, 1_000_000_000L, 1, "Rejected by search policy"),
189+
List.of("wiki", Intervals.of("2013-01-10/P1D"), 10, 1_000_000_000L, 1, "skip offset from latest[P1D]")
115190
),
116191
skippedTable.getRows()
117192
);

0 commit comments

Comments
 (0)