2222import com .google .cloud .hadoop .util .AccessTokenProvider ;
2323import com .google .cloud .hadoop .util .CredentialFactory ;
2424import dev .failsafe .Failsafe ;
25+ import dev .failsafe .FailsafeException ;
2526import dev .failsafe .RetryPolicy ;
2627import io .cdap .cdap .api .exception .ErrorCategory ;
2728import io .cdap .cdap .api .exception .ErrorCategory .ErrorCategoryEnum ;
2829import io .cdap .cdap .api .exception .ErrorType ;
2930import io .cdap .cdap .api .exception .ErrorUtils ;
30- import io .cdap .plugin .gcp .bigquery .source .BigQuerySourceConfig ;
31- import io .cdap .plugin .gcp .bigquery .util .BigQueryConstants ;
3231import io .cdap .plugin .gcp .common .GCPUtils ;
3332import io .cdap .plugin .gcp .common .ServerErrorException ;
3433import org .apache .hadoop .conf .Configuration ;
@@ -52,53 +51,50 @@ public class ServiceAccountAccessTokenProvider implements AccessTokenProvider {
5251 private Configuration conf ;
5352 private GoogleCredentials credentials ;
5453 private static final Gson GSON = new Gson ();
55- private static final Logger logger = LoggerFactory .getLogger (ServiceAccountAccessTokenProvider .class );
54+ private static final Logger LOG = LoggerFactory .getLogger (ServiceAccountAccessTokenProvider .class );
5655 public static final int DEFAULT_INITIAL_RETRY_DURATION_SECONDS = 5 ;
5756 public static final int DEFAULT_MAX_RETRY_COUNT = 5 ;
5857 public static final int DEFAULT_MAX_RETRY_DURATION_SECONDS = 80 ;
59-
58+ private static final RetryPolicy < Object > RETRY_POLICY = createRetryPolicy ();
6059
6160 @ Override
6261 public AccessToken getAccessToken () {
63- int initialRetryDuration = DEFAULT_INITIAL_RETRY_DURATION_SECONDS ;
64- int maxRetryCount = DEFAULT_MAX_RETRY_COUNT ;
65- int maxRetryDuration = DEFAULT_MAX_RETRY_DURATION_SECONDS ;
66- logger .debug (
62+ LOG .debug (
6763 "Initializing RetryPolicy with the following configuration: MaxRetryCount: {}, InitialRetryDuration: {}s, " +
68- "MaxRetryDuration: {}s" , maxRetryCount , initialRetryDuration , maxRetryDuration );
64+ "MaxRetryDuration: {}s" , DEFAULT_MAX_RETRY_COUNT , DEFAULT_INITIAL_RETRY_DURATION_SECONDS ,
65+ DEFAULT_MAX_RETRY_DURATION_SECONDS );
6966 try {
70- return Failsafe .with (getRetryPolicy (initialRetryDuration , maxRetryDuration , maxRetryCount ))
71- .get (() -> {
67+ return Failsafe .with (RETRY_POLICY ).get (() -> {
7268 com .google .auth .oauth2 .AccessToken token = safeGetAccessToken ();
7369 if (token == null || token .getExpirationTime ().before (Date .from (Instant .now ()))) {
7470 refresh ();
7571 token = safeGetAccessToken ();
7672 }
7773 return new AccessToken (token .getTokenValue (), token .getExpirationTime ().getTime ());
7874 });
79- } catch (Exception e ) {
75+ } catch (FailsafeException e ) {
76+ Throwable t = e .getCause () != null ? e .getCause () : e ;
8077 throw ErrorUtils .getProgramFailureException (
8178 new ErrorCategory (ErrorCategoryEnum .PLUGIN ),
8279 "Unable to get service account access token after retries." ,
83- e .getMessage (),
84- ErrorType .UNKNOWN ,
80+ t .getMessage (),
81+ ErrorType .SYSTEM ,
8582 true ,
86- e
83+ t
8784 );
8885 }
8986 }
9087
91-
92- private RetryPolicy <Object > getRetryPolicy (int initialRetryDuration , int maxRetryDuration ,
93- int maxRetryCount ) {
88+ private static RetryPolicy <Object > createRetryPolicy () {
9489 return RetryPolicy .builder ()
9590 .handle (ServerErrorException .class )
96- .withBackoff (Duration .ofSeconds (initialRetryDuration ), Duration .ofSeconds (maxRetryDuration ))
97- .withMaxRetries (maxRetryCount )
98- .onRetry (event -> logger .debug ("Retry attempt {} due to {}" , event .getAttemptCount (), event .getLastException ().
91+ .withBackoff (Duration .ofSeconds (DEFAULT_INITIAL_RETRY_DURATION_SECONDS ),
92+ Duration .ofSeconds (DEFAULT_MAX_RETRY_DURATION_SECONDS ))
93+ .withMaxRetries (DEFAULT_MAX_RETRY_COUNT )
94+ .onRetry (event -> LOG .debug ("Retry attempt {} due to {}" , event .getAttemptCount (), event .getLastException ().
9995 getMessage ()))
100- .onSuccess (event -> logger .debug ("Access Token Fetched Successfully." ))
101- .onRetriesExceeded (event -> logger .error ("Retry limit reached for Service account." ))
96+ .onSuccess (event -> LOG .debug ("Access Token Fetched Successfully." ))
97+ .onRetriesExceeded (event -> LOG .error ("Retry limit reached for Service account." ))
10298 .build ();
10399 }
104100
@@ -113,7 +109,7 @@ private com.google.auth.oauth2.AccessToken safeGetAccessToken() throws IOExcepti
113109 } catch (IOException e ) {
114110 if (isServerError (e )) {
115111 throw new ServerErrorException (HttpStatus .SC_SERVICE_UNAVAILABLE , "Server error while fetching access token: "
116- + e .getMessage ());
112+ + e .getMessage (), e );
117113 }
118114 throw e ;
119115 }
@@ -126,7 +122,7 @@ public void refresh() throws IOException {
126122 } catch (IOException e ) {
127123 if (isServerError (e )) {
128124 throw new ServerErrorException (HttpStatus .SC_SERVICE_UNAVAILABLE , "Server error during refresh: " +
129- e .getMessage ());
125+ e .getMessage (), e );
130126 }
131127 throw ErrorUtils .getProgramFailureException (new ErrorCategory (ErrorCategoryEnum .PLUGIN ),
132128 "Unable to refresh service account access token." , e .getMessage (),
0 commit comments