Skip to content

Commit 295bc1f

Browse files
committed
Add retry policy on Az blob operations
Signed-off-by: Paolo Di Tommaso <[email protected]>
1 parent 2a36fa7 commit 295bc1f

File tree

3 files changed

+65
-8
lines changed

3 files changed

+65
-8
lines changed

plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileSystem.groovy

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import java.nio.file.PathMatcher
2727
import java.nio.file.WatchService
2828
import java.nio.file.attribute.UserPrincipalLookupService
2929
import java.time.Duration
30+
import java.time.temporal.ChronoUnit
31+
import java.util.function.Predicate
3032

3133
import com.azure.core.util.polling.SyncPoller
3234
import com.azure.storage.blob.BlobServiceClient
@@ -35,9 +37,16 @@ import com.azure.storage.blob.models.BlobCopyInfo
3537
import com.azure.storage.blob.models.BlobItem
3638
import com.azure.storage.blob.models.BlobStorageException
3739
import com.azure.storage.blob.models.ListBlobsOptions
40+
import dev.failsafe.Failsafe
41+
import dev.failsafe.RetryPolicy
42+
import dev.failsafe.event.EventListener
43+
import dev.failsafe.event.ExecutionAttemptedEvent
44+
import dev.failsafe.function.CheckedSupplier
3845
import groovy.transform.CompileStatic
46+
import groovy.transform.Memoized
3947
import groovy.transform.PackageScope
4048
import groovy.util.logging.Slf4j
49+
import nextflow.cloud.azure.config.AzConfig
4150
/**
4251
* Implements a file system for Azure Blob Storage service
4352
*
@@ -69,7 +78,7 @@ class AzFileSystem extends FileSystem {
6978
@PackageScope AzFileSystem() {}
7079

7180
@PackageScope
72-
AzFileSystem(AzFileSystemProvider provider, BlobServiceClient storageClient, String bucket ) {
81+
AzFileSystem(AzFileSystemProvider provider, BlobServiceClient storageClient, String bucket) {
7382
this.provider = provider
7483
this.containerName = bucket
7584
this.storageClient = storageClient
@@ -109,6 +118,10 @@ class AzFileSystem extends FileSystem {
109118
}
110119

111120
private Iterable<? extends Path> listContainers() {
121+
return apply(()-> listContainers0())
122+
}
123+
124+
private Iterable<? extends Path> listContainers0() {
112125
final containers = new ArrayList()
113126
storageClient
114127
.listBlobContainers()
@@ -359,18 +372,18 @@ class AzFileSystem extends FileSystem {
359372
boolean exists = false
360373
boolean isDirectory = false
361374

362-
def opts = new ListBlobsOptions()
375+
final opts = new ListBlobsOptions()
363376
.setPrefix(path.blobName())
364377
.setMaxResultsPerPage(10)
365378
try {
366-
def values = path.containerClient().listBlobs(opts,null).iterator()
379+
final values = apply(()-> path.containerClient().listBlobs(opts,null).iterator())
367380

368381
final char SLASH = '/'
369382
final String name = path.blobName()
370383

371384
int count=0
372-
while( values.hasNext() ) {
373-
BlobItem blob = values.next()
385+
while( apply(()-> values.hasNext()) ) {
386+
BlobItem blob = apply(()-> values.next())
374387
if( blob.name == name )
375388
exists = true
376389
else if( blob.name.startsWith(name) && blob.name.charAt(name.length())==SLASH ) {
@@ -425,7 +438,7 @@ class AzFileSystem extends FileSystem {
425438
}
426439

427440
@PackageScope
428-
AzFileAttributes readAttributes(AzPath path) {
441+
AzFileAttributes readAttributes(AzPath path) {
429442
final cache = path.attributesCache()
430443
if( cache )
431444
return cache
@@ -496,5 +509,42 @@ class AzFileSystem extends FileSystem {
496509
return false
497510
}
498511
}
499-
512+
513+
514+
/**
515+
* Creates a retry policy using the configuration specified by {@link nextflow.cloud.azure.config.AzRetryConfig}
516+
*
517+
* @param cond A predicate that determines when a retry should be triggered
518+
* @return The {@link dev.failsafe.RetryPolicy} instance
519+
*/
520+
@Memoized
521+
protected <T> RetryPolicy<T> retryPolicy(Predicate<? extends Throwable> cond) {
522+
final cfg = AzConfig.getConfig().retryConfig()
523+
final listener = new EventListener<ExecutionAttemptedEvent<T>>() {
524+
@Override
525+
void accept(ExecutionAttemptedEvent<T> event) throws Throwable {
526+
log.debug("Azure I/O exception - attempt: ${event.attemptCount}; cause: ${event.lastFailure?.message}")
527+
}
528+
}
529+
return RetryPolicy.<T>builder()
530+
.handleIf(cond)
531+
.withBackoff(cfg.delay.toMillis(), cfg.maxDelay.toMillis(), ChronoUnit.MILLIS)
532+
.withMaxAttempts(cfg.maxAttempts)
533+
.withJitter(cfg.jitter)
534+
.onRetry(listener)
535+
.build()
536+
}
537+
538+
/**
539+
* Carry out the invocation of the specified action using a retry policy
540+
* when {@code TooManyRequests} Azure Batch error is returned
541+
*
542+
* @param action A {@link dev.failsafe.function.CheckedSupplier} instance modeling the action to be performed in a safe manner
543+
* @return The result of the supplied action
544+
*/
545+
protected <T> T apply(CheckedSupplier<T> action) {
546+
final cond = (e -> e instanceof IOException) as Predicate<? extends Throwable>
547+
final policy = retryPolicy(cond)
548+
return Failsafe.with(policy).get(action)
549+
}
500550
}

plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileSystemProvider.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class AzFileSystemProvider extends FileSystemProvider {
8686
return this.accountKey
8787
}
8888

89-
static private AzPath asAzPath(Path path ) {
89+
static private AzPath asAzPath(Path path) {
9090
if( path !instanceof AzPath )
9191
throw new IllegalArgumentException("Not a valid Azure blob storage path object: `$path` [${path?.class?.name?:'-'}]" )
9292
return (AzPath)path

plugins/nf-azure/src/test/nextflow/cloud/azure/nio/AzNioTest.groovy

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import java.nio.file.attribute.BasicFileAttributes
1717
import com.azure.storage.blob.BlobServiceClient
1818
import com.azure.storage.blob.BlobServiceClientBuilder
1919
import com.azure.storage.common.StorageSharedKeyCredential
20+
import nextflow.Global
21+
import nextflow.Session
2022
import nextflow.exception.AbortOperationException
2123
import nextflow.trace.TraceHelper
2224
import spock.lang.IgnoreIf
@@ -43,8 +45,13 @@ class AzNioTest extends Specification implements AzBaseSpec {
4345
def credential = new StorageSharedKeyCredential(accountName, accountKey);
4446
def endpoint = String.format(Locale.ROOT, "https://%s.blob.core.windows.net", accountName);
4547
storageClient = new BlobServiceClientBuilder().endpoint(endpoint).credential(credential).buildClient();
48+
and:
49+
Global.session = Mock(Session) { getConfig()>>Map.of() }
4650
}
4751

52+
def cleanupSpec() {
53+
Global.session = null
54+
}
4855

4956
def 'should create a blob' () {
5057
given:

0 commit comments

Comments
 (0)