1616package io .cdap .plugin .gcp .publisher .source ;
1717
1818import com .google .api .gax .core .FixedCredentialsProvider ;
19+ import com .google .api .gax .rpc .StatusCode .Code ;
1920import com .google .auth .Credentials ;
2021import com .google .cloud .pubsub .v1 .stub .GrpcSubscriberStub ;
2122import com .google .cloud .pubsub .v1 .stub .SubscriberStub ;
3536import java .util .Collections ;
3637import java .util .List ;
3738import java .util .Queue ;
39+ import java .util .Set ;
3840import java .util .concurrent .ConcurrentLinkedDeque ;
3941import java .util .concurrent .TimeUnit ;
4042import java .util .stream .Collectors ;
43+ import java .util .stream .Stream ;
4144
4245/**
4346 * Iterator for PubSub RDD.
@@ -51,6 +54,12 @@ public class PubSubRDDIterator implements Iterator<PubSubMessage> {
5154 private static final int MAX_MESSAGES = 1000 ;
5255 private static final int MAX_MESSAGE_SIZE = 20 * 1024 * 1024 ; //20 MB. Max size for "data" field of a message is 10MB.
5356 private static final int RETRY_DELAY = 100 ;
57+ // Used to set the retryable code settings for pubsub clients
58+ // The default codes are Code.ABORTED, Code.UNAVAILABLE, Code.UNKNOWN
59+ // for current client version 1.108.1
60+ private static final Set <Code > RETRYABLE_CLIENT_CODES =
61+ Stream .of (Code .RESOURCE_EXHAUSTED , Code .ABORTED , Code .CANCELLED , Code .INTERNAL , Code .UNKNOWN ,
62+ Code .UNAVAILABLE , Code .DEADLINE_EXCEEDED ).collect (Collectors .toSet ());
5463
5564 private final long startTime ;
5665 private final PubSubSubscriberConfig config ;
@@ -126,7 +135,8 @@ private SubscriberStub buildSubscriberClient() throws IOException {
126135 builder .setTransportChannelProvider (
127136 SubscriberStubSettings .defaultGrpcTransportProviderBuilder ()
128137 .setMaxInboundMessageSize (MAX_MESSAGE_SIZE ).build ());
129- builder .getSubscriptionSettings ().setRetrySettings (PubSubSubscriberUtil .getRetrySettings ());
138+ builder .getSubscriptionSettings ().setRetrySettings (PubSubSubscriberUtil .getRetrySettings ())
139+ .setRetryableCodes (RETRYABLE_CLIENT_CODES );
130140 return GrpcSubscriberStub .create (builder .build ());
131141 }
132142
0 commit comments