1717
1818package org .apache .gobblin .service .modules .orchestration ;
1919
20- import com .github .rholder .retry .AttemptTimeLimiters ;
21- import com .github .rholder .retry .RetryException ;
22- import com .github .rholder .retry .Retryer ;
23- import com .github .rholder .retry .RetryerBuilder ;
24- import com .github .rholder .retry .StopStrategies ;
25- import com .github .rholder .retry .WaitStrategies ;
26- import com .google .common .base .Preconditions ;
27- import com .google .common .base .Throwables ;
28- import com .google .common .io .Closer ;
29- import com .google .gson .Gson ;
30- import com .google .gson .JsonElement ;
31- import com .google .gson .JsonObject ;
32- import com .google .gson .JsonParser ;
3320import java .io .Closeable ;
3421import java .io .File ;
3522import java .io .IOException ;
4229import java .util .concurrent .ExecutorService ;
4330import java .util .concurrent .Executors ;
4431import java .util .concurrent .TimeUnit ;
45- import lombok . Builder ;
32+
4633import org .apache .commons .io .IOUtils ;
4734import org .apache .commons .lang3 .ObjectUtils ;
4835import org .apache .commons .lang3 .StringUtils ;
6148import org .slf4j .Logger ;
6249import org .slf4j .LoggerFactory ;
6350
51+ import com .github .rholder .retry .Attempt ;
52+ import com .github .rholder .retry .AttemptTimeLimiters ;
53+ import com .github .rholder .retry .RetryException ;
54+ import com .github .rholder .retry .RetryListener ;
55+ import com .github .rholder .retry .Retryer ;
56+ import com .github .rholder .retry .RetryerBuilder ;
57+ import com .github .rholder .retry .StopStrategies ;
58+ import com .github .rholder .retry .WaitStrategies ;
59+ import com .google .common .base .Preconditions ;
60+ import com .google .common .base .Throwables ;
61+ import com .google .common .io .Closer ;
62+ import com .google .gson .Gson ;
63+ import com .google .gson .JsonElement ;
64+ import com .google .gson .JsonObject ;
65+ import com .google .gson .JsonParser ;
66+
67+ import lombok .Builder ;
68+
6469
6570/**
6671 * A simple http based client that uses Ajax API to communicate with Azkaban server.
@@ -80,9 +85,9 @@ public class AzkabanClient implements Closeable {
8085 protected CloseableHttpClient httpClient ;
8186 private ExecutorService executorService ;
8287 private Closer closer = Closer .create ();
83- private Retryer <AzkabanClientStatus > retryer ;
84- private static Logger log = LoggerFactory .getLogger (AzkabanClient .class );
85- private Duration requestTimeout ;
88+ private Retryer <AzkabanClientStatus <?> > retryer ;
89+ private static final Logger log = LoggerFactory .getLogger (AzkabanClient .class );
90+ private final Duration requestTimeout ;
8691
8792 /**
8893 * Child class should have a different builderMethodName.
@@ -109,13 +114,25 @@ protected AzkabanClient(String username,
109114 this .initializeClient ();
110115 this .initializeSessionManager ();
111116 this .intializeExecutorService ();
117+ RetryListener retryListener = new RetryListener () {
118+ @ Override
119+ public <V > void onRetry (Attempt <V > attempt ) {
120+ if (attempt .hasException ()) {
121+ String msg = String .format ("(Likely retryable) failure running Azkaban API [attempt: %d; %s after start]" ,
122+ attempt .getAttemptNumber (), Duration .ofMillis (attempt .getDelaySinceFirstAttempt ()).toString ());
123+ log .warn (msg , attempt .getExceptionCause ());
124+ }
125+ }
126+ };
127+
112128
113- this .retryer = RetryerBuilder .<AzkabanClientStatus >newBuilder ()
129+ this .retryer = RetryerBuilder .<AzkabanClientStatus <?> >newBuilder ()
114130 .retryIfExceptionOfType (InvalidSessionException .class )
115131 .withAttemptTimeLimiter (AttemptTimeLimiters .fixedTimeLimit (this .requestTimeout .toMillis (), TimeUnit .MILLISECONDS ,
116132 this .executorService ))
117133 .withWaitStrategy (WaitStrategies .exponentialWait (60 , TimeUnit .SECONDS ))
118134 .withStopStrategy (StopStrategies .stopAfterAttempt (3 ))
135+ .withRetryListener (retryListener )
119136 .build ();
120137 try {
121138 this .sessionId = this .sessionManager .fetchSession ();
0 commit comments