Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,26 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.query.output;

import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.packages.LabelPrinter;
import com.google.devtools.build.lib.packages.Target;
import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
import com.google.devtools.build.lib.query2.proto.proto2api.Build;
import com.google.protobuf.CodedOutputStream;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* An output formatter that outputs a protocol buffer representation of a query result and outputs
* the proto bytes to the output print stream. By taking the bytes and calling {@code mergeFrom()}
* on a {@code Build.QueryResult} object the full result can be reconstructed.
*/
public class StreamedProtoOutputFormatter extends ProtoOutputFormatter {

@Override
public String getName() {
return "streamed_proto";
Expand All @@ -34,13 +42,120 @@ public String getName() {
public OutputFormatterCallback<Target> createPostFactoStreamCallback(
final OutputStream out, final QueryOptions options, LabelPrinter labelPrinter) {
return new OutputFormatterCallback<Target>() {
private static final int MAX_CHUNKS_IN_QUEUE = Runtime.getRuntime().availableProcessors() * 2;
private static final int TARGETS_PER_CHUNK = 500;

private final LabelPrinter ourLabelPrinter = labelPrinter;

@Override
public void processOutput(Iterable<Target> partialResult)
throws IOException, InterruptedException {
for (Target target : partialResult) {
toTargetProtoBuffer(target, labelPrinter).writeDelimitedTo(out);
ForkJoinTask<?> writeAllTargetsFuture;
try (ForkJoinPool executor =
new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
// we use asyncMode to ensure the queue is processed FIFO, which maximizes
// throughput
true)) {
Comment on lines +54 to +61
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nested Thread Pool

Creating a new ForkJoinPool for each processOutput call is inefficient and can lead to resource exhaustion. Thread pools should be shared resources with controlled lifecycle, not created per operation. This approach creates unnecessary thread churn.

Standards
  • ISO-IEC-25010-Reliability-Resource-Utilization
  • SRE-Resource-Management

var targetQueue = new LinkedBlockingQueue<Future<List<byte[]>>>(MAX_CHUNKS_IN_QUEUE);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbounded Queue Risk

While the queue has a capacity limit (MAX_CHUNKS_IN_QUEUE), the implementation doesn't handle the scenario when the queue is full. If producers outpace consumers, the put() operation will block indefinitely, potentially causing thread starvation or deadlock.

Standards
  • ISO-IEC-25010-Reliability-Maturity
  • SRE-Resource-Management

var stillAddingTargetsToQueue = new AtomicBoolean(true);
writeAllTargetsFuture =
executor.submit(
() -> {
try {
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
Future<List<byte[]>> targets = targetQueue.take();
for (byte[] target : targets.get()) {
out.write(target);
}
}
Comment on lines +68 to +73
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking Operations in ForkJoinPool

Blocking operations (take(), get()) are used within a ForkJoinPool worker, which causes thread starvation. ForkJoinPool workers should never block as this prevents work-stealing and can lead to deadlocks. This violates the repository guideline on avoiding blocking operations in ForkJoinPool.

                      while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
                        try {
                          ForkJoinPool.managedBlock(new ManagedBlocker() {
                            private Future<List<byte[]>> result;
                            @Override
                            public boolean block() throws InterruptedException {
                              result = targetQueue.take();
                              return true;
                            }
                            @Override
                            public boolean isReleasable() {
                              return result != null;
                            }
                          });
                          Future<List<byte[]>> targets = ((ManagedBlocker)result).result;
                          for (byte[] target : targets.get()) {
                            out.write(target);
                          }
                        } catch (InterruptedException e) {
                          throw new WrappedInterruptedException(e);
                        }
                      }
Commitable Suggestion
Suggested change
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
Future<List<byte[]>> targets = targetQueue.take();
for (byte[] target : targets.get()) {
out.write(target);
}
}
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
try {
ForkJoinPool.managedBlock(new ManagedBlocker() {
private Future<List<byte[]>> result;
@Override
public boolean block() throws InterruptedException {
result = targetQueue.take();
return true;
}
@Override
public boolean isReleasable() {
return result != null;
}
});
Future<List<byte[]>> targets = ((ManagedBlocker)result).result;
for (byte[] target : targets.get()) {
out.write(target);
}
} catch (InterruptedException e) {
throw new WrappedInterruptedException(e);
}
}
Standards
  • Repo-Guideline-Avoid blocking operations (take(), get(), put()) in ForkJoinPool workers as they cause thread starvation; use dedicated thread pools for blocking tasks, wrap with ForkJoinPool.managedBlock(), or switch to standard ExecutorService.

} catch (InterruptedException e) {
throw new WrappedInterruptedException(e);
} catch (IOException e) {
throw new WrappedIOException(e);
Comment on lines +74 to +77
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve the interrupt status before rethrowing.

Both the consumer loop and toProto catch InterruptedException, wrap it, and immediately rethrow. Because throwing InterruptedException clears the thread’s interrupt flag, wrapping it without first re‑interrupting loses the cancellation signal for upstream code. Best practice is to call Thread.currentThread().interrupt(); before propagating so higher levels can still observe the interrupt. (stackoverflow.com)

Please add Thread.currentThread().interrupt(); inside each catch before throwing WrappedInterruptedException. (stackoverflow.com)

Also applies to: 104-107

🤖 Prompt for AI Agents
In
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
around lines 74-77 and 104-107, the catch blocks for InterruptedException
rethrow WrappedInterruptedException without preserving the thread interrupt
status; update each InterruptedException catch to call
Thread.currentThread().interrupt(); immediately before throwing new
WrappedInterruptedException(e) so the interrupt flag is preserved for upstream
code to observe.

} catch (ExecutionException e) {
// TODO: figure out what might be in here and propagate
throw new RuntimeException(e);
}
Comment on lines +78 to +81

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The TODO here indicates incomplete exception handling. The ExecutionException from targets.get() wraps an exception from a serialization task. This should be unwrapped and re-thrown to be handled by the consumer's caller. The serialization tasks wrap exceptions in WrappedIOException and WrappedInterruptedException, which are RuntimeExceptions. You can unwrap and rethrow the cause.

} catch (ExecutionException e) {
  Throwable cause = e.getCause();
  if (cause instanceof RuntimeException) {
    throw (RuntimeException) cause;
  }
  // This path should ideally not be taken if producers only throw RuntimeExceptions.
  throw new RuntimeException(e);
}

});
try {
for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) {
targetQueue.put(executor.submit(() -> writeTargetsDelimitedToByteArrays(targets)));
}
Comment on lines +68 to +86
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix the consumer loop to avoid deadlock when the queue stays empty.

If partialResult is empty (or every producer fails before enqueuing), stillAddingTargetsToQueue remains true while the queue is empty, so the first targetQueue.take() blocks forever. Nothing ever enqueues a sentinel, so the consumer never reaches the loop guard, your ForkJoinPool never quiesces, and writeAllTargetsFuture.get() never returns. LinkedBlockingQueue.take() intentionally waits indefinitely until a producer supplies an element, so this becomes an unrecoverable hang. (docs.oracle.com)

Please switch to a polling pattern (e.g., poll with timeout) and exit once the flag is cleared, or enqueue an explicit end-of-stream marker so the consumer can drain and terminate. For example:

-          while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
-            Future<List<byte[]>> targets = targetQueue.take();
+          while (true) {
+            Future<List<byte[]>> targets =
+                targetQueue.poll(100, TimeUnit.MILLISECONDS);
+            if (targets == null) {
+              if (!stillAddingTargetsToQueue.get() && targetQueue.isEmpty()) {
+                break;
+              }
+              continue;
+            }

Remember to add the corresponding TimeUnit import. (docs.oracle.com)

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
Future<List<byte[]>> targets = targetQueue.take();
for (byte[] target : targets.get()) {
out.write(target);
}
}
} catch (InterruptedException e) {
throw new WrappedInterruptedException(e);
} catch (IOException e) {
throw new WrappedIOException(e);
} catch (ExecutionException e) {
// TODO: figure out what might be in here and propagate
throw new RuntimeException(e);
}
});
try {
for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) {
targetQueue.put(executor.submit(() -> writeTargetsDelimitedToByteArrays(targets)));
}
// Replace blocking take() with poll(timeout) and explicit exit
- while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
while (true) {
Future<List<byte[]>> targets =
targetQueue.poll(100, TimeUnit.MILLISECONDS);
if (targets == null) {
if (!stillAddingTargetsToQueue.get() && targetQueue.isEmpty()) {
break;
}
continue;
}
for (byte[] target : targets.get()) {
out.write(target);
}
}
🤖 Prompt for AI Agents
In
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
around lines 68 to 86, the consumer uses targetQueue.take() which can block
forever when stillAddingTargetsToQueue remains true and no producer ever
enqueues an element; change the consumer to use a polling pattern (e.g.,
targetQueue.poll(timeout, TimeUnit.MILLISECONDS)) in a loop and break out when
stillAddingTargetsToQueue is false and the queue is empty (or when poll returns
null after the timeout and stillAddingTargetsToQueue is cleared), and add the
corresponding java.util.concurrent.TimeUnit import; alternatively you may
enqueue a distinct end-of-stream sentinel object instead of switching to
poll—pick one approach and ensure the consumer drains remaining entries before
exiting so the ForkJoinPool can quiesce and writeAllTargetsFuture.get() can
complete.

} finally {
stillAddingTargetsToQueue.set(false);
}
}
try {
writeAllTargetsFuture.get();
} catch (ExecutionException e) {
// TODO: propagate
throw new RuntimeException(e);
}
Comment on lines +93 to +96

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The exception handling here is incomplete and breaks the processOutput method's contract, which is declared to throw IOException and InterruptedException. Throwing a generic RuntimeException can cause unhandled exceptions higher up the call stack.

