Skip to content

Conversation

@arvi18
Copy link

@arvi18 arvi18 commented Apr 27, 2025

This is a proposed fix for bazelbuild#24304

This speeds up a fully warm bazel query ... by 54%, reducing wall time from 1m49s to 50s

Current state:

$ time bazel query '...' --output=streamed_proto > queryoutput4.streamedproto

real    1m48.768s
user    0m27.410s
sys     0m19.646s

This PR:

$ time bazel query '...' --output=streamed_proto > queryoutput5.streamedproto

real    0m49.938s
user    0m22.897s
sys     0m16.161s

💁‍♂️ Note: when combined with bazelbuild#24298, total wall time is 37s, an overall reduction of 66%.

Summary by CodeRabbit

  • New Features
    • Improved performance for streaming large sets of proto targets by introducing asynchronous, chunked processing and streaming.
  • Bug Fixes
    • Enhanced reliability and responsiveness when handling large queries by managing concurrency and backpressure during output streaming.

@arvi18
Copy link
Author

arvi18 commented Apr 27, 2025

Do you think this is something we could add behind a flag to start off? I agree it's useful to have, but it also comes with misc subtleties that are hard to reason about all at once, so having an escape hatch in case we get it wrong the first time would be nice.

@arvi18
Copy link
Author

arvi18 commented Apr 27, 2025

I had actually been thinking of more of an opt-in that possibly makes it more clear it's experimental, depending on how solid we can get it on the first try, then eventually flip the default. The whole parallel-streaming-query thing gets tricky when we need to get exception handling involved, an opt-in will let us get code in and iterate with less risk of breakages.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @arvi18, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

Summary of Changes

This pull request addresses issue bazelbuild#24304 by speeding up the bazel query ... command when used with the --output=streamed_proto option. The changes introduce multi-threading to distribute the work of writing the proto output, resulting in a reported 54% reduction in wall time for a fully warm query. The original implementation was single-threaded, and this change leverages a ForkJoinPool to parallelize the writing of target proto buffers to the output stream.

Highlights

  • Performance Improvement: The primary goal of this PR is to improve the performance of bazel query with --output=streamed_proto by distributing the workload across multiple threads.
  • Multi-threading Implementation: A ForkJoinPool is used to parallelize the process of converting Target objects to proto buffers and writing them to the output stream. The number of threads defaults to the number of available processors.
  • Chunking and Queuing: The input Iterable<Target> is partitioned into chunks of 500 targets each. These chunks are then submitted to the ForkJoinPool via a LinkedBlockingQueue to be processed in parallel.
  • Error Handling: WrappedIOException and WrappedInterruptedException are introduced to handle exceptions thrown during the parallel processing and re-thrown as RuntimeExceptions.

Changelog

  • src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
    • Introduces multi-threading using ForkJoinPool to parallelize proto output writing.
    • Partitions the input targets into chunks for parallel processing.
    • Uses a LinkedBlockingQueue to manage the chunks to be processed by the thread pool.
    • Adds WrappedIOException and WrappedInterruptedException to handle exceptions during parallel processing.
    • Implements writeTargetsDelimitedToByteArrays to convert a list of targets to byte arrays in parallel.
    • Implements toProto to convert a target to a proto object.
    • Implements writeDelimited to write a proto object to a byte array in a delimited format.
    • Adds constants for MAX_CHUNKS_IN_QUEUE and TARGETS_PER_CHUNK to control the chunking and queuing behavior.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.


A query's slow pace,
Threads now race, time and space,
Swift proto takes flight.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link

coderabbitai bot commented Apr 27, 2025

Walkthrough

The StreamedProtoOutputFormatter class was refactored to change how it serializes and writes proto targets to an output stream. Instead of synchronously writing each target, the new implementation partitions targets into chunks, serializes each chunk asynchronously using a ForkJoinPool, and manages these operations with a bounded blocking queue. The serialized results are then written to the output stream in order by a dedicated consumer. Several helper methods and exception wrapper classes were introduced to support this concurrent, chunked, and streamed processing model.

Changes

Cohort / File(s) Change Summary
Core Formatter Refactor
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
Refactored processOutput to use chunked, concurrent serialization and streaming of proto targets; added private helper methods for chunk serialization and proto conversion; rewrote length-delimited proto serialization method; introduced private static runtime exception wrappers for IOException and InterruptedException; updated internal logic of createPostFactoStreamCallback to implement producer-consumer model with ForkJoinPool and bounded queue.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant OutputFormatterCallback
    participant ForkJoinPool
    participant BlockingQueue
    participant OutputStream

    Client->>OutputFormatterCallback: processOutput(targets)
    OutputFormatterCallback->>ForkJoinPool: Submit chunked serialization tasks
    ForkJoinPool-->>BlockingQueue: Place futures for each chunk
    loop For each chunk future in order
        BlockingQueue->>OutputFormatterCallback: Take next completed future
        OutputFormatterCallback->>OutputStream: Write serialized bytes
    end
    OutputFormatterCallback-->>Client: Completion
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~15 minutes

Poem

In a warren of code, the bytes now stream,
Chunks hop along, as in a dream.
Forks and joins, a queue in tow,
Proto targets in a rabbit row.
Streams are swift, exceptions wrapped tight—
The formatter’s future is running bright!
🐇✨

Note

⚡️ Unit Test Generation is now available in beta!

Learn more here, or try it out under "Finishing Touches" below.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch clone-parallel-streamed-proto

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces multi-threading to the streamed proto output formatter, significantly improving query performance. The approach of using a ForkJoinPool and a blocking queue seems reasonable for distributing the work. However, there are a few areas that could be improved, particularly around error handling and resource management.

Summary of Findings

  • Error Propagation from Executor: The code uses RuntimeException to wrap exceptions from the executor, losing valuable information about the original exception. It's crucial to propagate the original exception to provide better debugging insights.
  • Resource Management of ForkJoinPool: While the ForkJoinPool is created within a try-with-resources block, ensuring proper shutdown, the tasks submitted to the pool might still be running after the block completes if they are long-running or blocked. Consider explicitly shutting down the executor and awaiting termination to ensure all tasks are completed before proceeding.
  • Wrapped Exception Classes: The WrappedIOException and WrappedInterruptedException classes are simple wrappers around checked exceptions. Consider whether these wrappers are truly necessary, or if the exceptions can be handled directly.

Merge Readiness

