Skip to content

Conversation

@visz11
Copy link
Collaborator

@visz11 visz11 commented Oct 1, 2025

Summary by CodeRabbit

  • Refactor

    • Reworked streaming proto output to process items in parallel and write in order, improving throughput and responsiveness for large queries.
    • Introduced chunked processing to better manage memory and flow control during output.
  • Bug Fixes

    • Improved error handling during streaming to reduce the risk of partial outputs or hangs when I/O or interruptions occur, leading to more reliable runs.

@refacto-test
Copy link

refacto-test bot commented Oct 1, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@coderabbitai
Copy link

coderabbitai bot commented Oct 1, 2025

Walkthrough

Replaces per-target synchronous writes with a parallel producer-consumer pipeline. Targets are processed in parallel via ForkJoinPool into length-delimited byte chunks, enqueued with bounded capacity, and written sequentially by a dedicated consumer. Adds chunk sizing constants, atomic completion signaling, custom delimited writes, and runtime wrappers for IO/interrupt exceptions.

Changes

Cohort / File(s) Summary
Parallel streaming formatter rewrite
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
Reworks streaming proto output to a ForkJoinPool-based producer-consumer model with bounded queue, chunking (MAX_CHUNKS_IN_QUEUE, TARGETS_PER_CHUNK), custom length-delimited serialization, ordered FIFO writing, and runtime exception wrappers for IO and interruptions.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Caller
  participant Formatter as StreamedProtoOutputFormatter
  participant FJP as ForkJoinPool
  participant Producers
  participant Queue as Bounded Queue
  participant Writer as Writer Worker
  participant OS as OutputStream

  Caller->>Formatter: format(stream, targets)
  Formatter->>FJP: submit parallel tasks
  rect rgba(220,240,255,0.5)
    note over Producers,Queue: Produce chunks (length-delimited bytes)
    FJP->>Producers: process TARGETS_PER_CHUNK
    Producers-->>Queue: enqueue byte[] chunk (FIFO)
  end
  rect rgba(235,255,235,0.5)
    note over Writer,OS: Sequential writing ensures order
    Formatter->>Writer: start consumer loop
    loop until completion signaled and queue drained
      Queue-->>Writer: take chunk
      Writer->>OS: write chunk
    end
  end
  Formatter-->>Caller: return

  alt Error (IO / Interrupted)
    Producers-->>Formatter: throw WrappedIOException / WrappedInterruptedException
    Formatter-->>Caller: rethrow runtime wrapper
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I thump a drum of bytes in queue, hop-hop, in tidy rows,
With chunks of proto carrot sticks, the writer calmly goes.
Forked paths race like meadow wind, then join in ordered flow—
A burrow of concurrency, where streams in sequence grow.
Nibble, write, and off I zoom! 🥕✨

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Title Check ⚠️ Warning The title “Clone parallel streamed proto” does not accurately summarize the primary change, which is a rewrite of the StreamedProtoOutputFormatter to process and write protos in parallel via a ForkJoinPool with a producer-consumer queue. It uses the word “Clone” misleadingly and omits key implementation details such as chunking and exception handling that define the main enhancement. Rename the title to clearly reflect the new parallel streaming implementation, for example: “Rewrite StreamedProtoOutputFormatter to stream protos in parallel using ForkJoinPool.”
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch clone-parallel-streamed-proto

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @visz11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the StreamedProtoOutputFormatter by introducing parallel processing capabilities for generating streamed protocol buffer output. The changes leverage Java's ForkJoinPool to concurrently convert and write Target objects, which is expected to improve performance and throughput for large query results. The implementation includes careful management of task queues and refined delimited proto message writing, alongside custom exception handling for concurrent operations.

Highlights

  • Parallel Proto Output: Implemented parallel processing for converting and writing Target objects to streamed protocol buffer output using a ForkJoinPool.
  • Chunking and Queuing: Introduced chunking of query results into smaller lists of targets and utilized a LinkedBlockingQueue to manage concurrent processing, ensuring ordered output while maximizing throughput.
  • Refined Delimited Proto Writing: The writeDelimited method was refactored to explicitly write the var-int length prefix and the message bytes separately, providing a more robust and clear mechanism for length-prefixed proto messages.
  • Concurrent Error Handling: Added custom WrappedIOException and WrappedInterruptedException classes to facilitate the propagation of checked exceptions as unchecked ones within the concurrent execution context.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces parallel processing for serializing targets in the streamed_proto output formatter, which should improve performance. The implementation uses a ForkJoinPool and a producer-consumer pattern. While the overall approach is sound, there are critical issues with the exception handling, where TODO comments mark incomplete logic. This could lead to unhandled exceptions. I've provided suggestions to correctly propagate exceptions. Additionally, there's an opportunity to simplify the protobuf serialization logic for better readability and conciseness.

