1616
1717package nextflow.datasource
1818
19- import dev.failsafe.Failsafe
20- import dev.failsafe.RetryPolicy
21- import dev.failsafe.event.EventListener
22- import dev.failsafe.event.ExecutionAttemptedEvent
23- import dev.failsafe.function.CheckedSupplier
24-
19+ import java.net.http.HttpClient
20+ import java.net.http.HttpRequest
2521import java.nio.file.NoSuchFileException
2622import java.nio.file.Path
2723
@@ -32,6 +28,8 @@ import groovy.util.logging.Slf4j
3228import groovy.xml.XmlParser
3329import groovyx.gpars.dataflow.DataflowQueue
3430import groovyx.gpars.dataflow.DataflowWriteChannel
31+ import io.seqera.http.HxClient
32+ import io.seqera.http.HxConfig
3533import nextflow.Channel
3634import nextflow.Const
3735import nextflow.Global
@@ -40,11 +38,6 @@ import nextflow.extension.FilesEx
4038import nextflow.file.FileHelper
4139import nextflow.util.CacheHelper
4240import nextflow.util.Duration
43-
44- import java.time.temporal.ChronoUnit
45- import java.util.function.Predicate
46- import java.util.regex.Pattern
47-
4841/**
4942 * Query NCBI SRA database and returns the retrieved FASTQs to the specified
5043 * target channel. Inspired to SRA-Explorer by Phil Ewels -- https://ewels.github.io/sra-explorer/
@@ -55,8 +48,7 @@ import java.util.regex.Pattern
5548class SraExplorer {
5649
5750 static public Map PARAMS = [apiKey :[String ,GString ], cache : Boolean , max : Integer , protocol : [' ftp' ,' http' ,' https' ], retryPolicy : Map ]
58- final static public List<Integer > RETRY_CODES = List . of(408 , 429 , 500 , 502 , 503 , 504 )
59- final static private Pattern ERROR_PATTERN = ~/ Server returned HTTP response code: (\d +) for URL.*/
51+ final static private Set<Integer > RETRY_CODES = Set . of(408 , 429 , 500 , 502 , 503 , 504 )
6052
6153 @ToString
6254 static class SearchRecord {
@@ -80,7 +72,8 @@ class SraExplorer {
8072 private List<String > missing = new ArrayList<> ()
8173 private Path cacheFolder
8274 private String protocol = ' ftp'
83- private SraRetryConfig retryConfig = new SraRetryConfig ()
75+ private SraRetryConfig retryConfig
76+ private HxClient httpClient
8477
8578 String apiKey
8679 boolean useCache = true
@@ -171,7 +164,6 @@ class SraExplorer {
171164 }
172165
173166 protected void query1 (String query ) {
174-
175167 def url = getSearchUrl(query)
176168 def result = makeSearch(url)
177169 int index = result. retstart ?: 0
@@ -184,7 +176,6 @@ class SraExplorer {
184176 parseDataResponse(data)
185177 index + = entriesPerChunk
186178 }
187-
188179 }
189180
190181 protected String getSearchUrl (String term ) {
@@ -197,7 +188,7 @@ class SraExplorer {
197188
198189 protected Map makeDataRequest (String url ) {
199190 log. debug " SRA data request url=$url "
200- final text = runWithRetry(() -> getTextFormUrl(url) )
191+ final text = getTextFormUrl(url)
201192
202193 log. trace " SRA data result:\n ${ pretty(text)?.indent()} "
203194 def response = jsonSlurper. parseText(text)
@@ -236,15 +227,15 @@ class SraExplorer {
236227
237228 protected SearchRecord makeSearch (String url ) {
238229 log. debug " SRA search url=$url "
239- final text = runWithRetry(() -> getTextFormUrl(url) )
230+ final text = getTextFormUrl(url)
240231
241232 log. trace " SRA search result:\n ${ pretty(text)?.indent()} "
242233 final response = jsonSlurper. parseText(text)
243234
244235 if ( response instanceof Map && response. esearchresult instanceof Map ) {
245236 def search = (Map )response. esearchresult
246237 def result = new SearchRecord ()
247- result. count = search. count as Integer
238+ result. count = search. count as Integer
248239 result. retmax = search. retmax as Integer
249240 result. retstart = search. retstart as Integer
250241 result. querykey = search. querykey
@@ -281,14 +272,42 @@ class SraExplorer {
281272 return result
282273 }
283274
284- protected static String getTextFormUrl (String url ) {
285- new URI (url). toURL(). getText()
275+ protected HxClient getHttpClient () {
276+ if ( httpClient!= null )
277+ return httpClient
278+
279+ if ( retryConfig== null )
280+ retryConfig = new SraRetryConfig ()
281+
282+ final config = HxConfig . newBuilder()
283+ .retryConfig(retryConfig)
284+ .retryStatusCodes(RETRY_CODES )
285+ .build()
286+
287+ httpClient = HxClient . newBuilder()
288+ .httpClient(HttpClient . newBuilder()
289+ .version(HttpClient.Version . HTTP_1_1 )
290+ .followRedirects(HttpClient.Redirect . NORMAL )
291+ .build())
292+ .retryConfig(config)
293+ .build()
294+
295+ return httpClient
296+ }
297+
298+ protected String getTextFormUrl (String url ) {
299+ final request = HttpRequest . newBuilder()
300+ .uri(new URI (url))
301+ .GET ()
302+ .build()
303+ final response = getHttpClient(). sendAsString(request)
304+ return response. body()
286305 }
287306
288307 protected String readRunUrl (String acc ) {
289308 final url = " https://www.ebi.ac.uk/ena/portal/api/filereport?result=read_run&fields=fastq_ftp&accession=$acc "
290309 log. debug " SRA fetch ftp fastq url=$url "
291- String result = runWithRetry(() -> getTextFormUrl(url) ). trim()
310+ String result = getTextFormUrl(url). trim()
292311 log. trace " SRA fetch ftp fastq url result:\n ${ result?.indent()} "
293312
294313 if ( result. indexOf(' \n ' )== -1 ) {
@@ -326,7 +345,6 @@ class SraExplorer {
326345 return result. size()== 1 ? result[0 ] : result
327346 }
328347
329-
330348 protected parseXml (String str ) {
331349 if ( ! str )
332350 return null
@@ -337,79 +355,18 @@ class SraExplorer {
337355
338356 /**
339357 * NCBI search https://www.ncbi.nlm.nih.gov/books/NBK25499/#chapter4.ESummary
340- *
358+ *
341359 * @param result
342360 * @return
343361 */
344362 protected String getFetchUrl (String key , String webenv , int retstart , int retmax ) {
345-
346363 def url = " https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?db=sra&retmode=json&query_key=${ key} &WebEnv=${ webenv} &retstart=$retstart &retmax=$retmax "
347364 if ( apiKey )
348365 url + = " &api_key=$apiKey "
349366
350367 return url
351368 }
352369
353- /**
354- * Creates a retry policy using the SRA retry configuration
355- *
356- * @param cond A predicate that determines when a retry should be triggered
357- * @return The {@link dev.failsafe.RetryPolicy} instance
358- */
359- protected <T> RetryPolicy<T> retryPolicy (Predicate<? extends Throwable > cond ) {
360- final EventListener<ExecutionAttemptedEvent > listener = new EventListener<ExecutionAttemptedEvent > () {
361- @Override
362- void accept (ExecutionAttemptedEvent event ) throws Throwable {
363- log. debug(" Retryable response error - attempt: ${ event.attemptCount} ; reason: ${ event.lastFailure.message} " )
364- }
365- }
366- return RetryPolicy . < T> builder()
367- .handleIf(cond)
368- .withBackoff(retryConfig. delay. toMillis(), retryConfig. maxDelay. toMillis(), ChronoUnit . MILLIS )
369- .withMaxAttempts(retryConfig. maxAttempts)
370- .withJitter(retryConfig. jitter)
371- .onRetry(listener)
372- .build()
373- }
374-
375- /**
376- * Carry out the invocation of the specified action using a retry policy
377- * when {@link java.io.IOException} is returned containing an error code.
378- *
379- * @param action A {@link dev.failsafe.function.CheckedSupplier} instance modeling the action to be performed in a safe manner
380- * @return The result of the supplied action
381- */
382- protected <T> T runWithRetry (CheckedSupplier<T> action ) {
383- // define listener
384- final listener = new EventListener<ExecutionAttemptedEvent > () {
385- @Override
386- void accept (ExecutionAttemptedEvent event ) throws Throwable {
387- log. debug(" Retryable response error - attempt: ${ event.attemptCount} ; reason: ${ event.lastFailure.message} " )
388- }
389- }
390- // define the retry condition
391- final cond = new Predicate<? extends Throwable > () {
392- @Override
393- boolean test (Throwable t ) {
394- if ( t instanceof IOException && containsErrorCodes(t. message, RETRY_CODES ))
395- return true
396- if (t. cause instanceof IOException && containsErrorCodes(t. cause. message, RETRY_CODES ))
397- return true
398- return false
399- }
400- }
401- // create the retry policy
402- def policy = retryPolicy(cond)
403- // apply the action with
404- return Failsafe . with(policy). get(action)
405- }
406-
407- static boolean containsErrorCodes (String message , List<Integer > codes ){
408- def matcher = (message =~ ERROR_PATTERN )
409- def httpCode = matcher ? matcher[0 ][1 ] as Integer : null
410- return httpCode != null && codes. contains(httpCode)
411- }
412-
413370}
414371
415372
0 commit comments