Skip to content

Commit ac1a931

Browse files
authored
Merge pull request #1567 from cloudsufi/cherry-pick/acd0f0d6ba4aa23763cd16252960bd108b909785
[🍒] [PLUGIN-1808]Retry policy for service account for 5xx errors
2 parents 9d776bf + 12e0ca3 commit ac1a931

File tree

4 files changed

+193
-15
lines changed

4 files changed

+193
-15
lines changed

src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ public class GCPUtils {
9191
public static final int BQ_DEFAULT_READ_TIMEOUT_SECONDS = 120;
9292
public static final String DATASTORE_SUPPORTED_DOC_URL = "https://cloud.google.com/datastore/docs/concepts/errors";
9393
public static final String BIG_TABLE_SUPPORTED_DOC_URL = "https://cloud.google.com/bigtable/docs/status-codes";
94+
public static final String GCE_METADATA_SERVER_ERROR_SUPPORTED_DOC_URL =
95+
"https://cloud.google.com/compute/docs/troubleshooting/troubleshoot-metadata-server";
9496

9597
/**
9698
* Load a service account from the local file system.
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.gcp.common;
18+
19+
/**
20+
* Exception indicating a server-side error (HTTP 5xx).
21+
* <p>
22+
* This exception is intended to be used when a server responds with an HTTP 5xx status code,
23+
* which typically indicates temporary unavailability or failure on the server's part.
24+
* It can be used to trigger retries in retry frameworks like Failsafe.
25+
*/
26+
public class ServerErrorException extends RuntimeException {
27+
private final int statusCode;
28+
29+
/**
30+
* Constructs a new {@code ServerErrorException} with the given status code and message.
31+
*
32+
* @param statusCode the HTTP status code (should be in the 5xx range)
33+
* @param message the detail message explaining the error
34+
* @param cause the original cause of the error
35+
*/
36+
public ServerErrorException(int statusCode, String message, Throwable cause) {
37+
super("Server error [" + statusCode + "]: " + message, cause);
38+
this.statusCode = statusCode;
39+
}
40+
41+
/**
42+
* Returns the HTTP status code associated with this server error.
43+
*
44+
* @return the 5xx HTTP status code that triggered this exception
45+
*/
46+
public int getStatusCode() {
47+
return statusCode;
48+
}
49+
}

src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java

Lines changed: 72 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,24 @@
2121
import com.google.bigtable.repackaged.com.google.gson.Gson;
2222
import com.google.cloud.hadoop.util.AccessTokenProvider;
2323
import com.google.cloud.hadoop.util.CredentialFactory;
24-
import io.cdap.cdap.api.exception.ErrorCategory;
25-
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
24+
import com.google.common.annotations.VisibleForTesting;
25+
import dev.failsafe.Failsafe;
26+
import dev.failsafe.FailsafeException;
27+
import dev.failsafe.RetryPolicy;
2628
import io.cdap.cdap.api.exception.ErrorType;
27-
import io.cdap.cdap.api.exception.ErrorUtils;
29+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
2830
import io.cdap.plugin.gcp.common.GCPUtils;
31+
import io.cdap.plugin.gcp.common.ServerErrorException;
2932
import org.apache.hadoop.conf.Configuration;
33+
import org.apache.http.HttpStatus;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
3036

3137
import java.io.IOException;
38+
import java.time.Duration;
3239
import java.time.Instant;
3340
import java.util.Date;
41+
import java.util.regex.Pattern;
3442
import java.util.stream.Collectors;
3543
import java.util.stream.Stream;
3644

@@ -43,19 +51,65 @@ public class ServiceAccountAccessTokenProvider implements AccessTokenProvider {
4351
private Configuration conf;
4452
private GoogleCredentials credentials;
4553
private static final Gson GSON = new Gson();
54+
private static final Logger LOG = LoggerFactory.getLogger(ServiceAccountAccessTokenProvider.class);
55+
public static final int DEFAULT_INITIAL_RETRY_DURATION_SECONDS = 5;
56+
public static final int DEFAULT_MAX_RETRY_COUNT = 5;
57+
public static final int DEFAULT_MAX_RETRY_DURATION_SECONDS = 80;
58+
private static final RetryPolicy<Object> RETRY_POLICY = createRetryPolicy();
59+
private static final Pattern SERVER_ERROR_PATTERN = Pattern.compile("Unexpected Error code 5\\d{2} trying to get " +
60+
"security access token from Compute Engine metadata for the default service account.*");
4661

62+
@VisibleForTesting
4763
@Override
4864
public AccessToken getAccessToken() {
49-
try {
50-
com.google.auth.oauth2.AccessToken token = getCredentials().getAccessToken();
51-
if (token == null || token.getExpirationTime().before(Date.from(Instant.now()))) {
52-
refresh();
53-
token = getCredentials().getAccessToken();
65+
try {
66+
return Failsafe.with(RETRY_POLICY).get(() -> {
67+
com.google.auth.oauth2.AccessToken token = retrieveAccessToken();
68+
if (token == null || token.getExpirationTime().before(Date.from(Instant.now()))) {
69+
refresh();
70+
token = retrieveAccessToken();
71+
}
72+
return new AccessToken(token.getTokenValue(), token.getExpirationTime().getTime());
73+
});
74+
} catch (FailsafeException e) {
75+
Throwable t = e.getCause() != null ? e.getCause() : e;
76+
ErrorType errorType = (t instanceof ServerErrorException) ? ErrorType.SYSTEM : ErrorType.UNKNOWN;
77+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(
78+
e, "Unable to get service account access token after retries.", errorType, true,
79+
GCPUtils.GCE_METADATA_SERVER_ERROR_SUPPORTED_DOC_URL
80+
);
5481
}
55-
return new AccessToken(token.getTokenValue(), token.getExpirationTime().getTime());
82+
}
83+
84+
private static RetryPolicy<Object> createRetryPolicy() {
85+
return RetryPolicy.builder()
86+
.handle(ServerErrorException.class)
87+
.withBackoff(Duration.ofSeconds(DEFAULT_INITIAL_RETRY_DURATION_SECONDS),
88+
Duration.ofSeconds(DEFAULT_MAX_RETRY_DURATION_SECONDS))
89+
.withMaxRetries(DEFAULT_MAX_RETRY_COUNT)
90+
.onRetry(event -> LOG.debug("Retry attempt {} due to {}", event.getAttemptCount(), event.getLastException().
91+
getMessage()))
92+
.onSuccess(event -> LOG.debug("Access Token Fetched Successfully."))
93+
.onRetriesExceeded(
94+
event -> LOG.error("Unable to get service account access token after {} retries.", event.getAttemptCount() - 1))
95+
.build();
96+
}
97+
98+
@VisibleForTesting
99+
static boolean isServerError(IOException e) {
100+
String msg = e.getMessage();
101+
return msg != null && SERVER_ERROR_PATTERN.matcher(msg).matches();
102+
}
103+
104+
com.google.auth.oauth2.AccessToken retrieveAccessToken() throws IOException {
105+
try {
106+
return getCredentials().getAccessToken();
56107
} catch (IOException e) {
57-
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
58-
"Unable to get service account access token.", e.getMessage(), ErrorType.UNKNOWN, true, e);
108+
if (isServerError(e)) {
109+
throw new ServerErrorException(HttpStatus.SC_SERVICE_UNAVAILABLE, "Server error while fetching access token: "
110+
+ e.getMessage(), e);
111+
}
112+
throw e;
59113
}
60114
}
61115

@@ -64,9 +118,13 @@ public void refresh() throws IOException {
64118
try {
65119
getCredentials().refresh();
66120
} catch (IOException e) {
67-
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
68-
"Unable to refresh service account access token.", e.getMessage(),
69-
ErrorType.UNKNOWN, true, e);
121+
if (isServerError(e)) {
122+
throw new ServerErrorException(HttpStatus.SC_SERVICE_UNAVAILABLE, "Server error during refresh: " +
123+
e.getMessage(), e);
124+
}
125+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(
126+
e, "Unable to refresh service account access token.", ErrorType.UNKNOWN, true,
127+
GCPUtils.GCE_METADATA_SERVER_ERROR_SUPPORTED_DOC_URL);
70128
}
71129
}
72130

src/test/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProviderTest.java

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,23 @@
1717
package io.cdap.plugin.gcp.gcs;
1818

1919
import com.google.api.client.auth.oauth2.Credential;
20+
import com.google.auth.oauth2.AccessToken;
2021
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration;
2122
import com.google.cloud.hadoop.util.AccessTokenProvider;
2223
import com.google.cloud.hadoop.util.CredentialFactory;
2324
import com.google.cloud.hadoop.util.CredentialFromAccessTokenProviderClassFactory;
2425
import com.google.cloud.hadoop.util.HadoopCredentialConfiguration;
2526
import com.google.common.collect.ImmutableList;
2627
import io.cdap.plugin.gcp.common.GCPUtils;
28+
import io.cdap.plugin.gcp.common.ServerErrorException;
2729
import org.apache.hadoop.conf.Configuration;
28-
import org.hamcrest.CoreMatchers;
2930
import org.junit.Assert;
3031
import org.junit.Test;
32+
import org.mockito.Mockito;
33+
3134
import java.io.IOException;
35+
import java.time.Instant;
36+
import java.util.Date;
3237
import java.util.Map;
3338

3439
/**
@@ -71,4 +76,68 @@ public void testServiceAccountAccessTokenProvider() throws IOException {
7176
);
7277
Assert.assertNotNull(credential);
7378
}
79+
80+
@Test
81+
public void testIsServerErrorWith5xx() {
82+
IOException serverError = new IOException(
83+
"Unexpected Error code 500 trying to get security access token from Compute Engine metadata for the default " +
84+
"service account.");
85+
Assert.assertTrue(ServiceAccountAccessTokenProvider.isServerError(serverError));
86+
}
87+
88+
@Test
89+
public void testIsServerErrorWithNon5xxErrorCode400() {
90+
IOException clientError = new IOException(
91+
"Unexpected Error code 400 trying to get security access token from Compute Engine metadata for the default " +
92+
"service account.");
93+
Assert.assertFalse(ServiceAccountAccessTokenProvider.isServerError(clientError));
94+
}
95+
96+
@Test
97+
public void testIsServerErrorWithNon5xxErrorCode403() {
98+
IOException forbiddenError = new IOException(
99+
"Unexpected Error code 403 trying to get security access token from Compute Engine metadata for the default " +
100+
"service account.");
101+
Assert.assertFalse(ServiceAccountAccessTokenProvider.isServerError(forbiddenError));
102+
}
103+
104+
@Test
105+
public void testIsServerErrorWith5xxErrorCode503() {
106+
IOException serverError = new IOException(
107+
"Unexpected Error code 503 trying to get security access token from Compute Engine metadata for the default " +
108+
"service account.");
109+
Assert.assertTrue(ServiceAccountAccessTokenProvider.isServerError(serverError));
110+
}
111+
112+
@Test(expected = ServerErrorException.class)
113+
public void testRetryMechanismFailsAfterMaxRetries() throws IOException {
114+
ServiceAccountAccessTokenProvider provider = Mockito.spy(new ServiceAccountAccessTokenProvider());
115+
Mockito.doThrow(new ServerErrorException(503, "Unexpected Error code 503 trying to get security access token " +
116+
"from Compute Engine metadata for the default service account.", null))
117+
.when(provider).retrieveAccessToken();
118+
provider.getAccessToken();
119+
}
120+
121+
@Test
122+
public void testRetryMechanismSucceedsAfterFewRetries() throws IOException {
123+
ServiceAccountAccessTokenProvider provider = Mockito.spy(new ServiceAccountAccessTokenProvider());
124+
125+
// Create a valid token with future expiration
126+
AccessToken validToken = new AccessToken("valid-token", Date.from(Instant.now().plusSeconds(3600)));
127+
128+
// Fail first 2 attempts, then succeed
129+
Mockito.doThrow(new ServerErrorException(503, "Unexpected Error code 503 trying to get security access token " +
130+
"from Compute Engine metadata for the default service account.", null))
131+
.doThrow(new ServerErrorException(500, "Unexpected Error code 500 trying to get security access token " +
132+
"from Compute Engine metadata for the default service account.", null))
133+
.doReturn(validToken)
134+
.when(provider).retrieveAccessToken();
135+
AccessTokenProvider.AccessToken accessToken = provider.getAccessToken();
136+
137+
Assert.assertNotNull(accessToken);
138+
Assert.assertEquals("valid-token", accessToken.getToken());
139+
140+
// Verify that retrieveAccessToken was called 3 times (2 failures + 1 success)
141+
Mockito.verify(provider, Mockito.times(3)).retrieveAccessToken();
142+
}
74143
}

0 commit comments

Comments
 (0)