Comment on lines +93 to +96
} catch (ExecutionException e) {
// TODO: propagate
throw new RuntimeException(e);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The exception handling here is incomplete and breaks the processOutput method's contract, which is declared to throw IOException and InterruptedException. Throwing a generic RuntimeException can cause unhandled exceptions higher up the call stack.

The ExecutionException will contain a wrapped exception from the consumer task (e.g., WrappedIOException or WrappedInterruptedException). You should unwrap the cause and throw the original checked exception.

} catch (ExecutionException e) {
  Throwable cause = e.getCause();
  if (cause instanceof WrappedIOException) {
    throw ((WrappedIOException) cause).getCause();
  } else if (cause instanceof WrappedInterruptedException) {
    throw ((WrappedInterruptedException) cause).getCause();
  } else if (cause instanceof RuntimeException) {
    // Propagate other runtime exceptions from the workers
    throw (RuntimeException) cause;
  }
  // For other Throwables, wrap in a RuntimeException
  throw new RuntimeException(e);
}

Comment on lines +78 to +81
} catch (ExecutionException e) {
// TODO: figure out what might be in here and propagate
throw new RuntimeException(e);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The TODO here indicates incomplete exception handling. The ExecutionException from targets.get() wraps an exception from a serialization task. This should be unwrapped and re-thrown to be handled by the consumer's caller. The serialization tasks wrap exceptions in WrappedIOException and WrappedInterruptedException, which are RuntimeExceptions. You can unwrap and rethrow the cause.

} catch (ExecutionException e) {
  Throwable cause = e.getCause();
  if (cause instanceof RuntimeException) {
    throw (RuntimeException) cause;
  }
  // This path should ideally not be taken if producers only throw RuntimeExceptions.
  throw new RuntimeException(e);
}

Comment on lines +123 to +132
+ // 1. write the var-int length prefix
+ CodedOutputStream headerOut = CodedOutputStream.newInstance(output, 0, headerSize);
+ headerOut.writeUInt32NoTag(serializedSize);
+ headerOut.flush();
+
+ // 2. write the message bytes immediately after the prefix
+ CodedOutputStream bodyOut =
+ CodedOutputStream.newInstance(output, headerSize, serializedSize);
+ targetProtoBuffer.writeTo(bodyOut);
+ bodyOut.flush();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This implementation is correct, but it can be simplified. Instead of creating two CodedOutputStream instances, you can use a single one to write both the header (size prefix) and the message body sequentially into the byte array. The flush() calls are also not necessary as they are no-ops for array-backed output streams.

// write the var-int length prefix and then the message bytes
CodedOutputStream codedOut = CodedOutputStream.newInstance(output);
codedOut.writeUInt32NoTag(serializedSize);
targetProtoBuffer.writeTo(codedOut);

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c0c98a2 and ec1bf6e.

📒 Files selected for processing (1)
  • src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java (2 hunks)

