Skip to content

Conversation

@GWphua
Copy link
Contributor

@GWphua GWphua commented Jul 4, 2025

Fixes #18197

This problem is introduced through this PR in Druid v26.

Problem Statement

Using the druid-multi-stage-query extension with K8s, using the INSERT or REPLACE query will result in a failed with status code 404 message. However, the query is able to succeed uneventfully on a MM architecture. (See related issue for screenshots.)

Logs

Broker Logs

2025-06-26T09:08:09,757 INFO [qtp1786513714-155] org.apache.druid.msq.sql.resources.SqlStatementResource - Query details not found for queryId [query-1ed10341-bb20-4086-a1d4-060d2496266a]
org.apache.druid.rpc.HttpResponseException: Server error [404 Not Found]; body: No task reports were found for this task. The task may not exist, or it may not have completed yet.
        at org.apache.druid.rpc.ServiceClientImpl$1.onSuccess(ServiceClientImpl.java:200) ~[druid-server-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT]
        at org.apache.druid.rpc.ServiceClientImpl$1.onSuccess(ServiceClientImpl.java:182) ~[druid-server-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT]
        at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1133) ~[guava-32.0.1-jre.jar:?]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
        at java.base/java.lang.Thread.run(Thread.java:840) [?:?]

Overlord Logs

2025-07-01T03:18:16,150 WARN [qtp1510570627-108] org.apache.druid.indexing.overlord.http.OverlordResource - Failed to stream task reports for task query-be62b874-c86c-4079-8a71-a0867f6953c2
org.jboss.netty.channel.ChannelException: Faulty channel in resource pool
	at org.apache.druid.java.util.http.client.NettyHttpClient.go(NettyHttpClient.java:134) ~[druid-processing-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT]
	at org.apache.druid.java.util.http.client.CredentialedHttpClient.go(CredentialedHttpClient.java:48) ~[druid-processing-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT]
	at org.apache.druid.java.util.http.client.AbstractHttpClient.go(AbstractHttpClient.java:33) ~[druid-processing-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT]
	at org.apache.druid.k8s.overlord.KubernetesTaskRunner.streamTaskReports(KubernetesTaskRunner.java:298) ~[?:?]
	at org.apache.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer.streamTaskReports(TaskRunnerTaskLogStreamer.java:59) ~[druid-indexing-service-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT]
	at org.apache.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer.streamTaskReports(SwitchingTaskLogStreamer.java:94) ~[druid-indexing-service-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT]
	at org.apache.druid.indexing.overlord.http.OverlordResource.doGetReports(OverlordResource.java:810) ~[druid-indexing-service-31.0.1-SNAPSHOT.jar:31.0.1-SNAPSHOT]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
	...
Caused by: java.net.ConnectException: Connection refused: /XX.XX.XX.XX:8100
	...

Investigation

Root Cause Analysis

  1. The query is able to succeed, and the segments are able to be pushed on the Historicals.
  2. On the dashboard, the execution details are queried using the api druid/v2/sql/statement/{queryId}.
  3. The following error log is found on the webpage console: {"error":"druidException","errorCode":"notFound","persona":"USER","category":"NOT_FOUND","errorMessage":"Query [query-a72b5075-0dae-48bf-b1b8-7f66c2e070e6] was not found. The query details are no longer present or might not be of the type [query_controller]. Verify that the id is correct.","context":{}}

Code Dive

Looking into Dashboard API Call

  1. The dashboard tries to call a GET API to druid/v2/sql/statement/{queryId}?detail=true.
  2. The API is provided by SqlStatementResource#doGetStatus, and proceeds into the following steps:
    1. Authorise and Authenticate
    2. Get Statement Status for query through SqlStatementResource#getStatementStatus
    3. Returns the statement status as an OK Response if exists, else build and return a non-OK Response.