The ExecutionException will contain a wrapped exception from the consumer task (e.g., WrappedIOException or WrappedInterruptedException). You should unwrap the cause and throw the original checked exception.

} catch (ExecutionException e) {
  Throwable cause = e.getCause();
  if (cause instanceof WrappedIOException) {
    throw ((WrappedIOException) cause).getCause();
  } else if (cause instanceof WrappedInterruptedException) {
    throw ((WrappedInterruptedException) cause).getCause();
  } else if (cause instanceof RuntimeException) {
    // Propagate other runtime exceptions from the workers
    throw (RuntimeException) cause;
  }
  // For other Throwables, wrap in a RuntimeException
  throw new RuntimeException(e);
}

Comment on lines +93 to +96
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uncaught ExecutionException Handling

ExecutionException is caught but improperly handled with a TODO comment and generic RuntimeException. This masks the actual cause of failure and prevents proper error handling. The root cause should be extracted and appropriate exception handling implemented.

        } catch (ExecutionException e) {
          Throwable cause = e.getCause();
          if (cause instanceof WrappedIOException) {
            throw ((WrappedIOException) cause).getCause();
          } else if (cause instanceof WrappedInterruptedException) {
            throw ((WrappedInterruptedException) cause).getCause();
          } else if (cause instanceof RuntimeException) {
            throw (RuntimeException) cause;
          } else if (cause instanceof Error) {
            throw (Error) cause;
          } else {
            throw new RuntimeException("Error processing targets", cause);
          }
        }
