Skip to content

Commit fb1477e

Browse files
committed
PLUGIN-1769: Add more retry codes to pubsub client
1 parent edf9962 commit fb1477e

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

src/main/java/io/cdap/plugin/gcp/publisher/source/PubSubRDDIterator.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.cdap.plugin.gcp.publisher.source;
1717

1818
import com.google.api.gax.core.FixedCredentialsProvider;
19+
import com.google.api.gax.rpc.StatusCode.Code;
1920
import com.google.auth.Credentials;
2021
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
2122
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
@@ -35,9 +36,11 @@
3536
import java.util.Collections;
3637
import java.util.List;
3738
import java.util.Queue;
39+
import java.util.Set;
3840
import java.util.concurrent.ConcurrentLinkedDeque;
3941
import java.util.concurrent.TimeUnit;
4042
import java.util.stream.Collectors;
43+
import java.util.stream.Stream;
4144

4245
/**
4346
* Iterator for PubSub RDD.
@@ -51,6 +54,12 @@ public class PubSubRDDIterator implements Iterator<PubSubMessage> {
5154
private static final int MAX_MESSAGES = 1000;
5255
private static final int MAX_MESSAGE_SIZE = 20 * 1024 * 1024; //20 MB. Max size for "data" field of a message is 10MB.
5356
private static final int RETRY_DELAY = 100;
57+
// Used to set the retryable code settings for pubsub clients
58+
// The default codes are Code.ABORTED, Code.UNAVAILABLE, Code.UNKNOWN
59+
// for current client version 1.108.1
60+
private static final Set<Code> RETRYABLE_CLIENT_CODES =
61+
Stream.of(Code.RESOURCE_EXHAUSTED, Code.ABORTED, Code.CANCELLED, Code.INTERNAL, Code.UNKNOWN,
62+
Code.UNAVAILABLE, Code.DEADLINE_EXCEEDED).collect(Collectors.toSet());
5463

5564
private final long startTime;
5665
private final PubSubSubscriberConfig config;
@@ -126,7 +135,8 @@ private SubscriberStub buildSubscriberClient() throws IOException {
126135
builder.setTransportChannelProvider(
127136
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
128137
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE).build());
129-
builder.getSubscriptionSettings().setRetrySettings(PubSubSubscriberUtil.getRetrySettings());
138+
builder.getSubscriptionSettings().setRetrySettings(PubSubSubscriberUtil.getRetrySettings())
139+
.setRetryableCodes(RETRYABLE_CLIENT_CODES);
130140
return GrpcSubscriberStub.create(builder.build());
131141
}
132142

0 commit comments

Comments
 (0)