- 
                Notifications
    You must be signed in to change notification settings 
- Fork 0
Speed up streamed-proto query output by distributing work to multiple threads #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
6bd312b
              41fbacf
              0c9a1d2
              913d4a3
              9843a5e
              9a0efa0
              1852be0
              89e8b3b
              5fc8b13
              ec1bf6e
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|  | @@ -13,18 +13,26 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||
| // limitations under the License. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| package com.google.devtools.build.lib.query2.query.output; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import com.google.common.collect.Iterables; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import com.google.devtools.build.lib.packages.LabelPrinter; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import com.google.devtools.build.lib.packages.Target; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import com.google.devtools.build.lib.query2.proto.proto2api.Build; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import com.google.protobuf.CodedOutputStream; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.io.IOException; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.io.OutputStream; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.List; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.concurrent.*; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.concurrent.atomic.AtomicBoolean; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * An output formatter that outputs a protocol buffer representation of a query result and outputs | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * the proto bytes to the output print stream. By taking the bytes and calling {@code mergeFrom()} | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * on a {@code Build.QueryResult} object the full result can be reconstructed. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||
| public class StreamedProtoOutputFormatter extends ProtoOutputFormatter { | ||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||
| public String getName() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return "streamed_proto"; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|  | @@ -34,13 +42,107 @@ public String getName() { | |||||||||||||||||||||||||||||||||||||||||||||||||
| public OutputFormatterCallback<Target> createPostFactoStreamCallback( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| final OutputStream out, final QueryOptions options, LabelPrinter labelPrinter) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return new OutputFormatterCallback<Target>() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private static final int MAX_CHUNKS_IN_QUEUE = Runtime.getRuntime().availableProcessors() * 2; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Magic Numbers UsageThe code uses a magic number (500) for chunk size without explanation of why this value was chosen. This reduces maintainability as future developers won't understand the reasoning behind this specific value or know when it should be adjusted. Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private static final int TARGETS_PER_CHUNK = 500; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Magic Number UsageMagic number 500 lacks explanation for its selection. This hinders maintainability as future developers cannot understand the rationale for this specific batch size value. Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private final LabelPrinter ourLabelPrinter = labelPrinter; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||
| public void processOutput(Iterable<Target> partialResult) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| throws IOException, InterruptedException { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| for (Target target : partialResult) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| toTargetProtoBuffer(target, labelPrinter).writeDelimitedTo(out); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ForkJoinTask<?> writeAllTargetsFuture; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resource Management Issue: The ForkJoinPool is created with parallelism equal to all available processors without any safeguards against resource exhaustion. On many-core systems, this could lead to excessive thread creation and CPU contention. Consider limiting the maximum parallelism and adding an exception handler: // Limit the number of threads to avoid resource exhaustion
int parallelism = Math.min(Runtime.getRuntime().availableProcessors(), 4);
try (ForkJoinPool executor = new ForkJoinPool(
    parallelism,
    ForkJoinPool.defaultForkJoinWorkerThreadFactory,
    (thread, throwable) -> {
        // Log uncaught exceptions in worker threads
        System.err.println("Worker thread exception: " + throwable);
    },
    // we use asyncMode to ensure the queue is processed FIFO
    true)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| try (ForkJoinPool executor = | ||||||||||||||||||||||||||||||||||||||||||||||||||
| new ForkJoinPool( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Runtime.getRuntime().availableProcessors(), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ForkJoinPool.defaultForkJoinWorkerThreadFactory, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| null, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // we use asyncMode to ensure the queue is processed FIFO, which maximizes | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // throughput | ||||||||||||||||||||||||||||||||||||||||||||||||||
| true)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +54
     to 
      +61
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unclosed ForkJoinPoolThe ForkJoinPool is created with try-with-resources but the Future's completion is awaited after the try block exits, risking premature pool shutdown before task completion. Standards
 
      Comment on lines
    
      +54
     to 
      +61
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thread Pool LifecycleCreating a new ForkJoinPool for each processOutput call is expensive. Each pool creation allocates thread resources that must be initialized and later destroyed. Under high query volume, this pattern creates significant thread churn and initialization overhead, potentially degrading overall system performance. Standards
 
      Comment on lines
    
      +54
     to 
      +61
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resource Leak RiskThe ForkJoinTask created inside the try-with-resources block continues execution after the ForkJoinPool is closed. This creates a resource leak risk as tasks may attempt to use the shutdown executor, potentially causing RejectedExecutionException or task failures. Commitable Suggestion
        Suggested change
       
 Standards
 
      Comment on lines
    
      +54
     to 
      +61
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resource Leak RiskForkJoinPool is created with no exception handler (null parameter). Uncaught exceptions in worker threads may cause thread termination without proper cleanup, potentially reducing parallelism effectiveness under load and causing resource leaks. Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||
| var targetQueue = new LinkedBlockingQueue<Future<List<byte[]>>>(MAX_CHUNKS_IN_QUEUE); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resource Exhaustion RiskThe queue has a bounded size but no handling for queue full conditions. If producers outpace consumers, the put() operation will block indefinitely, potentially causing thread starvation or deadlock. Consider adding timeout mechanisms and graceful degradation when queue capacity is reached. Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||
| var stillAddingTargetsToQueue = new AtomicBoolean(true); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| writeAllTargetsFuture = | ||||||||||||||||||||||||||||||||||||||||||||||||||
| executor.submit( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| () -> { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Future<List<byte[]>> targets = targetQueue.take(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| for (byte[] target : targets.get()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| out.write(target); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +55
     to 
      +72
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion ForkJoinPool worker is blocking on  The consumer task runs inside the same  
 Because  Recommended approaches: 
 This will eliminate the starvation risk and make behaviour more predictable. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +68
     to 
      +73
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deadlock PotentialThe consumer thread calls targetQueue.take() which blocks until an element is available. If an exception occurs during queue population and stillAddingTargetsToQueue remains true while the queue is empty, the consumer thread will deadlock waiting indefinitely for items that will never arrive. Commitable Suggestion
        Suggested change
       
 Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (InterruptedException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new WrappedInterruptedException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (IOException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +74
     to 
      +76
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thread Interruption HandlingInterruptedException is caught but the thread's interrupted status is not restored. When wrapping InterruptedException, the current thread's interrupt status should be preserved with Thread.currentThread().interrupt() to ensure proper interruption propagation. Commitable Suggestion
        Suggested change
       
 Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new WrappedIOException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (ExecutionException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thread Safety Issue: The ExecutionException handling is inadequate, using a generic RuntimeException with a TODO comment. This can mask important exceptions during task execution, potentially leading to resource leaks or thread pool exhaustion. Consider unwrapping and properly handling the cause: catch (ExecutionException e) {
  Throwable cause = e.getCause();
  if (cause instanceof IOException) {
    throw new WrappedIOException((IOException) cause);
  } else if (cause instanceof InterruptedException) {
    Thread.currentThread().interrupt(); // Preserve interrupt status
    throw new WrappedInterruptedException((InterruptedException) cause);
  } else if (cause instanceof RuntimeException) {
    throw (RuntimeException) cause;
  } else {
    throw new RuntimeException("Error processing targets", cause);
  }
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // TODO: figure out what might be in here and propagate | ||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new RuntimeException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +78
     to 
      +80
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exception Swallowing RiskGeneric exception handling wraps ExecutionException in RuntimeException without preserving cause details. Attackers could exploit swallowed exceptions to hide malicious activities. Proper exception handling needed for security visibility. Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +79
     to 
      +81
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This  Also, consider logging the exception with sufficient context to aid debugging.                       // TODO: figure out what might be in here and propagate
                      Throwable cause = e.getCause();
                      if (cause instanceof IOException) {
                        throw new WrappedIOException((IOException) cause);
                      } else if (cause instanceof InterruptedException) {
                        throw new WrappedInterruptedException((InterruptedException) cause);
                      } else {
                        throw new RuntimeException("Error during target processing", cause);
                      }
      Comment on lines
    
      +78
     to 
      +81
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing Error PropagationExecutionException caught with TODO comment and generic RuntimeException wrapping. Specific error information is lost, preventing proper error handling and recovery. Standards
 
      Comment on lines
    
      +78
     to 
      +81
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uncaught RuntimeException PropagationUnresolved TODO with generic RuntimeException wrapping can mask actual failure causes. The code throws a generic RuntimeException without proper cause analysis, potentially hiding important error information and making debugging difficult in production. Commitable Suggestion
        Suggested change
       
 Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Concurrency Issue: The blocking  Consider using a non-blocking approach with timeouts: // Use offer with timeout to prevent indefinite blocking
Future<List<byte[]>> task = executor.submit(() -> writeTargetsDelimitedToByteArrays(targets));
if (!targetQueue.offer(task, 5, TimeUnit.SECONDS)) {
  // If queue is full for too long, process this batch directly to make progress
  for (byte[] target : task.get(30, TimeUnit.SECONDS)) {
    out.write(target);
  }
} | ||||||||||||||||||||||||||||||||||||||||||||||||||
| targetQueue.put(executor.submit(() -> writeTargetsDelimitedToByteArrays(targets))); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } finally { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| stillAddingTargetsToQueue.set(false); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +88
     to 
      +90
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After setting            } finally {
            stillAddingTargetsToQueue.set(false);
            executor.shutdown(); // Initiate shutdown
            try {
              if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // Wait for tasks to complete
                System.err.println("ForkJoinPool did not terminate in the specified timeout.");
                // Optionally, log the state of the tasks that are still running.
              }
            } catch (InterruptedException ie) {
              // (Re-)Cancel if current thread also interrupted
              executor.shutdownNow();
              // Preserve interrupt status
              Thread.currentThread().interrupt();
            }
          } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| writeAllTargetsFuture.get(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (ExecutionException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // TODO: propagate | ||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new RuntimeException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +94
     to 
      +96
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the previous          } catch (ExecutionException e) {
          // TODO: propagate
          Throwable cause = e.getCause();
          if (cause instanceof IOException) {
            throw new WrappedIOException((IOException) cause);
          } else if (cause instanceof InterruptedException) {
            throw new WrappedInterruptedException((InterruptedException) cause);
          } else {
            throw new RuntimeException("Error during target processing", cause);
          }
      Comment on lines
    
      +93
     to 
      +96
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exception Handling GapThe code catches ExecutionException but wraps it in a generic RuntimeException without preserving the original exception type. This loses important error context and makes debugging harder. 
        Suggested change
       
 Standards
 
      Comment on lines
    
      +92
     to 
      +96
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uncaught ExceptionsExecutionException is caught but wrapped in RuntimeException with a TODO comment. This loses specific error information and prevents proper error handling. Standards
 
      Comment on lines
    
      +93
     to 
      +96
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unhandled Interrupted ExceptionExecutionException handling doesn't restore interrupted status when InterruptedException is the cause. Thread interruption signals could be lost, preventing proper thread termination. Security-sensitive operations might continue after intended shutdown. Standards
 
      Comment on lines
    
      +93
     to 
      +96
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unhandled ExecutionException PropagationAnother unresolved TODO with generic RuntimeException wrapping. The code fails to properly analyze and handle the underlying cause of ExecutionException, which could contain important failure information needed for proper error handling and recovery. Commitable Suggestion
        Suggested change
       
 Standards
 
      Comment on lines
    
      +91
     to 
      +96
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unhandled Interrupted ExceptionThe code catches ExecutionException but doesn't handle InterruptedException from Future.get(). Thread interruption signals are lost, potentially causing thread leaks and preventing proper application shutdown. This can lead to resource exhaustion in long-running applications. Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private List<byte[]> writeTargetsDelimitedToByteArrays(List<Target> targets) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return targets.stream().map(target -> writeDelimited(toProto(target))).toList(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +99
     to 
      +100
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Memory Exhaustion RiskThe method processes all targets in memory without streaming or chunking large outputs. For large target lists, this could cause excessive memory consumption leading to OutOfMemoryError. Consider implementing true streaming with backpressure to prevent memory exhaustion. Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private Build.Target toProto(Target target) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return toTargetProtoBuffer(target, ourLabelPrinter); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (InterruptedException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new WrappedInterruptedException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private static byte[] writeDelimited(Build.Target targetProtoBuffer) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| var serializedSize = targetProtoBuffer.getSerializedSize(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| var output = new byte[headerSize + serializedSize]; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| var codedOut = CodedOutputStream.newInstance(output, headerSize, output.length - headerSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| targetProtoBuffer.writeTo(codedOut); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| codedOut.flush(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return output; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|         
                  visz11 marked this conversation as resolved.
              Show resolved
            Hide resolved | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (IOException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new WrappedIOException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private static class WrappedIOException extends RuntimeException { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private WrappedIOException(IOException cause) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| super(cause); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||
| public IOException getCause() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return (IOException) super.getCause(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +145
     to 
      +147
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider whether wrapping the      @Override
    public IOException getCause() {
      return cause;
    } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private static class WrappedInterruptedException extends RuntimeException { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private WrappedInterruptedException(InterruptedException cause) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| super(cause); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||
| public InterruptedException getCause() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return (InterruptedException) super.getCause(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +157
     to 
      +159
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to      @Override
    public InterruptedException getCause() {
      return cause;
    } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| 
      Comment on lines
    
      +140
     to 
      +160
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wrapper Classes DuplicationTwo nearly identical wrapper exception classes follow the same pattern with duplicated code structure. This violates DRY principle and increases maintenance burden. A generic wrapper with type parameter or unified exception handling strategy would be more maintainable. Standards
 | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nested Anonymous Classes
Complex parallel processing logic is implemented within an anonymous inner class. This creates maintainability issues as the code is deeply nested and tightly coupled to its parent class. Extracting this to a named class would improve readability and testability.
Standards