diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java index 77886af1a017..ea8c2995fd42 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java @@ -248,6 +248,39 @@ public Map getSnapshots() return snapshotBuilder.build(); } + /** + * List of all jobs currently in queue. The order of the jobs is not guaranteed. + */ + public List getQueuedJobs() + { + return List.copyOf(queue); + } + + /** + * Intervals for the given datasource that are already fully compacted. + */ + public List getFullyCompactedIntervals(String dataSource) + { + return snapshotBuilder.getFullyCompactedIntervals(dataSource); + } + + /** + * Intervals that were skipped from compaction. Some of the segments in these + * intervals may already be compacted. + */ + public List getSkippedIntervals(String dataSource) + { + return snapshotBuilder.getSkippedIntervals(dataSource); + } + + /** + * Search policy used to build this queue. + */ + public CompactionCandidateSearchPolicy getSearchPolicy() + { + return searchPolicy; + } + /** * Starts a job if it is ready and is not already in progress. * @@ -348,12 +381,9 @@ private String startTaskIfReady(CompactionJob job) } } - public CompactionStatus getCurrentStatusForJob(CompactionJob job, CompactionCandidateSearchPolicy policy) + private CompactionStatus getCurrentStatusForJob(CompactionJob job, CompactionCandidateSearchPolicy policy) { - final CompactionStatus compactionStatus = statusTracker.computeCompactionStatus(job.getCandidate(), policy); - final CompactionCandidate candidatesWithStatus = job.getCandidate().withCurrentStatus(null); - statusTracker.onCompactionStatusComputed(candidatesWithStatus, null); - return compactionStatus; + return statusTracker.computeCompactionStatus(job.getCandidate(), policy); } public static CompactionConfigValidationResult validateCompactionJob(BatchIndexingJob job) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobStatus.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobStatus.java new file mode 100644 index 000000000000..33e107c42c19 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobStatus.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.compact; + +import org.apache.druid.server.compaction.CompactionStatus; + +/** + * Latest status of a compaction job. + */ +public class CompactionJobStatus +{ + private final CompactionJob job; + private final CompactionStatus currentStatus; + private final int positionInQueue; + + public CompactionJobStatus(CompactionJob job, CompactionStatus currentStatus, int positionInQueue) + { + this.job = job; + this.currentStatus = currentStatus; + this.positionInQueue = positionInQueue; + } + + public CompactionJob getJob() + { + return job; + } + + public CompactionStatus getCurrentStatus() + { + return currentStatus; + } + + public int getPositionInQueue() + { + return positionInQueue; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobTable.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobTable.java new file mode 100644 index 000000000000..b164722d26d4 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobTable.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.compact; + +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.server.compaction.CompactionCandidate; +import org.apache.druid.server.compaction.CompactionStatistics; +import org.apache.druid.server.compaction.Table; + +import java.util.List; + +public class CompactionJobTable extends Table +{ + public static CompactionJobTable create(List jobs, int maxRows) + { + final CompactionJobTable table = new CompactionJobTable( + List.of("Interval", "Task Id", "Compacted segments", "Compacted bytes", "Position in job queue") + ); + + for (int i = 0; i < maxRows && i < jobs.size(); ++i) { + table.addJobRow(jobs.get(i)); + } + return table; + } + + private CompactionJobTable(List columnNames) + { + super(columnNames, null); + } + + private void addJobRow(CompactionJobStatus jobStatus) + { + final CompactionJob job = jobStatus.getJob(); + final CompactionCandidate candidate = jobStatus.getJob().getCandidate(); + final CompactionStatistics compactedStats = candidate.getCompactedStats(); + + final List values = List.of( + candidate.getCompactionInterval(), + job.isMsq() ? "" : job.getNonNullTask().getId(), + compactedStats == null + ? "" + : StringUtils.format("%d/%d", compactedStats.getNumSegments(), candidate.numSegments()), + compactedStats == null + ? "" + : StringUtils.format("%d/%d", compactedStats.getTotalBytes(), candidate.getTotalBytes()), + jobStatus.getPositionInQueue() + ); + + addRow(values.toArray(new Object[0])); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionScheduler.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionScheduler.java index 6f5ed1a7a6ef..503744290145 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionScheduler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionScheduler.java @@ -20,12 +20,14 @@ package org.apache.druid.indexing.compact; import org.apache.druid.server.compaction.CompactionSimulateResult; +import org.apache.druid.server.compaction.CompactionStatus; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.ClusterCompactionConfig; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DruidCompactionConfig; +import java.util.List; import java.util.Map; /** @@ -79,6 +81,11 @@ public interface CompactionScheduler */ AutoCompactionSnapshot getCompactionSnapshot(String dataSource); + /** + * Gets the current status of all the known compaction jobs for this datasource. + */ + Map> getJobsByStatus(String dataSource); + /** * Simulates a compaction run with the given cluster config. * diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java index f9ba0eee6db7..c385ef0bc536 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java @@ -25,12 +25,15 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorReport; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.compaction.CompactionStatus; +import org.apache.druid.server.compaction.Table; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; +import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nullable; import java.util.List; +import java.util.Map; /** * Supervisor for compaction of a single datasource. @@ -105,25 +108,28 @@ public void stop(boolean stopGracefully) } @Override - public SupervisorReport getStatus() + public SupervisorReport> getStatus() { final AutoCompactionSnapshot snapshot; + String detailedState = getState().toString(); if (supervisorSpec.isSuspended()) { snapshot = AutoCompactionSnapshot.builder(dataSource) .withStatus(AutoCompactionSnapshot.ScheduleStatus.NOT_ENABLED) .build(); } else if (!supervisorSpec.getValidationResult().isValid()) { - snapshot = AutoCompactionSnapshot.builder(dataSource) - .withMessage(StringUtils.format( - "Compaction supervisor spec is invalid. Reason[%s].", - supervisorSpec.getValidationResult().getReason() - )) - .build(); + snapshot = AutoCompactionSnapshot.builder(dataSource).build(); + detailedState = "Invalid: " + supervisorSpec.getValidationResult().getReason(); } else { snapshot = scheduler.getCompactionSnapshot(dataSource); } - return new SupervisorReport<>(supervisorSpec.getId(), DateTimes.nowUtc(), snapshot); + final Map statusMap = Map.of( + "state", getState(), + "detailedState", detailedState, + "stats", snapshot, + "jobs", getCompactionJobsMap() + ); + return new SupervisorReport<>(supervisorSpec.getId(), DateTimes.nowUtc(), statusMap); } @Override @@ -146,6 +152,14 @@ public void reset(@Nullable DataSourceMetadata dataSourceMetadata) // do nothing } + private Map getCompactionJobsMap() + { + return CollectionUtils.mapValues( + scheduler.getJobsByStatus(dataSource), + jobs -> CompactionJobTable.create(jobs, 100) + ); + } + public enum State implements SupervisorStateManager.State { SCHEDULER_STOPPED(true), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java index c3bce6a09de1..d9b227934b82 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java @@ -22,10 +22,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Supplier; +import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.broker.BrokerClient; import org.apache.druid.client.indexing.ClientCompactionRunnerInfo; +import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; @@ -44,9 +46,13 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.metadata.SegmentsMetadataManagerConfig; +import org.apache.druid.server.compaction.CompactionCandidate; +import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy; import org.apache.druid.server.compaction.CompactionRunSimulator; import org.apache.druid.server.compaction.CompactionSimulateResult; +import org.apache.druid.server.compaction.CompactionStatus; import org.apache.druid.server.compaction.CompactionStatusTracker; +import org.apache.druid.server.compaction.CompactionTaskStatus; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.ClusterCompactionConfig; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; @@ -58,9 +64,12 @@ import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -105,11 +114,13 @@ public class OverlordCompactionScheduler implements CompactionScheduler private final ConcurrentHashMap activeSupervisors; private final AtomicReference> datasourceToCompactionSnapshot; - private final AtomicBoolean shouldRecomputeJobsForAnyDatasource = new AtomicBoolean(false); + private final AtomicBoolean isJobRefreshPending = new AtomicBoolean(false); /** * Compaction job queue built in the last invocation of {@link #resetCompactionJobQueue()}. + * This is guarded to ensure that the operations performed on the queue are thread-safe. */ + @GuardedBy("this") private final AtomicReference latestJobQueue; /** @@ -159,7 +170,7 @@ public OverlordCompactionScheduler( { final long segmentPollPeriodMillis = segmentManagerConfig.getPollDuration().toStandardDuration().getMillis(); - this.schedulePeriodMillis = Math.min(DEFAULT_SCHEDULE_PERIOD_MILLIS, segmentPollPeriodMillis); + this.schedulePeriodMillis = Math.min(DEFAULT_SCHEDULE_PERIOD_MILLIS, 10 * segmentPollPeriodMillis); this.segmentManager = segmentManager; this.emitter = emitter; @@ -175,7 +186,7 @@ public OverlordCompactionScheduler( this.overlordClient = new LocalOverlordClient(taskMaster, taskQueryTool, objectMapper); this.brokerClient = brokerClient; this.activeSupervisors = new ConcurrentHashMap<>(); - this.datasourceToCompactionSnapshot = new AtomicReference<>(); + this.datasourceToCompactionSnapshot = new AtomicReference<>(Map.of()); this.latestJobQueue = new AtomicReference<>(); this.taskActionClientFactory = taskActionClientFactory; @@ -198,8 +209,7 @@ public void locationChanged(String taskId, TaskLocation newLocation) public void statusChanged(String taskId, TaskStatus status) { if (status.isComplete()) { - onTaskFinished(taskId, status); - launchPendingJobs(); + scheduleOnExecutor(() -> onTaskFinished(taskId, status), 0L); } } }; @@ -254,15 +264,13 @@ public CompactionConfigValidationResult validateCompactionConfig(DataSourceCompa @Override public void startCompaction(String dataSourceName, CompactionSupervisor supervisor) { - // Track active supervisors even if scheduler has not started yet because - // SupervisorManager is started before the scheduler - if (isEnabled()) { - activeSupervisors.put(dataSourceName, supervisor); + // Track active supervisors even if scheduler is disabled or stopped, + // so that the supervisors may be used if the scheduler is enabled later. + activeSupervisors.put(dataSourceName, supervisor); - if (started.get()) { - shouldRecomputeJobsForAnyDatasource.set(true); - scheduleOnExecutor(() -> recreateJobs(dataSourceName, supervisor), 0L); - } + if (isEnabled() && started.get()) { + isJobRefreshPending.set(true); + scheduleOnExecutor(() -> refreshJobs(dataSourceName, supervisor), 0L); } } @@ -274,6 +282,91 @@ public void stopCompaction(String dataSourceName) statusTracker.removeDatasource(dataSourceName); } + @Override + public Map> getJobsByStatus(String dataSource) + { + if (!activeSupervisors.containsKey(dataSource)) { + return Map.of(); + } + + // Get the state of all the jobs from the queue + final List allQueuedJobs = new ArrayList<>(); + final List fullyCompactedIntervals = new ArrayList<>(); + final List skippedIntervals = new ArrayList<>(); + + final AtomicReference searchPolicy = new AtomicReference<>(); + updateQueueIfComputed(queue -> { + allQueuedJobs.addAll(queue.getQueuedJobs()); + fullyCompactedIntervals.addAll(queue.getFullyCompactedIntervals(dataSource)); + skippedIntervals.addAll(queue.getSkippedIntervals(dataSource)); + + searchPolicy.set(queue.getSearchPolicy()); + }); + + // Search policy would be null only if queue has not been computed yet + if (searchPolicy.get() == null) { + return Map.of(); + } + + // Sort and filter out the jobs for the required datasource + final TreeSet sortedJobs = new TreeSet<>( + (o1, o2) -> searchPolicy.get().compareCandidates(o1.getCandidate(), o2.getCandidate()) + ); + sortedJobs.addAll(allQueuedJobs); + + final Map> jobsByStatus = new HashMap<>(); + + int jobPositionInQueue = 0; + for (CompactionJob job : sortedJobs) { + if (job.getDataSource().equals(dataSource)) { + final CompactionStatus currentStatus = + statusTracker.computeCompactionStatus(job.getCandidate(), searchPolicy.get()); + jobsByStatus.computeIfAbsent(currentStatus.getState(), s -> new ArrayList<>()) + .add(new CompactionJobStatus(job, currentStatus, jobPositionInQueue)); + } + ++jobPositionInQueue; + } + + // Add skipped jobs + for (CompactionCandidate candidate : skippedIntervals) { + final CompactionJob dummyJob = new CompactionJob(createDummyTask("", candidate), candidate, -1); + final CompactionStatus currentStatus = + statusTracker.computeCompactionStatus(candidate, searchPolicy.get()); + jobsByStatus.computeIfAbsent(currentStatus.getState(), s -> new ArrayList<>()) + .add(new CompactionJobStatus(dummyJob, currentStatus, jobPositionInQueue)); + } + + // Add recently completed jobs + for (CompactionCandidate candidate : fullyCompactedIntervals) { + final CompactionTaskStatus taskStatus = statusTracker.getLatestTaskStatus(candidate); + final String taskId = taskStatus == null ? "" : taskStatus.getTaskId(); + final CompactionJob dummyJob = new CompactionJob(createDummyTask(taskId, candidate), candidate, -1); + + final CompactionStatus currentStatus = CompactionStatus.COMPLETE; + jobsByStatus.computeIfAbsent(currentStatus.getState(), s -> new ArrayList<>()) + .add(new CompactionJobStatus(dummyJob, currentStatus, jobPositionInQueue)); + } + + return jobsByStatus; + } + + private static ClientCompactionTaskQuery createDummyTask(String taskId, CompactionCandidate candidate) + { + return new ClientCompactionTaskQuery( + taskId, + candidate.getDataSource(), + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + } + /** * Initializes scheduler state if required. */ @@ -343,6 +436,7 @@ private synchronized void scheduledRun() } scheduleOnExecutor(this::scheduledRun, schedulePeriodMillis); } else { + // Schedule run again in case compaction supervisors get enabled cleanupState(); scheduleOnExecutor(this::scheduledRun, schedulePeriodMillis); } @@ -375,7 +469,7 @@ private synchronized void resetCompactionJobQueue() // Jobs for all active supervisors are being freshly created // recomputation will not be needed - shouldRecomputeJobsForAnyDatasource.set(false); + isJobRefreshPending.set(false); activeSupervisors.forEach(this::createAndEnqueueJobs); launchPendingJobs(); @@ -387,22 +481,25 @@ private synchronized void resetCompactionJobQueue() * Launches pending compaction jobs if compaction task slots become available. * This method uses the jobs created by the last invocation of {@link #resetCompactionJobQueue()}. */ - private synchronized void launchPendingJobs() + private void launchPendingJobs() { updateQueueIfComputed(queue -> { queue.runReadyJobs(); - updateCompactionSnapshots(queue); + datasourceToCompactionSnapshot.set(queue.getSnapshots()); }); } - private synchronized void recreateJobs(String dataSource, CompactionSupervisor supervisor) + /** + * Refreshes compaction jobs for the given datasource if required. + */ + private void refreshJobs(String dataSource, CompactionSupervisor supervisor) { - if (shouldRecomputeJobsForAnyDatasource.get()) { + if (isJobRefreshPending.get()) { createAndEnqueueJobs(dataSource, supervisor); } } - private synchronized void createAndEnqueueJobs(String dataSource, CompactionSupervisor supervisor) + private void createAndEnqueueJobs(String dataSource, CompactionSupervisor supervisor) { updateQueueIfComputed( queue -> queue.createAndEnqueueJobs( @@ -413,10 +510,10 @@ private synchronized void createAndEnqueueJobs(String dataSource, CompactionSupe } /** - * Performs an operation on the {@link #latestJobQueue} if it has been already - * computed. + * Performs a thread-safe read or write operation on the {@link #latestJobQueue} + * if it has already been computed. */ - private void updateQueueIfComputed(Consumer operation) + private synchronized void updateQueueIfComputed(Consumer operation) { final CompactionJobQueue queue = latestJobQueue.get(); if (queue != null) { @@ -430,13 +527,10 @@ private void onTaskFinished(String taskId, TaskStatus taskStatus) updateQueueIfComputed(queue -> { queue.onTaskFinished(taskId, taskStatus); - updateCompactionSnapshots(queue); + datasourceToCompactionSnapshot.set(queue.getSnapshots()); }); - } - private void updateCompactionSnapshots(CompactionJobQueue queue) - { - datasourceToCompactionSnapshot.set(queue.getSnapshots()); + launchPendingJobs(); } @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java index b008d74829ae..4042541a94ff 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java @@ -36,6 +36,7 @@ import org.mockito.Mockito; import java.util.Collections; +import java.util.Map; public class CompactionSupervisorSpecTest { @@ -87,13 +88,17 @@ public void testGetStatusWithInvalidSpec() { Mockito.when(scheduler.validateCompactionConfig(ArgumentMatchers.any())) .thenReturn(CompactionConfigValidationResult.failure("bad spec")); - Assert.assertEquals( - "Compaction supervisor spec is invalid. Reason[bad spec].", new CompactionSupervisorSpec( - new InlineSchemaDataSourceCompactionConfig.Builder().forDataSource("datasource").build(), - false, - scheduler - ).createSupervisor().getStatus().getPayload().getMessage() - ); + Mockito.when(scheduler.isRunning()).thenReturn(true); + + final CompactionSupervisor supervisor = new CompactionSupervisorSpec( + new InlineSchemaDataSourceCompactionConfig.Builder().forDataSource("datasource").build(), + false, + scheduler + ).createSupervisor(); + + final Map report = supervisor.getStatus().getPayload(); + Assert.assertEquals(CompactionSupervisor.State.INVALID_SPEC, report.get("state")); + Assert.assertEquals("Invalid: bad spec", report.get("detailedState")); } @Test diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionSnapshotBuilder.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionSnapshotBuilder.java index d186587cb870..33040f944be8 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionSnapshotBuilder.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionSnapshotBuilder.java @@ -25,7 +25,9 @@ import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -35,42 +37,59 @@ public class CompactionSnapshotBuilder { private final CoordinatorRunStats stats; - private final Map datasourceToBuilder = new HashMap<>(); + private final Map datasourceToBuilder = new HashMap<>(); public CompactionSnapshotBuilder(CoordinatorRunStats runStats) { this.stats = runStats; } + public List getFullyCompactedIntervals(String dataSource) + { + return datasourceToBuilder.getOrDefault(dataSource, DatasourceSnapshotBuilder.EMPTY).completed; + } + + public List getSkippedIntervals(String dataSource) + { + return datasourceToBuilder.getOrDefault(dataSource, DatasourceSnapshotBuilder.EMPTY).skipped; + } + public void addToComplete(CompactionCandidate candidate) { - getBuilderForDatasource(candidate.getDataSource()) - .incrementCompactedStats(candidate.getStats()); + final DatasourceSnapshotBuilder builder = getBuilderForDatasource(candidate.getDataSource()); + builder.stats.incrementCompactedStats(candidate.getStats()); + builder.completed.add(candidate); } public void addToPending(CompactionCandidate candidate) { - getBuilderForDatasource(candidate.getDataSource()) - .incrementWaitingStats(candidate.getStats()); + final DatasourceSnapshotBuilder builder = getBuilderForDatasource(candidate.getDataSource()); + builder.stats.incrementWaitingStats(getUncompactedStats(candidate)); + + final CompactionStatistics compactedStats = candidate.getCompactedStats(); + if (compactedStats != null) { + builder.stats.incrementCompactedStats(compactedStats); + } } public void addToSkipped(CompactionCandidate candidate) { - getBuilderForDatasource(candidate.getDataSource()) - .incrementSkippedStats(candidate.getStats()); + final DatasourceSnapshotBuilder builder = getBuilderForDatasource(candidate.getDataSource()); + builder.stats.incrementSkippedStats(getUncompactedStats(candidate)); + builder.skipped.add(candidate); } public void moveFromPendingToSkipped(CompactionCandidate candidate) { - getBuilderForDatasource(candidate.getDataSource()) - .decrementWaitingStats(candidate.getStats()); + final DatasourceSnapshotBuilder builder = getBuilderForDatasource(candidate.getDataSource()); + builder.stats.decrementWaitingStats(getUncompactedStats(candidate)); addToSkipped(candidate); } public void moveFromPendingToCompleted(CompactionCandidate candidate) { - getBuilderForDatasource(candidate.getDataSource()) - .decrementWaitingStats(candidate.getStats()); + final DatasourceSnapshotBuilder builder = getBuilderForDatasource(candidate.getDataSource()); + builder.stats.decrementWaitingStats(getUncompactedStats(candidate)); addToComplete(candidate); } @@ -78,7 +97,7 @@ public Map build() { final Map datasourceToSnapshot = new HashMap<>(); datasourceToBuilder.forEach((dataSource, builder) -> { - final AutoCompactionSnapshot autoCompactionSnapshot = builder.build(); + final AutoCompactionSnapshot autoCompactionSnapshot = builder.stats.build(); datasourceToSnapshot.put(dataSource, autoCompactionSnapshot); collectSnapshotStats(autoCompactionSnapshot); }); @@ -86,9 +105,20 @@ public Map build() return datasourceToSnapshot; } - private AutoCompactionSnapshot.Builder getBuilderForDatasource(String dataSource) + /** + * Gets the stats for uncompacted segments in the given candidate. + * If details of uncompacted segments is not available, all segments within the + * candidate are considered to be uncompacted. + */ + private CompactionStatistics getUncompactedStats(CompactionCandidate candidate) { - return datasourceToBuilder.computeIfAbsent(dataSource, AutoCompactionSnapshot::builder); + final CompactionStatistics uncompacted = candidate.getUncompactedStats(); + return uncompacted == null ? candidate.getStats() : uncompacted; + } + + private DatasourceSnapshotBuilder getBuilderForDatasource(String dataSource) + { + return datasourceToBuilder.computeIfAbsent(dataSource, DatasourceSnapshotBuilder::new); } private void collectSnapshotStats(AutoCompactionSnapshot autoCompactionSnapshot) @@ -105,4 +135,22 @@ private void collectSnapshotStats(AutoCompactionSnapshot autoCompactionSnapshot) stats.add(Stats.Compaction.SKIPPED_SEGMENTS, rowKey, autoCompactionSnapshot.getSegmentCountSkipped()); stats.add(Stats.Compaction.SKIPPED_INTERVALS, rowKey, autoCompactionSnapshot.getIntervalCountSkipped()); } + + /** + * Wrapper around AutoCompactionSnapshot.Builder to track list of completed + * and skipped candidates. + */ + private static class DatasourceSnapshotBuilder + { + static final DatasourceSnapshotBuilder EMPTY = new DatasourceSnapshotBuilder("."); + + final AutoCompactionSnapshot.Builder stats; + final List completed = new ArrayList<>(); + final List skipped = new ArrayList<>(); + + DatasourceSnapshotBuilder(String dataSource) + { + this.stats = AutoCompactionSnapshot.builder(dataSource); + } + } } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java index cc52513b16c5..eeb1788f6bc4 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java @@ -55,7 +55,7 @@ */ public class CompactionStatus { - private static final CompactionStatus COMPLETE = new CompactionStatus(State.COMPLETE, null, null, null); + public static final CompactionStatus COMPLETE = new CompactionStatus(State.COMPLETE, null, null, null); public enum State { diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java index 1dc409e7361e..dc42faf77e92 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java @@ -196,12 +196,13 @@ void handleCompletedTask(Interval compactionInterval, TaskStatus taskStatus) final CompactionTaskStatus updatedStatus; if (taskStatus.isSuccess()) { - updatedStatus = new CompactionTaskStatus(TaskState.SUCCESS, now, 0); + updatedStatus = new CompactionTaskStatus(taskStatus.getId(), TaskState.SUCCESS, now, 0); } else if (lastKnownStatus == null || lastKnownStatus.getState().isSuccess()) { // This is the first failure - updatedStatus = new CompactionTaskStatus(TaskState.FAILED, now, 1); + updatedStatus = new CompactionTaskStatus(taskStatus.getId(), TaskState.FAILED, now, 1); } else { updatedStatus = new CompactionTaskStatus( + taskStatus.getId(), TaskState.FAILED, now, lastKnownStatus.getNumConsecutiveFailures() + 1 @@ -217,11 +218,11 @@ void handleSubmittedTask(CompactionCandidate candidateSegments) final DateTime now = DateTimes.nowUtc(); if (lastStatus == null || !lastStatus.getState().isFailure()) { - intervalToTaskStatus.put(interval, new CompactionTaskStatus(TaskState.RUNNING, now, 0)); + intervalToTaskStatus.put(interval, new CompactionTaskStatus("", TaskState.RUNNING, now, 0)); } else { intervalToTaskStatus.put( interval, - new CompactionTaskStatus(TaskState.RUNNING, now, lastStatus.getNumConsecutiveFailures()) + new CompactionTaskStatus("", TaskState.RUNNING, now, lastStatus.getNumConsecutiveFailures()) ); } } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionTaskStatus.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionTaskStatus.java index 3431de706f69..f09993f96573 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionTaskStatus.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionTaskStatus.java @@ -24,21 +24,29 @@ public class CompactionTaskStatus { + private final String taskId; private final TaskState state; private final DateTime updatedTime; private final int numConsecutiveFailures; public CompactionTaskStatus( + String taskId, TaskState state, DateTime updatedTime, int numConsecutiveFailures ) { + this.taskId = taskId; this.state = state; this.updatedTime = updatedTime; this.numConsecutiveFailures = numConsecutiveFailures; } + public String getTaskId() + { + return taskId; + } + public TaskState getState() { return state; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java index b450dd5a5042..e9b2640ca1a3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java @@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.server.compaction.CompactionStatistics; -import javax.annotation.Nullable; import javax.validation.constraints.NotNull; import java.util.Objects; @@ -43,8 +42,6 @@ public enum ScheduleStatus @JsonProperty private final ScheduleStatus scheduleStatus; @JsonProperty - private final String message; - @JsonProperty private final long bytesAwaitingCompaction; @JsonProperty private final long bytesCompacted; @@ -72,7 +69,6 @@ public static Builder builder(String dataSource) public AutoCompactionSnapshot( @JsonProperty("dataSource") @NotNull String dataSource, @JsonProperty("scheduleStatus") @NotNull AutoCompactionSnapshot.ScheduleStatus scheduleStatus, - @JsonProperty("message") @Nullable String message, @JsonProperty("bytesAwaitingCompaction") long bytesAwaitingCompaction, @JsonProperty("bytesCompacted") long bytesCompacted, @JsonProperty("bytesSkipped") long bytesSkipped, @@ -86,7 +82,6 @@ public AutoCompactionSnapshot( { this.dataSource = dataSource; this.scheduleStatus = scheduleStatus; - this.message = message; this.bytesAwaitingCompaction = bytesAwaitingCompaction; this.bytesCompacted = bytesCompacted; this.bytesSkipped = bytesSkipped; @@ -110,12 +105,6 @@ public AutoCompactionSnapshot.ScheduleStatus getScheduleStatus() return scheduleStatus; } - @Nullable - public String getMessage() - { - return message; - } - public long getBytesAwaitingCompaction() { return bytesAwaitingCompaction; @@ -181,8 +170,7 @@ public boolean equals(Object o) intervalCountCompacted == that.intervalCountCompacted && intervalCountSkipped == that.intervalCountSkipped && dataSource.equals(that.dataSource) && - scheduleStatus == that.scheduleStatus && - Objects.equals(message, that.message); + scheduleStatus == that.scheduleStatus; } @Override @@ -191,7 +179,6 @@ public int hashCode() return Objects.hash( dataSource, scheduleStatus, - message, bytesAwaitingCompaction, bytesCompacted, bytesSkipped, @@ -210,7 +197,6 @@ public String toString() return "AutoCompactionSnapshot{" + "dataSource='" + dataSource + '\'' + ", scheduleStatus=" + scheduleStatus + - ", message='" + message + '\'' + ", bytesAwaitingCompaction=" + bytesAwaitingCompaction + ", bytesCompacted=" + bytesCompacted + ", bytesSkipped=" + bytesSkipped + @@ -227,7 +213,6 @@ public static class Builder { private final String dataSource; private ScheduleStatus scheduleStatus; - private String message; private final CompactionStatistics compactedStats = new CompactionStatistics(); private final CompactionStatistics skippedStats = new CompactionStatistics(); @@ -249,12 +234,6 @@ public Builder withStatus(ScheduleStatus status) return this; } - public Builder withMessage(String message) - { - this.message = message; - return this; - } - public void incrementWaitingStats(CompactionStatistics entry) { waitingStats.increment(entry); @@ -280,7 +259,6 @@ public AutoCompactionSnapshot build() return new AutoCompactionSnapshot( dataSource, scheduleStatus, - message, waitingStats.getTotalBytes(), compactedStats.getTotalBytes(), skippedStats.getTotalBytes(), diff --git a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java index a6eb127f854c..7dd062362816 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java @@ -29,7 +29,6 @@ public class AutoCompactionSnapshotTest public void testAutoCompactionSnapshotBuilder() { final String expectedDataSource = "data"; - final String expectedMessage = "message"; final AutoCompactionSnapshot.Builder builder = AutoCompactionSnapshot.builder(expectedDataSource); // Increment every stat twice @@ -39,7 +38,7 @@ public void testAutoCompactionSnapshotBuilder() builder.incrementCompactedStats(CompactionStatistics.create(13, 13, 13)); } - final AutoCompactionSnapshot actual = builder.withMessage(expectedMessage).build(); + final AutoCompactionSnapshot actual = builder.build(); Assert.assertNotNull(actual); Assert.assertEquals(26, actual.getSegmentCountSkipped()); @@ -53,12 +52,10 @@ public void testAutoCompactionSnapshotBuilder() Assert.assertEquals(26, actual.getSegmentCountAwaitingCompaction()); Assert.assertEquals(AutoCompactionSnapshot.ScheduleStatus.RUNNING, actual.getScheduleStatus()); Assert.assertEquals(expectedDataSource, actual.getDataSource()); - Assert.assertEquals(expectedMessage, actual.getMessage()); AutoCompactionSnapshot expected = new AutoCompactionSnapshot( expectedDataSource, AutoCompactionSnapshot.ScheduleStatus.RUNNING, - expectedMessage, 26, 26, 26, diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java index 943f9a6ccf7d..00fb308a48c8 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java @@ -44,7 +44,6 @@ public class CoordinatorCompactionResourceTest private final AutoCompactionSnapshot expectedSnapshot = new AutoCompactionSnapshot( dataSourceName, AutoCompactionSnapshot.ScheduleStatus.RUNNING, - null, 1, 1, 1,