Skip to content

Commit 1daebee

Browse files
committed
Add retry policy to Wave http client
Signed-off-by: Paolo Di Tommaso <[email protected]>
1 parent f227f2e commit 1daebee

File tree

5 files changed

+144
-4
lines changed

5 files changed

+144
-4
lines changed

docs/wave.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,3 +201,15 @@ The following configuration options are available:
201201

202202
`wave.report.file` (preview)
203203
: The name of the containers report file (default: `containers-<timestamp>.config` requires version `23.06.0-edge` or later).
204+
205+
`wave.retry.delay`
206+
: The initial delay when a failing HTTP request is retried (default: `150ms`).
207+
208+
`wave.retry.maxDelay`
209+
: The max delay when a failing HTTP request is retried (default: `90 seconds`).
210+
211+
`wave.retry.maxAttempts`
212+
: The max number of attempts a failing HTTP request is retried (default: `5`).
213+
214+
`wave.retry.jitter`
215+
: Sets the jitterFactor to randomly vary retry delays by (default: `0.25`).

plugins/nf-wave/src/main/io/seqera/wave/plugin/WaveClient.groovy

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ import java.nio.file.Path
2525
import java.time.Duration
2626
import java.time.Instant
2727
import java.time.OffsetDateTime
28+
import java.time.temporal.ChronoUnit
2829
import java.util.concurrent.Callable
2930
import java.util.concurrent.TimeUnit
31+
import java.util.function.Predicate
3032
import java.util.regex.Pattern
3133

3234
import com.google.common.cache.Cache
@@ -35,6 +37,11 @@ import com.google.common.util.concurrent.UncheckedExecutionException
3537
import com.google.gson.Gson
3638
import com.google.gson.GsonBuilder
3739
import com.google.gson.reflect.TypeToken
40+
import dev.failsafe.Failsafe
41+
import dev.failsafe.RetryPolicy
42+
import dev.failsafe.event.EventListener
43+
import dev.failsafe.event.ExecutionAttemptedEvent
44+
import dev.failsafe.function.CheckedSupplier
3845
import groovy.json.JsonOutput
3946
import groovy.transform.CompileStatic
4047
import groovy.transform.Memoized
@@ -214,7 +221,7 @@ class WaveClient {
214221
.build()
215222

216223
try {
217-
final resp = httpClient.send(req, HttpResponse.BodyHandlers.ofString())
224+
final resp = httpSend(req)
218225
log.debug "Wave response: statusCode=${resp.statusCode()}; body=${resp.body()}"
219226
if( resp.statusCode()==200 )
220227
return jsonToSubmitResponse(resp.body())
@@ -232,7 +239,7 @@ class WaveClient {
232239
else
233240
throw new BadResponseException("Wave invalid response: [${resp.statusCode()}] ${resp.body()}")
234241
}
235-
catch (ConnectException e) {
242+
catch (IOException e) {
236243
throw new IllegalStateException("Unable to connect Wave service: $endpoint")
237244
}
238245
}
@@ -279,7 +286,7 @@ class WaveClient {
279286
.GET()
280287
.build()
281288

282-
final resp = httpClient.send(req, HttpResponse.BodyHandlers.ofString())
289+
final resp = httpSend(req)
283290
final code = resp.statusCode()
284291
if( code>=200 && code<400 ) {
285292
log.debug "Wave container config response: [$code] ${resp.body()}"
@@ -580,7 +587,7 @@ class WaveClient {
580587
.POST(HttpRequest.BodyPublishers.ofString("grant_type=refresh_token&refresh_token=${URLEncoder.encode(refresh, 'UTF-8')}"))
581588
.build()
582589

583-
final resp = httpClient.send(req, HttpResponse.BodyHandlers.ofString())
590+
final resp = httpSend(req)
584591
log.debug "Refresh cookie response: [${resp.statusCode()}] ${resp.body()}"
585592
if( resp.statusCode() != 200 )
586593
return false
@@ -674,4 +681,31 @@ class WaveClient {
674681
final type = new TypeToken<DescribeContainerResponse>(){}.getType()
675682
return gson.fromJson(json, type)
676683
}
684+
685+
protected <T> RetryPolicy<T> retryPolicy(Predicate<? extends Throwable> cond) {
686+
final cfg = config.retryOpts()
687+
final listener = new EventListener<ExecutionAttemptedEvent<T>>() {
688+
@Override
689+
void accept(ExecutionAttemptedEvent<T> event) throws Throwable {
690+
log.debug("Azure TooManyRequests reponse error - attempt: ${event.attemptCount}", event.lastFailure)
691+
}
692+
}
693+
return RetryPolicy.<T>builder()
694+
.handleIf(cond)
695+
.withBackoff(cfg.delay.toMillis(), cfg.maxDelay.toMillis(), ChronoUnit.MILLIS)
696+
.withMaxAttempts(cfg.maxAttempts)
697+
.withJitter(cfg.jitter)
698+
.onRetry(listener)
699+
.build()
700+
}
701+
702+
protected <T> T safeApply(CheckedSupplier<T> action) {
703+
final cond = (e -> e instanceof IOException) as Predicate<? extends Throwable>
704+
final policy = retryPolicy(cond)
705+
return Failsafe.with(policy).get(action)
706+
}
707+
708+
protected HttpResponse<String> httpSend(HttpRequest req) {
709+
return safeApply(() -> httpClient.send(req, HttpResponse.BodyHandlers.ofString()))
710+
}
677711
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2020-2022, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.seqera.wave.plugin.config
19+
20+
import nextflow.util.Duration
21+
22+
/**
23+
* Model retry options for Wave http requests
24+
*/
25+
class RetryOpts {
26+
Duration delay = Duration.of('150ms')
27+
Duration maxDelay = Duration.of('90s')
28+
int maxAttempts = 5
29+
double jitter = 0.25
30+
31+
RetryOpts() {
32+
this(Collections.emptyMap())
33+
}
34+
35+
RetryOpts(Map config) {
36+
if( config.delay )
37+
delay = config.delay as Duration
38+
if( config.maxDelay )
39+
maxDelay = config.maxDelay as Duration
40+
if( config.maxAttempts )
41+
maxAttempts = config.maxAttempts as int
42+
if( config.jitter )
43+
jitter = config.jitter as double
44+
}
45+
}

plugins/nf-wave/src/main/io/seqera/wave/plugin/config/WaveConfig.groovy

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class WaveConfig {
4141
final private String buildRepository
4242
final private String cacheRepository
4343
final private ReportOpts reportOpts
44+
final private RetryOpts retryOpts
4445

4546
WaveConfig(Map opts, Map<String,String> env=System.getenv()) {
4647
this.enabled = opts.enabled
@@ -54,6 +55,7 @@ class WaveConfig {
5455
this.strategy = parseStrategy(opts.strategy)
5556
this.bundleProjectResources = opts.bundleProjectResources
5657
this.reportOpts = new ReportOpts(opts.report as Map ?: Map.of())
58+
this.retryOpts = new RetryOpts(opts.retry as Map ?: Map.of())
5759
if( !endpoint.startsWith('http://') && !endpoint.startsWith('https://') )
5860
throw new IllegalArgumentException("Endpoint URL should start with 'http:' or 'https:' protocol prefix - offending value: $endpoint")
5961
}
@@ -66,6 +68,8 @@ class WaveConfig {
6668

6769
SpackOpts spackOpts() { this.spackOpts }
6870

71+
RetryOpts retryOpts() { this.retryOpts }
72+
6973
List<String> strategy() { this.strategy }
7074

7175
boolean bundleProjectResources() { bundleProjectResources }
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2020-2022, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.seqera.wave.plugin.config
19+
20+
import nextflow.util.Duration
21+
import spock.lang.Specification
22+
23+
/**
24+
*
25+
* @author Paolo Di Tommaso <[email protected]>
26+
*/
27+
class RetryOptsTest extends Specification {
28+
29+
def 'should create retry config' () {
30+
31+
expect:
32+
new RetryOpts().delay == Duration.of('150ms')
33+
new RetryOpts().maxDelay == Duration.of('90s')
34+
new RetryOpts().maxAttempts == 5
35+
new RetryOpts().jitter == 0.25d
36+
37+
and:
38+
new RetryOpts([maxAttempts: 20]).maxAttempts == 20
39+
new RetryOpts([delay: '1s']).delay == Duration.of('1s')
40+
new RetryOpts([maxDelay: '1m']).maxDelay == Duration.of('1m')
41+
new RetryOpts([jitter: '0.5']).jitter == 0.5d
42+
43+
}
44+
45+
}

0 commit comments

Comments
 (0)