The performance improvements introduced by this pull request are significant, making it a valuable contribution. However, the error handling and resource management aspects need to be addressed before merging. I recommend resolving the issues raised in the review comments, particularly regarding exception propagation and ForkJoinPool shutdown. I am unable to approve this pull request, and other reviewers should review and approve this code before merging.

Comment on lines +79 to +81
// TODO: figure out what might be in here and propagate
throw new RuntimeException(e);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This TODO indicates that the exception handling here is incomplete. It's important to propagate the original ExecutionException's cause to provide more context about the failure. Consider using e.getCause() to get the underlying exception and wrap it in a more specific exception type, or rethrow it if appropriate.

Also, consider logging the exception with sufficient context to aid debugging.

                      // TODO: figure out what might be in here and propagate
                      Throwable cause = e.getCause();
                      if (cause instanceof IOException) {
                        throw new WrappedIOException((IOException) cause);
                      } else if (cause instanceof InterruptedException) {
                        throw new WrappedInterruptedException((InterruptedException) cause);
                      } else {
                        throw new RuntimeException("Error during target processing", cause);
                      }

Comment on lines +94 to +96
// TODO: propagate
throw new RuntimeException(e);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the previous TODO, this one also requires proper exception propagation. The ExecutionException should be unwrapped to get the root cause and either rethrown as a more specific exception or wrapped in a RuntimeException with the cause attached. Logging the exception is also recommended.

        } catch (ExecutionException e) {
          // TODO: propagate
          Throwable cause = e.getCause();
          if (cause instanceof IOException) {
            throw new WrappedIOException((IOException) cause);
          } else if (cause instanceof InterruptedException) {
            throw new WrappedInterruptedException((InterruptedException) cause);
          } else {
            throw new RuntimeException("Error during target processing", cause);
          }

Comment on lines +88 to +90
stillAddingTargetsToQueue.set(false);
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

After setting stillAddingTargetsToQueue to false, it's good practice to explicitly shutdown the executor and wait for its termination. This ensures that all submitted tasks are completed before proceeding. Otherwise, there is a risk that the executor will not shutdown cleanly, and resources may be leaked. Consider adding executor.shutdown() and executor.awaitTermination() calls within the finally block.

          } finally {
            stillAddingTargetsToQueue.set(false);
            executor.shutdown(); // Initiate shutdown
            try {
              if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // Wait for tasks to complete
                System.err.println("ForkJoinPool did not terminate in the specified timeout.");
                // Optionally, log the state of the tasks that are still running.
              }
            } catch (InterruptedException ie) {
              // (Re-)Cancel if current thread also interrupted
              executor.shutdownNow();
              // Preserve interrupt status
              Thread.currentThread().interrupt();
            }
          }

