diff --git a/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java b/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java index 63202484..3b3a3136 100644 --- a/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java +++ b/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java @@ -29,6 +29,8 @@ import me.itzg.helpers.errors.GenericException; import me.itzg.helpers.http.SharedFetch.Options; import me.itzg.helpers.json.ObjectMappers; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.slf4j.Logger; import reactor.core.publisher.Mono; import reactor.netty.ByteBufMono; @@ -197,6 +199,25 @@ protected R useReactiveClient(ReactiveClientUser user) { } } + @FunctionalInterface + protected interface HcAsyncClientUser { + void use(CloseableHttpAsyncClient client, Runnable closer); + } + + protected void useHcAsyncClient(HcAsyncClientUser user) { + if (state.sharedFetch != null) { + user.use(state.sharedFetch.getHcAsyncClient(), () -> { + } + ); + } + else { + //noinspection resource since close callback will handle it + SharedFetch sharedFetch = new SharedFetch(state.userAgentCommand, Options.builder().build()); + + user.use(sharedFetch.getHcAsyncClient(), sharedFetch::close); + } + } + protected static BiConsumer debugLogRequest( Logger log, String operation ) { @@ -291,6 +312,25 @@ protected void applyHeaders(io.netty.handler.codec.http.HttpHeaders headers) { state.requestHeaders.forEach(headers::set); } + protected void applyHeaders(SimpleRequestBuilder requestBuilder) { + final Set contentTypes = getAcceptContentTypes(); + if (contentTypes != null && !contentTypes.isEmpty()) { + contentTypes.forEach(s -> requestBuilder.addHeader(ACCEPT.toString(), s)); + } + + if (state.userInfo != null) { + requestBuilder.setHeader( + AUTHORIZATION.toString(), + "Basic " + + Base64.getEncoder().encodeToString( + state.userInfo.getBytes(StandardCharsets.UTF_8) + ) + ); + } + + state.requestHeaders.forEach(requestBuilder::setHeader); + } + static String formatDuration(long millis) { final StringBuilder sb = new StringBuilder(); final long minutes = millis / 60000; diff --git a/src/main/java/me/itzg/helpers/http/MonoSinkFutureCallbackAdapter.java b/src/main/java/me/itzg/helpers/http/MonoSinkFutureCallbackAdapter.java new file mode 100644 index 00000000..0696df6e --- /dev/null +++ b/src/main/java/me/itzg/helpers/http/MonoSinkFutureCallbackAdapter.java @@ -0,0 +1,28 @@ +package me.itzg.helpers.http; + +import org.apache.hc.core5.concurrent.FutureCallback; +import reactor.core.publisher.MonoSink; + +class MonoSinkFutureCallbackAdapter implements FutureCallback { + + private final MonoSink sink; + + public MonoSinkFutureCallbackAdapter(MonoSink sink) { + this.sink = sink; + } + + @Override + public void completed(T result) { + sink.success(result); + } + + @Override + public void failed(Exception ex) { + sink.error(ex); + } + + @Override + public void cancelled() { + sink.success(); + } +} diff --git a/src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java b/src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java index 76886d28..01c754a9 100644 --- a/src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java +++ b/src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java @@ -1,19 +1,27 @@ package me.itzg.helpers.http; -import static io.netty.handler.codec.http.HttpHeaderNames.IF_MODIFIED_SINCE; import static io.netty.handler.codec.http.HttpHeaderNames.LAST_MODIFIED; import static java.util.Objects.requireNonNull; import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpResponseStatus; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.time.Instant; import lombok.Setter; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import me.itzg.helpers.files.ReactiveFileUtils; +import org.apache.hc.client5.http.async.methods.AbstractBinResponseConsumer; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStatus; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -202,40 +210,27 @@ else if (skipUpToDate) { return alreadyUpToDateMono .filter(alreadyUpToDate -> !alreadyUpToDate) - .flatMap(notUsed -> client - .headers(this::applyHeaders) - .headersWhen(headers -> - skipUpToDate ? - fileLastModifiedMono - .map(outputLastModified -> headers.set( - IF_MODIFIED_SINCE, - httpDateTimeFormatter.format(outputLastModified) - )) - .defaultIfEmpty(headers) - : Mono.just(headers) - ) - .followRedirect(true) - .doOnRequest(debugLogRequest(log, "file fetch")) - .doOnRequest( - (httpClientRequest, connection) -> statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile)) - .get() - .uri(resourceUrl) - .response((resp, byteBufFlux) -> { - if (skipUpToDate && resp.status() == HttpResponseStatus.NOT_MODIFIED) { - log.debug("The file {} is already up to date", outputFile); - statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile); - return Mono.just(outputFile); - } - - if (notSuccess(resp)) { - return failedRequestMono(resp, byteBufFlux.aggregate(), "Downloading file"); - } - - return copyBodyInputStreamToFile(byteBufFlux, outputFile); - }) - .last() - .checkpoint("Fetching file into directory") - ) + .flatMap(notUsed -> { + final SimpleRequestBuilder reqBuilder = SimpleRequestBuilder.get(resourceUrl); + applyHeaders(reqBuilder); + + return + fileLastModifiedMono + .map(instant -> reqBuilder.setHeader("If-Modified-Since", httpDateTimeFormatter.format(instant))) + .then( + Mono.create(sink -> { + useHcAsyncClient((hcClient, close) -> { + sink.onDispose(close::run); + + hcClient.execute( + SimpleRequestProducer.create(reqBuilder.build()), + new ResponseToFileConsumer(outputFile), + new MonoSinkFutureCallbackAdapter<>(sink) + ); + }); + }) + ); + }) .defaultIfEmpty(outputFile); } @@ -261,4 +256,68 @@ private String extractFilename(HttpClientResponse resp) { return resp.path().substring(pos + 1); } + private class ResponseToFileConsumer extends AbstractBinResponseConsumer { + + private final Path outputFile; + + private SeekableByteChannel channel; + private long amount; + + public ResponseToFileConsumer(Path outputFile) { + this.outputFile = outputFile; + } + + @Override + public void releaseResources() { + if (channel != null) { + try { + channel.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Override + protected int capacityIncrement() { + return 4096; + } + + @Override + protected void data(ByteBuffer src, boolean endOfStream) throws IOException { + if (channel != null) { + amount += channel.write(src); + if (endOfStream) { + channel.close(); + + statusHandler.call(FileDownloadStatus.DOWNLOADED, uri(), outputFile); + downloadedHandler.call(uri(), outputFile, amount); + } + } + } + + @Override + protected void start(HttpResponse response, + ContentType contentType + ) throws HttpException, IOException { + if (skipUpToDate && response.getCode() == HttpStatus.SC_NOT_MODIFIED) { + log.debug("The file {} is already up to date", outputFile); + statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile); + return; + } + + statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile); + + channel = Files.newByteChannel(outputFile, + StandardOpenOption.WRITE, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING + ); + } + + @Override + protected Path buildResult() { + return outputFile; + } + } } diff --git a/src/main/java/me/itzg/helpers/http/SharedFetch.java b/src/main/java/me/itzg/helpers/http/SharedFetch.java index 2ab46258..ab5cd4b1 100644 --- a/src/main/java/me/itzg/helpers/http/SharedFetch.java +++ b/src/main/java/me/itzg/helpers/http/SharedFetch.java @@ -1,6 +1,7 @@ package me.itzg.helpers.http; import io.netty.handler.codec.http.HttpHeaderNames; +import java.io.IOException; import java.net.URI; import java.time.Duration; import java.util.HashMap; @@ -12,6 +13,8 @@ import lombok.extern.slf4j.Slf4j; import me.itzg.helpers.McImageHelper; import me.itzg.helpers.errors.GenericException; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; import reactor.netty.http.Http11SslContextSpec; import reactor.netty.http.client.HttpClient; import reactor.netty.resources.ConnectionProvider; @@ -32,6 +35,8 @@ public class SharedFetch implements AutoCloseable { final LatchingUrisInterceptor latchingUrisInterceptor = new LatchingUrisInterceptor(); private final HttpClient reactiveClient; + private final CloseableHttpAsyncClient hcAsyncClient; + private boolean hcAsyncClientStarted = false; private final URI filesViaUrl; @@ -78,6 +83,16 @@ public SharedFetch(String forCommand, Options options) { headers.put("x-fetch-session", fetchSessionId); this.filesViaUrl = options.getFilesViaUrl(); + + hcAsyncClient = HttpAsyncClients.createSystem(); + } + + public synchronized CloseableHttpAsyncClient getHcAsyncClient() { + if (!hcAsyncClientStarted) { + hcAsyncClient.start(); + hcAsyncClientStarted = true; + } + return hcAsyncClient; } public FetchBuilderBase fetch(URI uri) { @@ -92,6 +107,11 @@ public SharedFetch addHeader(String name, String value) { @Override public void close() { + try { + hcAsyncClient.close(); + } catch (IOException e) { + log.warn("Failed to close async client for shared fetch", e); + } } @Builder diff --git a/src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java b/src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java index 206685f4..cd1c5ed9 100644 --- a/src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java +++ b/src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java @@ -20,6 +20,7 @@ import me.itzg.helpers.http.FailedRequestException; import me.itzg.helpers.http.Fetch; import me.itzg.helpers.http.SharedFetch; +import me.itzg.helpers.http.SharedFetch.Options; import me.itzg.helpers.http.Uris; import org.jetbrains.annotations.Blocking; import org.reactivestreams.Publisher; @@ -82,13 +83,15 @@ public Integer call() throws Exception { Files.createDirectories(dest); - Flux.fromIterable(sources) - .map(String::trim) - .filter(s -> !s.isEmpty()) - .flatMap(source -> processSource(source, fileIsListingOption)) - .collectList() - .flatMap(this::cleanupAndSaveManifest) - .block(); + try (SharedFetch sharedFetch = Fetch.sharedFetch("mcopy", Options.builder().build())) { + Flux.fromIterable(sources) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .flatMap(source -> processSource(sharedFetch, source, fileIsListingOption)) + .collectList() + .flatMap(this::cleanupAndSaveManifest) + .block(); + } return ExitCode.OK; } @@ -113,9 +116,9 @@ private Mono cleanupAndSaveManifest(List paths) { }); } - private Publisher processSource(String source, boolean fileIsListing) { + private Publisher processSource(SharedFetch sharedFetch, String source, boolean fileIsListing) { if (Uris.isUri(source)) { - return fileIsListing ? processRemoteListingFile(source) : processRemoteSource(source); + return fileIsListing ? processRemoteListingFile(source) : processRemoteSource(sharedFetch, source); } else { final Path path = Paths.get(source); if (!Files.exists(path)) { @@ -125,12 +128,12 @@ private Publisher processSource(String source, boolean fileIsListing) { if (Files.isDirectory(path)) { return processDirectory(path); } else { - return fileIsListing ? processListingFile(path) : processFile(path); + return fileIsListing ? processListingFile(sharedFetch, path) : processFile(path); } } } - private Flux processListingFile(Path listingFile) { + private Flux processListingFile(SharedFetch sharedFetch, Path listingFile) { return Mono.just(listingFile) .publishOn(Schedulers.boundedElastic()) .flatMapMany(path -> { @@ -139,7 +142,7 @@ private Flux processListingFile(Path listingFile) { final List lines = Files.readAllLines(path); return Flux.fromIterable(lines) .filter(this::isListingLine) - .flatMap(src -> processSource(src, + .flatMap(src -> processSource(sharedFetch, src, // avoid recursive file-listing processing false)); } catch (IOException e) { @@ -230,8 +233,8 @@ private Flux processDirectory(Path srcDir) { }); } - private Mono processRemoteSource(String source) { - return Fetch.fetch(URI.create(source)) + private Mono processRemoteSource(SharedFetch sharedFetch, String source) { + return sharedFetch.fetch(URI.create(source)) .userAgentCommand("mcopy") .toDirectory(dest) .skipUpToDate(skipUpToDate) @@ -273,7 +276,7 @@ private Flux processRemoteListingFile(String source) { .flatMapMany(content -> Flux.just(content.split("\\r?\\n"))) .filter(this::isListingLine) ) - .flatMap(this::processRemoteSource) + .flatMap(sourceInListing -> processRemoteSource(sharedFetch, sourceInListing)) .doOnTerminate(sharedFetch::close) .checkpoint("Processing remote listing at " + source, true); }