@@ -55,6 +55,7 @@ public class PubSubDirectDStream<T> extends InputDStream<T> implements Streaming
5555
5656 private static final Logger LOG = LoggerFactory .getLogger (PubSubDirectDStream .class );
5757 private static final String CDAP_PIPELINE = "cdap_pipeline" ;
58+ private static final int MAX_SNAPSHOT_ATTEMPTS = 3 ;
5859
5960 private final Credentials credentials ;
6061 private final PubSubSubscriberConfig config ;
@@ -64,6 +65,7 @@ public class PubSubDirectDStream<T> extends InputDStream<T> implements Streaming
6465 private final SerializableFunction <PubSubMessage , T > mappingFn ;
6566 private final StreamingContext streamingContext ;
6667 private final String pipeline ;
68+ private final BackoffConfig backoffConfig ;
6769
6870 private SubscriptionAdminClient subscriptionAdminClient ;
6971 private ProjectSnapshotName currentSnapshotName ;
@@ -82,6 +84,7 @@ public PubSubDirectDStream(io.cdap.cdap.etl.api.streaming.StreamingContext conte
8284 this .pipeline = context .getPipelineName ();
8385 this .credentials = PubSubSubscriberUtil .createCredentials (config .getServiceAccount (),
8486 config .isServiceAccountFilePath ());
87+ backoffConfig = BackoffConfig .defaultInstance ();
8588 }
8689
8790 @ Override
@@ -195,7 +198,7 @@ private String generateName(String subscription) {
195198 }
196199
197200 private void createSubscriptionIfNotPresent () throws IOException , InterruptedException {
198- PubSubSubscriberUtil .createSubscription (() -> true , BackoffConfig . defaultInstance () ,
201+ PubSubSubscriberUtil .createSubscription (() -> true , backoffConfig ,
199202 ProjectSubscriptionName .format (config .getProject (),
200203 config .getSubscription ()),
201204 TopicName .format (config .getProject (), config .getTopic ()),
@@ -243,15 +246,21 @@ private ProjectSnapshotName fetchSnapShot(String subscriptionId,
243246
244247 private Snapshot createSnapshot (ProjectSnapshotName projectSnapshotName ,
245248 ProjectSubscriptionName projectSubscriptionName ) {
246- // Creation takes around 3.5 s .
247249 LOG .debug ("Creating snapshot {} for subscription {} in Pub/Sub ." , projectSnapshotName .toString (),
248250 projectSubscriptionName .toString ());
249- CreateSnapshotRequest request = CreateSnapshotRequest .newBuilder ()
250- .setName (projectSnapshotName .toString ())
251- .setSubscription (projectSubscriptionName .toString ())
252- .putAllLabels (Collections .singletonMap (CDAP_PIPELINE , getLabelValue (pipeline )))
253- .build ();
254- return subscriptionAdminClient .createSnapshot (request );
251+ try {
252+ return PubSubSubscriberUtil .callWithRetry (() -> {
253+ // Creation takes around 3.5 s .
254+ CreateSnapshotRequest request = CreateSnapshotRequest .newBuilder ()
255+ .setName (projectSnapshotName .toString ())
256+ .setSubscription (projectSubscriptionName .toString ())
257+ .putAllLabels (Collections .singletonMap (CDAP_PIPELINE , getLabelValue (pipeline )))
258+ .build ();
259+ return subscriptionAdminClient .createSnapshot (request );
260+ }, backoffConfig , MAX_SNAPSHOT_ATTEMPTS );
261+ } catch (Exception e ) {
262+ throw new RuntimeException (e );
263+ }
255264 }
256265
257266 @ VisibleForTesting
@@ -268,9 +277,16 @@ static String getLabelValue(String pipeline) {
268277 }
269278
270279 private void deleteSnapshot (ProjectSnapshotName projectSnapshotName ) {
271- // Deletion takes around 2.5 s .
272- // TODO - Consider making this asynchronous
273- subscriptionAdminClient .deleteSnapshot (projectSnapshotName );
280+ try {
281+ PubSubSubscriberUtil .callWithRetry (() -> {
282+ // Deletion takes around 2.5 s .
283+ // TODO - Consider making this asynchronous
284+ subscriptionAdminClient .deleteSnapshot (projectSnapshotName );
285+ return null ;
286+ }, backoffConfig , MAX_SNAPSHOT_ATTEMPTS );
287+ } catch (Exception e ) {
288+ throw new RuntimeException (e );
289+ }
274290 }
275291
276292 private void saveSnapshotAsState (Snapshot snapshot , String subscription ,
0 commit comments