Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/main/java/me/itzg/helpers/http/FetchBuilderBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import me.itzg.helpers.errors.GenericException;
import me.itzg.helpers.http.SharedFetch.Options;
import me.itzg.helpers.json.ObjectMappers;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.slf4j.Logger;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufMono;
Expand Down Expand Up @@ -197,6 +199,25 @@ protected <R> R useReactiveClient(ReactiveClientUser<R> user) {
}
}

@FunctionalInterface
protected interface HcAsyncClientUser<R> {
void use(CloseableHttpAsyncClient client, Runnable closer);
}

protected <R> void useHcAsyncClient(HcAsyncClientUser<R> user) {
if (state.sharedFetch != null) {
user.use(state.sharedFetch.getHcAsyncClient(), () -> {
}
);
}
else {
//noinspection resource since close callback will handle it
SharedFetch sharedFetch = new SharedFetch(state.userAgentCommand, Options.builder().build());

user.use(sharedFetch.getHcAsyncClient(), sharedFetch::close);
}
}

protected static BiConsumer<? super HttpClientRequest, ? super Connection> debugLogRequest(
Logger log, String operation
) {
Expand Down Expand Up @@ -291,6 +312,25 @@ protected void applyHeaders(io.netty.handler.codec.http.HttpHeaders headers) {
state.requestHeaders.forEach(headers::set);
}

protected void applyHeaders(SimpleRequestBuilder requestBuilder) {
final Set<String> contentTypes = getAcceptContentTypes();
if (contentTypes != null && !contentTypes.isEmpty()) {
contentTypes.forEach(s -> requestBuilder.addHeader(ACCEPT.toString(), s));
}

if (state.userInfo != null) {
requestBuilder.setHeader(
AUTHORIZATION.toString(),
"Basic " +
Base64.getEncoder().encodeToString(
state.userInfo.getBytes(StandardCharsets.UTF_8)
)
);
}

state.requestHeaders.forEach(requestBuilder::setHeader);
}

static String formatDuration(long millis) {
final StringBuilder sb = new StringBuilder();
final long minutes = millis / 60000;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package me.itzg.helpers.http;

import org.apache.hc.core5.concurrent.FutureCallback;
import reactor.core.publisher.MonoSink;

class MonoSinkFutureCallbackAdapter<T> implements FutureCallback<T> {

private final MonoSink<T> sink;

public MonoSinkFutureCallbackAdapter(MonoSink<T> sink) {
this.sink = sink;
}

@Override
public void completed(T result) {
sink.success(result);
}

@Override
public void failed(Exception ex) {
sink.error(ex);
}

@Override
public void cancelled() {
sink.success();
}
}
131 changes: 95 additions & 36 deletions src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
package me.itzg.helpers.http;

import static io.netty.handler.codec.http.HttpHeaderNames.IF_MODIFIED_SINCE;
import static io.netty.handler.codec.http.HttpHeaderNames.LAST_MODIFIED;
import static java.util.Objects.requireNonNull;

import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import me.itzg.helpers.files.ReactiveFileUtils;
import org.apache.hc.client5.http.async.methods.AbstractBinResponseConsumer;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -202,40 +210,27 @@ else if (skipUpToDate) {

return alreadyUpToDateMono
.filter(alreadyUpToDate -> !alreadyUpToDate)
.flatMap(notUsed -> client
.headers(this::applyHeaders)
.headersWhen(headers ->
skipUpToDate ?
fileLastModifiedMono
.map(outputLastModified -> headers.set(
IF_MODIFIED_SINCE,
httpDateTimeFormatter.format(outputLastModified)
))
.defaultIfEmpty(headers)
: Mono.just(headers)
)
.followRedirect(true)
.doOnRequest(debugLogRequest(log, "file fetch"))
.doOnRequest(
(httpClientRequest, connection) -> statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile))
.get()
.uri(resourceUrl)
.response((resp, byteBufFlux) -> {
if (skipUpToDate && resp.status() == HttpResponseStatus.NOT_MODIFIED) {
log.debug("The file {} is already up to date", outputFile);
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile);
return Mono.just(outputFile);
}

if (notSuccess(resp)) {
return failedRequestMono(resp, byteBufFlux.aggregate(), "Downloading file");
}

return copyBodyInputStreamToFile(byteBufFlux, outputFile);
})
.last()
.checkpoint("Fetching file into directory")
)
.flatMap(notUsed -> {
final SimpleRequestBuilder reqBuilder = SimpleRequestBuilder.get(resourceUrl);
applyHeaders(reqBuilder);

return
fileLastModifiedMono
.map(instant -> reqBuilder.setHeader("If-Modified-Since", httpDateTimeFormatter.format(instant)))
.then(
Mono.<Path>create(sink -> {
useHcAsyncClient((hcClient, close) -> {
sink.onDispose(close::run);

hcClient.execute(
SimpleRequestProducer.create(reqBuilder.build()),
new ResponseToFileConsumer(outputFile),
new MonoSinkFutureCallbackAdapter<>(sink)
);
});
})
);
})
.defaultIfEmpty(outputFile);
}

