@@ -113,9 +113,7 @@ public class AviatorGrpcClient implements AutoCloseable {
113113 private final CountDownLatch initLatch = new CountDownLatch (1 );
114114 private final Semaphore requestSemaphore ;
115115 private final AtomicInteger outstandingRequests = new AtomicInteger (0 );
116- private volatile StreamObserver <UserPromptRequest > requestObserver ;
117- private final AtomicBoolean streamCompleted = new AtomicBoolean (false );
118- private volatile boolean isStreamActive = false ;
116+ private RequestHandler <UserPromptRequest > requestHandler ;
119117
120118 private final ScheduledExecutorService pingScheduler ;
121119 private ScheduledFuture <?> pingTask ;
@@ -231,7 +229,7 @@ private void stopPingPong() {
231229 // Send ping message
232230 private void sendPing () {
233231 try {
234- if (requestObserver != null && ! streamCompleted . get () && isStreamActive ) {
232+ if (requestHandler != null && requestHandler . isReady () ) {
235233 PingRequest pingRequest = PingRequest .newBuilder ()
236234 .setStreamId (streamId )
237235 .setTimestamp (System .currentTimeMillis ())
@@ -242,23 +240,23 @@ private void sendPing() {
242240 .build ();
243241
244242 LOG .info ("Sending ping streamId: {}" , streamId );
245- requestObserver . onNext (pingMsg );
243+ requestHandler . sendRequest (pingMsg );
246244 }
247245 } catch (Exception e ) {
248- if (! streamCompleted . get ()) {
246+ if (requestHandler != null && ! requestHandler . isCompleted ()) {
249247 LOG .warn ("Failed to send ping: {}" , e .getMessage ());
250248 }
251249 }
252250 }
253251
254252 public CompletableFuture <Map <String , AuditResponse >> processBatchRequests (
255253 Queue <UserPrompt > requests , String projectName , String token ) {
256- isStreamActive = true ;
257254 if (requests == null || requests .isEmpty ()) {
258255 LOG .info ("No issues to process" );
259256 return CompletableFuture .completedFuture (new HashMap <>());
260257 }
261258
259+ requestHandler = new RequestHandler <>(streamId );
262260 logger .info ("Starting processing - Total Issues: " + requests .size ());
263261 CompletableFuture <Map <String , AuditResponse >> resultFuture = new CompletableFuture <>();
264262 Map <String , AuditResponse > responses = new ConcurrentHashMap <>();
@@ -272,7 +270,7 @@ public CompletableFuture<Map<String, AuditResponse>> processBatchRequests(
272270
273271 @ Override
274272 public void beforeStart (ClientCallStreamObserver <UserPromptRequest > requestStream ) {
275- requestObserver = requestStream ;
273+ requestHandler . initialize ( requestStream ) ;
276274 }
277275
278276 @ Override
@@ -303,10 +301,9 @@ public void onNext(AuditorResponse response) {
303301 String cliMessage = "Internal server error occurred" ;
304302 logger .error (cliMessage );
305303 resultFuture .completeExceptionally (new AviatorTechnicalException (cliMessage ));
306- if (requestObserver != null ) {
307- requestObserver . onCompleted ();
304+ if (requestHandler != null ) {
305+ requestHandler . complete ();
308306 }
309- streamCompleted .set (true );
310307 latch .countDown ();
311308 return ;
312309 }
@@ -315,7 +312,6 @@ public void onNext(AuditorResponse response) {
315312 handleBackpressureWarning ();
316313 } else if ("BACKPRESSURE_VIOLATION" .equals (response .getStatus ())) {
317314 logger .error ("Server terminated stream due to backpressure violations: {}" , response .getStatusMessage ());
318- streamCompleted .set (true );
319315 if (!resultFuture .isDone ()) {
320316 resultFuture .completeExceptionally (new AviatorTechnicalException ("Stream terminated by server: " + response .getStatusMessage ()));
321317 }
@@ -341,8 +337,8 @@ public void onNext(AuditorResponse response) {
341337 } else {
342338 logger .progress ("Stream initialization failed: " + response .getStatusMessage ());
343339 resultFuture .completeExceptionally (new AviatorTechnicalException ("Stream initialization failed: " + response .getStatusMessage ()));
344- if (requestObserver != null ) {
345- requestObserver . onCompleted ();
340+ if (requestHandler != null ) {
341+ requestHandler . complete ();
346342 }
347343 latch .countDown ();
348344 }
@@ -357,8 +353,8 @@ public void onNext(AuditorResponse response) {
357353
358354 if (completed + errorRequests .get () >= totalRequests ) {
359355 logger .info ("All requests processed, completing stream" );
360- if (streamCompleted . compareAndSet ( false , true ) && requestObserver != null ) {
361- requestObserver . onCompleted ();
356+ if (requestHandler != null && ! requestHandler . isCompleted () ) {
357+ requestHandler . complete ();
362358 }
363359 if (!resultFuture .isDone ()) {
364360 resultFuture .complete (responses );
@@ -410,29 +406,27 @@ public void onCompleted() {
410406 .build ())
411407 .build ();
412408
413- requestObserver . onNext (initRequest );
409+ requestHandler . sendRequest (initRequest );
414410 LOG .info ("Client Id for stream initialization {}" ,streamId );
415411
416412 processingExecutor .submit (() -> {
417413 try {
418414 if (!initLatch .await (30 , TimeUnit .SECONDS )) {
419415 throw new AviatorTechnicalException ("Stream initialization timed out" );
420416 }
421- processRequests (requests , requestObserver );
417+ processRequests (requests );
422418 } catch (InterruptedException e ) {
423419 Thread .currentThread ().interrupt ();
424420 throw new AviatorTechnicalException ("Interrupted during request processing" , e );
425421 } catch (Exception e ) {
426- if (!streamCompleted . get ()) {
422+ if (!requestHandler . isCompleted ()) {
427423 throw new AviatorTechnicalException ("Error during request processing execution" , e );
428424 }
429425 LOG .warn ("Exception caught after stream completion during processing execution" , e );
430426 }
431427 });
432428 } catch (Exception e ) {
433- if (requestObserver != null ) {
434- requestObserver .onError (e );
435- }
429+ requestHandler .sendError (e );
436430 throw new AviatorTechnicalException ("Error initiating batch processing" , e );
437431 }
438432
@@ -445,12 +439,12 @@ public void onCompleted() {
445439 });
446440 }
447441
448- private void processRequests (Queue <UserPrompt > requests , StreamObserver < UserPromptRequest > observer ) {
442+ private void processRequests (Queue <UserPrompt > requests ) {
449443 logger .progress ("Starting to process issues..." );
450444 AtomicInteger failedRequests = new AtomicInteger (0 );
451445 AtomicInteger pendingRequests = new AtomicInteger (0 );
452446
453- while (!requests .isEmpty () && !isShutdown .get () && !streamCompleted . get ()) {
447+ while (!requests .isEmpty () && !isShutdown .get () && !requestHandler . isCompleted ()) {
454448 boolean acquiredSemaphore = false ;
455449 try {
456450 requestSemaphore .acquire ();
@@ -461,11 +455,11 @@ private void processRequests(Queue<UserPrompt> requests, StreamObserver<UserProm
461455 Thread .sleep (backoffMs );
462456 }
463457
464- while (pendingRequests .get () >= serverWindowSize .get () * 0.9 && !isShutdown .get () && !streamCompleted . get ()) {
458+ while (pendingRequests .get () >= serverWindowSize .get () * 0.9 && !isShutdown .get () && !requestHandler . isCompleted ()) {
465459 Thread .sleep (50 );
466460 }
467461
468- if (streamCompleted . get ()) {
462+ if (requestHandler . isCompleted ()) {
469463 break ;
470464 }
471465
@@ -505,15 +499,15 @@ private void processRequests(Queue<UserPrompt> requests, StreamObserver<UserProm
505499
506500 pendingRequests .decrementAndGet ();
507501 } catch (InterruptedException ie ) {
508- if (!streamCompleted . get ()) {
502+ if (!requestHandler . isCompleted ()) {
509503 Thread .currentThread ().interrupt ();
510504 throw new AviatorTechnicalException ("Thread interrupted while processing requests" , ie );
511505 }
512506 break ;
513507 } catch (AviatorSimpleException e ) {
514508 throw e ;
515509 } catch (Exception e ) {
516- if (!streamCompleted . get ()) {
510+ if (!requestHandler . isCompleted ()) {
517511 LOG .error ("Error processing request: {}" , e .getMessage (), e );
518512 throw new AviatorTechnicalException ("Error processing request" , e );
519513 }
@@ -528,12 +522,12 @@ private void processRequests(Queue<UserPrompt> requests, StreamObserver<UserProm
528522 private boolean sendRequestWithRetry (UserPromptRequest request , int maxRetries ) {
529523 for (int attempt = 0 ; attempt < maxRetries ; attempt ++) {
530524 try {
531- if (requestObserver == null ) {
532- LOG .debug ("Request observer is null , aborting send" );
525+ if (requestHandler == null || ! requestHandler . isReady () ) {
526+ LOG .debug ("Request Handler is not ready , aborting send" );
533527 return false ;
534528 }
535529
536- if (streamCompleted . get ()) {
530+ if (requestHandler . isCompleted ()) {
537531 return false ;
538532 }
539533
@@ -543,12 +537,12 @@ private boolean sendRequestWithRetry(UserPromptRequest request, int maxRetries)
543537 throw new AviatorSimpleException ("Message size exceeds maximum allowed limit" );
544538 }
545539
546- requestObserver . onNext (request );
540+ CompletableFuture < Boolean > result = requestHandler . sendRequest (request );
547541 return true ;
548542 } catch (AviatorSimpleException e ) {
549543 throw e ; // Propagate user-facing errors
550544 } catch (Exception e ) {
551- if (!streamCompleted . get ()) {
545+ if (!requestHandler . isCompleted ()) {
552546 LOG .error ("Error sending request (attempt {}): {}" , attempt + 1 , e .getMessage ());
553547 }
554548 if (attempt == maxRetries - 1 ) {
@@ -603,27 +597,16 @@ public void close() {
603597 isShutdown .set (true );
604598 stopPingPong ();
605599 try {
606- if (isStreamActive && !latch .await (10 , TimeUnit .SECONDS )) {
600+ if (requestHandler != null && requestHandler . isReady () && !latch .await (10 , TimeUnit .SECONDS )) {
607601 LOG .warn ("Timed out waiting for stream completion" );
608602 }
609603 } catch (InterruptedException e ) {
610604 Thread .currentThread ().interrupt ();
611605 LOG .warn ("Interrupted while waiting for stream completion" );
612606 }
613- if (requestObserver != null && !streamCompleted . get ()) {
607+ if (requestHandler != null && !requestHandler . isCompleted ()) {
614608 try {
615- if (requestObserver instanceof ClientCallStreamObserver ) {
616- ClientCallStreamObserver <?> clientObserver = (ClientCallStreamObserver <?>) requestObserver ;
617- if (clientObserver .isReady ()) {
618- streamCompleted .set (true );
619- requestObserver .onCompleted ();
620- LOG .debug ("Request observer completed" );
621- } else {
622- LOG .debug ("Request observer not ready, skipping onCompleted" );
623- }
624- } else {
625- LOG .debug ("Request observer is not a ClientCallStreamObserver, skipping onCompleted" );
626- }
609+ requestHandler .complete ().get (5 , TimeUnit .SECONDS );
627610 } catch (Exception e ) {
628611 LOG .debug ("Exception during request observer completion, likely already closed: {}" , e .getMessage ());
629612 }
@@ -636,16 +619,18 @@ public void close() {
636619 Thread .currentThread ().interrupt ();
637620 LOG .warn ("Interrupted during channel shutdown" );
638621 } finally {
639- processingExecutor .shutdown ();
640- try {
641- if (!processingExecutor .awaitTermination (5 , TimeUnit .SECONDS )) {
622+ if (processingExecutor != null ){
623+ processingExecutor .shutdown ();
624+ try {
625+ if (!processingExecutor .awaitTermination (5 , TimeUnit .SECONDS )) {
626+ processingExecutor .shutdownNow ();
627+ LOG .debug ("Processing executor forcibly shut down" );
628+ }
629+ } catch (InterruptedException e ) {
642630 processingExecutor .shutdownNow ();
643- LOG .debug ("Processing executor forcibly shut down" );
631+ Thread .currentThread ().interrupt ();
632+ LOG .warn ("Interrupted during executor shutdown" );
644633 }
645- } catch (InterruptedException e ) {
646- processingExecutor .shutdownNow ();
647- Thread .currentThread ().interrupt ();
648- LOG .warn ("Interrupted during executor shutdown" );
649634 }
650635 }
651636 }
0 commit comments