Skip to content

Commit 3de1186

Browse files
authored
Revert "Use hc5 for file downloads (#597)" (#598)
This reverts commit d42b9ec.
1 parent d42b9ec commit 3de1186

File tree

5 files changed

+51
-201
lines changed

5 files changed

+51
-201
lines changed

src/main/java/me/itzg/helpers/http/FetchBuilderBase.java

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import me.itzg.helpers.errors.GenericException;
3030
import me.itzg.helpers.http.SharedFetch.Options;
3131
import me.itzg.helpers.json.ObjectMappers;
32-
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
33-
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
3432
import org.slf4j.Logger;
3533
import reactor.core.publisher.Mono;
3634
import reactor.netty.ByteBufMono;
@@ -199,25 +197,6 @@ protected <R> R useReactiveClient(ReactiveClientUser<R> user) {
199197
}
200198
}
201199

202-
@FunctionalInterface
203-
protected interface HcAsyncClientUser<R> {
204-
void use(CloseableHttpAsyncClient client, Runnable closer);
205-
}
206-
207-
protected <R> void useHcAsyncClient(HcAsyncClientUser<R> user) {
208-
if (state.sharedFetch != null) {
209-
user.use(state.sharedFetch.getHcAsyncClient(), () -> {
210-
}
211-
);
212-
}
213-
else {
214-
//noinspection resource since close callback will handle it
215-
SharedFetch sharedFetch = new SharedFetch(state.userAgentCommand, Options.builder().build());
216-
217-
user.use(sharedFetch.getHcAsyncClient(), sharedFetch::close);
218-
}
219-
}
220-
221200
protected static BiConsumer<? super HttpClientRequest, ? super Connection> debugLogRequest(
222201
Logger log, String operation
223202
) {
@@ -312,25 +291,6 @@ protected void applyHeaders(io.netty.handler.codec.http.HttpHeaders headers) {
312291
state.requestHeaders.forEach(headers::set);
313292
}
314293

315-
protected void applyHeaders(SimpleRequestBuilder requestBuilder) {
316-
final Set<String> contentTypes = getAcceptContentTypes();
317-
if (contentTypes != null && !contentTypes.isEmpty()) {
318-
contentTypes.forEach(s -> requestBuilder.addHeader(ACCEPT.toString(), s));
319-
}
320-
321-
if (state.userInfo != null) {
322-
requestBuilder.setHeader(
323-
AUTHORIZATION.toString(),
324-
"Basic " +
325-
Base64.getEncoder().encodeToString(
326-
state.userInfo.getBytes(StandardCharsets.UTF_8)
327-
)
328-
);
329-
}
330-
331-
state.requestHeaders.forEach(requestBuilder::setHeader);
332-
}
333-
334294
static String formatDuration(long millis) {
335295
final StringBuilder sb = new StringBuilder();
336296
final long minutes = millis / 60000;

src/main/java/me/itzg/helpers/http/MonoSinkFutureCallbackAdapter.java

Lines changed: 0 additions & 28 deletions
This file was deleted.

src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java

Lines changed: 36 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,19 @@
11
package me.itzg.helpers.http;
22

3+
import static io.netty.handler.codec.http.HttpHeaderNames.IF_MODIFIED_SINCE;
34
import static io.netty.handler.codec.http.HttpHeaderNames.LAST_MODIFIED;
45
import static java.util.Objects.requireNonNull;
56

67
import io.netty.handler.codec.http.HttpHeaderNames;
8+
import io.netty.handler.codec.http.HttpResponseStatus;
79
import java.io.IOException;
8-
import java.nio.ByteBuffer;
9-
import java.nio.channels.SeekableByteChannel;
1010
import java.nio.file.Files;
1111
import java.nio.file.Path;
12-
import java.nio.file.StandardOpenOption;
1312
import java.time.Instant;
1413
import lombok.Setter;
1514
import lombok.experimental.Accessors;
1615
import lombok.extern.slf4j.Slf4j;
1716
import me.itzg.helpers.files.ReactiveFileUtils;
18-
import org.apache.hc.client5.http.async.methods.AbstractBinResponseConsumer;
19-
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
20-
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
21-
import org.apache.hc.core5.http.ContentType;
22-
import org.apache.hc.core5.http.HttpException;
23-
import org.apache.hc.core5.http.HttpResponse;
24-
import org.apache.hc.core5.http.HttpStatus;
2517
import org.jetbrains.annotations.NotNull;
2618
import reactor.core.publisher.Mono;
2719
import reactor.core.scheduler.Schedulers;
@@ -210,27 +202,40 @@ else if (skipUpToDate) {
210202

211203
return alreadyUpToDateMono
212204
.filter(alreadyUpToDate -> !alreadyUpToDate)
213-
.flatMap(notUsed -> {
214-
final SimpleRequestBuilder reqBuilder = SimpleRequestBuilder.get(resourceUrl);
215-
applyHeaders(reqBuilder);
216-
217-
return
218-
fileLastModifiedMono
219-
.map(instant -> reqBuilder.setHeader("If-Modified-Since", httpDateTimeFormatter.format(instant)))
220-
.then(
221-
Mono.<Path>create(sink -> {
222-
useHcAsyncClient((hcClient, close) -> {
223-
sink.onDispose(close::run);
224-
225-
hcClient.execute(
226-
SimpleRequestProducer.create(reqBuilder.build()),
227-
new ResponseToFileConsumer(outputFile),
228-
new MonoSinkFutureCallbackAdapter<>(sink)
229-
);
230-
});
231-
})
232-
);
233-
})
205+
.flatMap(notUsed -> client
206+
.headers(this::applyHeaders)
207+
.headersWhen(headers ->
208+
skipUpToDate ?
209+
fileLastModifiedMono
210+
.map(outputLastModified -> headers.set(
211+
IF_MODIFIED_SINCE,
212+
httpDateTimeFormatter.format(outputLastModified)
213+
))
214+
.defaultIfEmpty(headers)
215+
: Mono.just(headers)
216+
)
217+
.followRedirect(true)
218+
.doOnRequest(debugLogRequest(log, "file fetch"))
219+
.doOnRequest(
220+
(httpClientRequest, connection) -> statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile))
221+
.get()
222+
.uri(resourceUrl)
223+
.response((resp, byteBufFlux) -> {
224+
if (skipUpToDate && resp.status() == HttpResponseStatus.NOT_MODIFIED) {
225+
log.debug("The file {} is already up to date", outputFile);
226+
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile);
227+
return Mono.just(outputFile);
228+
}
229+
230+
if (notSuccess(resp)) {
231+
return failedRequestMono(resp, byteBufFlux.aggregate(), "Downloading file");
232+
}
233+
234+
return copyBodyInputStreamToFile(byteBufFlux, outputFile);
235+
})
236+
.last()
237+
.checkpoint("Fetching file into directory")
238+
)
234239
.defaultIfEmpty(outputFile);
235240
}
236241

