Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 0 additions & 40 deletions src/main/java/me/itzg/helpers/http/FetchBuilderBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import me.itzg.helpers.errors.GenericException;
import me.itzg.helpers.http.SharedFetch.Options;
import me.itzg.helpers.json.ObjectMappers;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.slf4j.Logger;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufMono;
Expand Down Expand Up @@ -199,25 +197,6 @@ protected <R> R useReactiveClient(ReactiveClientUser<R> user) {
}
}

@FunctionalInterface
protected interface HcAsyncClientUser<R> {
void use(CloseableHttpAsyncClient client, Runnable closer);
}

protected <R> void useHcAsyncClient(HcAsyncClientUser<R> user) {
if (state.sharedFetch != null) {
user.use(state.sharedFetch.getHcAsyncClient(), () -> {
}
);
}
else {
//noinspection resource since close callback will handle it
SharedFetch sharedFetch = new SharedFetch(state.userAgentCommand, Options.builder().build());

user.use(sharedFetch.getHcAsyncClient(), sharedFetch::close);
}
}

protected static BiConsumer<? super HttpClientRequest, ? super Connection> debugLogRequest(
Logger log, String operation
) {
Expand Down Expand Up @@ -312,25 +291,6 @@ protected void applyHeaders(io.netty.handler.codec.http.HttpHeaders headers) {
state.requestHeaders.forEach(headers::set);
}

protected void applyHeaders(SimpleRequestBuilder requestBuilder) {
final Set<String> contentTypes = getAcceptContentTypes();
if (contentTypes != null && !contentTypes.isEmpty()) {
contentTypes.forEach(s -> requestBuilder.addHeader(ACCEPT.toString(), s));
}

if (state.userInfo != null) {
requestBuilder.setHeader(
AUTHORIZATION.toString(),
"Basic " +
Base64.getEncoder().encodeToString(
state.userInfo.getBytes(StandardCharsets.UTF_8)
)
);
}

state.requestHeaders.forEach(requestBuilder::setHeader);
}

static String formatDuration(long millis) {
final StringBuilder sb = new StringBuilder();
final long minutes = millis / 60000;
Expand Down

This file was deleted.

131 changes: 36 additions & 95 deletions src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,19 @@
package me.itzg.helpers.http;

import static io.netty.handler.codec.http.HttpHeaderNames.IF_MODIFIED_SINCE;
import static io.netty.handler.codec.http.HttpHeaderNames.LAST_MODIFIED;
import static java.util.Objects.requireNonNull;

import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import me.itzg.helpers.files.ReactiveFileUtils;
import org.apache.hc.client5.http.async.methods.AbstractBinResponseConsumer;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -210,27 +202,40 @@ else if (skipUpToDate) {

return alreadyUpToDateMono
.filter(alreadyUpToDate -> !alreadyUpToDate)
.flatMap(notUsed -> {
final SimpleRequestBuilder reqBuilder = SimpleRequestBuilder.get(resourceUrl);
applyHeaders(reqBuilder);

return
fileLastModifiedMono
.map(instant -> reqBuilder.setHeader("If-Modified-Since", httpDateTimeFormatter.format(instant)))
.then(
Mono.<Path>create(sink -> {
useHcAsyncClient((hcClient, close) -> {
sink.onDispose(close::run);

hcClient.execute(
SimpleRequestProducer.create(reqBuilder.build()),
new ResponseToFileConsumer(outputFile),
new MonoSinkFutureCallbackAdapter<>(sink)
);
});
})
);
})
.flatMap(notUsed -> client
.headers(this::applyHeaders)
.headersWhen(headers ->
skipUpToDate ?
fileLastModifiedMono
.map(outputLastModified -> headers.set(
IF_MODIFIED_SINCE,
httpDateTimeFormatter.format(outputLastModified)
))
.defaultIfEmpty(headers)
: Mono.just(headers)
)
.followRedirect(true)
.doOnRequest(debugLogRequest(log, "file fetch"))
.doOnRequest(
(httpClientRequest, connection) -> statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile))
.get()
.uri(resourceUrl)
.response((resp, byteBufFlux) -> {
if (skipUpToDate && resp.status() == HttpResponseStatus.NOT_MODIFIED) {
log.debug("The file {} is already up to date", outputFile);
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile);
return Mono.just(outputFile);
}

if (notSuccess(resp)) {
return failedRequestMono(resp, byteBufFlux.aggregate(), "Downloading file");
}

return copyBodyInputStreamToFile(byteBufFlux, outputFile);
})
.last()
.checkpoint("Fetching file into directory")
)
.defaultIfEmpty(outputFile);
}

Expand All @@ -256,68 +261,4 @@ private String extractFilename(HttpClientResponse resp) {
return resp.path().substring(pos + 1);
}

private class ResponseToFileConsumer extends AbstractBinResponseConsumer<Path> {

private final Path outputFile;

private SeekableByteChannel channel;
private long amount;

public ResponseToFileConsumer(Path outputFile) {
this.outputFile = outputFile;
}

@Override
public void releaseResources() {
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@Override
protected int capacityIncrement() {
return 4096;
}

@Override
protected void data(ByteBuffer src, boolean endOfStream) throws IOException {
if (channel != null) {
amount += channel.write(src);
if (endOfStream) {
channel.close();

statusHandler.call(FileDownloadStatus.DOWNLOADED, uri(), outputFile);
downloadedHandler.call(uri(), outputFile, amount);
}
}
}

@Override
protected void start(HttpResponse response,
ContentType contentType
) throws HttpException, IOException {
if (skipUpToDate && response.getCode() == HttpStatus.SC_NOT_MODIFIED) {
log.debug("The file {} is already up to date", outputFile);
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile);
return;
}

statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile);

channel = Files.newByteChannel(outputFile,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING
);
}

