diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 9a7f2905b110..5d87ab602fba 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -524,7 +524,7 @@ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File lo try { taskLogPusher.pushTaskLog(task.getId(), logFile); } - catch (IOException e) { + catch (Exception e) { LOGGER.error("Task[%s] failed to push task logs to [%s]: Exception[%s]", task.getId(), logFile.getName(), e.getMessage()); } @@ -532,7 +532,7 @@ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File lo try { taskLogPusher.pushTaskReports(task.getId(), reportsFile); } - catch (IOException e) { + catch (Exception e) { LOGGER.error("Task[%s] failed to push task reports to [%s]: Exception[%s]", task.getId(), reportsFile.getName(), e.getMessage()); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java index 3664af6fd82c..4b722be02e5f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java @@ -535,6 +535,62 @@ ProcessHolder runTaskProcess(List command, File logFile, TaskLocation ta Assert.assertTrue(forkingTaskRunner.restore().isEmpty()); } + @Test + public void testTaskStatusWhenTaskLogUploadFails() throws Exception + { + class ExceptionTaskLogs extends NoopTaskLogs + { + @Override + public void pushTaskLog(String taskid, File logFile) + { + throw new RuntimeException("Exception occurred while pushing task logs"); + } + } + TaskConfig taskConfig = makeDefaultTaskConfigBuilder() + .build(); + final WorkerConfig workerConfig = new WorkerConfig(); + ExceptionTaskLogs exceptionTaskLogs = new ExceptionTaskLogs(); + ObjectMapper mapper = new DefaultObjectMapper(); + Task task = NoopTask.create(); + ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner( + new ForkingTaskRunnerConfig(), + taskConfig, + workerConfig, + new Properties(), + exceptionTaskLogs, + mapper, + new DruidNode("middleManager", "host", false, 8091, null, true, false), + new StartupLoggingConfig(), + TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig) + ) + { + @Override + ProcessHolder runTaskProcess(List command, File logFile, TaskLocation taskLocation) throws IOException + { + for (String param : command) { + if (param.endsWith(task.getId())) { + final String basePath = getTracker().pickStorageSlot(task.getId()).getDirectory().getAbsolutePath(); + File resultFile = Paths.get(basePath, task.getId(), "attempt", "1", "status.json").toFile(); + mapper.writeValue(resultFile, TaskStatus.success(task.getId())); + break; + } + } + MockTestProcess mockTestProcess = new MockTestProcess() + { + @Override + public int waitFor() + { + return 0; + } + }; + return new ForkingTaskRunner.ProcessHolder(mockTestProcess, logFile, taskLocation); + } + }; + forkingTaskRunner.setNumProcessorsPerTask(); + final TaskStatus status = forkingTaskRunner.run(task).get(); + assertEquals(TaskState.SUCCESS, status.getStatusCode()); + } + public static TaskConfigBuilder makeDefaultTaskConfigBuilder() { return new TaskConfigBuilder()