-
Notifications
You must be signed in to change notification settings - Fork 0
Clone parallel streamed proto #29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…t/StreamedProtoOutputFormatter.java Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
|
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
WalkthroughReplaces per-target synchronous writes with a parallel producer-consumer pipeline. Targets are processed in parallel via ForkJoinPool into length-delimited byte chunks, enqueued with bounded capacity, and written sequentially by a dedicated consumer. Adds chunk sizing constants, atomic completion signaling, custom delimited writes, and runtime wrappers for IO/interrupt exceptions. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller
participant Formatter as StreamedProtoOutputFormatter
participant FJP as ForkJoinPool
participant Producers
participant Queue as Bounded Queue
participant Writer as Writer Worker
participant OS as OutputStream
Caller->>Formatter: format(stream, targets)
Formatter->>FJP: submit parallel tasks
rect rgba(220,240,255,0.5)
note over Producers,Queue: Produce chunks (length-delimited bytes)
FJP->>Producers: process TARGETS_PER_CHUNK
Producers-->>Queue: enqueue byte[] chunk (FIFO)
end
rect rgba(235,255,235,0.5)
note over Writer,OS: Sequential writing ensures order
Formatter->>Writer: start consumer loop
loop until completion signaled and queue drained
Queue-->>Writer: take chunk
Writer->>OS: write chunk
end
end
Formatter-->>Caller: return
alt Error (IO / Interrupted)
Producers-->>Formatter: throw WrappedIOException / WrappedInterruptedException
Formatter-->>Caller: rethrow runtime wrapper
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @visz11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces parallel processing for serializing targets in the streamed_proto output formatter, which should improve performance. The implementation uses a ForkJoinPool and a producer-consumer pattern. While the overall approach is sound, there are critical issues with the exception handling, where TODO comments mark incomplete logic. This could lead to unhandled exceptions. I've provided suggestions to correctly propagate exceptions. Additionally, there's an opportunity to simplify the protobuf serialization logic for better readability and conciseness.
| } catch (ExecutionException e) { | ||
| // TODO: propagate | ||
| throw new RuntimeException(e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception handling here is incomplete and breaks the processOutput method's contract, which is declared to throw IOException and InterruptedException. Throwing a generic RuntimeException can cause unhandled exceptions higher up the call stack.
The ExecutionException will contain a wrapped exception from the consumer task (e.g., WrappedIOException or WrappedInterruptedException). You should unwrap the cause and throw the original checked exception.
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof WrappedIOException) {
throw ((WrappedIOException) cause).getCause();
} else if (cause instanceof WrappedInterruptedException) {
throw ((WrappedInterruptedException) cause).getCause();
} else if (cause instanceof RuntimeException) {
// Propagate other runtime exceptions from the workers
throw (RuntimeException) cause;
}
// For other Throwables, wrap in a RuntimeException
throw new RuntimeException(e);
}| } catch (ExecutionException e) { | ||
| // TODO: figure out what might be in here and propagate | ||
| throw new RuntimeException(e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TODO here indicates incomplete exception handling. The ExecutionException from targets.get() wraps an exception from a serialization task. This should be unwrapped and re-thrown to be handled by the consumer's caller. The serialization tasks wrap exceptions in WrappedIOException and WrappedInterruptedException, which are RuntimeExceptions. You can unwrap and rethrow the cause.
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
// This path should ideally not be taken if producers only throw RuntimeExceptions.
throw new RuntimeException(e);
}| + // 1. write the var-int length prefix | ||
| + CodedOutputStream headerOut = CodedOutputStream.newInstance(output, 0, headerSize); | ||
| + headerOut.writeUInt32NoTag(serializedSize); | ||
| + headerOut.flush(); | ||
| + | ||
| + // 2. write the message bytes immediately after the prefix | ||
| + CodedOutputStream bodyOut = | ||
| + CodedOutputStream.newInstance(output, headerSize, serializedSize); | ||
| + targetProtoBuffer.writeTo(bodyOut); | ||
| + bodyOut.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation is correct, but it can be simplified. Instead of creating two CodedOutputStream instances, you can use a single one to write both the header (size prefix) and the message body sequentially into the byte array. The flush() calls are also not necessary as they are no-ops for array-backed output streams.
// write the var-int length prefix and then the message bytes
CodedOutputStream codedOut = CodedOutputStream.newInstance(output);
codedOut.writeUInt32NoTag(serializedSize);
targetProtoBuffer.writeTo(codedOut);There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java(2 hunks)
| while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) { | ||
| Future<List<byte[]>> targets = targetQueue.take(); | ||
| for (byte[] target : targets.get()) { | ||
| out.write(target); | ||
| } | ||
| } | ||
| } catch (InterruptedException e) { | ||
| throw new WrappedInterruptedException(e); | ||
| } catch (IOException e) { | ||
| throw new WrappedIOException(e); | ||
| } catch (ExecutionException e) { | ||
| // TODO: figure out what might be in here and propagate | ||
| throw new RuntimeException(e); | ||
| } | ||
| }); | ||
| try { | ||
| for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) { | ||
| targetQueue.put(executor.submit(() -> writeTargetsDelimitedToByteArrays(targets))); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix the consumer loop to avoid deadlock when the queue stays empty.
If partialResult is empty (or every producer fails before enqueuing), stillAddingTargetsToQueue remains true while the queue is empty, so the first targetQueue.take() blocks forever. Nothing ever enqueues a sentinel, so the consumer never reaches the loop guard, your ForkJoinPool never quiesces, and writeAllTargetsFuture.get() never returns. LinkedBlockingQueue.take() intentionally waits indefinitely until a producer supplies an element, so this becomes an unrecoverable hang. (docs.oracle.com)
Please switch to a polling pattern (e.g., poll with timeout) and exit once the flag is cleared, or enqueue an explicit end-of-stream marker so the consumer can drain and terminate. For example:
- while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
- Future<List<byte[]>> targets = targetQueue.take();
+ while (true) {
+ Future<List<byte[]>> targets =
+ targetQueue.poll(100, TimeUnit.MILLISECONDS);
+ if (targets == null) {
+ if (!stillAddingTargetsToQueue.get() && targetQueue.isEmpty()) {
+ break;
+ }
+ continue;
+ }Remember to add the corresponding TimeUnit import. (docs.oracle.com)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) { | |
| Future<List<byte[]>> targets = targetQueue.take(); | |
| for (byte[] target : targets.get()) { | |
| out.write(target); | |
| } | |
| } | |
| } catch (InterruptedException e) { | |
| throw new WrappedInterruptedException(e); | |
| } catch (IOException e) { | |
| throw new WrappedIOException(e); | |
| } catch (ExecutionException e) { | |
| // TODO: figure out what might be in here and propagate | |
| throw new RuntimeException(e); | |
| } | |
| }); | |
| try { | |
| for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) { | |
| targetQueue.put(executor.submit(() -> writeTargetsDelimitedToByteArrays(targets))); | |
| } | |
| // Replace blocking take() with poll(timeout) and explicit exit | |
| - while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) { | |
| while (true) { | |
| Future<List<byte[]>> targets = | |
| targetQueue.poll(100, TimeUnit.MILLISECONDS); | |
| if (targets == null) { | |
| if (!stillAddingTargetsToQueue.get() && targetQueue.isEmpty()) { | |
| break; | |
| } | |
| continue; | |
| } | |
| for (byte[] target : targets.get()) { | |
| out.write(target); | |
| } | |
| } |
🤖 Prompt for AI Agents
In
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
around lines 68 to 86, the consumer uses targetQueue.take() which can block
forever when stillAddingTargetsToQueue remains true and no producer ever
enqueues an element; change the consumer to use a polling pattern (e.g.,
targetQueue.poll(timeout, TimeUnit.MILLISECONDS)) in a loop and break out when
stillAddingTargetsToQueue is false and the queue is empty (or when poll returns
null after the timeout and stillAddingTargetsToQueue is cleared), and add the
corresponding java.util.concurrent.TimeUnit import; alternatively you may
enqueue a distinct end-of-stream sentinel object instead of switching to
poll—pick one approach and ensure the consumer drains remaining entries before
exiting so the ForkJoinPool can quiesce and writeAllTargetsFuture.get() can
complete.
| } catch (InterruptedException e) { | ||
| throw new WrappedInterruptedException(e); | ||
| } catch (IOException e) { | ||
| throw new WrappedIOException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preserve the interrupt status before rethrowing.
Both the consumer loop and toProto catch InterruptedException, wrap it, and immediately rethrow. Because throwing InterruptedException clears the thread’s interrupt flag, wrapping it without first re‑interrupting loses the cancellation signal for upstream code. Best practice is to call Thread.currentThread().interrupt(); before propagating so higher levels can still observe the interrupt. (stackoverflow.com)
Please add Thread.currentThread().interrupt(); inside each catch before throwing WrappedInterruptedException. (stackoverflow.com)
Also applies to: 104-107
🤖 Prompt for AI Agents
In
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
around lines 74-77 and 104-107, the catch blocks for InterruptedException
rethrow WrappedInterruptedException without preserving the thread interrupt
status; update each InterruptedException catch to call
Thread.currentThread().interrupt(); immediately before throwing new
WrappedInterruptedException(e) so the interrupt flag is preserved for upstream
code to observe.
|
/refacto-visz |
|
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
Code Review: Concurrency Implementation in StreamedProtoOutputFormatter👍 Well Done
📁 Selected files for review (1)
🎯 Custom Instructions
|
| while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) { | ||
| Future<List<byte[]>> targets = targetQueue.take(); | ||
| for (byte[] target : targets.get()) { | ||
| out.write(target); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking Operations in ForkJoinPool
Blocking operations (take(), get()) are used within a ForkJoinPool worker, which causes thread starvation. ForkJoinPool workers should never block as this prevents work-stealing and can lead to deadlocks. This violates the repository guideline on avoiding blocking operations in ForkJoinPool.
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
try {
ForkJoinPool.managedBlock(new ManagedBlocker() {
private Future<List<byte[]>> result;
@Override
public boolean block() throws InterruptedException {
result = targetQueue.take();
return true;
}
@Override
public boolean isReleasable() {
return result != null;
}
});
Future<List<byte[]>> targets = ((ManagedBlocker)result).result;
for (byte[] target : targets.get()) {
out.write(target);
}
} catch (InterruptedException e) {
throw new WrappedInterruptedException(e);
}
}
Commitable Suggestion
| while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) { | |
| Future<List<byte[]>> targets = targetQueue.take(); | |
| for (byte[] target : targets.get()) { | |
| out.write(target); | |
| } | |
| } | |
| while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) { | |
| try { | |
| ForkJoinPool.managedBlock(new ManagedBlocker() { | |
| private Future<List<byte[]>> result; | |
| @Override | |
| public boolean block() throws InterruptedException { | |
| result = targetQueue.take(); | |
| return true; | |
| } | |
| @Override | |
| public boolean isReleasable() { | |
| return result != null; | |
| } | |
| }); | |
| Future<List<byte[]>> targets = ((ManagedBlocker)result).result; | |
| for (byte[] target : targets.get()) { | |
| out.write(target); | |
| } | |
| } catch (InterruptedException e) { | |
| throw new WrappedInterruptedException(e); | |
| } | |
| } |
Standards
- Repo-Guideline-Avoid blocking operations (take(), get(), put()) in ForkJoinPool workers as they cause thread starvation; use dedicated thread pools for blocking tasks, wrap with ForkJoinPool.managedBlock(), or switch to standard ExecutorService.
| } catch (ExecutionException e) { | ||
| // TODO: propagate | ||
| throw new RuntimeException(e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uncaught ExecutionException Handling
ExecutionException is caught but improperly handled with a TODO comment and generic RuntimeException. This masks the actual cause of failure and prevents proper error handling. The root cause should be extracted and appropriate exception handling implemented.
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof WrappedIOException) {
throw ((WrappedIOException) cause).getCause();
} else if (cause instanceof WrappedInterruptedException) {
throw ((WrappedInterruptedException) cause).getCause();
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else if (cause instanceof Error) {
throw (Error) cause;
} else {
throw new RuntimeException("Error processing targets", cause);
}
}
Commitable Suggestion
| } catch (ExecutionException e) { | |
| // TODO: propagate | |
| throw new RuntimeException(e); | |
| } | |
| } catch (ExecutionException e) { | |
| Throwable cause = e.getCause(); | |
| if (cause instanceof WrappedIOException) { | |
| throw ((WrappedIOException) cause).getCause(); | |
| } else if (cause instanceof WrappedInterruptedException) { | |
| throw ((WrappedInterruptedException) cause).getCause(); | |
| } else if (cause instanceof RuntimeException) { | |
| throw (RuntimeException) cause; | |
| } else if (cause instanceof Error) { | |
| throw (Error) cause; | |
| } else { | |
| throw new RuntimeException("Error processing targets", cause); | |
| } | |
| } |
Standards
- ISO-IEC-25010-Reliability-Fault-Tolerance
- SRE-Error-Handling
| } catch (IOException e) { | ||
| throw new WrappedIOException(e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improper Exception Wrapping
The code wraps an IOException in a custom RuntimeException, but doesn't properly document when this wrapping is necessary. This violates the repository guideline that states to only wrap IOException in RuntimeException when necessary, always preserving the original exception.
Standards
- Repo-Guideline-Only wrap IOException in RuntimeException when necessary, always preserving the original exception via getCause().
| // we use asyncMode to ensure the queue is processed FIFO, which maximizes | ||
| // throughput | ||
| true)) { | ||
| var targetQueue = new LinkedBlockingQueue<Future<List<byte[]>>>(MAX_CHUNKS_IN_QUEUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unbounded Queue Risk
While the queue has a capacity limit (MAX_CHUNKS_IN_QUEUE), the implementation doesn't handle the scenario when the queue is full. If producers outpace consumers, the put() operation will block indefinitely, potentially causing thread starvation or deadlock.
Standards
- ISO-IEC-25010-Reliability-Maturity
- SRE-Resource-Management
| try (ForkJoinPool executor = | ||
| new ForkJoinPool( | ||
| Runtime.getRuntime().availableProcessors(), | ||
| ForkJoinPool.defaultForkJoinWorkerThreadFactory, | ||
| null, | ||
| // we use asyncMode to ensure the queue is processed FIFO, which maximizes | ||
| // throughput | ||
| true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nested Thread Pool
Creating a new ForkJoinPool for each processOutput call is inefficient and can lead to resource exhaustion. Thread pools should be shared resources with controlled lifecycle, not created per operation. This approach creates unnecessary thread churn.
Standards
- ISO-IEC-25010-Reliability-Resource-Utilization
- SRE-Resource-Management
Summary by CodeRabbit
Refactor
Bug Fixes