@Override
protected Path buildResult() {
return outputFile;
}
}
}
20 changes: 0 additions & 20 deletions src/main/java/me/itzg/helpers/http/SharedFetch.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package me.itzg.helpers.http;

import io.netty.handler.codec.http.HttpHeaderNames;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.HashMap;
Expand All @@ -13,8 +12,6 @@
import lombok.extern.slf4j.Slf4j;
import me.itzg.helpers.McImageHelper;
import me.itzg.helpers.errors.GenericException;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import reactor.netty.http.Http11SslContextSpec;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
Expand All @@ -35,8 +32,6 @@ public class SharedFetch implements AutoCloseable {
final LatchingUrisInterceptor latchingUrisInterceptor = new LatchingUrisInterceptor();

private final HttpClient reactiveClient;
private final CloseableHttpAsyncClient hcAsyncClient;
private boolean hcAsyncClientStarted = false;

private final URI filesViaUrl;

Expand Down Expand Up @@ -83,16 +78,6 @@ public SharedFetch(String forCommand, Options options) {
headers.put("x-fetch-session", fetchSessionId);

this.filesViaUrl = options.getFilesViaUrl();

hcAsyncClient = HttpAsyncClients.createSystem();
}

public synchronized CloseableHttpAsyncClient getHcAsyncClient() {
if (!hcAsyncClientStarted) {
hcAsyncClient.start();
hcAsyncClientStarted = true;
}
return hcAsyncClient;
}

public FetchBuilderBase<?> fetch(URI uri) {
Expand All @@ -107,11 +92,6 @@ public SharedFetch addHeader(String name, String value) {

@Override
public void close() {
try {
hcAsyncClient.close();
} catch (IOException e) {
log.warn("Failed to close async client for shared fetch", e);
}
}

@Builder
Expand Down
33 changes: 15 additions & 18 deletions src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import me.itzg.helpers.http.FailedRequestException;
import me.itzg.helpers.http.Fetch;
import me.itzg.helpers.http.SharedFetch;
import me.itzg.helpers.http.SharedFetch.Options;
import me.itzg.helpers.http.Uris;
import org.jetbrains.annotations.Blocking;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -83,15 +82,13 @@ public Integer call() throws Exception {

Files.createDirectories(dest);

try (SharedFetch sharedFetch = Fetch.sharedFetch("mcopy", Options.builder().build())) {
Flux.fromIterable(sources)
.map(String::trim)
.filter(s -> !s.isEmpty())
.flatMap(source -> processSource(sharedFetch, source, fileIsListingOption))
.collectList()
.flatMap(this::cleanupAndSaveManifest)
.block();
}
Flux.fromIterable(sources)
.map(String::trim)
.filter(s -> !s.isEmpty())
.flatMap(source -> processSource(source, fileIsListingOption))
.collectList()
.flatMap(this::cleanupAndSaveManifest)
.block();

return ExitCode.OK;
}
Expand All @@ -116,9 +113,9 @@ private Mono<?> cleanupAndSaveManifest(List<Path> paths) {
});
}

private Publisher<Path> processSource(SharedFetch sharedFetch, String source, boolean fileIsListing) {
private Publisher<Path> processSource(String source, boolean fileIsListing) {
if (Uris.isUri(source)) {
return fileIsListing ? processRemoteListingFile(source) : processRemoteSource(sharedFetch, source);
return fileIsListing ? processRemoteListingFile(source) : processRemoteSource(source);
} else {
final Path path = Paths.get(source);
if (!Files.exists(path)) {
Expand All @@ -128,12 +125,12 @@ private Publisher<Path> processSource(SharedFetch sharedFetch, String source, bo
if (Files.isDirectory(path)) {
return processDirectory(path);
} else {
return fileIsListing ? processListingFile(sharedFetch, path) : processFile(path);
return fileIsListing ? processListingFile(path) : processFile(path);
}
}
}

private Flux<Path> processListingFile(SharedFetch sharedFetch, Path listingFile) {
private Flux<Path> processListingFile(Path listingFile) {
return Mono.just(listingFile)
.publishOn(Schedulers.boundedElastic())
.flatMapMany(path -> {
Expand All @@ -142,7 +139,7 @@ private Flux<Path> processListingFile(SharedFetch sharedFetch, Path listingFile)
final List<String> lines = Files.readAllLines(path);
return Flux.fromIterable(lines)
.filter(this::isListingLine)
.flatMap(src -> processSource(sharedFetch, src,
.flatMap(src -> processSource(src,
// avoid recursive file-listing processing
false));
} catch (IOException e) {
Expand Down Expand Up @@ -233,8 +230,8 @@ private Flux<Path> processDirectory(Path srcDir) {
});
}

private Mono<Path> processRemoteSource(SharedFetch sharedFetch, String source) {
return sharedFetch.fetch(URI.create(source))
private Mono<Path> processRemoteSource(String source) {
return Fetch.fetch(URI.create(source))
.userAgentCommand("mcopy")
.toDirectory(dest)
.skipUpToDate(skipUpToDate)
Expand Down Expand Up @@ -276,7 +273,7 @@ private Flux<Path> processRemoteListingFile(String source) {
.flatMapMany(content -> Flux.just(content.split("\\r?\\n")))
.filter(this::isListingLine)
)
.flatMap(sourceInListing -> processRemoteSource(sharedFetch, sourceInListing))
.flatMap(this::processRemoteSource)
.doOnTerminate(sharedFetch::close)
.checkpoint("Processing remote listing at " + source, true);
}
Expand Down