55import ml .comet .experiment .exception .CometApiException ;
66import ml .comet .experiment .exception .CometGeneralException ;
77import ml .comet .experiment .impl .constants .QueryParamName ;
8- import ml .comet .experiment .impl .utils .JsonUtils ;
98import org .asynchttpclient .AsyncCompletionHandler ;
109import org .asynchttpclient .AsyncHttpClient ;
1110import org .asynchttpclient .AsyncHttpClientConfig ;
2120import java .time .Duration ;
2221import java .util .Map ;
2322import java .util .Optional ;
24- import java .util .concurrent .CompletableFuture ;
2523import java .util .concurrent .TimeUnit ;
2624import java .util .concurrent .TimeoutException ;
2725import java .util .concurrent .atomic .AtomicInteger ;
28- import java .util .function .Function ;
2926
3027import static org .asynchttpclient .Dsl .asyncHttpClient ;
3128
@@ -46,11 +43,14 @@ public class Connection implements Closeable {
4643
4744 private static final String RESPONSE_NO_BODY = "NO BODY" ;
4845
46+ AsyncHttpClient asyncHttpClient ;
4947 String cometBaseUrl ;
5048 String apiKey ;
5149 Logger logger ;
50+ /**
51+ * The maximum number of retries when contacting server.
52+ */
5253 int maxAuthRetries ;
53- AsyncHttpClient asyncHttpClient ;
5454 /**
5555 * This is inventory tracker to maintain remaining list of scheduled asynchronous request posts. It will be used
5656 * to properly close this connection only after all scheduled requests are processed.
@@ -83,47 +83,31 @@ public Connection(@NonNull String cometBaseUrl, @NonNull String apiKey,
8383 *
8484 * @param endpoint the request path of the endpoint
8585 * @param params the map with request parameters.
86- * @return the Optional response body.
86+ * @return the {@link Optional} of response body.
8787 */
88- public Optional <String > sendGet (@ NonNull String endpoint , @ NonNull Map <QueryParamName , String > params ) {
89- return executeRequestWithAuth (
88+ public Optional <String > sendGetWithRetries (@ NonNull String endpoint , @ NonNull Map <QueryParamName , String > params ) {
89+ return executeRequestSyncWithRetries (
9090 ConnectionUtils .createGetRequest (this .buildCometUrl (endpoint ), params ), false );
9191 }
9292
9393 /**
94- * Allows sending POST to the specified endpoint with body as JSON string.
94+ * Allows sending POST to the specified endpoint with body as JSON string. This method will retry request using
95+ * {@link #maxAuthRetries} attempts. If failed empty {@link Optional} will be returned or {@link CometApiException}
96+ * will be thrown if {@link #maxAuthRetries} attempts exceeded.
9597 *
9698 * @param json the JSON string to be posted.
9799 * @param endpoint the relative path to the endpoint
98100 * @param throwOnFailure the flag to indicate if exception should be thrown on failure of request execution.
99- * @return the Optional response body.
101+ * @return the {@link Optional} of response body.
102+ * @throws CometApiException if throwOnFailure set to {@code true} and request was failed.
100103 */
101- public Optional <String > sendPost (@ NonNull String json , @ NonNull String endpoint , boolean throwOnFailure ) {
104+ public Optional <String > sendPostWithRetries (
105+ @ NonNull String json , @ NonNull String endpoint , boolean throwOnFailure ) throws CometApiException {
102106 String url = this .buildCometUrl (endpoint );
103107 if (logger .isDebugEnabled ()) {
104108 logger .debug ("sending JSON {} to {}" , json , url );
105109 }
106- return executeRequestWithAuth (ConnectionUtils .createPostJsonRequest (json , url ), throwOnFailure );
107- }
108-
109- /**
110- * Allows asynchronous sending given object as JSON encoded body of the POST request.
111- *
112- * @param payload the payload object to be sent.
113- * @param endpoint the relative path to the endpoint.
114- */
115- public void sendPostAsync (@ NonNull Object payload , @ NonNull String endpoint ) {
116- CompletableFuture <Response > future = sendPostAsync (JsonUtils .toJson (payload ), endpoint )
117- .toCompletableFuture ()
118- .exceptionally (t -> {
119- logger .error ("failed to execute asynchronous request to endpoint {} with payload {}" ,
120- endpoint , payload , t );
121- return null ;
122- }
123- );
124- if (logger .isDebugEnabled ()) {
125- future .thenApply (getDebugLogResponse (endpoint ));
126- }
110+ return executeRequestSyncWithRetries (ConnectionUtils .createPostJsonRequest (json , url ), throwOnFailure );
127111 }
128112
129113 /**
@@ -135,7 +119,7 @@ public void sendPostAsync(@NonNull Object payload, @NonNull String endpoint) {
135119 * the request execution.
136120 */
137121 public ListenableFuture <Response > sendPostAsync (@ NonNull String json , @ NonNull String endpoint ) {
138- return executeRequestWithAuthAsync (
122+ return executeRequestAsync (
139123 ConnectionUtils .createPostJsonRequest (json , this .buildCometUrl (endpoint )));
140124 }
141125
@@ -150,7 +134,7 @@ public ListenableFuture<Response> sendPostAsync(@NonNull String json, @NonNull S
150134 */
151135 public ListenableFuture <Response > sendPostAsync (@ NonNull File file , @ NonNull String endpoint ,
152136 @ NonNull Map <QueryParamName , String > params ) {
153- return executeRequestWithAuthAsync (
137+ return executeRequestAsync (
154138 ConnectionUtils .createPostFileRequest (file , this .buildCometUrl (endpoint ), params ));
155139 }
156140
@@ -170,7 +154,7 @@ public ListenableFuture<Response> sendPostAsync(byte[] bytes, @NonNull String en
170154 logger .debug ("sending POST bytearray with length {} to {}" , bytes .length , url );
171155 }
172156
173- return executeRequestWithAuthAsync (ConnectionUtils .createPostByteArrayRequest (bytes , url , params ));
157+ return executeRequestAsync (ConnectionUtils .createPostByteArrayRequest (bytes , url , params ));
174158 }
175159
176160 /**
@@ -225,9 +209,9 @@ public void waitAndClose(@NonNull Duration timeout) throws IOException, Interrup
225209 * Executes provided request asynchronously.
226210 *
227211 * @param request the request to be executed.
228- * @return the <code> ListenableFuture</code> which can be used to check request status.
212+ * @return the {@link ListenableFuture} which can be used to check request status.
229213 */
230- ListenableFuture <Response > executeRequestWithAuthAsync (@ NonNull Request request ) {
214+ ListenableFuture <Response > executeRequestAsync (@ NonNull Request request ) {
231215 // check that client is not closed
232216 if (this .asyncHttpClient .isClosed ()) {
233217 String msg = String .format ("failed to execute request %s connection to the server already closed" , request );
@@ -245,49 +229,59 @@ ListenableFuture<Response> executeRequestWithAuthAsync(@NonNull Request request)
245229 }
246230
247231 /**
248- * Synchronously executes provided request. It will attempt to execute request <code> maxAuthRetries</code> in
249- * case of failure. If all attempts failed the empty optional will be returned or <code>CometGeneralException</code>
250- * will be thrown in case of < code> throwOnFailure</code> is < code> true</code> .
232+ * Synchronously executes provided request. It will attempt to execute request {@link # maxAuthRetries} in
233+ * case of failure. If all attempts failed the empty optional will be returned or {@link CometApiException}
234+ * will be thrown in case of {@ code throwOnFailure} is {@ code true} .
251235 *
252236 * @param request the request to be executed
253- * @param throwOnFailure if < code> true</code> throws exception on failure. Otherwise, empty Optional will be
237+ * @param throwOnFailure if {@ code true} throws exception on failure. Otherwise, empty {@link Optional} will be
254238 * returned.
255- * @return the response body or empty Optional.
239+ * @return the response body or empty {@link Optional}.
240+ * @throws CometApiException if throwOnFailure set to {@code true} and request was failed.
256241 */
257- Optional <String > executeRequestWithAuth (@ NonNull Request request , boolean throwOnFailure ) {
242+ Optional <String > executeRequestSyncWithRetries (
243+ @ NonNull Request request , boolean throwOnFailure ) throws CometApiException {
258244 request .getHeaders ().add (COMET_SDK_API_HEADER , apiKey );
259245 String endpoint = request .getUrl ();
260246 try {
261247 org .asynchttpclient .Response response = null ;
262- for (int i = 1 ; i < maxAuthRetries ; i ++) {
263- // execute request and wait for completion until default REQUEST_TIMEOUT_MS exceeded
264- if (!this .asyncHttpClient .isClosed ()) {
265- response = this .asyncHttpClient .executeRequest (request ).get ();
266- } else {
267- logger .warn ("failed to execute request {}, the connection already closed." , request );
248+ for (int i = 1 ; i < this .maxAuthRetries ; i ++) {
249+ if (this .asyncHttpClient .isClosed ()) {
250+ this .logger .warn ("failed to execute request {}, the connection already closed." , request );
268251 if (throwOnFailure ) {
269- throw new CometGeneralException ("failed to execute request, the connection already closed." );
252+ throw new CometApiException ("failed to execute request, the connection already closed." );
270253 }
271254 return Optional .empty ();
272255 }
273256
257+ // execute request and wait for completion until default REQUEST_TIMEOUT_MS exceeded
258+ response = this .asyncHttpClient
259+ .executeRequest (request )
260+ .get ();
261+
274262 if (!ConnectionUtils .isResponseSuccessful (response .getStatusCode ())) {
275- // request attempt failed
276- if (i < maxAuthRetries - 1 ) {
277- logger .debug ("for endpoint {} response {}, retrying\n " , endpoint , response .getStatusText ());
263+ // attempt failed - check if to retry
264+ if (i < this .maxAuthRetries - 1 ) {
265+ // sleep for a while and repeat
266+ this .logger .debug ("for endpoint {} response {}, retrying\n " , endpoint , response .getStatusText ());
278267 Thread .sleep ((2 ^ i ) * 1000L );
279268 } else {
280- logger .error ("for endpoint {} response {}, last retry failed\n " ,
281- endpoint , response .getStatusText ());
269+ // maximal number of attempts exceeded - throw or return
270+ this .logger .error (
271+ "for endpoint {} got the response '{}', the last retry failed from {} attempts" ,
272+ endpoint , response .getStatusText (), this .maxAuthRetries );
282273 if (throwOnFailure ) {
283274 String body = response .hasResponseBody () ? response .getResponseBody () : RESPONSE_NO_BODY ;
284- throw new CometGeneralException ("failed to call: " + endpoint + ", response status: "
285- + response .getStatusCode () + ", body: " + body );
275+ throw new CometApiException (
276+ "failed to call endpoint: %s, response status: %s, body: %s, failed attempts: %d" ,
277+ endpoint , response .getStatusCode (), body , this .maxAuthRetries );
286278 }
279+ return Optional .empty ();
287280 }
288281 } else {
289- if (logger .isDebugEnabled ()) {
290- logger .debug ("for endpoint {} got response {}\n " , endpoint , response .getResponseBody ());
282+ // success - log debug and stop trying
283+ if (this .logger .isDebugEnabled ()) {
284+ this .logger .debug ("for endpoint {} got response {}\n " , endpoint , response .getResponseBody ());
291285 }
292286 break ;
293287 }
@@ -298,18 +292,14 @@ Optional<String> executeRequestWithAuth(@NonNull Request request, boolean throwO
298292 }
299293 return Optional .of (response .getResponseBody ());
300294 } catch (Throwable e ) {
301- logger .error ("Failed to execute request: " + request , e );
295+ this . logger .error ("Failed to execute request: " + request , e );
302296 if (throwOnFailure ) {
303- throw new CometGeneralException ("failed to execute request, unknown error" , e );
297+ throw new CometApiException ("failed to execute request, unknown error" , e );
304298 }
305299 return Optional .empty ();
306300 }
307301 }
308302
309- private Function <Response , Response > getDebugLogResponse (@ NonNull String endpoint ) {
310- return new ConnectionUtils .DebugLogResponse (this .logger , endpoint );
311- }
312-
313303 private String buildCometUrl (String endpoint ) {
314304 return this .cometBaseUrl + endpoint ;
315305 }
0 commit comments