@@ -256,68 +261,4 @@ private String extractFilename(HttpClientResponse resp) {
256261
return resp.path().substring(pos + 1);
257262
}
258263

259-
private class ResponseToFileConsumer extends AbstractBinResponseConsumer<Path> {
260-
261-
private final Path outputFile;
262-
263-
private SeekableByteChannel channel;
264-
private long amount;
265-
266-
public ResponseToFileConsumer(Path outputFile) {
267-
this.outputFile = outputFile;
268-
}
269-
270-
@Override
271-
public void releaseResources() {
272-
if (channel != null) {
273-
try {
274-
channel.close();
275-
} catch (IOException e) {
276-
throw new RuntimeException(e);
277-
}
278-
}
279-
}
280-
281-
@Override
282-
protected int capacityIncrement() {
283-
return 4096;
284-
}
285-
286-
@Override
287-
protected void data(ByteBuffer src, boolean endOfStream) throws IOException {
288-
if (channel != null) {
289-
amount += channel.write(src);
290-
if (endOfStream) {
291-
channel.close();
292-
293-
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri(), outputFile);
294-
downloadedHandler.call(uri(), outputFile, amount);
295-
}
296-
}
297-
}
298-
299-
@Override
300-
protected void start(HttpResponse response,
301-
ContentType contentType
302-
) throws HttpException, IOException {
303-
if (skipUpToDate && response.getCode() == HttpStatus.SC_NOT_MODIFIED) {
304-
log.debug("The file {} is already up to date", outputFile);
305-
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile);
306-
return;
307-
}
308-
309-
statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile);
310-
311-
channel = Files.newByteChannel(outputFile,
312-
StandardOpenOption.WRITE,
313-
StandardOpenOption.CREATE,
314-
StandardOpenOption.TRUNCATE_EXISTING
315-
);
316-
}
317-
318-
@Override
319-
protected Path buildResult() {
320-
return outputFile;
321-
}
322-
}
323264
}

src/main/java/me/itzg/helpers/http/SharedFetch.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package me.itzg.helpers.http;
22

