Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -524,15 +524,15 @@ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File lo
try {
taskLogPusher.pushTaskLog(task.getId(), logFile);
}
catch (IOException e) {
catch (Exception e) {
LOGGER.error("Task[%s] failed to push task logs to [%s]: Exception[%s]",
task.getId(), logFile.getName(), e.getMessage());
}
if (reportsFile.exists()) {
try {
taskLogPusher.pushTaskReports(task.getId(), reportsFile);
}
catch (IOException e) {
catch (Exception e) {
LOGGER.error("Task[%s] failed to push task reports to [%s]: Exception[%s]",
task.getId(), reportsFile.getName(), e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,62 @@ ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation ta
Assert.assertTrue(forkingTaskRunner.restore().isEmpty());
}

@Test
public void testTaskStatusWhenTaskLogUploadFails() throws Exception
{
class ExceptionTaskLogs extends NoopTaskLogs
{
@Override
public void pushTaskLog(String taskid, File logFile)
{
throw new RuntimeException("Exception occurred while pushing task logs");
}
}
TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
.build();
final WorkerConfig workerConfig = new WorkerConfig();
ExceptionTaskLogs exceptionTaskLogs = new ExceptionTaskLogs();
ObjectMapper mapper = new DefaultObjectMapper();
Task task = NoopTask.create();
ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
new ForkingTaskRunnerConfig(),
taskConfig,
workerConfig,
new Properties(),
exceptionTaskLogs,
mapper,
new DruidNode("middleManager", "host", false, 8091, null, true, false),
new StartupLoggingConfig(),
TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
)
{
@Override
ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation) throws IOException
{
for (String param : command) {
if (param.endsWith(task.getId())) {
final String basePath = getTracker().pickStorageSlot(task.getId()).getDirectory().getAbsolutePath();
File resultFile = Paths.get(basePath, task.getId(), "attempt", "1", "status.json").toFile();
mapper.writeValue(resultFile, TaskStatus.success(task.getId()));
break;
}
}
MockTestProcess mockTestProcess = new MockTestProcess()
{
@Override
public int waitFor()
{
return 0;
}
};
return new ForkingTaskRunner.ProcessHolder(mockTestProcess, logFile, taskLocation);
}
};
forkingTaskRunner.setNumProcessorsPerTask();
final TaskStatus status = forkingTaskRunner.run(task).get();
assertEquals(TaskState.SUCCESS, status.getStatusCode());
}

public static TaskConfigBuilder makeDefaultTaskConfigBuilder()
{
return new TaskConfigBuilder()
Expand Down
Loading