Expand All @@ -261,4 +256,68 @@ private String extractFilename(HttpClientResponse resp) {
return resp.path().substring(pos + 1);
}

private class ResponseToFileConsumer extends AbstractBinResponseConsumer<Path> {

private final Path outputFile;

private SeekableByteChannel channel;
private long amount;

public ResponseToFileConsumer(Path outputFile) {
this.outputFile = outputFile;
}

@Override
public void releaseResources() {
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@Override
protected int capacityIncrement() {
return 4096;
}

@Override
protected void data(ByteBuffer src, boolean endOfStream) throws IOException {
if (channel != null) {
amount += channel.write(src);
if (endOfStream) {
channel.close();

statusHandler.call(FileDownloadStatus.DOWNLOADED, uri(), outputFile);
downloadedHandler.call(uri(), outputFile, amount);
}
}
}

@Override
protected void start(HttpResponse response,
ContentType contentType
) throws HttpException, IOException {
if (skipUpToDate && response.getCode() == HttpStatus.SC_NOT_MODIFIED) {
log.debug("The file {} is already up to date", outputFile);
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile);
return;
}

statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile);

channel = Files.newByteChannel(outputFile,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING
);
}

@Override
protected Path buildResult() {
return outputFile;
}
}
}
20 changes: 20 additions & 0 deletions src/main/java/me/itzg/helpers/http/SharedFetch.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package me.itzg.helpers.http;