Comment on lines +68 to +86
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
Future<List<byte[]>> targets = targetQueue.take();
for (byte[] target : targets.get()) {
out.write(target);
}
}
} catch (InterruptedException e) {
throw new WrappedInterruptedException(e);
} catch (IOException e) {
throw new WrappedIOException(e);
} catch (ExecutionException e) {
// TODO: figure out what might be in here and propagate
throw new RuntimeException(e);
}
});
try {
for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) {
targetQueue.put(executor.submit(() -> writeTargetsDelimitedToByteArrays(targets)));
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix the consumer loop to avoid deadlock when the queue stays empty.

If partialResult is empty (or every producer fails before enqueuing), stillAddingTargetsToQueue remains true while the queue is empty, so the first targetQueue.take() blocks forever. Nothing ever enqueues a sentinel, so the consumer never reaches the loop guard, your ForkJoinPool never quiesces, and writeAllTargetsFuture.get() never returns. LinkedBlockingQueue.take() intentionally waits indefinitely until a producer supplies an element, so this becomes an unrecoverable hang. (docs.oracle.com)

Please switch to a polling pattern (e.g., poll with timeout) and exit once the flag is cleared, or enqueue an explicit end-of-stream marker so the consumer can drain and terminate. For example:

-          while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
-            Future<List<byte[]>> targets = targetQueue.take();
+          while (true) {
+            Future<List<byte[]>> targets =
+                targetQueue.poll(100, TimeUnit.MILLISECONDS);
+            if (targets == null) {
+              if (!stillAddingTargetsToQueue.get() && targetQueue.isEmpty()) {
+                break;
+              }
+              continue;
+            }

Remember to add the corresponding TimeUnit import. (docs.oracle.com)

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
Future<List<byte[]>> targets = targetQueue.take();
for (byte[] target : targets.get()) {
out.write(target);
}
}
} catch (InterruptedException e) {
throw new WrappedInterruptedException(e);
} catch (IOException e) {
throw new WrappedIOException(e);
} catch (ExecutionException e) {
// TODO: figure out what might be in here and propagate
throw new RuntimeException(e);
}
});
try {
for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) {
targetQueue.put(executor.submit(() -> writeTargetsDelimitedToByteArrays(targets)));
}
// Replace blocking take() with poll(timeout) and explicit exit
- while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
while (true) {
Future<List<byte[]>> targets =
targetQueue.poll(100, TimeUnit.MILLISECONDS);
if (targets == null) {
if (!stillAddingTargetsToQueue.get() && targetQueue.isEmpty()) {
break;
}
continue;
}
for (byte[] target : targets.get()) {
out.write(target);
}
}
🤖 Prompt for AI Agents
In
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
around lines 68 to 86, the consumer uses targetQueue.take() which can block
forever when stillAddingTargetsToQueue remains true and no producer ever
enqueues an element; change the consumer to use a polling pattern (e.g.,
targetQueue.poll(timeout, TimeUnit.MILLISECONDS)) in a loop and break out when
stillAddingTargetsToQueue is false and the queue is empty (or when poll returns
null after the timeout and stillAddingTargetsToQueue is cleared), and add the
corresponding java.util.concurrent.TimeUnit import; alternatively you may
enqueue a distinct end-of-stream sentinel object instead of switching to
poll—pick one approach and ensure the consumer drains remaining entries before
exiting so the ForkJoinPool can quiesce and writeAllTargetsFuture.get() can
complete.

Comment on lines +74 to +77
} catch (InterruptedException e) {
throw new WrappedInterruptedException(e);
} catch (IOException e) {
throw new WrappedIOException(e);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve the interrupt status before rethrowing.

Both the consumer loop and toProto catch InterruptedException, wrap it, and immediately rethrow. Because throwing InterruptedException clears the thread’s interrupt flag, wrapping it without first re‑interrupting loses the cancellation signal for upstream code. Best practice is to call Thread.currentThread().interrupt(); before propagating so higher levels can still observe the interrupt. (stackoverflow.com)

Please add Thread.currentThread().interrupt(); inside each catch before throwing WrappedInterruptedException. (stackoverflow.com)

Also applies to: 104-107

🤖 Prompt for AI Agents
In
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
around lines 74-77 and 104-107, the catch blocks for InterruptedException
rethrow WrappedInterruptedException without preserving the thread interrupt
status; update each InterruptedException catch to call
Thread.currentThread().interrupt(); immediately before throwing new
WrappedInterruptedException(e) so the interrupt flag is preserved for upstream
code to observe.

@visz11
Copy link
Collaborator Author

visz11 commented Oct 1, 2025

/refacto-visz

@refacto-visz
Copy link

refacto-visz bot commented Oct 1, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@refacto-visz
Copy link

refacto-visz bot commented Oct 1, 2025

Code Review: Concurrency Implementation in StreamedProtoOutputFormatter

👍 Well Done
Parallel Processing

Added parallel processing to improve performance while maintaining data consistency.

📁 Selected files for review (1)
  • src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
🎯 Custom Instructions
✅ Applied Instructions
Repository Guidelines
  • Only wrap IOException in RuntimeException when necessary, always preserving the original exception via getCause().
  • Avoid blocking operations (take(), get(), put()) in ForkJoinPool workers as they cause thread starvation; use dedicated thread pools for blocking tasks, wrap with ForkJoinPool.managedBlock(), or switch to standard ExecutorService.

Scope: All files

❌ Unapplied Instructions
Organization Guidelines

Reason: Your set path patterns [] don't match any selected files for review; Your set extension patterns [] don't match any selected files for review

devd-client

Reason: Repository 'devd-client' does not match current PR repository

Comment on lines +68 to +73
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
Future<List<byte[]>> targets = targetQueue.take();
for (byte[] target : targets.get()) {
out.write(target);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking Operations in ForkJoinPool

Blocking operations (take(), get()) are used within a ForkJoinPool worker, which causes thread starvation. ForkJoinPool workers should never block as this prevents work-stealing and can lead to deadlocks. This violates the repository guideline on avoiding blocking operations in ForkJoinPool.

                      while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
                        try {
                          ForkJoinPool.managedBlock(new ManagedBlocker() {
                            private Future<List<byte[]>> result;
                            @Override
                            public boolean block() throws InterruptedException {
                              result = targetQueue.take();
                              return true;
                            }
                            @Override
                            public boolean isReleasable() {
                              return result != null;
                            }
                          });
                          Future<List<byte[]>> targets = ((ManagedBlocker)result).result;
                          for (byte[] target : targets.get()) {
                            out.write(target);
                          }
                        } catch (InterruptedException e) {
                          throw new WrappedInterruptedException(e);
                        }
                      }
Commitable Suggestion
Suggested change
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
Future<List<byte[]>> targets = targetQueue.take();
for (byte[] target : targets.get()) {
out.write(target);
}
}
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
try {
ForkJoinPool.managedBlock(new ManagedBlocker() {
private Future<List<byte[]>> result;
@Override
public boolean block() throws InterruptedException {
result = targetQueue.take();
return true;
}
@Override
public boolean isReleasable() {
return result != null;
}
});
Future<List<byte[]>> targets = ((ManagedBlocker)result).result;
for (byte[] target : targets.get()) {
out.write(target);
}
} catch (InterruptedException e) {
throw new WrappedInterruptedException(e);
}
}
Standards
  • Repo-Guideline-Avoid blocking operations (take(), get(), put()) in ForkJoinPool workers as they cause thread starvation; use dedicated thread pools for blocking tasks, wrap with ForkJoinPool.managedBlock(), or switch to standard ExecutorService.

