diff --git a/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java b/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java index 3b3a3136..63202484 100644 --- a/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java +++ b/src/main/java/me/itzg/helpers/http/FetchBuilderBase.java @@ -29,8 +29,6 @@ import me.itzg.helpers.errors.GenericException; import me.itzg.helpers.http.SharedFetch.Options; import me.itzg.helpers.json.ObjectMappers; -import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; -import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.slf4j.Logger; import reactor.core.publisher.Mono; import reactor.netty.ByteBufMono; @@ -199,25 +197,6 @@ protected R useReactiveClient(ReactiveClientUser user) { } } - @FunctionalInterface - protected interface HcAsyncClientUser { - void use(CloseableHttpAsyncClient client, Runnable closer); - } - - protected void useHcAsyncClient(HcAsyncClientUser user) { - if (state.sharedFetch != null) { - user.use(state.sharedFetch.getHcAsyncClient(), () -> { - } - ); - } - else { - //noinspection resource since close callback will handle it - SharedFetch sharedFetch = new SharedFetch(state.userAgentCommand, Options.builder().build()); - - user.use(sharedFetch.getHcAsyncClient(), sharedFetch::close); - } - } - protected static BiConsumer debugLogRequest( Logger log, String operation ) { @@ -312,25 +291,6 @@ protected void applyHeaders(io.netty.handler.codec.http.HttpHeaders headers) { state.requestHeaders.forEach(headers::set); } - protected void applyHeaders(SimpleRequestBuilder requestBuilder) { - final Set contentTypes = getAcceptContentTypes(); - if (contentTypes != null && !contentTypes.isEmpty()) { - contentTypes.forEach(s -> requestBuilder.addHeader(ACCEPT.toString(), s)); - } - - if (state.userInfo != null) { - requestBuilder.setHeader( - AUTHORIZATION.toString(), - "Basic " + - Base64.getEncoder().encodeToString( - state.userInfo.getBytes(StandardCharsets.UTF_8) - ) - ); - } - - state.requestHeaders.forEach(requestBuilder::setHeader); - } - static String formatDuration(long millis) { final StringBuilder sb = new StringBuilder(); final long minutes = millis / 60000; diff --git a/src/main/java/me/itzg/helpers/http/MonoSinkFutureCallbackAdapter.java b/src/main/java/me/itzg/helpers/http/MonoSinkFutureCallbackAdapter.java deleted file mode 100644 index 0696df6e..00000000 --- a/src/main/java/me/itzg/helpers/http/MonoSinkFutureCallbackAdapter.java +++ /dev/null @@ -1,28 +0,0 @@ -package me.itzg.helpers.http; - -import org.apache.hc.core5.concurrent.FutureCallback; -import reactor.core.publisher.MonoSink; - -class MonoSinkFutureCallbackAdapter implements FutureCallback { - - private final MonoSink sink; - - public MonoSinkFutureCallbackAdapter(MonoSink sink) { - this.sink = sink; - } - - @Override - public void completed(T result) { - sink.success(result); - } - - @Override - public void failed(Exception ex) { - sink.error(ex); - } - - @Override - public void cancelled() { - sink.success(); - } -} diff --git a/src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java b/src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java index 01c754a9..76886d28 100644 --- a/src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java +++ b/src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java @@ -1,27 +1,19 @@ package me.itzg.helpers.http; +import static io.netty.handler.codec.http.HttpHeaderNames.IF_MODIFIED_SINCE; import static io.netty.handler.codec.http.HttpHeaderNames.LAST_MODIFIED; import static java.util.Objects.requireNonNull; import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponseStatus; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.StandardOpenOption; import java.time.Instant; import lombok.Setter; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import me.itzg.helpers.files.ReactiveFileUtils; -import org.apache.hc.client5.http.async.methods.AbstractBinResponseConsumer; -import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; -import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; -import org.apache.hc.core5.http.ContentType; -import org.apache.hc.core5.http.HttpException; -import org.apache.hc.core5.http.HttpResponse; -import org.apache.hc.core5.http.HttpStatus; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -210,27 +202,40 @@ else if (skipUpToDate) { return alreadyUpToDateMono .filter(alreadyUpToDate -> !alreadyUpToDate) - .flatMap(notUsed -> { - final SimpleRequestBuilder reqBuilder = SimpleRequestBuilder.get(resourceUrl); - applyHeaders(reqBuilder); - - return - fileLastModifiedMono - .map(instant -> reqBuilder.setHeader("If-Modified-Since", httpDateTimeFormatter.format(instant))) - .then( - Mono.create(sink -> { - useHcAsyncClient((hcClient, close) -> { - sink.onDispose(close::run); - - hcClient.execute( - SimpleRequestProducer.create(reqBuilder.build()), - new ResponseToFileConsumer(outputFile), - new MonoSinkFutureCallbackAdapter<>(sink) - ); - }); - }) - ); - }) + .flatMap(notUsed -> client + .headers(this::applyHeaders) + .headersWhen(headers -> + skipUpToDate ? + fileLastModifiedMono + .map(outputLastModified -> headers.set( + IF_MODIFIED_SINCE, + httpDateTimeFormatter.format(outputLastModified) + )) + .defaultIfEmpty(headers) + : Mono.just(headers) + ) + .followRedirect(true) + .doOnRequest(debugLogRequest(log, "file fetch")) + .doOnRequest( + (httpClientRequest, connection) -> statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile)) + .get() + .uri(resourceUrl) + .response((resp, byteBufFlux) -> { + if (skipUpToDate && resp.status() == HttpResponseStatus.NOT_MODIFIED) { + log.debug("The file {} is already up to date", outputFile); + statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile); + return Mono.just(outputFile); + } + + if (notSuccess(resp)) { + return failedRequestMono(resp, byteBufFlux.aggregate(), "Downloading file"); + } + + return copyBodyInputStreamToFile(byteBufFlux, outputFile); + }) + .last() + .checkpoint("Fetching file into directory") + ) .defaultIfEmpty(outputFile); } @@ -256,68 +261,4 @@ private String extractFilename(HttpClientResponse resp) { return resp.path().substring(pos + 1); } - private class ResponseToFileConsumer extends AbstractBinResponseConsumer { - - private final Path outputFile; - - private SeekableByteChannel channel; - private long amount; - - public ResponseToFileConsumer(Path outputFile) { - this.outputFile = outputFile; - } - - @Override - public void releaseResources() { - if (channel != null) { - try { - channel.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - @Override - protected int capacityIncrement() { - return 4096; - } - - @Override - protected void data(ByteBuffer src, boolean endOfStream) throws IOException { - if (channel != null) { - amount += channel.write(src); - if (endOfStream) { - channel.close(); - - statusHandler.call(FileDownloadStatus.DOWNLOADED, uri(), outputFile); - downloadedHandler.call(uri(), outputFile, amount); - } - } - } - - @Override - protected void start(HttpResponse response, - ContentType contentType - ) throws HttpException, IOException { - if (skipUpToDate && response.getCode() == HttpStatus.SC_NOT_MODIFIED) { - log.debug("The file {} is already up to date", outputFile); - statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile); - return; - } - - statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile); - - channel = Files.newByteChannel(outputFile, - StandardOpenOption.WRITE, - StandardOpenOption.CREATE, - StandardOpenOption.TRUNCATE_EXISTING - ); - } - - @Override - protected Path buildResult() { - return outputFile; - } - } } diff --git a/src/main/java/me/itzg/helpers/http/SharedFetch.java b/src/main/java/me/itzg/helpers/http/SharedFetch.java index ab5cd4b1..2ab46258 100644 --- a/src/main/java/me/itzg/helpers/http/SharedFetch.java +++ b/src/main/java/me/itzg/helpers/http/SharedFetch.java @@ -1,7 +1,6 @@ package me.itzg.helpers.http; import io.netty.handler.codec.http.HttpHeaderNames; -import java.io.IOException; import java.net.URI; import java.time.Duration; import java.util.HashMap; @@ -13,8 +12,6 @@ import lombok.extern.slf4j.Slf4j; import me.itzg.helpers.McImageHelper; import me.itzg.helpers.errors.GenericException; -import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; -import org.apache.hc.client5.http.impl.async.HttpAsyncClients; import reactor.netty.http.Http11SslContextSpec; import reactor.netty.http.client.HttpClient; import reactor.netty.resources.ConnectionProvider; @@ -35,8 +32,6 @@ public class SharedFetch implements AutoCloseable { final LatchingUrisInterceptor latchingUrisInterceptor = new LatchingUrisInterceptor(); private final HttpClient reactiveClient; - private final CloseableHttpAsyncClient hcAsyncClient; - private boolean hcAsyncClientStarted = false; private final URI filesViaUrl; @@ -83,16 +78,6 @@ public SharedFetch(String forCommand, Options options) { headers.put("x-fetch-session", fetchSessionId); this.filesViaUrl = options.getFilesViaUrl(); - - hcAsyncClient = HttpAsyncClients.createSystem(); - } - - public synchronized CloseableHttpAsyncClient getHcAsyncClient() { - if (!hcAsyncClientStarted) { - hcAsyncClient.start(); - hcAsyncClientStarted = true; - } - return hcAsyncClient; } public FetchBuilderBase fetch(URI uri) { @@ -107,11 +92,6 @@ public SharedFetch addHeader(String name, String value) { @Override public void close() { - try { - hcAsyncClient.close(); - } catch (IOException e) { - log.warn("Failed to close async client for shared fetch", e); - } } @Builder diff --git a/src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java b/src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java index cd1c5ed9..206685f4 100644 --- a/src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java +++ b/src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java @@ -20,7 +20,6 @@ import me.itzg.helpers.http.FailedRequestException; import me.itzg.helpers.http.Fetch; import me.itzg.helpers.http.SharedFetch; -import me.itzg.helpers.http.SharedFetch.Options; import me.itzg.helpers.http.Uris; import org.jetbrains.annotations.Blocking; import org.reactivestreams.Publisher; @@ -83,15 +82,13 @@ public Integer call() throws Exception { Files.createDirectories(dest); - try (SharedFetch sharedFetch = Fetch.sharedFetch("mcopy", Options.builder().build())) { - Flux.fromIterable(sources) - .map(String::trim) - .filter(s -> !s.isEmpty()) - .flatMap(source -> processSource(sharedFetch, source, fileIsListingOption)) - .collectList() - .flatMap(this::cleanupAndSaveManifest) - .block(); - } + Flux.fromIterable(sources) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .flatMap(source -> processSource(source, fileIsListingOption)) + .collectList() + .flatMap(this::cleanupAndSaveManifest) + .block(); return ExitCode.OK; } @@ -116,9 +113,9 @@ private Mono cleanupAndSaveManifest(List paths) { }); } - private Publisher processSource(SharedFetch sharedFetch, String source, boolean fileIsListing) { + private Publisher processSource(String source, boolean fileIsListing) { if (Uris.isUri(source)) { - return fileIsListing ? processRemoteListingFile(source) : processRemoteSource(sharedFetch, source); + return fileIsListing ? processRemoteListingFile(source) : processRemoteSource(source); } else { final Path path = Paths.get(source); if (!Files.exists(path)) { @@ -128,12 +125,12 @@ private Publisher processSource(SharedFetch sharedFetch, String source, bo if (Files.isDirectory(path)) { return processDirectory(path); } else { - return fileIsListing ? processListingFile(sharedFetch, path) : processFile(path); + return fileIsListing ? processListingFile(path) : processFile(path); } } } - private Flux processListingFile(SharedFetch sharedFetch, Path listingFile) { + private Flux processListingFile(Path listingFile) { return Mono.just(listingFile) .publishOn(Schedulers.boundedElastic()) .flatMapMany(path -> { @@ -142,7 +139,7 @@ private Flux processListingFile(SharedFetch sharedFetch, Path listingFile) final List lines = Files.readAllLines(path); return Flux.fromIterable(lines) .filter(this::isListingLine) - .flatMap(src -> processSource(sharedFetch, src, + .flatMap(src -> processSource(src, // avoid recursive file-listing processing false)); } catch (IOException e) { @@ -233,8 +230,8 @@ private Flux processDirectory(Path srcDir) { }); } - private Mono processRemoteSource(SharedFetch sharedFetch, String source) { - return sharedFetch.fetch(URI.create(source)) + private Mono processRemoteSource(String source) { + return Fetch.fetch(URI.create(source)) .userAgentCommand("mcopy") .toDirectory(dest) .skipUpToDate(skipUpToDate) @@ -276,7 +273,7 @@ private Flux processRemoteListingFile(String source) { .flatMapMany(content -> Flux.just(content.split("\\r?\\n"))) .filter(this::isListingLine) ) - .flatMap(sourceInListing -> processRemoteSource(sharedFetch, sourceInListing)) + .flatMap(this::processRemoteSource) .doOnTerminate(sharedFetch::close) .checkpoint("Processing remote listing at " + source, true); }