import io.netty.handler.codec.http.HttpHeaderNames;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.HashMap;
Expand All @@ -12,6 +13,8 @@
import lombok.extern.slf4j.Slf4j;
import me.itzg.helpers.McImageHelper;
import me.itzg.helpers.errors.GenericException;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import reactor.netty.http.Http11SslContextSpec;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
Expand All @@ -32,6 +35,8 @@ public class SharedFetch implements AutoCloseable {
final LatchingUrisInterceptor latchingUrisInterceptor = new LatchingUrisInterceptor();

private final HttpClient reactiveClient;
private final CloseableHttpAsyncClient hcAsyncClient;
private boolean hcAsyncClientStarted = false;

private final URI filesViaUrl;

Expand Down Expand Up @@ -78,6 +83,16 @@ public SharedFetch(String forCommand, Options options) {
headers.put("x-fetch-session", fetchSessionId);

this.filesViaUrl = options.getFilesViaUrl();

hcAsyncClient = HttpAsyncClients.createSystem();
}

public synchronized CloseableHttpAsyncClient getHcAsyncClient() {
if (!hcAsyncClientStarted) {
hcAsyncClient.start();
hcAsyncClientStarted = true;
}
return hcAsyncClient;
}

public FetchBuilderBase<?> fetch(URI uri) {
Expand All @@ -92,6 +107,11 @@ public SharedFetch addHeader(String name, String value) {

@Override
public void close() {
try {
hcAsyncClient.close();
} catch (IOException e) {
log.warn("Failed to close async client for shared fetch", e);
}
}

@Builder
Expand Down
33 changes: 18 additions & 15 deletions src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import me.itzg.helpers.http.FailedRequestException;
import me.itzg.helpers.http.Fetch;
import me.itzg.helpers.http.SharedFetch;
import me.itzg.helpers.http.SharedFetch.Options;
import me.itzg.helpers.http.Uris;
import org.jetbrains.annotations.Blocking;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -82,13 +83,15 @@ public Integer call() throws Exception {

Files.createDirectories(dest);

Flux.fromIterable(sources)
.map(String::trim)
.filter(s -> !s.isEmpty())
.flatMap(source -> processSource(source, fileIsListingOption))
.collectList()
.flatMap(this::cleanupAndSaveManifest)
.block();
try (SharedFetch sharedFetch = Fetch.sharedFetch("mcopy", Options.builder().build())) {
Flux.fromIterable(sources)
.map(String::trim)
.filter(s -> !s.isEmpty())
.flatMap(source -> processSource(sharedFetch, source, fileIsListingOption))
.collectList()
.flatMap(this::cleanupAndSaveManifest)
.block();
}

return ExitCode.OK;
}
Expand All @@ -113,9 +116,9 @@ private Mono<?> cleanupAndSaveManifest(List<Path> paths) {
});
}

private Publisher<Path> processSource(String source, boolean fileIsListing) {
private Publisher<Path> processSource(SharedFetch sharedFetch, String source, boolean fileIsListing) {
if (Uris.isUri(source)) {
return fileIsListing ? processRemoteListingFile(source) : processRemoteSource(source);
return fileIsListing ? processRemoteListingFile(source) : processRemoteSource(sharedFetch, source);
} else {
final Path path = Paths.get(source);
if (!Files.exists(path)) {
Expand All @@ -125,12 +128,12 @@ private Publisher<Path> processSource(String source, boolean fileIsListing) {
if (Files.isDirectory(path)) {
return processDirectory(path);
} else {
return fileIsListing ? processListingFile(path) : processFile(path);
return fileIsListing ? processListingFile(sharedFetch, path) : processFile(path);
}
}
}

private Flux<Path> processListingFile(Path listingFile) {
private Flux<Path> processListingFile(SharedFetch sharedFetch, Path listingFile) {
return Mono.just(listingFile)
.publishOn(Schedulers.boundedElastic())
.flatMapMany(path -> {
Expand All @@ -139,7 +142,7 @@ private Flux<Path> processListingFile(Path listingFile) {
final List<String> lines = Files.readAllLines(path);
return Flux.fromIterable(lines)
.filter(this::isListingLine)
.flatMap(src -> processSource(src,
.flatMap(src -> processSource(sharedFetch, src,
// avoid recursive file-listing processing
false));
} catch (IOException e) {
Expand Down Expand Up @@ -230,8 +233,8 @@ private Flux<Path> processDirectory(Path srcDir) {
});
}

private Mono<Path> processRemoteSource(String source) {
return Fetch.fetch(URI.create(source))
private Mono<Path> processRemoteSource(SharedFetch sharedFetch, String source) {
return sharedFetch.fetch(URI.create(source))
.userAgentCommand("mcopy")
.toDirectory(dest)
.skipUpToDate(skipUpToDate)
Expand Down Expand Up @@ -273,7 +276,7 @@ private Flux<Path> processRemoteListingFile(String source) {
.flatMapMany(content -> Flux.just(content.split("\\r?\\n")))
.filter(this::isListingLine)
)
.flatMap(this::processRemoteSource)
.flatMap(sourceInListing -> processRemoteSource(sharedFetch, sourceInListing))
.doOnTerminate(sharedFetch::close)
.checkpoint("Processing remote listing at " + source, true);
}
Expand Down