Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,39 @@ public Map<String, AutoCompactionSnapshot> getSnapshots()
return snapshotBuilder.build();
}

/**
* List of all jobs currently in queue. The order of the jobs is not guaranteed.
*/
public List<CompactionJob> getQueuedJobs()
{
return List.copyOf(queue);
}

/**
* Intervals for the given datasource that are already fully compacted.
*/
public List<CompactionCandidate> getFullyCompactedIntervals(String dataSource)
{
return snapshotBuilder.getFullyCompactedIntervals(dataSource);
}

/**
* Intervals that were skipped from compaction. Some of the segments in these
* intervals may already be compacted.
*/
public List<CompactionCandidate> getSkippedIntervals(String dataSource)
{
return snapshotBuilder.getSkippedIntervals(dataSource);
}

/**
* Search policy used to build this queue.
*/
public CompactionCandidateSearchPolicy getSearchPolicy()
{
return searchPolicy;
}

/**
* Starts a job if it is ready and is not already in progress.
*
Expand Down Expand Up @@ -348,12 +381,9 @@ private String startTaskIfReady(CompactionJob job)
}
}

public CompactionStatus getCurrentStatusForJob(CompactionJob job, CompactionCandidateSearchPolicy policy)
private CompactionStatus getCurrentStatusForJob(CompactionJob job, CompactionCandidateSearchPolicy policy)
{
final CompactionStatus compactionStatus = statusTracker.computeCompactionStatus(job.getCandidate(), policy);
final CompactionCandidate candidatesWithStatus = job.getCandidate().withCurrentStatus(null);
statusTracker.onCompactionStatusComputed(candidatesWithStatus, null);
return compactionStatus;
return statusTracker.computeCompactionStatus(job.getCandidate(), policy);
}

public static CompactionConfigValidationResult validateCompactionJob(BatchIndexingJob job)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.compact;

import org.apache.druid.server.compaction.CompactionStatus;

/**
* Latest status of a compaction job.
*/
public class CompactionJobStatus
{
private final CompactionJob job;
private final CompactionStatus currentStatus;
private final int positionInQueue;

public CompactionJobStatus(CompactionJob job, CompactionStatus currentStatus, int positionInQueue)
{
this.job = job;
this.currentStatus = currentStatus;
this.positionInQueue = positionInQueue;
}

public CompactionJob getJob()
{
return job;
}

public CompactionStatus getCurrentStatus()
{
return currentStatus;
}

public int getPositionInQueue()
{
return positionInQueue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.compact;

import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.compaction.CompactionCandidate;
import org.apache.druid.server.compaction.CompactionStatistics;
import org.apache.druid.server.compaction.Table;

import java.util.List;

public class CompactionJobTable extends Table
{
public static CompactionJobTable create(List<CompactionJobStatus> jobs, int maxRows)
{
final CompactionJobTable table = new CompactionJobTable(
List.of("Interval", "Task Id", "Compacted segments", "Compacted bytes", "Position in job queue")
);

for (int i = 0; i < maxRows && i < jobs.size(); ++i) {
table.addJobRow(jobs.get(i));
}
return table;
}

private CompactionJobTable(List<String> columnNames)
{
super(columnNames, null);
}

private void addJobRow(CompactionJobStatus jobStatus)
{
final CompactionJob job = jobStatus.getJob();
final CompactionCandidate candidate = jobStatus.getJob().getCandidate();
final CompactionStatistics compactedStats = candidate.getCompactedStats();

final List<Object> values = List.of(
candidate.getCompactionInterval(),
job.isMsq() ? "" : job.getNonNullTask().getId(),
compactedStats == null
? ""
: StringUtils.format("%d/%d", compactedStats.getNumSegments(), candidate.numSegments()),
compactedStats == null
? ""
: StringUtils.format("%d/%d", compactedStats.getTotalBytes(), candidate.getTotalBytes()),
jobStatus.getPositionInQueue()
);

addRow(values.toArray(new Object[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
package org.apache.druid.indexing.compact;

import org.apache.druid.server.compaction.CompactionSimulateResult;
import org.apache.druid.server.compaction.CompactionStatus;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;

import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -79,6 +81,11 @@ public interface CompactionScheduler
*/
AutoCompactionSnapshot getCompactionSnapshot(String dataSource);

/**
* Gets the current status of all the known compaction jobs for this datasource.
*/
Map<CompactionStatus.State, List<CompactionJobStatus>> getJobsByStatus(String dataSource);

/**
* Simulates a compaction run with the given cluster config.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.compaction.CompactionStatus;
import org.apache.druid.server.compaction.Table;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.utils.CollectionUtils;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;

/**
* Supervisor for compaction of a single datasource.
Expand Down Expand Up @@ -105,25 +108,28 @@ public void stop(boolean stopGracefully)
}

@Override
public SupervisorReport<AutoCompactionSnapshot> getStatus()
public SupervisorReport<Map<String, Object>> getStatus()
{
final AutoCompactionSnapshot snapshot;
String detailedState = getState().toString();
if (supervisorSpec.isSuspended()) {
snapshot = AutoCompactionSnapshot.builder(dataSource)
.withStatus(AutoCompactionSnapshot.ScheduleStatus.NOT_ENABLED)
.build();
} else if (!supervisorSpec.getValidationResult().isValid()) {
snapshot = AutoCompactionSnapshot.builder(dataSource)
.withMessage(StringUtils.format(
"Compaction supervisor spec is invalid. Reason[%s].",
supervisorSpec.getValidationResult().getReason()
))
.build();
snapshot = AutoCompactionSnapshot.builder(dataSource).build();
detailedState = "Invalid: " + supervisorSpec.getValidationResult().getReason();
} else {
snapshot = scheduler.getCompactionSnapshot(dataSource);
}

return new SupervisorReport<>(supervisorSpec.getId(), DateTimes.nowUtc(), snapshot);
final Map<String, Object> statusMap = Map.of(
"state", getState(),
"detailedState", detailedState,
"stats", snapshot,
"jobs", getCompactionJobsMap()
);
return new SupervisorReport<>(supervisorSpec.getId(), DateTimes.nowUtc(), statusMap);
}

@Override
Expand All @@ -146,6 +152,14 @@ public void reset(@Nullable DataSourceMetadata dataSourceMetadata)
// do nothing
}

private Map<CompactionStatus.State, Table> getCompactionJobsMap()
{
return CollectionUtils.mapValues(
scheduler.getJobsByStatus(dataSource),
jobs -> CompactionJobTable.create(jobs, 100)
);
}

public enum State implements SupervisorStateManager.State
{
SCHEDULER_STOPPED(true),
Expand Down
Loading
Loading