Comment on lines +132 to +134
@Override
public IOException getCause() {
return (IOException) super.getCause();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider whether wrapping the IOException in a RuntimeException is truly necessary. If the calling code can handle IOException directly, it might be better to avoid the wrapping. If wrapping is necessary, ensure that the original exception is accessible via getCause().

    @Override
    public IOException getCause() {
      return cause;
    }

Comment on lines +144 to +146
public InterruptedException getCause() {
return (InterruptedException) super.getCause();
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to WrappedIOException, evaluate the necessity of wrapping InterruptedException. If the calling code can handle it directly, avoid wrapping. If wrapping is necessary, ensure the original exception is accessible via getCause().

    @Override
    public InterruptedException getCause() {
      return cause;
    }

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java (1)

92-96: Propagate checked exceptions instead of wrapping them in generic RuntimeException

writeAllTargetsFuture.get() can surface WrappedIOException / WrappedInterruptedException. By catching ExecutionException and blindly re-wrapping, callers lose the ability to handle IOException or honour the method’s declared InterruptedException.

Suggestion:

-        } catch (ExecutionException e) {
-          // TODO: propagate
-          throw new RuntimeException(e);
+        } catch (ExecutionException e) {
+          Throwable cause = e.getCause();
+          if (cause instanceof WrappedIOException ioe) {
+            throw ioe.getCause();          // preserves declared throws IOException
+          }
+          if (cause instanceof WrappedInterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw ie.getCause();
+          }
+          throw new RuntimeException(cause);
         }

This keeps the public contract intact and avoids masking interruption.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c0c98a2 and 5fc8b13.

📒 Files selected for processing (1)
  • src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java (2 hunks)
🔇 Additional comments (1)
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java (1)

63-90: Potential producer/consumer termination race

stillAddingTargetsToQueue is set to false only in the finally block after the producer loop. If partialResult is empty, the queue remains empty and the consumer immediately blocks on take() forever, preventing writeAllTargetsFuture from completing.

Consider signalling completion explicitly:

// after producing chunks
targetQueue.put(POISON_PILL_FUTURE); // Future that returns an empty list
...
while (true) {
  Future<List<byte[]>> f = targetQueue.take();
  if (f == POISON_PILL_FUTURE) break;
  ...
}

or a BlockingQueue with null sentinel, ensuring graceful shutdown even for empty inputs.

Comment on lines +55 to +72
new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
// we use asyncMode to ensure the queue is processed FIFO, which maximizes
// throughput
true)) {
var targetQueue = new LinkedBlockingQueue<Future<List<byte[]>>>(MAX_CHUNKS_IN_QUEUE);
var stillAddingTargetsToQueue = new AtomicBoolean(true);
writeAllTargetsFuture =
executor.submit(
() -> {
try {
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
Future<List<byte[]>> targets = targetQueue.take();
for (byte[] target : targets.get()) {
out.write(target);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

ForkJoinPool worker is blocking on take()/get() – high risk of thread starvation

The consumer task runs inside the same ForkJoinPool that processes the producer tasks yet:

  • performs unmanaged blocking (targetQueue.take() and Future.get()),
  • uses a bounded queue that may fill up, causing the producer thread (caller of processOutput) to block on put().

Because ForkJoinPool counts blocked workers toward parallelism, the pool can dead-lock or under-utilise CPUs when many long-running producer tasks occupy the limited workers while the single consumer waits, or vice-versa.

Recommended approaches:

  1. Start the consumer on a dedicated thread (e.g. Executors.newSingleThreadExecutor) outside the FJP, or
  2. Wrap blocking calls with ForkJoinPool.managedBlock, or
  3. Replace the FJP entirely with a plain ExecutorService that tolerates blocking.

This will eliminate the starvation risk and make behaviour more predictable.

@visz11
Copy link
Collaborator

visz11 commented Jul 26, 2025

/refacto-test

@refacto-test
Copy link

refacto-test bot commented Jul 26, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

throw new WrappedInterruptedException(e);
} catch (IOException e) {
throw new WrappedIOException(e);
} catch (ExecutionException e) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread Safety Issue: The ExecutionException handling is inadequate, using a generic RuntimeException with a TODO comment. This can mask important exceptions during task execution, potentially leading to resource leaks or thread pool exhaustion.

Consider unwrapping and properly handling the cause:

catch (ExecutionException e) {
  Throwable cause = e.getCause();
  if (cause instanceof IOException) {
    throw new WrappedIOException((IOException) cause);
  } else if (cause instanceof InterruptedException) {
    Thread.currentThread().interrupt(); // Preserve interrupt status
    throw new WrappedInterruptedException((InterruptedException) cause);
  } else if (cause instanceof RuntimeException) {
    throw (RuntimeException) cause;
  } else {
    throw new RuntimeException("Error processing targets", cause);
  }
}

throws IOException, InterruptedException {
for (Target target : partialResult) {
toTargetProtoBuffer(target, labelPrinter).writeDelimitedTo(out);
ForkJoinTask<?> writeAllTargetsFuture;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Management Issue: The ForkJoinPool is created with parallelism equal to all available processors without any safeguards against resource exhaustion. On many-core systems, this could lead to excessive thread creation and CPU contention.

Consider limiting the maximum parallelism and adding an exception handler:

// Limit the number of threads to avoid resource exhaustion
int parallelism = Math.min(Runtime.getRuntime().availableProcessors(), 4);
try (ForkJoinPool executor = new ForkJoinPool(
    parallelism,
    ForkJoinPool.defaultForkJoinWorkerThreadFactory,
    (thread, throwable) -> {
        // Log uncaught exceptions in worker threads
        System.err.println("Worker thread exception: " + throwable);
    },
    // we use asyncMode to ensure the queue is processed FIFO
    true)) {

}
});
try {
for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concurrency Issue: The blocking targetQueue.put() operation could lead to thread starvation if the queue becomes full. There's no timeout or circuit-breaking mechanism to handle slow or stuck tasks.

Consider using a non-blocking approach with timeouts:

// Use offer with timeout to prevent indefinite blocking
Future<List<byte[]>> task = executor.submit(() -> writeTargetsDelimitedToByteArrays(targets));
if (!targetQueue.offer(task, 5, TimeUnit.SECONDS)) {
  // If queue is full for too long, process this batch directly to make progress
  for (byte[] target : task.get(30, TimeUnit.SECONDS)) {
    out.write(target);
  }
}

…t/StreamedProtoOutputFormatter.java

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
@visz11
Copy link
Collaborator

visz11 commented Jul 29, 2025

/refacto-test

@refacto-test
Copy link

refacto-test bot commented Jul 29, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (5)
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java (5)

53-61: Address resource management and parallelism concerns from previous reviews

The ForkJoinPool creation has several issues that were previously identified but remain unaddressed:

  1. No upper bound on parallelism (could exhaust resources on many-core systems)
  2. No exception handler for uncaught worker thread exceptions

Apply this fix to address the resource management issues:

-try (ForkJoinPool executor =
-    new ForkJoinPool(
-        Runtime.getRuntime().availableProcessors(),
-        ForkJoinPool.defaultForkJoinWorkerThreadFactory,
-        null,
-        // we use asyncMode to ensure the queue is processed FIFO, which maximizes
-        // throughput
-        true)) {
+int parallelism = Math.min(Runtime.getRuntime().availableProcessors(), 8);
+try (ForkJoinPool executor =
+    new ForkJoinPool(
+        parallelism,
+        ForkJoinPool.defaultForkJoinWorkerThreadFactory,
+        (thread, throwable) -> {
+            System.err.println("Worker thread exception in query processing: " + throwable);
+        },
+        // we use asyncMode to ensure the queue is processed FIFO, which maximizes
+        // throughput
+        true)) {

64-82: Critical threading architecture issue - consumer runs in same ForkJoinPool

The consumer task is submitted to the same ForkJoinPool that processes the producer tasks, creating a high risk of deadlock and thread starvation as previously identified. The consumer performs blocking operations (take(), get()) which can starve the work-stealing threads.

Move the consumer to a dedicated thread outside the ForkJoinPool:

+ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
+try {
  var targetQueue = new LinkedBlockingQueue<Future<List<byte[]>>>(MAX_CHUNKS_IN_QUEUE);
  var stillAddingTargetsToQueue = new AtomicBoolean(true);
- writeAllTargetsFuture =
-     executor.submit(
+ writeAllTargetsFuture =
+     consumerExecutor.submit(
          () -> {
            // consumer logic remains the same
          });
+} finally {
+  consumerExecutor.shutdown();
+  if (!consumerExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+    consumerExecutor.shutdownNow();
+  }
+}

78-81: Incomplete exception handling in consumer task

The TODO comment indicates incomplete exception propagation. This masks important failures and can lead to silent data loss or resource leaks.

Properly unwrap and propagate the ExecutionException cause:

-} catch (ExecutionException e) {
-  // TODO: figure out what might be in here and propagate
-  throw new RuntimeException(e);
+} catch (ExecutionException e) {
+  Throwable cause = e.getCause();
+  if (cause instanceof IOException) {
+    throw new WrappedIOException((IOException) cause);
+  } else if (cause instanceof InterruptedException) {
+    Thread.currentThread().interrupt();
+    throw new WrappedInterruptedException((InterruptedException) cause);
+  } else if (cause instanceof RuntimeException) {
+    throw (RuntimeException) cause;
+  } else {
+    throw new RuntimeException("Error processing targets", cause);
+  }

84-89: Potential blocking issue with bounded queue

The targetQueue.put() operation can block indefinitely if the consumer falls behind, as previously identified. This could lead to thread starvation.

Use a timeout-based approach to prevent indefinite blocking:

-for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) {
-  targetQueue.put(executor.submit(() -> writeTargetsDelimitedToByteArrays(targets)));
-}
+for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) {
+  Future<List<byte[]>> task = executor.submit(() -> writeTargetsDelimitedToByteArrays(targets));
+  if (!targetQueue.offer(task, 30, TimeUnit.SECONDS)) {
+    throw new RuntimeException("Queue timeout - consumer may be stuck");
+  }
+}

94-96: Duplicate incomplete exception handling

Same issue as in the consumer task - ExecutionException is not properly unwrapped and propagated.

Apply the same fix as for the consumer task:

-} catch (ExecutionException e) {
-  // TODO: propagate
-  throw new RuntimeException(e);
+} catch (ExecutionException e) {
+  Throwable cause = e.getCause();
+  if (cause instanceof IOException) {
+    throw (IOException) cause;
+  } else if (cause instanceof InterruptedException) {
+    Thread.currentThread().interrupt();
+    throw (InterruptedException) cause;
+  } else if (cause instanceof RuntimeException) {
+    throw (RuntimeException) cause;
+  } else {
+    throw new RuntimeException("Error waiting for consumer completion", cause);
+  }
🧹 Nitpick comments (1)
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java (1)

45-47: Consider making chunk size and queue capacity configurable

The hardcoded constants may not be optimal for all environments:

  • TARGETS_PER_CHUNK = 500 might be too small/large depending on target complexity
  • MAX_CHUNKS_IN_QUEUE = processors * 2 could cause memory issues with large targets

Consider making these configurable via system properties or constructor parameters:

-private static final int MAX_CHUNKS_IN_QUEUE = Runtime.getRuntime().availableProcessors() * 2;
-private static final int TARGETS_PER_CHUNK = 500;
+private static final int MAX_CHUNKS_IN_QUEUE = Integer.getInteger("bazel.query.max_chunks", Runtime.getRuntime().availableProcessors() * 2);
+private static final int TARGETS_PER_CHUNK = Integer.getInteger("bazel.query.targets_per_chunk", 500);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5fc8b13 and ec1bf6e.

📒 Files selected for processing (1)
  • src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java (2 hunks)
🔇 Additional comments (6)
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java (6)

16-21: LGTM - Import additions support the concurrent implementation

The added imports (Iterables, Build, CodedOutputStream, List, concurrent utilities, and AtomicBoolean) are all necessary for the new multi-threaded implementation.

Also applies to: 25-27


99-101: LGTM - Clean helper method implementation

The writeTargetsDelimitedToByteArrays method correctly maps targets to delimited byte arrays using the fixed writeDelimited method.


103-109: LGTM - Proper exception wrapping for concurrent context

The toProto method correctly wraps InterruptedException in a runtime exception, which is necessary for use within the ForkJoinPool's lambda expressions.


113-138: Excellent fix for length-delimited proto serialization

The previous implementation had a critical bug where the length prefix was never actually written, making the output unparseable. The new implementation correctly:

  1. Writes the varint length prefix using a separate CodedOutputStream
  2. Writes the message bytes after the prefix
  3. Properly flushes both streams

This addresses the critical issue identified in previous reviews where mergeDelimitedFrom would fail.


140-149: LGTM - Exception wrapper provides proper cause access

The WrappedIOException class correctly extends RuntimeException and overrides getCause() to return the specific IOException type, addressing the concern from previous reviews.


151-160: LGTM - Consistent exception wrapper implementation

The WrappedInterruptedException class follows the same pattern as WrappedIOException, providing proper access to the underlying InterruptedException.

@visz11
Copy link
Collaborator

visz11 commented Jul 31, 2025

/refacto-test

1 similar comment
@visz11
Copy link
Collaborator

visz11 commented Aug 22, 2025

/refacto-test

@refacto-test
Copy link

refacto-test bot commented Aug 22, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@refacto-test
Copy link

refacto-test bot commented Aug 22, 2025

Code Review: Parallel Processing Implementation

👍 Well Done
Concurrent Target Processing

Parallelizing target processing improves throughput for large query results

FIFO Queue Processing

Using asyncMode ensures ordered processing maximizing throughput

📌 Files Processed
  • src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
📝 Additional Comments
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java (2)
Missing Exception Class

The code references a WrappedExecutionException class in the fix but this class is not defined in the file. A corresponding wrapper class should be added for consistency with other exception handling.

  private static class WrappedExecutionException extends RuntimeException {
    private WrappedExecutionException(ExecutionException cause) {
      super(cause);
    }

    @Override
    public ExecutionException getCause() {
      return (ExecutionException) super.getCause();
    }
  }

Standards:

  • ISO-IEC-25010-Reliability-Maturity
  • SRE-Error-Classification
Thread Interruption Handling

The code wraps InterruptedException without restoring the thread's interrupted status. This may prevent proper thread interruption propagation and cause threads to continue running when they should terminate.

                    } catch (InterruptedException e) {
                      Thread.currentThread().interrupt();
                      throw new WrappedInterruptedException(e);
                    } catch (IOException e) {

Standards:

  • ISO-IEC-25010-Reliability-Maturity
  • SRE-Thread-Safety

Comment on lines +134 to +137
}
} catch (IOException e) {
throw new WrappedIOException(e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclosed Resources Risk

The method has an extra closing brace that creates a syntax error. This will cause compilation failure and prevent the code from functioning properly.

Suggested change
}
} catch (IOException e) {
throw new WrappedIOException(e);
}
} catch (IOException e) {
throw new WrappedIOException(e);
}
}
Standards
  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness

Comment on lines +93 to +96
} catch (ExecutionException e) {
// TODO: propagate
throw new RuntimeException(e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception Handling Gap

The code catches ExecutionException but wraps it in a generic RuntimeException without preserving the original exception type. This loses important error context and makes debugging harder.

Suggested change
} catch (ExecutionException e) {
// TODO: propagate
throw new RuntimeException(e);
}
} catch (ExecutionException e) {
throw new WrappedExecutionException(e);
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • SRE-Error-Propagation

@refacto-test
Copy link

refacto-test bot commented Sep 4, 2025

Code Review: Parallel Processing Implementation

👍 Well Done
Parallel Processing

Effectively distributes work across multiple threads improving throughput.

Batch Processing

Partitioning targets into chunks optimizes workload distribution.

📌 Files Processed
  • src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
📝 Additional Comments
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java (5)
Blocking Queue Sizing

The queue size is directly tied to CPU count, which may not be optimal for I/O-bound operations. A fixed multiplier of processor count doesn't account for actual throughput needs or memory constraints. This can lead to suboptimal resource utilization when processing large result sets.

private static final int MAX_CHUNKS_IN_QUEUE = Math.max(
    16, // minimum queue size for small systems
    Runtime.getRuntime().availableProcessors() * 4 // scale with available parallelism
);

Standards:

  • ISO-IEC-25010-Performance-Resource-Utilization
  • Netflix-Performance-Architecture
  • Google-Performance-Best-Practices
Memory Buffer Allocation

Each target serialization creates a new byte array, causing frequent memory allocations and potential GC pressure. For high-throughput scenarios with many targets, these allocations can impact performance and cause memory fragmentation.

// Consider using a buffer pool for high-throughput scenarios
private static final ThreadLocal<ByteArrayPool> BUFFER_POOL = 
    ThreadLocal.withInitial(() -> new ByteArrayPool(10, 1024 * 1024));

private static byte[] writeDelimited(Build.Target targetProtoBuffer) {
  try {
    var serializedSize = targetProtoBuffer.getSerializedSize();
    int headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
    int totalSize = headerSize + serializedSize;
    
    // Get buffer from pool or create new one if needed
    byte[] output = BUFFER_POOL.get().getBuffer(totalSize);
    
    // Rest of serialization code...
    
    // Return a copy to avoid buffer reuse issues
    byte[] result = Arrays.copyOf(output, totalSize);
    BUFFER_POOL.get().returnBuffer(output);
    return result;
  } catch (IOException e) {
    throw new WrappedIOException(e);
  }
}

Standards:

  • ISO-IEC-25010-Performance-Resource-Utilization
  • Netflix-Memory-Management
  • Google-Performance-Best-Practices
Unbounded Queue Growth

No backpressure mechanism when partialResult is very large. While the queue has a capacity limit (MAX_CHUNKS_IN_QUEUE), the put() operation will block indefinitely, potentially causing memory issues with large datasets.

// Add backpressure mechanism to prevent unbounded memory growth
int submittedChunks = 0;
for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) {
    // Submit work and track how many chunks we've submitted
    targetQueue.put(executor.submit(() -> writeTargetsDelimitedToByteArrays(targets)));
    submittedChunks++;
    
    // Every N chunks, wait for some results to be processed to avoid memory pressure
    if (submittedChunks % (MAX_CHUNKS_IN_QUEUE / 2) == 0) {
        // Wait for queue to have some capacity before continuing
        while (targetQueue.size() > MAX_CHUNKS_IN_QUEUE / 4) {
            Thread.sleep(10);  // Short sleep to avoid busy waiting
        }
    }
}

Standards:

  • Algorithm-Correctness-Memory-Management
  • Logic-Verification-Backpressure
  • Business-Rule-Resource-Constraints
Missing Thread Interruption

InterruptedException caught without restoring thread interrupt status. Thread interruption state lost when exception wrapped. Could prevent proper shutdown of security-sensitive operations.

} catch (InterruptedException e) {
  Thread.currentThread().interrupt(); // Restore interrupted status
  throw new WrappedInterruptedException(e);
}

Standards:

  • CWE-391
  • CWE-755
Hardcoded Thread Configuration

Hardcoded multiplier (2) for queue size lacks justification and configurability. This limits maintainability as system tuning requires code changes rather than configuration adjustments.

      // Queue size balances memory usage with keeping worker threads busy
      // Default multiplier can be overridden via system property
      private static final int QUEUE_SIZE_MULTIPLIER = 
          Integer.getInteger("streamed.proto.queue.multiplier", 2);
      private static final int MAX_CHUNKS_IN_QUEUE = 
          Runtime.getRuntime().availableProcessors() * QUEUE_SIZE_MULTIPLIER;

Standards:

  • Clean-Code-Configuration
  • Clean-Code-No-Magic-Numbers

Comment on lines +54 to +61
try (ForkJoinPool executor =
new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
// we use asyncMode to ensure the queue is processed FIFO, which maximizes
// throughput
true)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclosed ForkJoinPool

The ForkJoinPool is created with try-with-resources but the Future's completion is awaited after the try block exits, risking premature pool shutdown before task completion.

ForkJoinPool executor = new ForkJoinPool(
    Runtime.getRuntime().availableProcessors(),
    ForkJoinPool.defaultForkJoinWorkerThreadFactory,
    null,
    // we use asyncMode to ensure the queue is processed FIFO, which maximizes
    // throughput
    true);
