diff --git a/src/main/java/me/itzg/helpers/http/ConcurrencyLimiter.java b/src/main/java/me/itzg/helpers/http/ConcurrencyLimiter.java new file mode 100644 index 00000000..cd0fee98 --- /dev/null +++ b/src/main/java/me/itzg/helpers/http/ConcurrencyLimiter.java @@ -0,0 +1,16 @@ +package me.itzg.helpers.http; + +import reactor.core.publisher.Mono; + +@FunctionalInterface +public interface ConcurrencyLimiter { + + ConcurrencyLimiter NOOP_LIMITER = new ConcurrencyLimiter() { + @Override + public Mono limit(Mono source) { + return source; + } + }; + + Mono limit(Mono source); +} diff --git a/src/main/java/me/itzg/helpers/http/ConcurrencyLimiterImpl.java b/src/main/java/me/itzg/helpers/http/ConcurrencyLimiterImpl.java new file mode 100644 index 00000000..ea44ff4b --- /dev/null +++ b/src/main/java/me/itzg/helpers/http/ConcurrencyLimiterImpl.java @@ -0,0 +1,27 @@ +package me.itzg.helpers.http; + +import java.util.concurrent.Semaphore; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class ConcurrencyLimiterImpl implements ConcurrencyLimiter { + + private final Semaphore semaphore; + + public ConcurrencyLimiterImpl(int concurrency) { + this.semaphore = new Semaphore(concurrency); + } + + @Override + public Mono limit(Mono source) { + return Mono.using( + () -> { + semaphore.acquire(); + return Boolean.TRUE; + }, + r -> source, + r -> semaphore.release() + ) + .subscribeOn(Schedulers.boundedElastic()); + } +} diff --git a/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java b/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java index 543d5ec0..218a2655 100644 --- a/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java +++ b/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java @@ -142,6 +142,11 @@ protected URI uri() { return state.uri; } + protected ConcurrencyLimiter getConcurrencyLimiter() { + return state.sharedFetch != null ? state.sharedFetch.getConcurrencyLimiter() + : ConcurrencyLimiter.NOOP_LIMITER; + } + public Set getAcceptContentTypes() { return state.acceptContentTypes; } diff --git a/src/main/java/me/itzg/helpers/http/SharedFetch.java b/src/main/java/me/itzg/helpers/http/SharedFetch.java index 72c36c93..6d1ae723 100644 --- a/src/main/java/me/itzg/helpers/http/SharedFetch.java +++ b/src/main/java/me/itzg/helpers/http/SharedFetch.java @@ -33,6 +33,8 @@ public class SharedFetch implements AutoCloseable { private final HttpClient reactiveClient; + private final ConcurrencyLimiter concurrencyLimiter; + public SharedFetch(String forCommand, Options options) { final String userAgent = String.format("%s/%s/%s (cmd=%s)", "itzg", @@ -74,6 +76,8 @@ public SharedFetch(String forCommand, Options options) { ); headers.put("x-fetch-session", fetchSessionId); + + concurrencyLimiter = new ConcurrencyLimiterImpl(options.getConcurrentFileDownloads()); } public FetchBuilderBase fetch(URI uri) { @@ -116,6 +120,9 @@ public static class Options { @Default private final Duration pendingAcquireTimeout = Duration.ofSeconds(120); + @Default + private final int concurrentFileDownloads = 10; + private final Map extraHeaders; public Options withHeader(String key, String value) { @@ -124,7 +131,7 @@ public Options withHeader(String key, String value) { newHeaders.put(key, value); return new Options( - responseTimeout, tlsHandshakeTimeout, maxIdleTimeout, pendingAcquireTimeout, + responseTimeout, tlsHandshakeTimeout, maxIdleTimeout, pendingAcquireTimeout, concurrentFileDownloads, newHeaders ); } diff --git a/src/main/java/me/itzg/helpers/http/SharedFetchArgs.java b/src/main/java/me/itzg/helpers/http/SharedFetchArgs.java index 6ca4d75b..e64461b3 100644 --- a/src/main/java/me/itzg/helpers/http/SharedFetchArgs.java +++ b/src/main/java/me/itzg/helpers/http/SharedFetchArgs.java @@ -48,6 +48,14 @@ public void setPendingAcquireTimeout(Duration timeout) { optionsBuilder.pendingAcquireTimeout(timeout); } + @Option(names = "--concurrent-file-downloads", defaultValue = "${env:FETCH_CONCURRENT_FILE_DOWNLOADS:-10}", + paramLabel = "COUNT", + description = "The maximum number of concurrent file downloads. Default: ${DEFAULT-VALUE}" + ) + public void setConcurrentFileDownloads(int concurrentFileDownloads) { + optionsBuilder.concurrentFileDownloads(concurrentFileDownloads); + } + public Options options() { return optionsBuilder.build(); } diff --git a/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java b/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java index 000aa411..58ce3e5e 100644 --- a/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java +++ b/src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java @@ -5,6 +5,7 @@ import static java.lang.System.currentTimeMillis; import static java.util.Objects.requireNonNull; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpResponseStatus; import java.io.IOException; import java.net.URI; @@ -17,6 +18,8 @@ import me.itzg.helpers.errors.GenericException; import me.itzg.helpers.files.ReactiveFileUtils; import reactor.core.publisher.Mono; +import reactor.netty.ByteBufFlux; +import reactor.netty.http.client.HttpClientResponse; @Slf4j @Accessors(fluent = true) @@ -65,69 +68,78 @@ public Mono assemble() { final boolean useIfModifiedSince = skipUpToDate && Files.exists(file); return useReactiveClient(client -> - client - .doOnRequest((httpClientRequest, connection) -> - statusHandler.call(FileDownloadStatus.DOWNLOADING, uri, file) - ) - .headers(headers -> { - if (useIfModifiedSince) { - try { - final FileTime lastModifiedTime; - lastModifiedTime = Files.getLastModifiedTime(file); - headers.set( - IF_MODIFIED_SINCE, - httpDateTimeFormatter.format(lastModifiedTime.toInstant()) + getConcurrencyLimiter().limit( + client + .doOnRequest((httpClientRequest, connection) -> + statusHandler.call(FileDownloadStatus.DOWNLOADING, uri, file) + ) + .headers(headers -> + setupHeaders(headers, useIfModifiedSince) + ) + .followRedirect(true) + .doOnRequest(debugLogRequest(log, "file fetch")) + .get() + .uri(uri) + .response((resp, byteBufFlux) -> + processResponse(resp, byteBufFlux, useIfModifiedSince, uri) + ) + .last() + .contextWrite(context -> context.put("downloadStart", currentTimeMillis())) + ) + ); + } + + private Mono processResponse(HttpClientResponse resp, ByteBufFlux byteBufFlux, boolean useIfModifiedSince, URI uri) { + final HttpResponseStatus status = resp.status(); + + if (useIfModifiedSince && status == NOT_MODIFIED) { + log.debug("The file {} is already up to date", file); + statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri, file); + return Mono.just(file); + } + + if (notSuccess(resp)) { + return failedRequestMono(resp, byteBufFlux.aggregate(), "Trying to retrieve file"); + } + + if (notExpectedContentType(resp)) { + return failedContentTypeMono(resp); + } + + return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, file) + .flatMap(fileSize -> { + statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file); + downloadedHandler.call(uri, file, fileSize); + return Mono + .deferContextual(contextView -> { + if (log.isDebugEnabled()) { + final long durationMillis = + currentTimeMillis() - contextView.get("downloadStart"); + log.debug("Download of {} took {} at {}", + uri, formatDuration(durationMillis), transferRate(durationMillis, fileSize) ); - } catch (IOException e) { - throw new GenericException("Unable to get last modified time of " + file, e); } + return Mono.just(file); + }); + }); + } - } + private void setupHeaders(HttpHeaders headers, boolean useIfModifiedSince) { + if (useIfModifiedSince) { + try { + final FileTime lastModifiedTime; + lastModifiedTime = Files.getLastModifiedTime(file); + headers.set( + IF_MODIFIED_SINCE, + httpDateTimeFormatter.format(lastModifiedTime.toInstant()) + ); + } catch (IOException e) { + throw new GenericException("Unable to get last modified time of " + file, e); + } - applyHeaders(headers); - }) - .followRedirect(true) - .doOnRequest(debugLogRequest(log, "file fetch")) - .get() - .uri(uri) - .response((resp, byteBufFlux) -> { - final HttpResponseStatus status = resp.status(); + } - if (useIfModifiedSince && status == NOT_MODIFIED) { - log.debug("The file {} is already up to date", file); - statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri, file); - return Mono.just(file); - } - - if (notSuccess(resp)) { - return failedRequestMono(resp, byteBufFlux.aggregate(), "Trying to retrieve file"); - } - - if (notExpectedContentType(resp)) { - return failedContentTypeMono(resp); - } - - return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, file) - .flatMap(fileSize -> { - statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file); - downloadedHandler.call(uri, file, fileSize); - return Mono - .deferContextual(contextView -> { - if (log.isDebugEnabled()) { - final long durationMillis = - currentTimeMillis() - contextView.get("downloadStart"); - log.debug("Download of {} took {} at {}", - uri, formatDuration(durationMillis), transferRate(durationMillis, fileSize) - ); - } - return Mono.just(file); - }); - }); - - }) - .last() - .contextWrite(context -> context.put("downloadStart", currentTimeMillis())) - ); + applyHeaders(headers); } }