-
Notifications
You must be signed in to change notification settings - Fork 0
Clone parallel streamed proto #29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
6bd312b
41fbacf
0c9a1d2
913d4a3
9843a5e
9a0efa0
1852be0
89e8b3b
5fc8b13
ec1bf6e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -13,18 +13,26 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // limitations under the License. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| package com.google.devtools.build.lib.query2.query.output; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import com.google.common.collect.Iterables; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import com.google.devtools.build.lib.packages.LabelPrinter; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import com.google.devtools.build.lib.packages.Target; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import com.google.devtools.build.lib.query2.proto.proto2api.Build; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import com.google.protobuf.CodedOutputStream; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.io.IOException; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.io.OutputStream; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.List; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.concurrent.*; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.concurrent.atomic.AtomicBoolean; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * An output formatter that outputs a protocol buffer representation of a query result and outputs | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * the proto bytes to the output print stream. By taking the bytes and calling {@code mergeFrom()} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * on a {@code Build.QueryResult} object the full result can be reconstructed. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public class StreamedProtoOutputFormatter extends ProtoOutputFormatter { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public String getName() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return "streamed_proto"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -34,13 +42,120 @@ public String getName() { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public OutputFormatterCallback<Target> createPostFactoStreamCallback( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| final OutputStream out, final QueryOptions options, LabelPrinter labelPrinter) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return new OutputFormatterCallback<Target>() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private static final int MAX_CHUNKS_IN_QUEUE = Runtime.getRuntime().availableProcessors() * 2; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private static final int TARGETS_PER_CHUNK = 500; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private final LabelPrinter ourLabelPrinter = labelPrinter; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public void processOutput(Iterable<Target> partialResult) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throws IOException, InterruptedException { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (Target target : partialResult) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| toTargetProtoBuffer(target, labelPrinter).writeDelimitedTo(out); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ForkJoinTask<?> writeAllTargetsFuture; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try (ForkJoinPool executor = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| new ForkJoinPool( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Runtime.getRuntime().availableProcessors(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ForkJoinPool.defaultForkJoinWorkerThreadFactory, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| null, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // we use asyncMode to ensure the queue is processed FIFO, which maximizes | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // throughput | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| true)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var targetQueue = new LinkedBlockingQueue<Future<List<byte[]>>>(MAX_CHUNKS_IN_QUEUE); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unbounded Queue RiskWhile the queue has a capacity limit (MAX_CHUNKS_IN_QUEUE), the implementation doesn't handle the scenario when the queue is full. If producers outpace consumers, the put() operation will block indefinitely, potentially causing thread starvation or deadlock. Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var stillAddingTargetsToQueue = new AtomicBoolean(true); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| writeAllTargetsFuture = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| executor.submit( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| () -> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Future<List<byte[]>> targets = targetQueue.take(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (byte[] target : targets.get()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| out.write(target); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+68
to
+73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Blocking Operations in ForkJoinPoolBlocking operations (take(), get()) are used within a ForkJoinPool worker, which causes thread starvation. ForkJoinPool workers should never block as this prevents work-stealing and can lead to deadlocks. This violates the repository guideline on avoiding blocking operations in ForkJoinPool. Commitable Suggestion
Suggested change
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (InterruptedException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new WrappedInterruptedException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (IOException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new WrappedIOException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+74
to
+77
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Preserve the interrupt status before rethrowing. Both the consumer loop and Please add Also applies to: 104-107 🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (ExecutionException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // TODO: figure out what might be in here and propagate | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new RuntimeException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+78
to
+81
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The } catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
// This path should ideally not be taken if producers only throw RuntimeExceptions.
throw new RuntimeException(e);
} |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| targetQueue.put(executor.submit(() -> writeTargetsDelimitedToByteArrays(targets))); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+68
to
+86
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix the consumer loop to avoid deadlock when the queue stays empty. If Please switch to a polling pattern (e.g., - while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
- Future<List<byte[]>> targets = targetQueue.take();
+ while (true) {
+ Future<List<byte[]>> targets =
+ targetQueue.poll(100, TimeUnit.MILLISECONDS);
+ if (targets == null) {
+ if (!stillAddingTargetsToQueue.get() && targetQueue.isEmpty()) {
+ break;
+ }
+ continue;
+ }Remember to add the corresponding 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } finally { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stillAddingTargetsToQueue.set(false); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| writeAllTargetsFuture.get(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (ExecutionException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // TODO: propagate | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new RuntimeException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+93
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The exception handling here is incomplete and breaks the The } catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof WrappedIOException) {
throw ((WrappedIOException) cause).getCause();
} else if (cause instanceof WrappedInterruptedException) {
throw ((WrappedInterruptedException) cause).getCause();
} else if (cause instanceof RuntimeException) {
// Propagate other runtime exceptions from the workers
throw (RuntimeException) cause;
}
// For other Throwables, wrap in a RuntimeException
throw new RuntimeException(e);
}
Comment on lines
+93
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uncaught ExecutionException HandlingExecutionException is caught but improperly handled with a TODO comment and generic RuntimeException. This masks the actual cause of failure and prevents proper error handling. The root cause should be extracted and appropriate exception handling implemented. Commitable Suggestion
Suggested change
Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private List<byte[]> writeTargetsDelimitedToByteArrays(List<Target> targets) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return targets.stream().map(target -> writeDelimited(toProto(target))).toList(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private Build.Target toProto(Target target) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return toTargetProtoBuffer(target, ourLabelPrinter); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (InterruptedException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new WrappedInterruptedException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private static byte[] writeDelimited(Build.Target targetProtoBuffer) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var serializedSize = targetProtoBuffer.getSerializedSize(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - var output = new byte[headerSize + serializedSize]; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - var codedOut = CodedOutputStream.newInstance(output, headerSize, output.length - headerSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - targetProtoBuffer.writeTo(codedOut); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + int headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + byte[] output = new byte[headerSize + serializedSize]; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + // 1. write the var-int length prefix | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + CodedOutputStream headerOut = CodedOutputStream.newInstance(output, 0, headerSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + headerOut.writeUInt32NoTag(serializedSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + headerOut.flush(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + // 2. write the message bytes immediately after the prefix | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + CodedOutputStream bodyOut = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + CodedOutputStream.newInstance(output, headerSize, serializedSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + targetProtoBuffer.writeTo(bodyOut); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + bodyOut.flush(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+123
to
+132
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This implementation is correct, but it can be simplified. Instead of creating two // write the var-int length prefix and then the message bytes
CodedOutputStream codedOut = CodedOutputStream.newInstance(output);
codedOut.writeUInt32NoTag(serializedSize);
targetProtoBuffer.writeTo(codedOut); |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return output; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (IOException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new WrappedIOException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+135
to
+137
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Improper Exception WrappingThe code wraps an IOException in a custom RuntimeException, but doesn't properly document when this wrapping is necessary. This violates the repository guideline that states to only wrap IOException in RuntimeException when necessary, always preserving the original exception. Standards
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private static class WrappedIOException extends RuntimeException { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private WrappedIOException(IOException cause) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| super(cause); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public IOException getCause() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return (IOException) super.getCause(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private static class WrappedInterruptedException extends RuntimeException { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private WrappedInterruptedException(InterruptedException cause) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| super(cause); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public InterruptedException getCause() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return (InterruptedException) super.getCause(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nested Thread Pool
Creating a new ForkJoinPool for each processOutput call is inefficient and can lead to resource exhaustion. Thread pools should be shared resources with controlled lifecycle, not created per operation. This approach creates unnecessary thread churn.
Standards