Commitable Suggestion
Suggested change
} catch (ExecutionException e) {
// TODO: propagate
throw new RuntimeException(e);
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof WrappedIOException) {
throw ((WrappedIOException) cause).getCause();
} else if (cause instanceof WrappedInterruptedException) {
throw ((WrappedInterruptedException) cause).getCause();
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else if (cause instanceof Error) {
throw (Error) cause;
} else {
throw new RuntimeException("Error processing targets", cause);
}
}
Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • SRE-Error-Handling

}

private List<byte[]> writeTargetsDelimitedToByteArrays(List<Target> targets) {
return targets.stream().map(target -> writeDelimited(toProto(target))).toList();
}

private Build.Target toProto(Target target) {
try {
return toTargetProtoBuffer(target, ourLabelPrinter);
} catch (InterruptedException e) {
throw new WrappedInterruptedException(e);
}
}
};
}

private static byte[] writeDelimited(Build.Target targetProtoBuffer) {
try {
var serializedSize = targetProtoBuffer.getSerializedSize();
- var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
- var output = new byte[headerSize + serializedSize];
- var codedOut = CodedOutputStream.newInstance(output, headerSize, output.length - headerSize);
- targetProtoBuffer.writeTo(codedOut);
+ int headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
+ byte[] output = new byte[headerSize + serializedSize];
+
+ // 1. write the var-int length prefix
+ CodedOutputStream headerOut = CodedOutputStream.newInstance(output, 0, headerSize);
+ headerOut.writeUInt32NoTag(serializedSize);
+ headerOut.flush();
+
+ // 2. write the message bytes immediately after the prefix
+ CodedOutputStream bodyOut =
+ CodedOutputStream.newInstance(output, headerSize, serializedSize);
+ targetProtoBuffer.writeTo(bodyOut);
+ bodyOut.flush();
Comment on lines +123 to +132

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This implementation is correct, but it can be simplified. Instead of creating two CodedOutputStream instances, you can use a single one to write both the header (size prefix) and the message body sequentially into the byte array. The flush() calls are also not necessary as they are no-ops for array-backed output streams.

// write the var-int length prefix and then the message bytes
CodedOutputStream codedOut = CodedOutputStream.newInstance(output);
codedOut.writeUInt32NoTag(serializedSize);
targetProtoBuffer.writeTo(codedOut);

return output;
}
} catch (IOException e) {
throw new WrappedIOException(e);
}
Comment on lines +135 to +137
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improper Exception Wrapping

The code wraps an IOException in a custom RuntimeException, but doesn't properly document when this wrapping is necessary. This violates the repository guideline that states to only wrap IOException in RuntimeException when necessary, always preserving the original exception.

Standards
  • Repo-Guideline-Only wrap IOException in RuntimeException when necessary, always preserving the original exception via getCause().

}

private static class WrappedIOException extends RuntimeException {
private WrappedIOException(IOException cause) {
super(cause);
}

@Override
public IOException getCause() {
return (IOException) super.getCause();
}
}

private static class WrappedInterruptedException extends RuntimeException {
private WrappedInterruptedException(InterruptedException cause) {
super(cause);
}

@Override
public InterruptedException getCause() {
return (InterruptedException) super.getCause();
}
}
}
Loading