try {
  var targetQueue = new LinkedBlockingQueue<Future<List<byte[]>>>(MAX_CHUNKS_IN_QUEUE);
  var stillAddingTargetsToQueue = new AtomicBoolean(true);
  // Rest of the code...
  // After all processing is complete:
  writeAllTargetsFuture.get();
} finally {
  executor.shutdown();
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness

Comment on lines +92 to +96
writeAllTargetsFuture.get();
} catch (ExecutionException e) {
// TODO: propagate
throw new RuntimeException(e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uncaught Exceptions

ExecutionException is caught but wrapped in RuntimeException with a TODO comment. This loses specific error information and prevents proper error handling.

try {
  writeAllTargetsFuture.get();
} catch (ExecutionException e) {
  Throwable cause = e.getCause();
  if (cause instanceof WrappedIOException) {
    throw ((WrappedIOException) cause).getCause();
  } else if (cause instanceof WrappedInterruptedException) {
    throw ((WrappedInterruptedException) cause).getCause();
  } else {
    throw new IOException("Error processing targets", e);
  }
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness

Comment on lines +78 to +81
} catch (ExecutionException e) {
// TODO: figure out what might be in here and propagate
throw new RuntimeException(e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Error Propagation

ExecutionException caught with TODO comment and generic RuntimeException wrapping. Specific error information is lost, preventing proper error handling and recovery.

} catch (ExecutionException e) {
  Throwable cause = e.getCause();
  if (cause instanceof WrappedIOException) {
    throw ((WrappedIOException) cause).getCause();
  } else if (cause instanceof WrappedInterruptedException) {
    throw ((WrappedInterruptedException) cause).getCause();
  } else {
    throw new IOException("Failed to process target chunk", e);
  }
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness

Comment on lines +134 to +137
}
} catch (IOException e) {
throw new WrappedIOException(e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Syntax Error

Syntax error with mismatched curly braces. The extra closing brace on line 135 breaks the method structure, causing compilation failure.

      return output;
    } catch (IOException e) {
      throw new WrappedIOException(e);
    }
Standards
  • ISO-IEC-25010-Functional-Correctness-Appropriateness

Comment on lines +54 to +61
try (ForkJoinPool executor =
new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
// we use asyncMode to ensure the queue is processed FIFO, which maximizes
// throughput
true)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread Pool Lifecycle

Creating a new ForkJoinPool for each processOutput call is expensive. Each pool creation allocates thread resources that must be initialized and later destroyed. Under high query volume, this pattern creates significant thread churn and initialization overhead, potentially degrading overall system performance.

// Use a shared thread pool to avoid creation/destruction overhead
private static final ForkJoinPool SHARED_EXECUTOR = new ForkJoinPool(
    Runtime.getRuntime().availableProcessors(),
    ForkJoinPool.defaultForkJoinWorkerThreadFactory,
    null,
    // we use asyncMode to ensure the queue is processed FIFO, which maximizes throughput
    true);

@Override
public void processOutput(Iterable<Target> partialResult)
    throws IOException, InterruptedException {
  ForkJoinTask<?> writeAllTargetsFuture;
  var targetQueue = new LinkedBlockingQueue<Future<List<byte[]>>>(MAX_CHUNKS_IN_QUEUE);
  var stillAddingTargetsToQueue = new AtomicBoolean(true);
  writeAllTargetsFuture = SHARED_EXECUTOR.submit(
      () -> {
Standards
  • ISO-IEC-25010-Performance-Resource-Utilization
  • Netflix-Resource-Pooling
  • Algorithm-Opt-Thread-Pool

Comment on lines +131 to +132
+ targetProtoBuffer.writeTo(bodyOut);
+ bodyOut.flush();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synchronization Missing

Multiple threads write to the same output stream without synchronization. Concurrent writes to the output stream can cause data corruption as bytes from different threads interleave unpredictably.

// Add synchronization when writing to the output stream
synchronized(out) {
    targetProtoBuffer.writeTo(bodyOut);
    bodyOut.flush();
}
Standards
  • Algorithm-Correctness-Concurrency
  • Logic-Verification-Thread-Safety
  • Business-Rule-Data-Integrity

Comment on lines +124 to +132
+ CodedOutputStream headerOut = CodedOutputStream.newInstance(output, 0, headerSize);
+ headerOut.writeUInt32NoTag(serializedSize);
+ headerOut.flush();
+
+ // 2. write the message bytes immediately after the prefix
+ CodedOutputStream bodyOut =
+ CodedOutputStream.newInstance(output, headerSize, serializedSize);
+ targetProtoBuffer.writeTo(bodyOut);
+ bodyOut.flush();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclosed Resource Leak

CodedOutputStream instances are not properly closed. The streams implement Closeable interface and should be closed to prevent resource leaks, especially in high-throughput concurrent processing.

// Use try-with-resources to ensure streams are closed properly
try (CodedOutputStream headerOut = CodedOutputStream.newInstance(output, 0, headerSize)) {
    headerOut.writeUInt32NoTag(serializedSize);
    headerOut.flush();
}

// 2. write the message bytes immediately after the prefix
try (CodedOutputStream bodyOut = CodedOutputStream.newInstance(output, headerSize, serializedSize)) {
    targetProtoBuffer.writeTo(bodyOut);
    bodyOut.flush();
}
Standards
  • Logic-Verification-Resource-Management
  • Algorithm-Correctness-Resource-Handling
  • Business-Rule-Resource-Cleanup

Comment on lines +78 to +80
} catch (ExecutionException e) {
// TODO: figure out what might be in here and propagate
throw new RuntimeException(e);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception Swallowing Risk

Generic exception handling wraps ExecutionException in RuntimeException without preserving cause details. Attackers could exploit swallowed exceptions to hide malicious activities. Proper exception handling needed for security visibility.

} catch (ExecutionException e) {
  Throwable cause = e.getCause();
  if (cause instanceof IOException) {
    throw new WrappedIOException((IOException) cause);
  } else if (cause instanceof InterruptedException) {
    throw new WrappedInterruptedException((InterruptedException) cause);
  } else {
    throw new RuntimeException("Unexpected error during target processing", e);
  }
Standards
  • CWE-390
  • CWE-755

Comment on lines +93 to +96
} catch (ExecutionException e) {
// TODO: propagate
throw new RuntimeException(e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unhandled Interrupted Exception

ExecutionException handling doesn't restore interrupted status when InterruptedException is the cause. Thread interruption signals could be lost, preventing proper thread termination. Security-sensitive operations might continue after intended shutdown.

} catch (ExecutionException e) {
  Throwable cause = e.getCause();
  if (cause instanceof InterruptedException) {
    Thread.currentThread().interrupt(); // Restore interrupted status
    throw new WrappedInterruptedException((InterruptedException) cause);
  } else if (cause instanceof IOException) {
    throw new WrappedIOException((IOException) cause);
  } else {
    throw new RuntimeException("Error processing targets", e);
  }
Standards
  • CWE-391
  • CWE-755

final OutputStream out, final QueryOptions options, LabelPrinter labelPrinter) {
return new OutputFormatterCallback<Target>() {
private static final int MAX_CHUNKS_IN_QUEUE = Runtime.getRuntime().availableProcessors() * 2;
private static final int TARGETS_PER_CHUNK = 500;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic Number Usage

Magic number 500 lacks explanation for its selection. This hinders maintainability as future developers cannot understand the rationale for this specific batch size value.

      // Balance between memory usage and parallelization efficiency
      // Experimentally determined optimal value for typical target sizes
      private static final int TARGETS_PER_CHUNK = 500;
Standards
  • Clean-Code-No-Magic-Numbers
  • Clean-Code-Comments

@visz11
Copy link
Collaborator

visz11 commented Sep 18, 2025

/refacto-test

@refacto-test
Copy link

refacto-test bot commented Sep 18, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@refacto-test
Copy link

refacto-test bot commented Sep 18, 2025

Code Review: Concurrency Implementation in StreamedProtoOutputFormatter

👍 Well Done
Parallel Processing Design

Effective use of ForkJoinPool for concurrent processing improves throughput while maintaining FIFO order.

Exception Handling Approach

Consistent wrapping of checked exceptions preserves original exception types for better diagnostics.

📌 Files Processed
  • src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
📝 Additional Comments
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java (6)
Queue Size Tuning

Queue size is statically tied to processor count which may not be optimal for I/O-bound serialization tasks. Under high load, this could lead to thread starvation or underutilization. Consider making this configurable or based on memory constraints.

Standards:

  • ISO-IEC-25010-Performance-Capacity
  • Concurrency-Pattern-Queue-Sizing
  • Resource-Utilization-Optimization
Chunk Size Optimization

Fixed chunk size of 500 targets may not be optimal for all workloads. Performance could vary significantly based on target size distribution. Has this value been benchmarked against different target sizes and workloads to ensure optimal parallelization?

Standards:

  • ISO-IEC-25010-Performance-Efficiency
  • Batch-Processing-Optimization
  • Workload-Balancing-Pattern
Exception Wrapping Overhead

Custom exception wrappers add overhead with minimal benefit. Standard exception handling patterns would be more efficient. Consider using UncheckedIOException for IOException and properly restoring interrupt status for InterruptedException instead of custom wrappers.

Standards:

  • ISO-IEC-25010-Performance-Efficiency
  • Exception-Handling-Pattern
  • Code-Simplification
Bounded Queue Backpressure

The bounded queue provides some backpressure, but there's no handling for queue full scenarios. If producers outpace consumers significantly, put() operations will block indefinitely. Consider adding timeout or monitoring to prevent thread starvation.

Standards:

  • ISO-IEC-25010-Reliability-Maturity
  • SRE-Resource-Management
  • ISO-IEC-25010-Reliability-Fault-Tolerance
Inconsistent Variable Naming

The code uses inconsistent variable naming patterns, mixing var and explicit types. Lines 116-117 use explicit types (int, byte[]) while earlier code uses var. This inconsistency creates logical confusion and makes the code harder to maintain as the codebase evolves.

Standards:

  • Logic-Verification-Consistency
  • Algorithm-Correctness-Readability
Hardcoded Thread Configuration

Thread pool sizing is hardcoded with a fixed multiplier (2x processors). This limits configurability and adaptability to different workloads. Making this configurable through a parameter would improve maintainability by allowing tuning without code changes.

Standards:

  • Maintainability-Quality-Configuration
  • Clean-Code-Functions
  • Design-Pattern-Strategy

Comment on lines +134 to +135
}
} catch (IOException e) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Syntax Error

Syntax error in the writeDelimited method. The code has an extra closing bracket after 'return output;' but before the catch block, which will prevent compilation and cause build failures. This breaks the try-catch structure.

      return output;
Commitable Suggestion
Suggested change
}
} catch (IOException e) {
return output;
Standards
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • ISO-IEC-25010-Reliability-Maturity

Comment on lines +68 to +73
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
Future<List<byte[]>> targets = targetQueue.take();
for (byte[] target : targets.get()) {
out.write(target);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deadlock Potential

The consumer thread calls targetQueue.take() which blocks until an element is available. If an exception occurs during queue population and stillAddingTargetsToQueue remains true while the queue is empty, the consumer thread will deadlock waiting indefinitely for items that will never arrive.

                      while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
                        Future<List<byte[]>> targets;
                        try {
                          targets = targetQueue.poll(100, TimeUnit.MILLISECONDS);
                          if (targets == null) {
                            continue;
                          }
                        } catch (InterruptedException e) {
                          Thread.currentThread().interrupt();
                          throw new WrappedInterruptedException(e);
                        }
                        for (byte[] target : targets.get()) {
                          out.write(target);
                        }
Commitable Suggestion
Suggested change
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
Future<List<byte[]>> targets = targetQueue.take();
for (byte[] target : targets.get()) {
out.write(target);
}
}
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
Future<List<byte[]>> targets;
try {
targets = targetQueue.poll(100, TimeUnit.MILLISECONDS);
if (targets == null) {
continue;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new WrappedInterruptedException(e);
}
for (byte[] target : targets.get()) {
out.write(target);
}
Standards
  • Algorithm-Correctness-Concurrency
  • Logic-Verification-Deadlock-Prevention

Comment on lines +54 to +61
try (ForkJoinPool executor =
new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
// we use asyncMode to ensure the queue is processed FIFO, which maximizes
// throughput
true)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk

The ForkJoinTask created inside the try-with-resources block continues execution after the ForkJoinPool is closed. This creates a resource leak risk as tasks may attempt to use the shutdown executor, potentially causing RejectedExecutionException or task failures.

        ForkJoinPool executor = new ForkJoinPool(
            Runtime.getRuntime().availableProcessors(),
            ForkJoinPool.defaultForkJoinWorkerThreadFactory,
            null,
            // we use asyncMode to ensure the queue is processed FIFO, which maximizes
            // throughput
            true);
        try {
Commitable Suggestion
Suggested change
try (ForkJoinPool executor =
new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
// we use asyncMode to ensure the queue is processed FIFO, which maximizes
// throughput
true)) {
ForkJoinPool executor = new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
// we use asyncMode to ensure the queue is processed FIFO, which maximizes
// throughput
true);
try {
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Resource-Management

Comment on lines +78 to +81
} catch (ExecutionException e) {
// TODO: figure out what might be in here and propagate
throw new RuntimeException(e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uncaught RuntimeException Propagation

Unresolved TODO with generic RuntimeException wrapping can mask actual failure causes. The code throws a generic RuntimeException without proper cause analysis, potentially hiding important error information and making debugging difficult in production.

                    } catch (ExecutionException e) {
                      Throwable cause = e.getCause();
                      if (cause instanceof IOException) {
                        throw new WrappedIOException((IOException) cause);
                      } else if (cause instanceof InterruptedException) {
                        throw new WrappedInterruptedException((InterruptedException) cause);
                      } else {
                        throw new RuntimeException("Error processing target data", e);
                      }
                    }
Commitable Suggestion
Suggested change
} catch (ExecutionException e) {
// TODO: figure out what might be in here and propagate
throw new RuntimeException(e);
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw new WrappedIOException((IOException) cause);
} else if (cause instanceof InterruptedException) {
throw new WrappedInterruptedException((InterruptedException) cause);
} else {
throw new RuntimeException("Error processing target data", e);
}
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Error-Handling

Comment on lines +93 to +96
} catch (ExecutionException e) {
// TODO: propagate
throw new RuntimeException(e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unhandled ExecutionException Propagation

Another unresolved TODO with generic RuntimeException wrapping. The code fails to properly analyze and handle the underlying cause of ExecutionException, which could contain important failure information needed for proper error handling and recovery.

        } catch (ExecutionException e) {
          Throwable cause = e.getCause();
          if (cause instanceof WrappedIOException) {
            throw ((WrappedIOException) cause).getCause();
          } else if (cause instanceof WrappedInterruptedException) {
            Thread.currentThread().interrupt();
            throw ((WrappedInterruptedException) cause).getCause();
          } else {
            throw new IOException("Error processing query results", e);
          }
        } finally {
          executor.shutdown();
        }
Commitable Suggestion
Suggested change
} catch (ExecutionException e) {
// TODO: propagate
throw new RuntimeException(e);
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof WrappedIOException) {
throw ((WrappedIOException) cause).getCause();
} else if (cause instanceof WrappedInterruptedException) {
Thread.currentThread().interrupt();
throw ((WrappedInterruptedException) cause).getCause();
} else {
throw new IOException("Error processing query results", e);
}
} finally {
executor.shutdown();
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Error-Handling

Comment on lines +74 to +76
} catch (InterruptedException e) {
throw new WrappedInterruptedException(e);
} catch (IOException e) {
Copy link

@refacto-test refacto-test bot Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread Interruption Handling

InterruptedException is caught but the thread's interrupted status is not restored. When wrapping InterruptedException, the current thread's interrupt status should be preserved with Thread.currentThread().interrupt() to ensure proper interruption propagation.

                    } catch (InterruptedException e) {
                      Thread.currentThread().interrupt();
                      throw new WrappedInterruptedException(e);
Commitable Suggestion
Suggested change
} catch (InterruptedException e) {
throw new WrappedInterruptedException(e);
} catch (IOException e) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new WrappedInterruptedException(e);
} catch (IOException e) {
Standards
  • ISO-IEC-25010-Reliability-Maturity
  • SRE-Error-Handling
  • ISO-IEC-25010-Functional-Correctness-Appropriateness

Comment on lines +54 to +61
try (ForkJoinPool executor =
new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
// we use asyncMode to ensure the queue is processed FIFO, which maximizes
// throughput
true)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak Risk

ForkJoinPool is created with no exception handler (null parameter). Uncaught exceptions in worker threads may cause thread termination without proper cleanup, potentially reducing parallelism effectiveness under load and causing resource leaks.

Standards
  • ISO-IEC-25010-Performance-Resource-Utilization
  • Concurrency-Pattern-Exception-Handling
  • Thread-Pool-Management

Comment on lines +124 to +132
+ CodedOutputStream headerOut = CodedOutputStream.newInstance(output, 0, headerSize);
+ headerOut.writeUInt32NoTag(serializedSize);
+ headerOut.flush();
+
+ // 2. write the message bytes immediately after the prefix
+ CodedOutputStream bodyOut =
+ CodedOutputStream.newInstance(output, headerSize, serializedSize);
+ targetProtoBuffer.writeTo(bodyOut);
+ bodyOut.flush();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant CodedOutputStream Creation

Creating separate CodedOutputStream instances for header and body adds overhead. A single CodedOutputStream with appropriate positioning would be more efficient, reducing object creation and improving serialization performance under high throughput.

Standards
  • ISO-IEC-25010-Performance-Resource-Utilization
  • Optimization-Pattern-Object-Reuse
  • Memory-Allocation-Efficiency

Comment on lines +91 to +96
try {
writeAllTargetsFuture.get();
} catch (ExecutionException e) {
// TODO: propagate
throw new RuntimeException(e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unhandled Interrupted Exception

The code catches ExecutionException but doesn't handle InterruptedException from Future.get(). Thread interruption signals are lost, potentially causing thread leaks and preventing proper application shutdown. This can lead to resource exhaustion in long-running applications.

Standards
  • CWE-391
  • OWASP-A06
  • NIST-SSDF-PW.1

// we use asyncMode to ensure the queue is processed FIFO, which maximizes
// throughput
true)) {
var targetQueue = new LinkedBlockingQueue<Future<List<byte[]>>>(MAX_CHUNKS_IN_QUEUE);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Exhaustion Risk

The queue has a bounded size but no handling for queue full conditions. If producers outpace consumers, the put() operation will block indefinitely, potentially causing thread starvation or deadlock. Consider adding timeout mechanisms and graceful degradation when queue capacity is reached.

Standards
  • CWE-400
  • OWASP-A05
  • NIST-SSDF-PW.8

Comment on lines +140 to +160
private static class WrappedIOException extends RuntimeException {
private WrappedIOException(IOException cause) {
super(cause);
}

@Override
public IOException getCause() {
return (IOException) super.getCause();
}
}

private static class WrappedInterruptedException extends RuntimeException {
private WrappedInterruptedException(InterruptedException cause) {
super(cause);
}

@Override
public InterruptedException getCause() {
return (InterruptedException) super.getCause();
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapper Classes Duplication

Two nearly identical wrapper exception classes follow the same pattern with duplicated code structure. This violates DRY principle and increases maintenance burden. A generic wrapper with type parameter or unified exception handling strategy would be more maintainable.

Standards
  • Clean-Code-DRY
  • SOLID-DRY
  • Maintainability-Quality-Duplication

Comment on lines 43 to +45
final OutputStream out, final QueryOptions options, LabelPrinter labelPrinter) {
return new OutputFormatterCallback<Target>() {
private static final int MAX_CHUNKS_IN_QUEUE = Runtime.getRuntime().availableProcessors() * 2;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nested Anonymous Classes

Complex parallel processing logic is implemented within an anonymous inner class. This creates maintainability issues as the code is deeply nested and tightly coupled to its parent class. Extracting this to a named class would improve readability and testability.

Standards
  • Clean-Code-Class-Organization
  • Maintainability-Quality-Nesting
  • Clean-Code-Functions

@@ -34,13 +42,120 @@ public String getName() {
public OutputFormatterCallback<Target> createPostFactoStreamCallback(
final OutputStream out, final QueryOptions options, LabelPrinter labelPrinter) {
return new OutputFormatterCallback<Target>() {
private static final int MAX_CHUNKS_IN_QUEUE = Runtime.getRuntime().availableProcessors() * 2;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic Numbers Usage

The code uses a magic number (500) for chunk size without explanation of why this value was chosen. This reduces maintainability as future developers won't understand the reasoning behind this specific value or know when it should be adjusted.

Standards
  • Clean-Code-Naming
  • Maintainability-Quality-Constants
  • Clean-Code-Comments

Comment on lines +134 to +137
}
} catch (IOException e) {
throw new WrappedIOException(e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broken Exception Chain

The writeDelimited method has mismatched braces causing a logical syntax error. The extra closing brace at line 135 will prevent compilation, and the exception handling block at lines 136-137 is improperly structured, breaking the intended error propagation flow.

Standards
  • Algorithm-Correctness-Syntax
  • Logic-Verification-Error-Handling

Comment on lines +99 to +100
private List<byte[]> writeTargetsDelimitedToByteArrays(List<Target> targets) {
return targets.stream().map(target -> writeDelimited(toProto(target))).toList();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory Exhaustion Risk

The method processes all targets in memory without streaming or chunking large outputs. For large target lists, this could cause excessive memory consumption leading to OutOfMemoryError. Consider implementing true streaming with backpressure to prevent memory exhaustion.

Standards
  • CWE-400
  • OWASP-A05
  • NIST-SSDF-PW.8

@visz11 visz11 closed this Oct 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants