Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class GCPUtils {
public static final int BQ_DEFAULT_READ_TIMEOUT_SECONDS = 120;
public static final String DATASTORE_SUPPORTED_DOC_URL = "https://cloud.google.com/datastore/docs/concepts/errors";
public static final String BIG_TABLE_SUPPORTED_DOC_URL = "https://cloud.google.com/bigtable/docs/status-codes";
public static final String GCE_METADATA_SERVER_ERROR_SUPPORTED_DOC_URL =
"https://cloud.google.com/compute/docs/troubleshooting/troubleshoot-metadata-server";

/**
* Load a service account from the local file system.
Expand Down
49 changes: 49 additions & 0 deletions src/main/java/io/cdap/plugin/gcp/common/ServerErrorException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.gcp.common;

/**
* Exception indicating a server-side error (HTTP 5xx).
* <p>
* This exception is intended to be used when a server responds with an HTTP 5xx status code,
* which typically indicates temporary unavailability or failure on the server's part.
* It can be used to trigger retries in retry frameworks like Failsafe.
*/
public class ServerErrorException extends RuntimeException {
private final int statusCode;

/**
* Constructs a new {@code ServerErrorException} with the given status code and message.
*
* @param statusCode the HTTP status code (should be in the 5xx range)
* @param message the detail message explaining the error
* @param cause the original cause of the error
*/
public ServerErrorException(int statusCode, String message, Throwable cause) {
super("Server error [" + statusCode + "]: " + message, cause);
this.statusCode = statusCode;
}

/**
* Returns the HTTP status code associated with this server error.
*
* @return the 5xx HTTP status code that triggered this exception
*/
public int getStatusCode() {
return statusCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,24 @@
import com.google.bigtable.repackaged.com.google.gson.Gson;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.CredentialFactory;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import com.google.common.annotations.VisibleForTesting;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeException;
import dev.failsafe.RetryPolicy;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.common.ServerErrorException;
import org.apache.hadoop.conf.Configuration;
import org.apache.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -43,19 +51,65 @@ public class ServiceAccountAccessTokenProvider implements AccessTokenProvider {
private Configuration conf;
private GoogleCredentials credentials;
private static final Gson GSON = new Gson();
private static final Logger LOG = LoggerFactory.getLogger(ServiceAccountAccessTokenProvider.class);
public static final int DEFAULT_INITIAL_RETRY_DURATION_SECONDS = 5;
public static final int DEFAULT_MAX_RETRY_COUNT = 5;
public static final int DEFAULT_MAX_RETRY_DURATION_SECONDS = 80;
private static final RetryPolicy<Object> RETRY_POLICY = createRetryPolicy();
private static final Pattern SERVER_ERROR_PATTERN = Pattern.compile("Unexpected Error code 5\\d{2} trying to get " +
"security access token from Compute Engine metadata for the default service account.*");

@VisibleForTesting
@Override
public AccessToken getAccessToken() {
try {
com.google.auth.oauth2.AccessToken token = getCredentials().getAccessToken();
if (token == null || token.getExpirationTime().before(Date.from(Instant.now()))) {
refresh();
token = getCredentials().getAccessToken();
try {
return Failsafe.with(RETRY_POLICY).get(() -> {
com.google.auth.oauth2.AccessToken token = retrieveAccessToken();
if (token == null || token.getExpirationTime().before(Date.from(Instant.now()))) {
refresh();
token = retrieveAccessToken();
}
return new AccessToken(token.getTokenValue(), token.getExpirationTime().getTime());
});
} catch (FailsafeException e) {
Throwable t = e.getCause() != null ? e.getCause() : e;
ErrorType errorType = (t instanceof ServerErrorException) ? ErrorType.SYSTEM : ErrorType.UNKNOWN;
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(
e, "Unable to get service account access token after retries.", errorType, true,
GCPUtils.GCE_METADATA_SERVER_ERROR_SUPPORTED_DOC_URL
);
}
return new AccessToken(token.getTokenValue(), token.getExpirationTime().getTime());
}

private static RetryPolicy<Object> createRetryPolicy() {
return RetryPolicy.builder()
.handle(ServerErrorException.class)
.withBackoff(Duration.ofSeconds(DEFAULT_INITIAL_RETRY_DURATION_SECONDS),
Duration.ofSeconds(DEFAULT_MAX_RETRY_DURATION_SECONDS))
.withMaxRetries(DEFAULT_MAX_RETRY_COUNT)
.onRetry(event -> LOG.debug("Retry attempt {} due to {}", event.getAttemptCount(), event.getLastException().
getMessage()))
.onSuccess(event -> LOG.debug("Access Token Fetched Successfully."))
.onRetriesExceeded(
event -> LOG.error("Unable to get service account access token after {} retries.", event.getAttemptCount() - 1))
.build();
}

@VisibleForTesting
static boolean isServerError(IOException e) {
String msg = e.getMessage();
return msg != null && SERVER_ERROR_PATTERN.matcher(msg).matches();
}

com.google.auth.oauth2.AccessToken retrieveAccessToken() throws IOException {
try {
return getCredentials().getAccessToken();
} catch (IOException e) {
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
"Unable to get service account access token.", e.getMessage(), ErrorType.UNKNOWN, true, e);
if (isServerError(e)) {
throw new ServerErrorException(HttpStatus.SC_SERVICE_UNAVAILABLE, "Server error while fetching access token: "
+ e.getMessage(), e);
}
throw e;
}
}

Expand All @@ -64,9 +118,13 @@ public void refresh() throws IOException {
try {
getCredentials().refresh();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add retries on getCredentials().refresh()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

} catch (IOException e) {
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
"Unable to refresh service account access token.", e.getMessage(),
ErrorType.UNKNOWN, true, e);
if (isServerError(e)) {
throw new ServerErrorException(HttpStatus.SC_SERVICE_UNAVAILABLE, "Server error during refresh: " +
e.getMessage(), e);
}
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(
e, "Unable to refresh service account access token.", ErrorType.UNKNOWN, true,
GCPUtils.GCE_METADATA_SERVER_ERROR_SUPPORTED_DOC_URL);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@
package io.cdap.plugin.gcp.gcs;

import com.google.api.client.auth.oauth2.Credential;
import com.google.auth.oauth2.AccessToken;
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.CredentialFactory;
import com.google.cloud.hadoop.util.CredentialFromAccessTokenProviderClassFactory;
import com.google.cloud.hadoop.util.HadoopCredentialConfiguration;
import com.google.common.collect.ImmutableList;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.common.ServerErrorException;
import org.apache.hadoop.conf.Configuration;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.time.Instant;
import java.util.Date;
import java.util.Map;

/**
Expand Down Expand Up @@ -71,4 +76,68 @@ public void testServiceAccountAccessTokenProvider() throws IOException {
);
Assert.assertNotNull(credential);
}

@Test
public void testIsServerErrorWith5xx() {
IOException serverError = new IOException(
"Unexpected Error code 500 trying to get security access token from Compute Engine metadata for the default " +
"service account.");
Assert.assertTrue(ServiceAccountAccessTokenProvider.isServerError(serverError));
}

@Test
public void testIsServerErrorWithNon5xxErrorCode400() {
IOException clientError = new IOException(
"Unexpected Error code 400 trying to get security access token from Compute Engine metadata for the default " +
"service account.");
Assert.assertFalse(ServiceAccountAccessTokenProvider.isServerError(clientError));
}

@Test
public void testIsServerErrorWithNon5xxErrorCode403() {
IOException forbiddenError = new IOException(
"Unexpected Error code 403 trying to get security access token from Compute Engine metadata for the default " +
"service account.");
Assert.assertFalse(ServiceAccountAccessTokenProvider.isServerError(forbiddenError));
}

@Test
public void testIsServerErrorWith5xxErrorCode503() {
IOException serverError = new IOException(
"Unexpected Error code 503 trying to get security access token from Compute Engine metadata for the default " +
"service account.");
Assert.assertTrue(ServiceAccountAccessTokenProvider.isServerError(serverError));
}

@Test(expected = ServerErrorException.class)
public void testRetryMechanismFailsAfterMaxRetries() throws IOException {
ServiceAccountAccessTokenProvider provider = Mockito.spy(new ServiceAccountAccessTokenProvider());
Mockito.doThrow(new ServerErrorException(503, "Unexpected Error code 503 trying to get security access token " +
"from Compute Engine metadata for the default service account.", null))
.when(provider).retrieveAccessToken();
provider.getAccessToken();
}

@Test
public void testRetryMechanismSucceedsAfterFewRetries() throws IOException {
ServiceAccountAccessTokenProvider provider = Mockito.spy(new ServiceAccountAccessTokenProvider());

// Create a valid token with future expiration
AccessToken validToken = new AccessToken("valid-token", Date.from(Instant.now().plusSeconds(3600)));

// Fail first 2 attempts, then succeed
Mockito.doThrow(new ServerErrorException(503, "Unexpected Error code 503 trying to get security access token " +
"from Compute Engine metadata for the default service account.", null))
.doThrow(new ServerErrorException(500, "Unexpected Error code 500 trying to get security access token " +
"from Compute Engine metadata for the default service account.", null))
.doReturn(validToken)
.when(provider).retrieveAccessToken();
AccessTokenProvider.AccessToken accessToken = provider.getAccessToken();

Assert.assertNotNull(accessToken);
Assert.assertEquals("valid-token", accessToken.getToken());

// Verify that retrieveAccessToken was called 3 times (2 failures + 1 success)
Mockito.verify(provider, Mockito.times(3)).retrieveAccessToken();
}
}
Loading