@@ -408,21 +408,7 @@ private void setFailureFutureOutstandingMessages(Throwable t) {
408408 AckResponse ackResponse ;
409409
410410 if (getExactlyOnceDeliveryEnabled ()) {
411- if (!(t instanceof ApiException )) {
412- ackResponse = AckResponse .OTHER ;
413- }
414-
415- ApiException apiException = (ApiException ) t ;
416- switch (apiException .getStatusCode ().getCode ()) {
417- case FAILED_PRECONDITION :
418- ackResponse = AckResponse .FAILED_PRECONDITION ;
419- break ;
420- case PERMISSION_DENIED :
421- ackResponse = AckResponse .PERMISSION_DENIED ;
422- break ;
423- default :
424- ackResponse = AckResponse .OTHER ;
425- }
411+ ackResponse = StatusUtil .getFailedAckResponse (t );
426412 } else {
427413 // We should set success regardless if ExactlyOnceDelivery is not enabled
428414 ackResponse = AckResponse .SUCCESSFUL ;
@@ -504,7 +490,7 @@ private void sendModackOperations(
504490 modackRequestData .getIsReceiptModack ());
505491 ApiFutureCallback <Empty > callback =
506492 getCallback (
507- modackRequestData . getAckRequestData () ,
493+ ackRequestDataInRequestList ,
508494 deadlineExtensionSeconds ,
509495 true ,
510496 currentBackoffMillis ,
@@ -611,59 +597,91 @@ public void onFailure(Throwable t) {
611597 List <AckRequestData > ackRequestDataArrayRetryList = new ArrayList <>();
612598 try {
613599 Map <String , String > metadataMap = getMetadataMapFromThrowable (t );
614- ackRequestDataList .forEach (
615- ackRequestData -> {
616- String ackId = ackRequestData .getAckId ();
617- if (metadataMap .containsKey (ackId )) {
618- // An error occured
619- String errorMessage = metadataMap .get (ackId );
620- if (errorMessage .startsWith (TRANSIENT_FAILURE_METADATA_PREFIX )) {
621- // Retry all "TRANSIENT_*" error messages - do not set message future
622- logger .log (Level .INFO , "Transient error message, will resend" , errorMessage );
623- ackRequestDataArrayRetryList .add (ackRequestData );
624- } else if (errorMessage .equals (PERMANENT_FAILURE_INVALID_ACK_ID_METADATA )) {
625- // Permanent failure, send
626- logger .log (
627- Level .INFO ,
628- "Permanent error invalid ack id message, will not resend" ,
629- errorMessage );
630- ackRequestData .setResponse (AckResponse .INVALID , setResponseOnSuccess );
600+ if (metadataMap .isEmpty ()) {
601+ String operation = isModack ? "ModifyAckDeadline" : "Acknowledge" ;
602+ if (!StatusUtil .isRetryable (t )) {
603+ logger .log (Level .WARNING , "Un-retryable error on " + operation , t );
604+ ackRequestDataList .forEach (
605+ ackRequestData -> {
606+ AckResponse failedAckResponse = StatusUtil .getFailedAckResponse (t );
607+ ackRequestData .setResponse (failedAckResponse , setResponseOnSuccess );
631608 messageDispatcher .notifyAckFailed (ackRequestData );
632609 tracer .addEndRpcEvent (
633610 ackRequestData .getMessageWrapper (),
634611 rpcSpanSampled ,
635612 isModack ,
636613 deadlineExtensionSeconds );
637614 tracer .setSubscriberSpanException (
638- ackRequestData .getMessageWrapper (), t , "Invalid ack ID" );
615+ ackRequestData .getMessageWrapper (), t , "Error with no metadata map" );
616+ ackRequestData
617+ .getMessageWrapper ()
618+ .setSubscriberSpanException (t , "Error with no metadata map" );
619+ pendingRequests .remove (ackRequestData );
620+ });
621+ } else {
622+ logger .log (Level .INFO , "Retryable error on " + operation + ", will resend" , t );
623+ ackRequestDataArrayRetryList .addAll (ackRequestDataList );
624+ ackRequestDataList .forEach (
625+ ackRequestData -> {
626+ pendingRequests .remove (ackRequestData );
627+ });
628+ }
629+ } else {
630+ ackRequestDataList .forEach (
631+ ackRequestData -> {
632+ String ackId = ackRequestData .getAckId ();
633+ if (metadataMap .containsKey (ackId )) {
634+ // An error occured
635+ String errorMessage = metadataMap .get (ackId );
636+ if (errorMessage .startsWith (TRANSIENT_FAILURE_METADATA_PREFIX )) {
637+ // Retry all "TRANSIENT_*" error messages - do not set message future
638+ logger .log (Level .INFO , "Transient error message, will resend" , errorMessage );
639+ ackRequestDataArrayRetryList .add (ackRequestData );
640+ } else if (errorMessage .equals (PERMANENT_FAILURE_INVALID_ACK_ID_METADATA )) {
641+ // Permanent failure
642+ logger .log (
643+ Level .INFO ,
644+ "Permanent error invalid ack id message, will not resend" ,
645+ errorMessage );
646+ ackRequestData .setResponse (AckResponse .INVALID , setResponseOnSuccess );
647+ messageDispatcher .notifyAckFailed (ackRequestData );
648+ tracer .addEndRpcEvent (
649+ ackRequestData .getMessageWrapper (),
650+ rpcSpanSampled ,
651+ isModack ,
652+ deadlineExtensionSeconds );
653+ tracer .setSubscriberSpanException (
654+ ackRequestData .getMessageWrapper (), t , "Invalid ack ID" );
655+ } else {
656+ logger .log (
657+ Level .INFO , "Unknown error message, will not resend" , errorMessage );
658+ ackRequestData .setResponse (AckResponse .OTHER , setResponseOnSuccess );
659+ messageDispatcher .notifyAckFailed (ackRequestData );
660+ tracer .addEndRpcEvent (
661+ ackRequestData .getMessageWrapper (),
662+ rpcSpanSampled ,
663+ isModack ,
664+ deadlineExtensionSeconds );
665+ tracer .setSubscriberSpanException (
666+ ackRequestData .getMessageWrapper (), t , "Unknown error message" );
667+ ackRequestData
668+ .getMessageWrapper ()
669+ .setSubscriberSpanException (t , "Unknown error message" );
670+ }
639671 } else {
640- logger . log ( Level . INFO , "Unknown error message, will not resend" , errorMessage );
641- ackRequestData . setResponse ( AckResponse . OTHER , setResponseOnSuccess );
642- messageDispatcher . notifyAckFailed (ackRequestData );
672+ ackRequestData . setResponse ( AckResponse . SUCCESSFUL , setResponseOnSuccess );
673+ messageDispatcher . notifyAckSuccess ( ackRequestData );
674+ tracer . endSubscriberSpan (ackRequestData . getMessageWrapper () );
643675 tracer .addEndRpcEvent (
644676 ackRequestData .getMessageWrapper (),
645677 rpcSpanSampled ,
646678 isModack ,
647679 deadlineExtensionSeconds );
648- tracer .setSubscriberSpanException (
649- ackRequestData .getMessageWrapper (), t , "Unknown error message" );
650- ackRequestData
651- .getMessageWrapper ()
652- .setSubscriberSpanException (t , "Unknown error message" );
653680 }
654- } else {
655- ackRequestData .setResponse (AckResponse .SUCCESSFUL , setResponseOnSuccess );
656- messageDispatcher .notifyAckSuccess (ackRequestData );
657- tracer .endSubscriberSpan (ackRequestData .getMessageWrapper ());
658- tracer .addEndRpcEvent (
659- ackRequestData .getMessageWrapper (),
660- rpcSpanSampled ,
661- isModack ,
662- deadlineExtensionSeconds );
663- }
664- // Remove from our pending
665- pendingRequests .remove (ackRequestData );
666- });
681+ // Remove from our pending
682+ pendingRequests .remove (ackRequestData );
683+ });
684+ }
667685 } catch (InvalidProtocolBufferException e ) {
668686 // If we fail to parse out the errorInfo, we should retry all
669687 logger .log (
0 commit comments