2121import com .google .bigtable .repackaged .com .google .gson .Gson ;
2222import com .google .cloud .hadoop .util .AccessTokenProvider ;
2323import com .google .cloud .hadoop .util .CredentialFactory ;
24+ import dev .failsafe .Failsafe ;
25+ import dev .failsafe .RetryPolicy ;
2426import io .cdap .cdap .api .exception .ErrorCategory ;
2527import io .cdap .cdap .api .exception .ErrorCategory .ErrorCategoryEnum ;
2628import io .cdap .cdap .api .exception .ErrorType ;
2729import io .cdap .cdap .api .exception .ErrorUtils ;
30+ import io .cdap .plugin .gcp .bigquery .source .BigQuerySourceConfig ;
31+ import io .cdap .plugin .gcp .bigquery .util .BigQueryConstants ;
2832import io .cdap .plugin .gcp .common .GCPUtils ;
33+ import io .cdap .plugin .gcp .common .ServerErrorException ;
2934import org .apache .hadoop .conf .Configuration ;
35+ import org .apache .http .HttpStatus ;
36+ import org .slf4j .Logger ;
37+ import org .slf4j .LoggerFactory ;
3038
3139import java .io .IOException ;
40+ import java .time .Duration ;
3241import java .time .Instant ;
3342import java .util .Date ;
3443import java .util .stream .Collectors ;
@@ -43,19 +52,70 @@ public class ServiceAccountAccessTokenProvider implements AccessTokenProvider {
4352 private Configuration conf ;
4453 private GoogleCredentials credentials ;
4554 private static final Gson GSON = new Gson ();
55+ private static final Logger logger = LoggerFactory .getLogger (ServiceAccountAccessTokenProvider .class );
56+ public static final int DEFAULT_INITIAL_RETRY_DURATION_SECONDS = 5 ;
57+ public static final int DEFAULT_MAX_RETRY_COUNT = 5 ;
58+ public static final int DEFAULT_MAX_RETRY_DURATION_SECONDS = 80 ;
59+
4660
4761 @ Override
4862 public AccessToken getAccessToken () {
49- try {
50- com .google .auth .oauth2 .AccessToken token = getCredentials ().getAccessToken ();
51- if (token == null || token .getExpirationTime ().before (Date .from (Instant .now ()))) {
52- refresh ();
53- token = getCredentials ().getAccessToken ();
63+ int initialRetryDuration = DEFAULT_INITIAL_RETRY_DURATION_SECONDS ;
64+ int maxRetryCount = DEFAULT_MAX_RETRY_COUNT ;
65+ int maxRetryDuration = DEFAULT_MAX_RETRY_DURATION_SECONDS ;
66+ logger .debug (
67+ "Initializing RetryPolicy with the following configuration: MaxRetryCount: {}, InitialRetryDuration: {}s, " +
68+ "MaxRetryDuration: {}s" , maxRetryCount , initialRetryDuration , maxRetryDuration );
69+ try {
70+ return Failsafe .with (getRetryPolicy (initialRetryDuration , maxRetryDuration , maxRetryCount ))
71+ .get (() -> {
72+ com .google .auth .oauth2 .AccessToken token = safeGetAccessToken ();
73+ if (token == null || token .getExpirationTime ().before (Date .from (Instant .now ()))) {
74+ refresh ();
75+ token = safeGetAccessToken ();
76+ }
77+ return new AccessToken (token .getTokenValue (), token .getExpirationTime ().getTime ());
78+ });
79+ } catch (Exception e ) {
80+ throw ErrorUtils .getProgramFailureException (
81+ new ErrorCategory (ErrorCategoryEnum .PLUGIN ),
82+ "Unable to get service account access token after retries." ,
83+ e .getMessage (),
84+ ErrorType .UNKNOWN ,
85+ true ,
86+ e
87+ );
5488 }
55- return new AccessToken (token .getTokenValue (), token .getExpirationTime ().getTime ());
89+ }
90+
91+
92+ private RetryPolicy <Object > getRetryPolicy (int initialRetryDuration , int maxRetryDuration ,
93+ int maxRetryCount ) {
94+ return RetryPolicy .builder ()
95+ .handle (ServerErrorException .class )
96+ .withBackoff (Duration .ofSeconds (initialRetryDuration ), Duration .ofSeconds (maxRetryDuration ))
97+ .withMaxRetries (maxRetryCount )
98+ .onRetry (event -> logger .debug ("Retry attempt {} due to {}" , event .getAttemptCount (), event .getLastException ().
99+ getMessage ()))
100+ .onSuccess (event -> logger .debug ("Access Token Fetched Successfully ." ))
101+ .onRetriesExceeded (event -> logger .error ("Retry limit reached for Service account." ))
102+ .build ();
103+ }
104+
105+ private boolean isServerError (IOException e ) {
106+ String msg = e .getMessage ();
107+ return msg != null && msg .matches ("^5\\ d{2}$" ); // crude check for 5xx codes
108+ }
109+
110+ private com .google .auth .oauth2 .AccessToken safeGetAccessToken () throws IOException {
111+ try {
112+ return getCredentials ().getAccessToken ();
56113 } catch (IOException e ) {
57- throw ErrorUtils .getProgramFailureException (new ErrorCategory (ErrorCategoryEnum .PLUGIN ),
58- "Unable to get service account access token." , e .getMessage (), ErrorType .UNKNOWN , true , e );
114+ if (isServerError (e )) {
115+ throw new ServerErrorException (HttpStatus .SC_SERVICE_UNAVAILABLE , "Server error while fetching access token: "
116+ + e .getMessage ());
117+ }
118+ throw e ;
59119 }
60120 }
61121
@@ -64,6 +124,10 @@ public void refresh() throws IOException {
64124 try {
65125 getCredentials ().refresh ();
66126 } catch (IOException e ) {
127+ if (isServerError (e )) {
128+ throw new ServerErrorException (HttpStatus .SC_SERVICE_UNAVAILABLE , "Server error during refresh: " +
129+ e .getMessage ());
130+ }
67131 throw ErrorUtils .getProgramFailureException (new ErrorCategory (ErrorCategoryEnum .PLUGIN ),
68132 "Unable to refresh service account access token." , e .getMessage (),
69133 ErrorType .UNKNOWN , true , e );
0 commit comments