Get Statement Status

  1. Contact the overlord for the task status with taskId = queryId at /druid/indexer/v1/tasks/{taskId}/status.

    1. Hits OverlordResource#getTaskStatus
  2. Contact the overlord for the task report with taskId = queryId at /druid/indexer/v1/tasks/{taskId}/reports.

    1. Calls OverlordResource#doGetReports, which calls SwitchingTaskLogStreamer#streamTaskReports.

      @GET
        @Path("/task/{taskid}/reports")
        @Produces(MediaType.APPLICATION_JSON)
        @ResourceFilters(TaskResourceFilter.class)
        public Response doGetReports(
            @PathParam("taskid") final String taskid
        )
        {
          try {
            final Optional<InputStream> stream = taskLogStreamer.streamTaskReports(taskid);
            if (stream.isPresent()) {
              return Response.ok(stream.get()).build();
            } else {
              return Response.status(Response.Status.NOT_FOUND)
                             .entity(
                                 "No task reports were found for this task. "
                                 + "The task may not exist, or it may not have completed yet."
                             )
                             .build();
            }
          }
          catch (Exception e) {
            log.warn(e, "Failed to stream task reports for task %s", taskid);
            return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
          }
        }
    2. SwitchingTaskLogStreamer will first query task report from the task runner (Kubernetes pods in K8s context), before trying to find task reports in deep storage.

      @Override
        public Optional<InputStream> streamTaskReports(String taskid) throws IOException
        {
          IOException deferIOException = null;
      
          try {
            final Optional<InputStream> stream = taskRunnerTaskLogStreamer.streamTaskReports(taskid);
            if (stream.isPresent()) {
              return stream;
            }
          }
          catch (IOException e) {
            // defer first IO exception due to race in the way tasks update their exit status in the overlord
            // It may happen that the task sent the report to deep storage but the task is still running with http chat handlers unregistered
            // In such a case, catch and ignore the 1st IOException and try deepStorage for the report. If the report is still not found, return the caught exception
            deferIOException = e;
          }
      
          for (TaskLogStreamer provider : deepStorageStreamers) {
            try {
              final Optional<InputStream> stream = provider.streamTaskReports(taskid);
              if (stream.isPresent()) {
                return stream;
              }
            }
            catch (IOException e) {
              if (deferIOException != null) {
                e.addSuppressed(deferIOException);
              }
              throw e;
            }
          }
          // Could not find any InputStream. Throw deferred exception if exists
          if (deferIOException != null) {
            throw deferIOException;
          }
          return Optional.absent();
        }
    3. Looking into the above Overlord logs, there is a stack trace that surfaces after the task completes. This shows us that the Overlord is unable to contact the exited Peon pod to find the task report, and then unable to find the task report from deep storage.

Hypothesis

Given the trace from above, there is reason to believe that the task report cannot be found in the Deep Storage. We are now guessing that the code is unable to push the task report to HDFS (Deep Storage).

Kubernetes Task Reports Not Uploaded Into Deep Storage During Task Cleanup

  1. In the Kubernetes context, the K8s tries to push task reports, followed by task status via AbstractTask when cleaning up the task.

    @Override
    public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception
    {
      // clear any interrupted status to ensure subsequent cleanup proceeds without interruption.
      Thread.interrupted();
      
      if (!toolbox.getConfig().isEncapsulatedTask()) {
        log.debug("Not pushing task logs and reports from task.");
        return;
      }
    	// isEncapsulatedTask() means "isK8sIngestion". Non-K8s ingestions will proceed after this line.
      
      // 7 lines of unrelated code block omitted here.
      
      if (reportsFile != null && reportsFile.exists()) {
        toolbox.getTaskLogPusher().pushTaskReports(id, reportsFile);
        log.debug("Pushed task reports");
      } else {
        log.debug("No task reports file exists to push");
      }
    
      if (statusFile != null) {
        toolbox.getJsonMapper().writeValue(statusFile, taskStatusToReport);
        toolbox.getTaskLogPusher().pushTaskStatus(id, statusFile);
        Files.deleteIfExists(statusFile.toPath());
        log.debug("Pushed task status");
      } else {
        log.debug("No task status file exists to push");
      }
    }
  2. Checking the logs, I found that the only the task status is pushed, but not the task reports.

    2025-07-01T03:20:35,548 INFO [query-be62b874-c86c-4079-8a71-a0867f6953c2-segment-load-waiter-0] org.apache.druid.msq.exec.SegmentLoadStatusFetcher - Fetching segment load status for datasource[HELLO_WORLD] from broker
    2025-07-01T03:20:40,962 INFO [query-be62b874-c86c-4079-8a71-a0867f6953c2-segment-load-waiter-0] org.apache.druid.msq.exec.SegmentLoadStatusFetcher - Segment loading completed for datasource[HELLO_WORLD]
    2025-07-01T03:20:40,978 INFO [task-runner-0-priority-0] org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs - Writing task status to: hdfs://R2/user/druid/k8s/test-benchmarking-jdk21/indexing-logs/query-be62b874-c86c-4079-8a71-a0867f6953c2.status.json
    2025-07-01T03:20:43,850 INFO [task-runner-0-priority-0] org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs - Wrote task status to: hdfs://R2/user/druid/k8s/test-benchmarking-jdk21/indexing-logs/query-be62b874-c86c-4079-8a71-a0867f6953c2.status.json
    
  3. Entering into the bash terminal of the Peon pod, I found that the report file exists in ./var/tmp/attempt/1/report.json

  4. Through examination, we note that the AbstractTask is trying to read from ./var/tmp/{queryId}/attempt/{attemptId}/report.json instead.

  5. The missing queryId in the path is probably a cause for the error.

  6. This problem will not happen for MM architecture.

