diff --git a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java
index 708065c8a..4ebae9bb0 100644
--- a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java
+++ b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java
@@ -91,6 +91,8 @@ public class GCPUtils {
public static final int BQ_DEFAULT_READ_TIMEOUT_SECONDS = 120;
public static final String DATASTORE_SUPPORTED_DOC_URL = "https://cloud.google.com/datastore/docs/concepts/errors";
public static final String BIG_TABLE_SUPPORTED_DOC_URL = "https://cloud.google.com/bigtable/docs/status-codes";
+ public static final String GCE_METADATA_SERVER_ERROR_SUPPORTED_DOC_URL =
+ "https://cloud.google.com/compute/docs/troubleshooting/troubleshoot-metadata-server";
/**
* Load a service account from the local file system.
diff --git a/src/main/java/io/cdap/plugin/gcp/common/ServerErrorException.java b/src/main/java/io/cdap/plugin/gcp/common/ServerErrorException.java
new file mode 100644
index 000000000..4f56a75af
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/gcp/common/ServerErrorException.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.gcp.common;
+
+/**
+ * Exception indicating a server-side error (HTTP 5xx).
+ *
+ * This exception is intended to be used when a server responds with an HTTP 5xx status code,
+ * which typically indicates temporary unavailability or failure on the server's part.
+ * It can be used to trigger retries in retry frameworks like Failsafe.
+ */
+public class ServerErrorException extends RuntimeException {
+ private final int statusCode;
+
+ /**
+ * Constructs a new {@code ServerErrorException} with the given status code and message.
+ *
+ * @param statusCode the HTTP status code (should be in the 5xx range)
+ * @param message the detail message explaining the error
+ * @param cause the original cause of the error
+ */
+ public ServerErrorException(int statusCode, String message, Throwable cause) {
+ super("Server error [" + statusCode + "]: " + message, cause);
+ this.statusCode = statusCode;
+ }
+
+ /**
+ * Returns the HTTP status code associated with this server error.
+ *
+ * @return the 5xx HTTP status code that triggered this exception
+ */
+ public int getStatusCode() {
+ return statusCode;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java b/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java
index 4cedc83c8..4e4fadb2d 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java
@@ -21,16 +21,24 @@
import com.google.bigtable.repackaged.com.google.gson.Gson;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.CredentialFactory;
-import io.cdap.cdap.api.exception.ErrorCategory;
-import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
+import com.google.common.annotations.VisibleForTesting;
+import dev.failsafe.Failsafe;
+import dev.failsafe.FailsafeException;
+import dev.failsafe.RetryPolicy;
import io.cdap.cdap.api.exception.ErrorType;
-import io.cdap.cdap.api.exception.ErrorUtils;
+import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
import io.cdap.plugin.gcp.common.GCPUtils;
+import io.cdap.plugin.gcp.common.ServerErrorException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.time.Duration;
import java.time.Instant;
import java.util.Date;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -43,19 +51,65 @@ public class ServiceAccountAccessTokenProvider implements AccessTokenProvider {
private Configuration conf;
private GoogleCredentials credentials;
private static final Gson GSON = new Gson();
+ private static final Logger LOG = LoggerFactory.getLogger(ServiceAccountAccessTokenProvider.class);
+ public static final int DEFAULT_INITIAL_RETRY_DURATION_SECONDS = 5;
+ public static final int DEFAULT_MAX_RETRY_COUNT = 5;
+ public static final int DEFAULT_MAX_RETRY_DURATION_SECONDS = 80;
+ private static final RetryPolicy RETRY_POLICY = createRetryPolicy();
+ private static final Pattern SERVER_ERROR_PATTERN = Pattern.compile("Unexpected Error code 5\\d{2} trying to get " +
+ "security access token from Compute Engine metadata for the default service account.*");
+ @VisibleForTesting
@Override
public AccessToken getAccessToken() {
- try {
- com.google.auth.oauth2.AccessToken token = getCredentials().getAccessToken();
- if (token == null || token.getExpirationTime().before(Date.from(Instant.now()))) {
- refresh();
- token = getCredentials().getAccessToken();
+ try {
+ return Failsafe.with(RETRY_POLICY).get(() -> {
+ com.google.auth.oauth2.AccessToken token = retrieveAccessToken();
+ if (token == null || token.getExpirationTime().before(Date.from(Instant.now()))) {
+ refresh();
+ token = retrieveAccessToken();
+ }
+ return new AccessToken(token.getTokenValue(), token.getExpirationTime().getTime());
+ });
+ } catch (FailsafeException e) {
+ Throwable t = e.getCause() != null ? e.getCause() : e;
+ ErrorType errorType = (t instanceof ServerErrorException) ? ErrorType.SYSTEM : ErrorType.UNKNOWN;
+ throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(
+ e, "Unable to get service account access token after retries.", errorType, true,
+ GCPUtils.GCE_METADATA_SERVER_ERROR_SUPPORTED_DOC_URL
+ );
}
- return new AccessToken(token.getTokenValue(), token.getExpirationTime().getTime());
+ }
+
+ private static RetryPolicy createRetryPolicy() {
+ return RetryPolicy.builder()
+ .handle(ServerErrorException.class)
+ .withBackoff(Duration.ofSeconds(DEFAULT_INITIAL_RETRY_DURATION_SECONDS),
+ Duration.ofSeconds(DEFAULT_MAX_RETRY_DURATION_SECONDS))
+ .withMaxRetries(DEFAULT_MAX_RETRY_COUNT)
+ .onRetry(event -> LOG.debug("Retry attempt {} due to {}", event.getAttemptCount(), event.getLastException().
+ getMessage()))
+ .onSuccess(event -> LOG.debug("Access Token Fetched Successfully."))
+ .onRetriesExceeded(
+ event -> LOG.error("Unable to get service account access token after {} retries.", event.getAttemptCount() - 1))
+ .build();
+ }
+
+ @VisibleForTesting
+ static boolean isServerError(IOException e) {
+ String msg = e.getMessage();
+ return msg != null && SERVER_ERROR_PATTERN.matcher(msg).matches();
+ }
+
+ com.google.auth.oauth2.AccessToken retrieveAccessToken() throws IOException {
+ try {
+ return getCredentials().getAccessToken();
} catch (IOException e) {
- throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
- "Unable to get service account access token.", e.getMessage(), ErrorType.UNKNOWN, true, e);
+ if (isServerError(e)) {
+ throw new ServerErrorException(HttpStatus.SC_SERVICE_UNAVAILABLE, "Server error while fetching access token: "
+ + e.getMessage(), e);
+ }
+ throw e;
}
}
@@ -64,9 +118,13 @@ public void refresh() throws IOException {
try {
getCredentials().refresh();
} catch (IOException e) {
- throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
- "Unable to refresh service account access token.", e.getMessage(),
- ErrorType.UNKNOWN, true, e);
+ if (isServerError(e)) {
+ throw new ServerErrorException(HttpStatus.SC_SERVICE_UNAVAILABLE, "Server error during refresh: " +
+ e.getMessage(), e);
+ }
+ throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(
+ e, "Unable to refresh service account access token.", ErrorType.UNKNOWN, true,
+ GCPUtils.GCE_METADATA_SERVER_ERROR_SUPPORTED_DOC_URL);
}
}
diff --git a/src/test/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProviderTest.java b/src/test/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProviderTest.java
index 7a54a3bc2..bf7e4f58b 100644
--- a/src/test/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProviderTest.java
+++ b/src/test/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProviderTest.java
@@ -17,6 +17,7 @@
package io.cdap.plugin.gcp.gcs;
import com.google.api.client.auth.oauth2.Credential;
+import com.google.auth.oauth2.AccessToken;
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.CredentialFactory;
@@ -24,11 +25,15 @@
import com.google.cloud.hadoop.util.HadoopCredentialConfiguration;
import com.google.common.collect.ImmutableList;
import io.cdap.plugin.gcp.common.GCPUtils;
+import io.cdap.plugin.gcp.common.ServerErrorException;
import org.apache.hadoop.conf.Configuration;
-import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
+
import java.io.IOException;
+import java.time.Instant;
+import java.util.Date;
import java.util.Map;
/**
@@ -71,4 +76,68 @@ public void testServiceAccountAccessTokenProvider() throws IOException {
);
Assert.assertNotNull(credential);
}
+
+ @Test
+ public void testIsServerErrorWith5xx() {
+ IOException serverError = new IOException(
+ "Unexpected Error code 500 trying to get security access token from Compute Engine metadata for the default " +
+ "service account.");
+ Assert.assertTrue(ServiceAccountAccessTokenProvider.isServerError(serverError));
+ }
+
+ @Test
+ public void testIsServerErrorWithNon5xxErrorCode400() {
+ IOException clientError = new IOException(
+ "Unexpected Error code 400 trying to get security access token from Compute Engine metadata for the default " +
+ "service account.");
+ Assert.assertFalse(ServiceAccountAccessTokenProvider.isServerError(clientError));
+ }
+
+ @Test
+ public void testIsServerErrorWithNon5xxErrorCode403() {
+ IOException forbiddenError = new IOException(
+ "Unexpected Error code 403 trying to get security access token from Compute Engine metadata for the default " +
+ "service account.");
+ Assert.assertFalse(ServiceAccountAccessTokenProvider.isServerError(forbiddenError));
+ }
+
+ @Test
+ public void testIsServerErrorWith5xxErrorCode503() {
+ IOException serverError = new IOException(
+ "Unexpected Error code 503 trying to get security access token from Compute Engine metadata for the default " +
+ "service account.");
+ Assert.assertTrue(ServiceAccountAccessTokenProvider.isServerError(serverError));
+ }
+
+ @Test(expected = ServerErrorException.class)
+ public void testRetryMechanismFailsAfterMaxRetries() throws IOException {
+ ServiceAccountAccessTokenProvider provider = Mockito.spy(new ServiceAccountAccessTokenProvider());
+ Mockito.doThrow(new ServerErrorException(503, "Unexpected Error code 503 trying to get security access token " +
+ "from Compute Engine metadata for the default service account.", null))
+ .when(provider).retrieveAccessToken();
+ provider.getAccessToken();
+ }
+
+ @Test
+ public void testRetryMechanismSucceedsAfterFewRetries() throws IOException {
+ ServiceAccountAccessTokenProvider provider = Mockito.spy(new ServiceAccountAccessTokenProvider());
+
+ // Create a valid token with future expiration
+ AccessToken validToken = new AccessToken("valid-token", Date.from(Instant.now().plusSeconds(3600)));
+
+ // Fail first 2 attempts, then succeed
+ Mockito.doThrow(new ServerErrorException(503, "Unexpected Error code 503 trying to get security access token " +
+ "from Compute Engine metadata for the default service account.", null))
+ .doThrow(new ServerErrorException(500, "Unexpected Error code 500 trying to get security access token " +
+ "from Compute Engine metadata for the default service account.", null))
+ .doReturn(validToken)
+ .when(provider).retrieveAccessToken();
+ AccessTokenProvider.AccessToken accessToken = provider.getAccessToken();
+
+ Assert.assertNotNull(accessToken);
+ Assert.assertEquals("valid-token", accessToken.getToken());
+
+ // Verify that retrieveAccessToken was called 3 times (2 failures + 1 success)
+ Mockito.verify(provider, Mockito.times(3)).retrieveAccessToken();
+ }
}