Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ private void startPendingTaskOnRunner(TaskEntry entry, ListenableFuture<TaskStat
}
final TaskStatus taskStatus = TaskStatus.failure(task.getId(), errorMessage);
notifyStatus(entry, taskStatus, taskStatus.getErrorMsg());
emitTaskCompletionLogsAndMetrics(task, taskStatus);
return;
}
if (taskIsReady) {
Expand Down Expand Up @@ -749,8 +748,8 @@ private void notifyStatus(final TaskEntry entry, final TaskStatus taskStatus, St
}

shutdownTaskOnRunner(task.getId(), reasonFormat, args);

removeTaskLock(task);
emitTaskCompletionLogsAndMetrics(task, taskStatus);
requestManagement();

log.info("Completed notifyStatus for task[%s] with status[%s]", task.getId(), taskStatus);
Expand Down Expand Up @@ -811,9 +810,6 @@ private void handleStatus(final TaskStatus status)
task.getId(),
entry -> notifyStatus(entry, status, "notified status change from task")
);

// Emit event and log, if the task is done
emitTaskCompletionLogsAndMetrics(task, status);
}
catch (Exception e) {
log.makeAlert(e, "Failed to handle task status")
Expand Down Expand Up @@ -1073,24 +1069,22 @@ public Map<String, Task> getActiveTasksForDatasource(String datasource)

private void emitTaskCompletionLogsAndMetrics(final Task task, final TaskStatus status)
{
if (status.isComplete()) {
final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status);

emitter.emit(metricBuilder.setMetric("task/run/time", status.getDuration()));
final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status);

if (status.isSuccess()) {
Counters.incrementAndGetLong(totalSuccessfulTaskCount, getMetricKey(task));
} else {
Counters.incrementAndGetLong(totalFailedTaskCount, getMetricKey(task));
}
emitter.emit(metricBuilder.setMetric("task/run/time", status.getDuration()));

log.info(
"Completed task[%s] with status[%s] in [%d]ms.",
task.getId(), status, status.getDuration()
);
if (status.isSuccess()) {
Counters.incrementAndGetLong(totalSuccessfulTaskCount, getMetricKey(task));
} else {
Counters.incrementAndGetLong(totalFailedTaskCount, getMetricKey(task));
}

log.info(
"Completed task[%s] with status[%s] in [%d]ms.",
task.getId(), status, status.getDuration()
);
}

private void validateTaskPayload(Task task)
Expand Down
Loading