Skip to content

Commit 7062ff2

Browse files
jagednabhi18av
authored andcommitted
Implement an exception handler for api client
Signed-off-by: Jorge Aguilera <[email protected]>
1 parent b55dc70 commit 7062ff2

File tree

5 files changed

+175
-31
lines changed

5 files changed

+175
-31
lines changed

plugins/nf-nomad/src/main/nextflow/nomad/config/NomadClientOpts.groovy

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,19 @@ class NomadClientOpts{
3333

3434
final static protected API_VERSION = "v1"
3535

36-
private Map<String,String> sysEnv
37-
3836
final String address
3937
final String token
38+
final int connectionTimeout
39+
40+
final int readTimeout
41+
final int writeTimeout
42+
43+
final RetryConfig retryConfig
4044

4145
NomadClientOpts(Map nomadClientOpts, Map<String,String> env=null){
4246
assert nomadClientOpts!=null
4347

44-
sysEnv = env==null ? new HashMap<String,String>(System.getenv()) : env
48+
def sysEnv = env ?: new HashMap<String,String>(System.getenv())
4549

4650
def address = (nomadClientOpts.address?.toString() ?: sysEnv.get('NOMAD_ADDR'))
4751
assert address != null, "Nomad Address is required"
@@ -50,8 +54,17 @@ class NomadClientOpts{
5054
address +="/"
5155
this.address = address + API_VERSION
5256
this.token = nomadClientOpts.token ?: sysEnv.get('NOMAD_TOKEN')
57+
this.connectionTimeout = (nomadClientOpts.connectionTimeout ?: 6000 ) as Integer
58+
this.readTimeout = (nomadClientOpts.readTimeout ?: 6000 ) as Integer
59+
this.writeTimeout = (nomadClientOpts.writeTimeout ?: 6000 ) as Integer
60+
61+
this.retryConfig = new RetryConfig(nomadClientOpts.retryConfig as Map ?: Collections.emptyMap())
5362

5463
//TODO: Add mTLS properties and env vars
5564
// https://developer.hashicorp.com/nomad/docs/commands#mtls-environment-variables
5665
}
66+
67+
RetryConfig getRetryConfig() {
68+
return retryConfig
69+
}
5770
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package nextflow.nomad.config
2+
3+
import groovy.transform.CompileStatic
4+
import nextflow.util.Duration
5+
6+
7+
@CompileStatic
8+
class RetryConfig {
9+
10+
Duration delay = Duration.of('250ms')
11+
Duration maxDelay = Duration.of('90s')
12+
int maxAttempts = 10
13+
double jitter = 0.25
14+
15+
RetryConfig(){
16+
this(Collections.emptyMap())
17+
}
18+
19+
RetryConfig(Map config){
20+
if( config.delay )
21+
delay = config.delay as Duration
22+
if( config.maxDelay )
23+
maxDelay = config.maxDelay as Duration
24+
if( config.maxAttempts )
25+
maxAttempts = config.maxAttempts as int
26+
if( config.jitter )
27+
jitter = config.jitter as double
28+
}
29+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package nextflow.nomad.executor
2+
3+
import dev.failsafe.Failsafe
4+
import dev.failsafe.RetryPolicy
5+
import dev.failsafe.event.EventListener
6+
import dev.failsafe.event.ExecutionAttemptedEvent
7+
import dev.failsafe.function.CheckedSupplier
8+
import groovy.transform.CompileStatic
9+
import groovy.util.logging.Slf4j
10+
import io.nomadproject.client.ApiException
11+
import nextflow.nomad.config.RetryConfig
12+
13+
import java.time.temporal.ChronoUnit
14+
import java.util.concurrent.TimeoutException
15+
import java.util.function.Predicate
16+
17+
@Slf4j
18+
@CompileStatic
19+
class FailsafeExecutor {
20+
21+
private RetryConfig config
22+
23+
FailsafeExecutor(RetryConfig config){
24+
this.config = config
25+
}
26+
27+
protected <T> RetryPolicy<T> retryPolicy(Predicate<? extends Throwable> cond) {
28+
29+
final listener = new EventListener<ExecutionAttemptedEvent<T>>() {
30+
@Override
31+
void accept(ExecutionAttemptedEvent<T> event) throws Throwable {
32+
log.debug("Nomad TooManyRequests response error - attempt: ${event.attemptCount}; reason: ${event.lastFailure.message}")
33+
}
34+
}
35+
return RetryPolicy.<T>builder()
36+
.handleIf(cond)
37+
.withBackoff(config.delay.toMillis(), config.maxDelay.toMillis(), ChronoUnit.MILLIS)
38+
.withMaxAttempts(config.maxAttempts)
39+
.withJitter(config.jitter)
40+
.onRetry(listener)
41+
.build()
42+
}
43+
44+
final private static List<Integer> RETRY_CODES = List.of(408, 429, 500, 502, 503, 504)
45+
46+
protected <T> T apply(CheckedSupplier<T> action) {
47+
// define the retry condition
48+
final cond = new Predicate<? extends Throwable>() {
49+
@Override
50+
boolean test(Throwable t) {
51+
if( t instanceof ApiException && t.code in RETRY_CODES )
52+
return true
53+
if( t instanceof IOException || t.cause instanceof IOException )
54+
return true
55+
if( t instanceof TimeoutException || t.cause instanceof TimeoutException )
56+
return true
57+
return false
58+
}
59+
}
60+
// create the retry policy object
61+
final policy = retryPolicy(cond)
62+
// apply the action with
63+
return Failsafe.with(policy).get(action)
64+
}
65+
66+
}

plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy

Lines changed: 59 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,14 @@ class NomadService implements Closeable{
4545
ApiClient apiClient
4646
JobsApi jobsApi
4747
VariablesApi variablesApi
48+
FailsafeExecutor safeExecutor
4849

4950
NomadService(NomadConfig config) {
5051
this.config = config
5152

52-
//TODO: Accommodate these connection level options in clientOpts()
53-
final CONNECTION_TIMEOUT_MILLISECONDS = 60000
54-
final READ_TIMEOUT_MILLISECONDS = 60000
55-
final WRITE_TIMEOUT_MILLISECONDS = 60000
53+
final CONNECTION_TIMEOUT_MILLISECONDS = config.clientOpts().connectionTimeout
54+
final READ_TIMEOUT_MILLISECONDS = config.clientOpts().readTimeout
55+
final WRITE_TIMEOUT_MILLISECONDS = config.clientOpts().writeTimeout
5656

5757
apiClient = new ApiClient( connectTimeout: CONNECTION_TIMEOUT_MILLISECONDS, readTimeout: READ_TIMEOUT_MILLISECONDS, writeTimeout: WRITE_TIMEOUT_MILLISECONDS)
5858
apiClient.basePath = config.clientOpts().address
@@ -64,6 +64,8 @@ class NomadService implements Closeable{
6464
}
6565
this.jobsApi = new JobsApi(apiClient)
6666
this.variablesApi = new VariablesApi(apiClient)
67+
68+
this.safeExecutor = new FailsafeExecutor(config.clientOpts().retryConfig)
6769
}
6870

6971

@@ -96,8 +98,12 @@ class NomadService implements Closeable{
9698
}
9799

98100
try {
99-
JobRegisterResponse jobRegisterResponse = jobsApi.registerJob(jobRegisterRequest, config.jobOpts().region, config.jobOpts().namespace, null, null)
100-
return jobRegisterResponse.evalID
101+
safeExecutor.apply {
102+
JobRegisterResponse jobRegisterResponse = jobsApi.registerJob(jobRegisterRequest,
103+
config.jobOpts().region, config.jobOpts().namespace,
104+
null, null)
105+
jobRegisterResponse.evalID
106+
}
101107
} catch (ApiException apiException) {
102108
log.debug("[NOMAD] Failed to submit ${job.name} -- Cause: ${apiException.responseBody ?: apiException}", apiException)
103109
throw new ProcessSubmitException("[NOMAD] Failed to submit ${job.name} -- Cause: ${apiException.responseBody ?: apiException}", apiException)
@@ -110,7 +116,11 @@ class NomadService implements Closeable{
110116

111117
String getJobState(String jobId){
112118
try {
113-
List<AllocationListStub> allocations = jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null)
119+
List<AllocationListStub> allocations = safeExecutor.apply {
120+
jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace,
121+
null, null, null, null, null, null,
122+
null, null)
123+
}
114124
AllocationListStub last = allocations?.sort {
115125
it.modifyIndex
116126
}?.last()
@@ -127,7 +137,10 @@ class NomadService implements Closeable{
127137

128138
boolean checkIfRunning(String jobId){
129139
try {
130-
Job job = jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null)
140+
Job job = safeExecutor.apply {
141+
jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
142+
null, null, null, null, null, null, null)
143+
}
131144
log.debug "[NOMAD] checkIfRunning jobID=$job.ID; status=$job.status"
132145
job.status == "running"
133146
}catch (Exception e){
@@ -138,7 +151,10 @@ class NomadService implements Closeable{
138151

139152
boolean checkIfDead(String jobId){
140153
try{
141-
Job job = jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null)
154+
Job job = safeExecutor.apply {
155+
jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
156+
null, null, null, null, null, null, null)
157+
}
142158
log.debug "[NOMAD] checkIfDead jobID=$job.ID; status=$job.status"
143159
job.status == "dead"
144160
}catch (Exception e){
@@ -158,15 +174,22 @@ class NomadService implements Closeable{
158174
protected void purgeJob(String jobId, boolean purge){
159175
log.debug "[NOMAD] purgeJob with jobId=${jobId}"
160176
try {
161-
jobsApi.deleteJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, purge, true)
177+
safeExecutor.apply {
178+
jobsApi.deleteJob(jobId, config.jobOpts().region, config.jobOpts().namespace,
179+
null, null, purge, true)
180+
}
162181
}catch(Exception e){
163182
log.debug("[NOMAD] Failed to delete job ${jobId} -- Cause: ${e.message ?: e}", e)
164183
}
165184
}
166185

167186
String getClientOfJob(String jobId) {
168187
try{
169-
List<AllocationListStub> allocations = jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null)
188+
List<AllocationListStub> allocations = safeExecutor.apply {
189+
jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace,
190+
null, null, null, null, null, null,
191+
null, null)
192+
}
170193
if( !allocations ){
171194
return null
172195
}
@@ -183,10 +206,12 @@ class NomadService implements Closeable{
183206
}
184207

185208
String getVariableValue(String path, String key){
186-
var variable = variablesApi.getVariableQuery("$path/$key",
187-
config.jobOpts().region,
188-
config.jobOpts().namespace,
189-
null, null, null, null, null, null, null)
209+
var variable = safeExecutor.apply {
210+
variablesApi.getVariableQuery("$path/$key",
211+
config.jobOpts().region,
212+
config.jobOpts().namespace,
213+
null, null, null, null, null, null, null)
214+
}
190215
variable?.items?.find{ it.key == key }?.value
191216
}
192217

@@ -197,17 +222,22 @@ class NomadService implements Closeable{
197222
void setVariableValue(String path, String key, String value){
198223
var content = Map.of(key,value)
199224
var variable = new Variable(path: path, items: content)
200-
variablesApi.postVariable("$path/$key", variable,
201-
config.jobOpts().region,
202-
config.jobOpts().namespace,
203-
null, null, null)
225+
safeExecutor.apply {
226+
variablesApi.postVariable("$path/$key", variable,
227+
config.jobOpts().region,
228+
config.jobOpts().namespace,
229+
null, null, null)
230+
}
204231
}
205232

206233
List<String> getVariablesList(){
207-
var listRequest = variablesApi.getVariablesListRequest(
208-
config.jobOpts().region,
209-
config.jobOpts().namespace,
210-
null, null, null, null, null, null, null)
234+
var listRequest = safeExecutor.apply {
235+
variablesApi.getVariablesListRequest(
236+
config.jobOpts().region,
237+
config.jobOpts().namespace,
238+
null, null, null, null,
239+
null, null, null)
240+
}
211241
String path = (config.jobOpts().secretOpts?.path ?: '')+"/"
212242
listRequest.collect{ it.path - path}
213243
}
@@ -218,9 +248,11 @@ class NomadService implements Closeable{
218248

219249
void deleteVariable(String path, String key){
220250
var variable = new Variable( items: Map.of(key, ""))
221-
variablesApi.deleteVariable("$path/$key", variable,
222-
config.jobOpts().region,
223-
config.jobOpts().namespace,
224-
null, null, null)
251+
safeExecutor.apply {
252+
variablesApi.deleteVariable("$path/$key", variable,
253+
config.jobOpts().region,
254+
config.jobOpts().namespace,
255+
null, null, null)
256+
}
225257
}
226258
}

plugins/nf-nomad/src/test/nextflow/nomad/NomadDSLSpec.groovy

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,11 @@ class NomadDSLSpec extends Dsl2Spec{
195195
[
196196
client:
197197
[
198-
address : "http://${mockWebServer.hostName}:${mockWebServer.port}"
198+
address : "http://${mockWebServer.hostName}:${mockWebServer.port}",
199+
retryConfig:[
200+
maxAttempts: 1,
201+
delay: '1ms'
202+
]
199203
]
200204
]
201205
]).setScript(SCRIPT).execute()

0 commit comments

Comments
 (0)