Skip to content

Commit 6e66aaa

Browse files
adamrtalbotpditommasoclaude
authored
Handle Azure Batch ActiveJobAndScheduleQuotaReached with retry (#6874)
* Handle Azure Batch ActiveJobAndScheduleQuotaReached with retry (#5575) Add retry logic when Azure Batch returns HTTP 409 with ActiveJobAndScheduleQuotaReached error code during job creation, instead of failing immediately. Configurable via azure.batch.maxJobQuotaRetries (default 3) and azure.batch.jobQuotaRetryDelay (default 2 min). Generated by Claude Code Signed-off-by: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com> * Refactor job quota retry to use Failsafe RetryPolicy Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com> --------- Signed-off-by: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com> Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com> Co-authored-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent ab6be6d commit 6e66aaa

File tree

3 files changed

+211
-1
lines changed

3 files changed

+211
-1
lines changed

plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,10 +451,59 @@ class AzBatchService implements Closeable {
451451
content.setConstraints(createJobConstraints(config.batch().jobMaxWallClockTime))
452452
}
453453

454-
apply(() -> client.createJob(content))
454+
applyCreateJob(content)
455455
return jobId
456456
}
457457

458+
protected void applyCreateJob(BatchJobCreateContent content) {
459+
final maxRetries = config.batch().maxJobQuotaRetries
460+
final retryDelay = config.batch().jobQuotaRetryDelay
461+
462+
// define retry condition for job quota errors
463+
final cond = new Predicate<? extends Throwable>() {
464+
@Override
465+
boolean test(Throwable t) {
466+
return t instanceof HttpResponseException && isJobQuotaError((HttpResponseException) t)
467+
}
468+
}
469+
470+
final listener = new EventListener<ExecutionAttemptedEvent<Object>>() {
471+
@Override
472+
void accept(ExecutionAttemptedEvent<Object> event) throws Throwable {
473+
log.warn "Azure Batch active job quota reached - waiting ${retryDelay} before retry (attempt ${event.attemptCount} of ${maxRetries})"
474+
}
475+
}
476+
477+
// create a retry policy with fixed delay for quota errors
478+
final policy = RetryPolicy.builder()
479+
.handleIf(cond)
480+
.withDelay(Duration.ofMillis(retryDelay.toMillis()))
481+
.withMaxAttempts(maxRetries + 1)
482+
.onRetry(listener)
483+
.build()
484+
485+
try {
486+
Failsafe.with(policy).get(() -> { createJobRequest(content); return null })
487+
}
488+
catch (HttpResponseException e) {
489+
if (isJobQuotaError(e))
490+
throw new IllegalStateException("Azure Batch active job quota reached - exceeded maximum number of retries ($maxRetries). Consider increasing 'azure.batch.maxJobQuotaRetries' or reducing the number of concurrent jobs", e)
491+
throw e
492+
}
493+
}
494+
495+
protected void createJobRequest(BatchJobCreateContent content) {
496+
apply(() -> client.createJob(content))
497+
}
498+
499+
protected boolean isJobQuotaError(HttpResponseException e) {
500+
if (e.response.statusCode != 409)
501+
return false
502+
if (e.message?.contains('ActiveJobAndScheduleQuotaReached'))
503+
return true
504+
return toString(e.response.body)?.contains('ActiveJobAndScheduleQuotaReached')
505+
}
506+
458507
String makeJobId(TaskRun task) {
459508
final name = task
460509
.processor

plugins/nf-azure/src/main/nextflow/cloud/azure/config/AzBatchOpts.groovy

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,18 @@ class AzBatchOpts implements ConfigScope, CloudTransferOptions {
129129
""")
130130
final Boolean terminateJobsOnCompletion
131131

132+
@ConfigOption
133+
@Description("""
134+
The maximum number of attempts to create a job when the active job quota has been reached (default: `3`). Set to `0` to fail immediately without retrying.
135+
""")
136+
final int maxJobQuotaRetries
137+
138+
@ConfigOption
139+
@Description("""
140+
The delay between attempts to create a job when the active job quota has been reached (default: `'2 min'`).
141+
""")
142+
final Duration jobQuotaRetryDelay
143+
132144
AzBatchOpts(Map config, Map<String,String> env=null) {
133145
assert config!=null
134146
sysEnv = env==null ? new HashMap<String,String>(System.getenv()) : env
@@ -149,6 +161,8 @@ class AzBatchOpts implements ConfigScope, CloudTransferOptions {
149161
maxTransferAttempts = config.maxTransferAttempts ? config.maxTransferAttempts as int : MAX_TRANSFER_ATTEMPTS
150162
delayBetweenAttempts = config.delayBetweenAttempts ? config.delayBetweenAttempts as Duration : DEFAULT_DELAY_BETWEEN_ATTEMPTS
151163
copyToolInstallMode = config.copyToolInstallMode as CopyToolInstallMode
164+
maxJobQuotaRetries = config.maxJobQuotaRetries != null ? config.maxJobQuotaRetries as int : 3
165+
jobQuotaRetryDelay = config.jobQuotaRetryDelay ? config.jobQuotaRetryDelay as Duration : Duration.of('2 min')
152166
}
153167

154168
static Map<String,AzPoolOpts> parsePools(Map<String,Map> pools) {

plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchServiceTest.groovy

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.time.temporal.ChronoUnit
2323
import java.util.function.Predicate
2424

2525
import com.azure.compute.batch.models.BatchPool
26+
import com.azure.compute.batch.models.BatchJobCreateContent
2627
import com.azure.compute.batch.models.ElevationLevel
2728
import com.azure.compute.batch.models.EnvironmentSetting
2829
import com.azure.core.exception.HttpResponseException
@@ -1038,4 +1039,150 @@ class AzBatchServiceTest extends Specification {
10381039
]
10391040
}
10401041

1042+
// -- applyCreateJob tests --
1043+
1044+
def 'should create job successfully on first try'() {
1045+
given:
1046+
def config = new AzConfig([batch: [maxJobQuotaRetries: 2, jobQuotaRetryDelay: '1 sec']])
1047+
def exec = createExecutor(config)
1048+
int createCalls = 0
1049+
def service = new AzBatchService(exec) {
1050+
@Override
1051+
protected void createJobRequest(BatchJobCreateContent content) {
1052+
createCalls++
1053+
}
1054+
}
1055+
1056+
when:
1057+
service.applyCreateJob(null)
1058+
1059+
then:
1060+
noExceptionThrown()
1061+
createCalls == 1
1062+
}
1063+
1064+
def 'should retry on ActiveJobAndScheduleQuotaReached and then succeed'() {
1065+
given:
1066+
def config = new AzConfig([batch: [maxJobQuotaRetries: 2, jobQuotaRetryDelay: '1 sec']])
1067+
def exec = createExecutor(config)
1068+
int createCalls = 0
1069+
def service = new AzBatchService(exec) {
1070+
@Override
1071+
protected void createJobRequest(BatchJobCreateContent content) {
1072+
createCalls++
1073+
if (createCalls == 1)
1074+
throw new HttpResponseException('first call', null)
1075+
}
1076+
@Override
1077+
protected boolean isJobQuotaError(HttpResponseException e) {
1078+
return true
1079+
}
1080+
}
1081+
1082+
when:
1083+
service.applyCreateJob(null)
1084+
1085+
then:
1086+
noExceptionThrown()
1087+
createCalls == 2
1088+
}
1089+
1090+
def 'should throw after exceeding max job quota retries'() {
1091+
given:
1092+
def config = new AzConfig([batch: [maxJobQuotaRetries: 2, jobQuotaRetryDelay: '1 sec']])
1093+
def exec = createExecutor(config)
1094+
int createCalls = 0
1095+
def service = new AzBatchService(exec) {
1096+
@Override
1097+
protected void createJobRequest(BatchJobCreateContent content) {
1098+
createCalls++
1099+
throw new HttpResponseException('quota error', null)
1100+
}
1101+
@Override
1102+
protected boolean isJobQuotaError(HttpResponseException e) {
1103+
return true
1104+
}
1105+
}
1106+
1107+
when:
1108+
service.applyCreateJob(null)
1109+
1110+
then:
1111+
def e = thrown(IllegalStateException)
1112+
e.message.contains('exceeded maximum number of retries')
1113+
e.message.contains('2')
1114+
createCalls == 3 // initial + 2 retries
1115+
}
1116+
1117+
def 'should fail immediately when maxJobQuotaRetries is 0'() {
1118+
given:
1119+
def config = new AzConfig([batch: [maxJobQuotaRetries: 0, jobQuotaRetryDelay: '1 sec']])
1120+
def exec = createExecutor(config)
1121+
int createCalls = 0
1122+
def service = new AzBatchService(exec) {
1123+
@Override
1124+
protected void createJobRequest(BatchJobCreateContent content) {
1125+
createCalls++
1126+
throw new HttpResponseException('quota error', null)
1127+
}
1128+
@Override
1129+
protected boolean isJobQuotaError(HttpResponseException e) {
1130+
return true
1131+
}
1132+
}
1133+
1134+
when:
1135+
service.applyCreateJob(null)
1136+
1137+
then:
1138+
def e = thrown(IllegalStateException)
1139+
e.message.contains('exceeded maximum number of retries')
1140+
createCalls == 1
1141+
}
1142+
1143+
def 'should not retry on non-quota 409 error'() {
1144+
given:
1145+
def config = new AzConfig([batch: [maxJobQuotaRetries: 2, jobQuotaRetryDelay: '1 sec']])
1146+
def exec = createExecutor(config)
1147+
int createCalls = 0
1148+
def service = new AzBatchService(exec) {
1149+
@Override
1150+
protected void createJobRequest(BatchJobCreateContent content) {
1151+
createCalls++
1152+
throw new HttpResponseException('Job already exists', null)
1153+
}
1154+
@Override
1155+
protected boolean isJobQuotaError(HttpResponseException e) {
1156+
return false
1157+
}
1158+
}
1159+
1160+
when:
1161+
service.applyCreateJob(null)
1162+
1163+
then:
1164+
thrown(HttpResponseException)
1165+
createCalls == 1
1166+
}
1167+
1168+
def 'should not retry on non-HttpResponseException'() {
1169+
given:
1170+
def config = new AzConfig([batch: [maxJobQuotaRetries: 2, jobQuotaRetryDelay: '1 sec']])
1171+
def exec = createExecutor(config)
1172+
int createCalls = 0
1173+
def service = new AzBatchService(exec) {
1174+
@Override
1175+
protected void createJobRequest(BatchJobCreateContent content) {
1176+
createCalls++
1177+
throw new IllegalArgumentException('unexpected error')
1178+
}
1179+
}
1180+
1181+
when:
1182+
service.applyCreateJob(null)
1183+
1184+
then:
1185+
thrown(IllegalArgumentException)
1186+
createCalls == 1
1187+
}
10411188
}

0 commit comments

Comments
 (0)