Why Task Report is Written to the Wrong Path

  1. Under CliPeon, we have binded TaskReportFileWriter to SingleFileTaskReportFileWriter. This SingleFileTaskReportFileWriter is configured to write to the input File.

    binder.bind(TaskReportFileWriter.class)
          .toInstance(
              new SingleFileTaskReportFileWriter(
                  Paths.get(taskDirPath, "attempt", attemptId, "report.json").toFile()
              ));
              
    // The Path given will translate to: {taskDirPath}/attempt/{attemptId}/report.json
  2. CliPeon is called with variables: Main internal peon taskDirPath attemptId.

    1. Under the K8s Model, we do not have the queryId in taskDirPath: org.apache.druid.cli.Main internal peon /opt/druid/var/tmp/ 1 --taskId query-5f496cd7-9c23-43c8-a666-76fa81825a2f

    2. Under the MM Model, we have the queryId in taskDirPath: org.apache.druid.cli.Main internal peon var/druid/task/slot0/query-5f496cd7-9c23-43c8-a666-76fa81825a2f 1 --loadBroadcastDatasourceMode NONE

  3. However, the path of reportFile and statusFile in AbstractTask is constructed with the queryId

    @Nullable
      public String setup(TaskToolbox toolbox) throws Exception
      {
        if (toolbox.getConfig().isEncapsulatedTask()) {
          // taskDir = {taskDirPath}
          File taskDir = toolbox.getConfig().getTaskDir(getId());
          FileUtils.mkdirp(taskDir);
          // attemptDir = {taskDirPath}/attempt/{attemptId}
          File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt", toolbox.getAttemptId()).toFile();
          FileUtils.mkdirp(attemptDir);
          // reportsFile = {taskDirPath}/attempt/{attemptId}/report.json
          reportsFile = new File(attemptDir, "report.json");
          // statusFile = {taskDirPath}/attempt/{attemptId}/status.json
          statusFile = new File(attemptDir, "status.json");
    			// Excess code skipped...
      }
  4. Hence, this causes a mismatch between the file path finding.

  5. The reason why Druid is able to find the statusFile is because of a last-minute write into the status file in AbstractTask#cleanUp:

    // No check for whether statusFile exists.
    if (statusFile != null) {
      // Writes value into the status file.
      toolbox.getJsonMapper().writeValue(statusFile, taskStatusToReport);
      toolbox.getTaskLogPusher().pushTaskStatus(id, statusFile);
      Files.deleteIfExists(statusFile.toPath());
      log.debug("Pushed task status");
    } else {
      log.debug("No task status file exists to push");
    }

Comparison Between MM and MM-less

Task Creation

MM Architecture constructs the peon startup command from ForkingTaskRunner#run:

// taskDir: var/druid/task/slot0/query-ae23e2ef-283a-4546-a295-68995bd90fe1
final File taskDir = new File(storageSlot.getDirectory(), task.getId());
final String attemptId = String.valueOf(getNextAttemptID(taskDir));
final File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt", attemptId).toFile();
// Omitted Code

// Path of taskFile, statusFile, logFile, reportsFile configured properly...
final File taskFile = new File(taskDir, "task.json");
final File statusFile = new File(attemptDir, "status.json");
final File logFile = new File(taskDir, "log");
final File reportsFile = new File(attemptDir, "report.json");

// Omitted Code

command.add("org.apache.druid.cli.Main");
command.add("internal");
command.add("peon");
command.add(taskDir.toString());
command.add(attemptId);

// Final command: org.apache.druid.cli.Main internal peon var/druid/task/slot0/query-ae23e2ef-283a-4546-a295-68995bd90fe1 1

MM-less Architecture constructs the peon startup command from K8sTaskAdapter#generateCommand.

// peon.sh construct command of structure: org.apache.druid.cli.Main internal peon {1} {2} 
command.add("/peon.sh");
// taskDir: /opt/druid/var/tmp/
command.add(taskConfig.getBaseTaskDir().getAbsolutePath());
command.add("1");

// Final command: org.apache.druid.cli.Main internal peon /opt/druid/var/tmp/ 1

MM-less Architecture with PodTemplate constructs the peon startup command via Helm Chart, with environment variables as constructed in PodTemplateTaskAdapter#getEnv.

Solution

Pass the TASK_DIR into peon.sh. This TASK_DIR will be of the form: baseTaskDir/taskId.

Impact

  • With correct pathing, the task report will be able to persist to deep storage successfully. On the frontend, the query will not look like it is failing.
  • Additionally, we can view the task reports after the Kubernetes pods finish

Release note

  • MSQ queries are able to run without 404 error on the web console.
  • You can now view K8s task reports after the tasks exit on the web console.
  • When running tasks with PodTemplate, you should not specify /peon.sh script arguments, and configure baseTaskDir anymore, the Overlord will construct this for you automatically.

Key changed/added classes in this PR
  • CliPeon
  • distribution/docker/peon.sh

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • been tested in a test Druid cluster (MM Architecture)
  • been tested in a test Druid cluster (K8s Architecture, no Pod Templates)
  • been tested in a test Druid cluster (K8s Architecture, with Pod Templates)

@GWphua GWphua marked this pull request as draft July 4, 2025 09:23
@FrankChen021 FrankChen021 added this to the 34.0.0 milestone Jul 5, 2025
@GWphua
Copy link
Contributor Author

GWphua commented Jul 8, 2025

After making the appropriate changes, I find that tasks started by pod templates are able to execute the task without reporting the 404 error to frontend, but K8s tasks that do not use Pod Templates still fail.

Going into the job container shows a mismatch between expected task report directory, and actual task report directory:

  1. PodTemplate:
    1. Actual: /opt/druid/var/tmp/persistent/task/query-dbf9c83f-da9c-4873-9c89-693392662770/attempt/1/report.json
    2. Expected: /opt/druid/var/tmp/query-cc8b4f9d-1079-4a59-9d67-8ecd7b9aeb20/attempt/1/report.json
  2. SingleContainer
    1. Actual: /opt/druid/var/tmp/persistent/task/query-dbf9c83f-da9c-4873-9c89-693392662770/attempt/1/report.json
    2. Expected: /tmp/persistent/task/query-4903642a-6abe-4e29-9725-063c32650c45/attempt/1/report.json

Findings:

  • Recommended by the documentation, PodTemplate has baseTaskDir set, while SingleContainer does not.
  • If baseTaskDir is not set, Druid will default to System.getProperty("java.io.tmpdir")
  • The absence of baseTaskDir causes the problem. Without setting baseTaskDir, the task may use the default baseTaskDir, which is system-dependent. This may lead to differences from the variables used to call peon.sh.

Current Solution in Mind:
K8sMode should spin up peon.sh and add baseTaskDir as an environment variable.

@capistrant capistrant added the Bug label Jul 9, 2025
@GWphua GWphua marked this pull request as ready for review July 9, 2025 09:11
Copy link
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@FrankChen021 FrankChen021 requested a review from georgew5656 July 11, 2025 09:12
@GWphua GWphua marked this pull request as draft July 21, 2025 02:45
@GWphua
Copy link
Contributor Author

GWphua commented Jul 21, 2025

Converting to Draft - Looking into case where baseTaskDir is not set for Pod Template deployment...

@GWphua GWphua marked this pull request as ready for review July 21, 2025 10:29
@GWphua
Copy link
Contributor Author

GWphua commented Jul 21, 2025

For forward-compatibility purposes: Added the ability to replace baseTaskDir environment variable, since we will be heading in the direction that allows Druid to automatically generate the task directory for us.

How it's done

In Kubernetes, if an environment variable is defined multiple times, the last occurrence takes precedence. Doing a kubectl describe pod XXX will show us this:

# LOTS OF ENV VARIABLES
      druid_indexer_task_baseTaskDir:                                               /opt/druid/var/funnyTaskDirectory/
# LOTS OF ENV VARIABLES

      TASK_DIR:                                                                     var/tmp/index_parallel_wikipedia_phhompki_2025-07-21T10:34:38.511Z
      druid_indexer_task_baseTaskDir:                                               var/tmp       # OVERWRITTEN baseTaskDir
      TASK_ID:                                                                      index_parallel_wikipedia_phhompki_2025-07-21T10:34:38.511Z
      LOAD_BROADCAST_SEGMENTS:                                                      false
      TASK_JSON:                                                                     (v1:metadata.annotations['task'])

@cryptoe cryptoe removed this from the 34.0.0 milestone Jul 22, 2025
@GWphua GWphua requested a review from FrankChen021 July 23, 2025 07:33
@GWphua
Copy link
Contributor Author

GWphua commented Jul 24, 2025

Made sure CICD passes again before merge. PTAL @FrankChen021

@FrankChen021
Copy link
Member

@cryptoe why is this removed from druid 34 release?

@FrankChen021 FrankChen021 merged commit c494dd1 into apache:master Jul 24, 2025
77 checks passed
@cryptoe
Copy link
Contributor

cryptoe commented Aug 7, 2025

@FrankChen021 This PR did not make it in time for the Druid 34 code freeze.

@adarshsanjeev
Copy link
Contributor

@GWphua, I have been observing some failures in Druid clusters after this PR was merged. The cause is the same as the error you described. From what I can understand, the fix is changing the configuration of the base directory, which might be causing this issue (failure to find reports to push). I have raised a PR with an alternate fix that does resolve the issues that I am seeing (#18379).

Could you please check if this also resolves the issue that you were initially seeing?

@GWphua
Copy link
Contributor Author

GWphua commented Aug 8, 2025

Hello @adarshsanjeev, thanks for letting me know. Will try to get some clusters on running with your changes next week.

Just want to understand about the environment you are deploying on which caused failures: are your clusters deployed using the Kubernetes extension (With or without PodTemplates?), and are there any notable configurations made to your baseTaskDir (e.g. made specifically for Peon tasks?)

@GWphua
Copy link
Contributor Author

GWphua commented Aug 12, 2025

Hello @adarshsanjeev, update on the issue: I created a cluster with your changes, and am able to run MSQ tasks + see the task reports. Thanks!

@GWphua GWphua deleted the workable-msq branch August 12, 2025 07:29
druid.indexer.task.encapsulatedTask=true
```

**Note**: Prior to Druid 35.0.0, you will need the `druid.indexer.task.baseTaskDir` runtime property, along with the `TASK_DIR` and `attemptId` arguments to `/peon.sh` to run your jobs. There is no need for that now as Druid will automatically configure the task directory. You can still choose to customize the target task directory by adjusting `druid.indexer.task.baseTaskDir` on the Overlord service.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

druid.indexer.task.baseTaskDir -> druid.indexer.task.baseDir, the old document is wrong already, 🤣

@cecemei cecemei added this to the 35.0.0 milestone Oct 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[MM-less extension] Task report retrieval error in Druid MSQ

7 participants