Comment on lines +93 to +96
} catch (ExecutionException e) {
// TODO: propagate
throw new RuntimeException(e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uncaught ExecutionException Handling

ExecutionException is caught but improperly handled with a TODO comment and generic RuntimeException. This masks the actual cause of failure and prevents proper error handling. The root cause should be extracted and appropriate exception handling implemented.

        } catch (ExecutionException e) {
          Throwable cause = e.getCause();
          if (cause instanceof WrappedIOException) {
            throw ((WrappedIOException) cause).getCause();
          } else if (cause instanceof WrappedInterruptedException) {
            throw ((WrappedInterruptedException) cause).getCause();
          } else if (cause instanceof RuntimeException) {
            throw (RuntimeException) cause;
          } else if (cause instanceof Error) {
            throw (Error) cause;
          } else {
            throw new RuntimeException("Error processing targets", cause);
          }
        }
Commitable Suggestion
Suggested change
} catch (ExecutionException e) {
// TODO: propagate
throw new RuntimeException(e);
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof WrappedIOException) {
throw ((WrappedIOException) cause).getCause();
} else if (cause instanceof WrappedInterruptedException) {
throw ((WrappedInterruptedException) cause).getCause();
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else if (cause instanceof Error) {
throw (Error) cause;
} else {
throw new RuntimeException("Error processing targets", cause);
}
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • SRE-Error-Handling

Comment on lines +135 to +137
} catch (IOException e) {
throw new WrappedIOException(e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improper Exception Wrapping

The code wraps an IOException in a custom RuntimeException, but doesn't properly document when this wrapping is necessary. This violates the repository guideline that states to only wrap IOException in RuntimeException when necessary, always preserving the original exception.

Standards
  • Repo-Guideline-Only wrap IOException in RuntimeException when necessary, always preserving the original exception via getCause().

// we use asyncMode to ensure the queue is processed FIFO, which maximizes
// throughput
true)) {
var targetQueue = new LinkedBlockingQueue<Future<List<byte[]>>>(MAX_CHUNKS_IN_QUEUE);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbounded Queue Risk

While the queue has a capacity limit (MAX_CHUNKS_IN_QUEUE), the implementation doesn't handle the scenario when the queue is full. If producers outpace consumers, the put() operation will block indefinitely, potentially causing thread starvation or deadlock.

Standards
  • ISO-IEC-25010-Reliability-Maturity
  • SRE-Resource-Management

Comment on lines +54 to +61
try (ForkJoinPool executor =
new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
// we use asyncMode to ensure the queue is processed FIFO, which maximizes
// throughput
true)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nested Thread Pool

Creating a new ForkJoinPool for each processOutput call is inefficient and can lead to resource exhaustion. Thread pools should be shared resources with controlled lifecycle, not created per operation. This approach creates unnecessary thread churn.

Standards
  • ISO-IEC-25010-Reliability-Resource-Utilization
  • SRE-Resource-Management

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants