Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,12 @@ public void test_streamLogs_ofCancelledTask() throws Exception
);

final Optional<InputStream> streamOptional =
overlord.bindings()
.getInstance(TaskLogStreamer.class)
.streamTaskLog(taskId, 0);
cluster.callApi().waitForResult(
() -> overlord.bindings()
.getInstance(TaskLogStreamer.class)
.streamTaskLog(taskId, 0),
Optional::isPresent
).go();

Assertions.assertTrue(streamOptional.isPresent());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.druid.error.DruidException;
import org.apache.druid.error.EntryAlreadyExists;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -46,6 +45,7 @@
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskContextEnricher;
import org.apache.druid.indexing.common.task.TaskMetrics;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner;
Expand Down Expand Up @@ -451,7 +451,6 @@ private void startPendingTaskOnRunner(TaskEntry entry, ListenableFuture<TaskStat
}
final TaskStatus taskStatus = TaskStatus.failure(task.getId(), errorMessage);
notifyStatus(entry, taskStatus, taskStatus.getErrorMsg());
emitTaskCompletionLogsAndMetrics(task, taskStatus);
return;
}
if (taskIsReady) {
Expand Down Expand Up @@ -751,8 +750,8 @@ private void notifyStatus(final TaskEntry entry, final TaskStatus taskStatus, St
}

shutdownTaskOnRunner(task.getId(), reasonFormat, args);

removeTaskLock(task);
emitTaskCompletionLogsAndMetrics(task, taskStatus);
requestManagement();

log.info("Completed notifyStatus for task[%s] with status[%s]", task.getId(), taskStatus);
Expand Down Expand Up @@ -813,9 +812,6 @@ private void handleStatus(final TaskStatus status)
task.getId(),
entry -> notifyStatus(entry, status, "notified status change from task")
);

// Emit event and log, if the task is done
emitTaskCompletionLogsAndMetrics(task, status);
}
catch (Exception e) {
log.makeAlert(e, "Failed to handle task status")
Expand Down Expand Up @@ -966,14 +962,14 @@ public Map<RowKey, Long> getWaitingTaskCount()
}

/**
* Gets the current status of this task either from the {@link TaskRunner}
* or from the {@link TaskStorage} (if not available with the TaskRunner).
* Gets the current status of this task either from {@link #activeTasks} and {@link #taskRunner}, if active,
* or otherwise from the {@link TaskStorage}.
*/
public Optional<TaskStatus> getTaskStatus(final String taskId)
{
RunnerTaskState runnerTaskState = taskRunner.getRunnerTaskState(taskId);
if (runnerTaskState != null && runnerTaskState != RunnerTaskState.NONE) {
return Optional.of(TaskStatus.running(taskId).withLocation(taskRunner.getTaskLocation(taskId)));
final TaskEntry activeTaskEntry = activeTasks.get(taskId);
if (activeTaskEntry != null) {
return Optional.of(activeTaskEntry.taskInfo.getStatus().withLocation(taskRunner.getTaskLocation(taskId)));
} else {
return taskStorage.getStatus(taskId);
}
Expand Down Expand Up @@ -1074,24 +1070,22 @@ public Map<String, Task> getActiveTasksForDatasource(String datasource)

private void emitTaskCompletionLogsAndMetrics(final Task task, final TaskStatus status)
{
if (status.isComplete()) {
final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status);

emitter.emit(metricBuilder.setMetric("task/run/time", status.getDuration()));
final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status);

if (status.isSuccess()) {
Counters.incrementAndGetLong(totalSuccessfulTaskCount, getMetricKey(task));
} else {
Counters.incrementAndGetLong(totalFailedTaskCount, getMetricKey(task));
}
emitter.emit(metricBuilder.setMetric(TaskMetrics.RUN_DURATION, status.getDuration()));

log.info(
"Completed task[%s] with status[%s] in [%d]ms.",
task.getId(), status, status.getDuration()
);
if (status.isSuccess()) {
Counters.incrementAndGetLong(totalSuccessfulTaskCount, getMetricKey(task));
} else {
Counters.incrementAndGetLong(totalFailedTaskCount, getMetricKey(task));
}

log.info(
"Completed task[%s] with status[%s] in [%d]ms.",
task.getId(), status, status.getDuration()
);
}

private void validateTaskPayload(Task task)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Map<String, Object> getContext()
taskQueue.setActive(true);
}

@Test
@Test(timeout = 30_000)
public void testManageQueuedTasksReleaseLockWhenTaskIsNotReady() throws Exception
{
// task1 emulates a case when there is a task that was issued before task2 and acquired locks conflicting
Expand All @@ -164,16 +164,20 @@ public void testManageQueuedTasksReleaseLockWhenTaskIsNotReady() throws Exceptio
final TestTask task3 = new TestTask("t3", Intervals.of("2021-02-01/P1M"));
taskQueue.add(task3);
taskQueue.manageQueuedTasks();

// Wait for task3 to exit.
waitForTaskToExit(task3);

Assert.assertFalse(task2.isDone());
Assert.assertTrue(task3.isDone());
Assert.assertTrue(getLockbox().findLocksForTask(task2).isEmpty());

// Shut down task1 and task3 and release their locks.
// Shut down task1 and release its locks.
shutdownTask(task1);
taskQueue.shutdown(task3.getId(), "Emulating shutdown of task3");

// Now task2 should run.
taskQueue.manageQueuedTasks();
waitForTaskToExit(task2);
Assert.assertTrue(task2.isDone());

// Sleep to allow all metrics to be emitted
Expand Down Expand Up @@ -490,7 +494,7 @@ public void testKilledTasksEmitRuntimeMetricWithHttpRemote() throws InterruptedE
Thread.sleep(100);

// Verify that metrics are emitted on receiving announcement
serviceEmitter.verifyEmitted("task/run/time", Map.of(DruidMetrics.DESCRIPTION, "shutdown on runner"), 1);
serviceEmitter.verifyEmitted("task/run/time", Map.of(DruidMetrics.DESCRIPTION, "shutdown"), 1);
verifySuccessfulTaskCount(taskQueue, 0);
verifyFailedTaskCount(taskQueue, 1);

Expand All @@ -500,69 +504,53 @@ public void testKilledTasksEmitRuntimeMetricWithHttpRemote() throws InterruptedE
}

@Test
public void testGetTaskStatus()
public void testGetTaskStatus_successfulTask()
{
final TaskRunner taskRunner = EasyMock.createMock(TaskRunner.class);
final TaskStorage taskStorage = EasyMock.createMock(TaskStorage.class);

final String newTask = "newTask";
EasyMock.expect(taskRunner.getRunnerTaskState(newTask))
.andReturn(null);
EasyMock.expect(taskStorage.getStatus(newTask))
.andReturn(Optional.of(TaskStatus.running(newTask)));

final String waitingTask = "waitingTask";
EasyMock.expect(taskRunner.getRunnerTaskState(waitingTask))
.andReturn(RunnerTaskState.WAITING);
EasyMock.expect(taskRunner.getTaskLocation(waitingTask))
.andReturn(TaskLocation.unknown());

final String pendingTask = "pendingTask";
EasyMock.expect(taskRunner.getRunnerTaskState(pendingTask))
.andReturn(RunnerTaskState.PENDING);
EasyMock.expect(taskRunner.getTaskLocation(pendingTask))
.andReturn(TaskLocation.unknown());

final String runningTask = "runningTask";
EasyMock.expect(taskRunner.getRunnerTaskState(runningTask))
.andReturn(RunnerTaskState.RUNNING);
EasyMock.expect(taskRunner.getTaskLocation(runningTask))
.andReturn(TaskLocation.create("host", 8100, 8100));

final String successfulTask = "successfulTask";
EasyMock.expect(taskRunner.getRunnerTaskState(successfulTask))
.andReturn(RunnerTaskState.NONE);
EasyMock.expect(taskStorage.getStatus(successfulTask))
.andReturn(Optional.of(TaskStatus.success(successfulTask)));

final String failedTask = "failedTask";
EasyMock.expect(taskRunner.getRunnerTaskState(failedTask))
.andReturn(RunnerTaskState.NONE);
EasyMock.expect(taskStorage.getStatus(failedTask))
.andReturn(Optional.of(TaskStatus.failure(failedTask, failedTask)));

EasyMock.replay(taskRunner, taskStorage);
final TestTask task = new TestTask("successfulTask", Intervals.of("2021-01-01/P1D"));
taskQueue.add(task);

final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
new TaskQueueConfig(null, null, null, null, null, null),
new DefaultTaskConfig(),
taskStorage,
taskRunner,
actionClientFactory,
getLockbox(),
serviceEmitter,
getObjectMapper(),
new NoopTaskContextEnricher()
);
taskQueue.setActive(true);
// Active task: status should come from activeTasks map
final Optional<TaskStatus> activeStatus = taskQueue.getTaskStatus(task.getId());
Assert.assertTrue(activeStatus.isPresent());
Assert.assertEquals(TaskState.RUNNING, activeStatus.get().getStatusCode());
Assert.assertEquals(task.getId(), activeStatus.get().getId());

Assert.assertEquals(TaskStatus.running(newTask), taskQueue.getTaskStatus(newTask).get());
Assert.assertEquals(TaskStatus.running(waitingTask), taskQueue.getTaskStatus(waitingTask).get());
Assert.assertEquals(TaskStatus.running(pendingTask), taskQueue.getTaskStatus(pendingTask).get());
Assert.assertEquals(TaskStatus.running(runningTask), taskQueue.getTaskStatus(runningTask).get());
Assert.assertEquals(TaskStatus.success(successfulTask), taskQueue.getTaskStatus(successfulTask).get());
Assert.assertEquals(TaskStatus.failure(failedTask, failedTask), taskQueue.getTaskStatus(failedTask).get());
// Run the task so it completes
taskQueue.manageQueuedTasks();
waitForTaskToExit(task);

final Optional<TaskStatus> completedStatus = taskQueue.getTaskStatus(task.getId());
Assert.assertTrue(completedStatus.isPresent());
Assert.assertEquals(TaskState.SUCCESS, completedStatus.get().getStatusCode());
}

@Test
public void testGetTaskStatus_failedTask()
{
final TestTask task = new TestTask("failedTask", Intervals.of("2021-01-01/P1D"))
{
@Override
public TaskStatus runTask(TaskToolbox toolbox)
{
super.done = true;
return TaskStatus.failure(getId(), "intentional failure");
}
};
taskQueue.add(task);
taskQueue.manageQueuedTasks();
waitForTaskToExit(task);

final Optional<TaskStatus> failedStatus = taskQueue.getTaskStatus(task.getId());
Assert.assertTrue(failedStatus.isPresent());
Assert.assertEquals(TaskState.FAILED, failedStatus.get().getStatusCode());
Assert.assertEquals("intentional failure", failedStatus.get().getErrorMsg());
}

@Test
public void testGetTaskStatus_unknownTask()
{
final Optional<TaskStatus> unknownStatus = taskQueue.getTaskStatus("unknownTask");
Assert.assertFalse(unknownStatus.isPresent());
}

@Test
Expand Down Expand Up @@ -785,6 +773,18 @@ private HttpRemoteTaskRunner createHttpRemoteTaskRunner()
);
}

private void waitForTaskToExit(final Task task)
{
while (taskQueue.getActiveTasksForDatasource(task.getDataSource()).containsKey(task.getId())) {
try {
Thread.sleep(10);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

private static void verifySuccessfulTaskCount(final TaskQueue taskQueue, int successCount)
{
Assert.assertEquals(
Expand Down
Loading
Loading