33
import io.netty.handler.codec.http.HttpHeaderNames;
4-
import java.io.IOException;
54
import java.net.URI;
65
import java.time.Duration;
76
import java.util.HashMap;
@@ -13,8 +12,6 @@
1312
import lombok.extern.slf4j.Slf4j;
1413
import me.itzg.helpers.McImageHelper;
1514
import me.itzg.helpers.errors.GenericException;
16-
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
17-
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
1815
import reactor.netty.http.Http11SslContextSpec;
1916
import reactor.netty.http.client.HttpClient;
2017
import reactor.netty.resources.ConnectionProvider;
@@ -35,8 +32,6 @@ public class SharedFetch implements AutoCloseable {
3532
final LatchingUrisInterceptor latchingUrisInterceptor = new LatchingUrisInterceptor();
3633

3734
private final HttpClient reactiveClient;
38-
private final CloseableHttpAsyncClient hcAsyncClient;
39-
private boolean hcAsyncClientStarted = false;
4035

4136
private final URI filesViaUrl;
4237

@@ -83,16 +78,6 @@ public SharedFetch(String forCommand, Options options) {
8378
headers.put("x-fetch-session", fetchSessionId);
8479

8580
this.filesViaUrl = options.getFilesViaUrl();
86-
87-
hcAsyncClient = HttpAsyncClients.createSystem();
88-
}
89-
90-
public synchronized CloseableHttpAsyncClient getHcAsyncClient() {
91-
if (!hcAsyncClientStarted) {
92-
hcAsyncClient.start();
93-
hcAsyncClientStarted = true;
94-
}
95-
return hcAsyncClient;
9681
}
9782

9883
public FetchBuilderBase<?> fetch(URI uri) {
@@ -107,11 +92,6 @@ public SharedFetch addHeader(String name, String value) {
10792

10893
@Override
10994
public void close() {
110-
try {
111-
hcAsyncClient.close();
112-
} catch (IOException e) {
113-
log.warn("Failed to close async client for shared fetch", e);
114-
}
11595
}
11696

11797
@Builder

src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import me.itzg.helpers.http.FailedRequestException;
2121
import me.itzg.helpers.http.Fetch;
2222
import me.itzg.helpers.http.SharedFetch;
23-
import me.itzg.helpers.http.SharedFetch.Options;
2423
import me.itzg.helpers.http.Uris;
2524
import org.jetbrains.annotations.Blocking;
2625
import org.reactivestreams.Publisher;
@@ -83,15 +82,13 @@ public Integer call() throws Exception {
8382

8483
Files.createDirectories(dest);
8584

86-
try (SharedFetch sharedFetch = Fetch.sharedFetch("mcopy", Options.builder().build())) {
87-
Flux.fromIterable(sources)
88-
.map(String::trim)
89-
.filter(s -> !s.isEmpty())
90-
.flatMap(source -> processSource(sharedFetch, source, fileIsListingOption))
91-
.collectList()
92-
.flatMap(this::cleanupAndSaveManifest)
93-
.block();
94-
}
85+
Flux.fromIterable(sources)
86+
.map(String::trim)
87+
.filter(s -> !s.isEmpty())
88+
.flatMap(source -> processSource(source, fileIsListingOption))
89+
.collectList()
90+
.flatMap(this::cleanupAndSaveManifest)
91+
.block();
9592

9693
return ExitCode.OK;
9794
}
@@ -116,9 +113,9 @@ private Mono<?> cleanupAndSaveManifest(List<Path> paths) {
116113
});
117114
}
118115

119-
private Publisher<Path> processSource(SharedFetch sharedFetch, String source, boolean fileIsListing) {
116+
private Publisher<Path> processSource(String source, boolean fileIsListing) {
120117
if (Uris.isUri(source)) {
121-
return fileIsListing ? processRemoteListingFile(source) : processRemoteSource(sharedFetch, source);
118+
return fileIsListing ? processRemoteListingFile(source) : processRemoteSource(source);
122119
} else {
123120
final Path path = Paths.get(source);
124121
if (!Files.exists(path)) {
@@ -128,12 +125,12 @@ private Publisher<Path> processSource(SharedFetch sharedFetch, String source, bo
128125
if (Files.isDirectory(path)) {
129126
return processDirectory(path);
130127
} else {
131-
return fileIsListing ? processListingFile(sharedFetch, path) : processFile(path);
128+
return fileIsListing ? processListingFile(path) : processFile(path);
132129
}
133130
}
134131
}
135132

136-
private Flux<Path> processListingFile(SharedFetch sharedFetch, Path listingFile) {
133+
private Flux<Path> processListingFile(Path listingFile) {
137134
return Mono.just(listingFile)
138135
.publishOn(Schedulers.boundedElastic())
139136
.flatMapMany(path -> {
@@ -142,7 +139,7 @@ private Flux<Path> processListingFile(SharedFetch sharedFetch, Path listingFile)
142139
final List<String> lines = Files.readAllLines(path);
143140
return Flux.fromIterable(lines)
144141
.filter(this::isListingLine)
145-
.flatMap(src -> processSource(sharedFetch, src,
142+
.flatMap(src -> processSource(src,
146143
// avoid recursive file-listing processing
147144
false));
148145
} catch (IOException e) {
@@ -233,8 +230,8 @@ private Flux<Path> processDirectory(Path srcDir) {
233230
});
234231
}
235232

236-
private Mono<Path> processRemoteSource(SharedFetch sharedFetch, String source) {
237-
return sharedFetch.fetch(URI.create(source))
233+
private Mono<Path> processRemoteSource(String source) {
234+
return Fetch.fetch(URI.create(source))
238235
.userAgentCommand("mcopy")
239236
.toDirectory(dest)
240237
.skipUpToDate(skipUpToDate)
@@ -276,7 +273,7 @@ private Flux<Path> processRemoteListingFile(String source) {
276273
.flatMapMany(content -> Flux.just(content.split("\\r?\\n")))
277274
.filter(this::isListingLine)
278275
)
279-
.flatMap(sourceInListing -> processRemoteSource(sharedFetch, sourceInListing))
276+
.flatMap(this::processRemoteSource)
280277
.doOnTerminate(sharedFetch::close)
281278
.checkpoint("Processing remote listing at " + source, true);
282279
}

0 commit comments

Comments
 (0)