Skip to content

Commit 243b718

Browse files
committed
Support non-SharedFetch usage of HC Client
1 parent 579f2b5 commit 243b718

File tree

3 files changed

+44
-27
lines changed

3 files changed

+44
-27
lines changed

src/main/java/me/itzg/helpers/http/FetchBuilderBase.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import me.itzg.helpers.errors.GenericException;
3030
import me.itzg.helpers.http.SharedFetch.Options;
3131
import me.itzg.helpers.json.ObjectMappers;
32-
import org.apache.hc.client5.http.async.HttpAsyncClient;
3332
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
3433
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
3534
import org.slf4j.Logger;
@@ -189,10 +188,6 @@ protected interface ReactiveClientUser<R> {
189188
R use(HttpClient client);
190189
}
191190

192-
protected interface HcAsyncClientUser<R> {
193-
R use(HttpAsyncClient client);
194-
}
195-
196191
protected <R> R useReactiveClient(ReactiveClientUser<R> user) {
197192
if (state.sharedFetch != null) {
198193
return user.use(state.sharedFetch.getReactiveClient());
@@ -204,8 +199,23 @@ protected <R> R useReactiveClient(ReactiveClientUser<R> user) {
204199
}
205200
}
206201

207-
protected CloseableHttpAsyncClient getHcAsyncClient() {
208-
return state.sharedFetch.getHcAsyncClient();
202+
@FunctionalInterface
203+
protected interface HcAsyncClientUser<R> {
204+
void use(CloseableHttpAsyncClient client, Runnable closer);
205+
}
206+
207+
protected <R> void useHcAsyncClient(HcAsyncClientUser<R> user) {
208+
if (state.sharedFetch != null) {
209+
user.use(state.sharedFetch.getHcAsyncClient(), () -> {
210+
}
211+
);
212+
}
213+
else {
214+
//noinspection resource since close callback will handle it
215+
SharedFetch sharedFetch = new SharedFetch(state.userAgentCommand, Options.builder().build());
216+
217+
user.use(sharedFetch.getHcAsyncClient(), sharedFetch::close);
218+
}
209219
}
210220

211221
protected static BiConsumer<? super HttpClientRequest, ? super Connection> debugLogRequest(

src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,11 +219,15 @@ else if (skipUpToDate) {
219219
.map(instant -> reqBuilder.setHeader("If-Modified-Since", httpDateTimeFormatter.format(instant)))
220220
.then(
221221
Mono.<Path>create(sink -> {
222-
getHcAsyncClient().execute(
223-
SimpleRequestProducer.create(reqBuilder.build()),
224-
new ResponseToFileConsumer(outputFile),
225-
new MonoSinkFutureCallbackAdapter<>(sink)
226-
);
222+
useHcAsyncClient((hcClient, close) -> {
223+
sink.onDispose(close::run);
224+
225+
hcClient.execute(
226+
SimpleRequestProducer.create(reqBuilder.build()),
227+
new ResponseToFileConsumer(outputFile),
228+
new MonoSinkFutureCallbackAdapter<>(sink)
229+
);
230+
});
227231
})
228232
);
229233
})

src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import me.itzg.helpers.http.FailedRequestException;
2121
import me.itzg.helpers.http.Fetch;
2222
import me.itzg.helpers.http.SharedFetch;
23+
import me.itzg.helpers.http.SharedFetch.Options;
2324
import me.itzg.helpers.http.Uris;
2425
import org.jetbrains.annotations.Blocking;
2526
import org.reactivestreams.Publisher;
@@ -82,13 +83,15 @@ public Integer call() throws Exception {
8283

8384
Files.createDirectories(dest);
8485

85-
Flux.fromIterable(sources)
86-
.map(String::trim)
87-
.filter(s -> !s.isEmpty())
88-
.flatMap(source -> processSource(source, fileIsListingOption))
89-
.collectList()
90-
.flatMap(this::cleanupAndSaveManifest)
91-
.block();
86+
try (SharedFetch sharedFetch = Fetch.sharedFetch("mcopy", Options.builder().build())) {
87+
Flux.fromIterable(sources)
88+
.map(String::trim)
89+
.filter(s -> !s.isEmpty())
90+
.flatMap(source -> processSource(sharedFetch, source, fileIsListingOption))
91+
.collectList()
92+
.flatMap(this::cleanupAndSaveManifest)
93+
.block();
94+
}
9295

9396
return ExitCode.OK;
9497
}
@@ -113,9 +116,9 @@ private Mono<?> cleanupAndSaveManifest(List<Path> paths) {
113116
});
114117
}
115118

116-
private Publisher<Path> processSource(String source, boolean fileIsListing) {
119+
private Publisher<Path> processSource(SharedFetch sharedFetch, String source, boolean fileIsListing) {
117120
if (Uris.isUri(source)) {
118-
return fileIsListing ? processRemoteListingFile(source) : processRemoteSource(source);
121+
return fileIsListing ? processRemoteListingFile(source) : processRemoteSource(sharedFetch, source);
119122
} else {
120123
final Path path = Paths.get(source);
121124
if (!Files.exists(path)) {
@@ -125,12 +128,12 @@ private Publisher<Path> processSource(String source, boolean fileIsListing) {
125128
if (Files.isDirectory(path)) {
126129
return processDirectory(path);
127130
} else {
128-
return fileIsListing ? processListingFile(path) : processFile(path);
131+
return fileIsListing ? processListingFile(sharedFetch, path) : processFile(path);
129132
}
130133
}
131134
}
132135

133-
private Flux<Path> processListingFile(Path listingFile) {
136+
private Flux<Path> processListingFile(SharedFetch sharedFetch, Path listingFile) {
134137
return Mono.just(listingFile)
135138
.publishOn(Schedulers.boundedElastic())
136139
.flatMapMany(path -> {
@@ -139,7 +142,7 @@ private Flux<Path> processListingFile(Path listingFile) {
139142
final List<String> lines = Files.readAllLines(path);
140143
return Flux.fromIterable(lines)
141144
.filter(this::isListingLine)
142-
.flatMap(src -> processSource(src,
145+
.flatMap(src -> processSource(sharedFetch, src,
143146
// avoid recursive file-listing processing
144147
false));
145148
} catch (IOException e) {
@@ -230,8 +233,8 @@ private Flux<Path> processDirectory(Path srcDir) {
230233
});
231234
}
232235

233-
private Mono<Path> processRemoteSource(String source) {
234-
return Fetch.fetch(URI.create(source))
236+
private Mono<Path> processRemoteSource(SharedFetch sharedFetch, String source) {
237+
return sharedFetch.fetch(URI.create(source))
235238
.userAgentCommand("mcopy")
236239
.toDirectory(dest)
237240
.skipUpToDate(skipUpToDate)
@@ -273,7 +276,7 @@ private Flux<Path> processRemoteListingFile(String source) {
273276
.flatMapMany(content -> Flux.just(content.split("\\r?\\n")))
274277
.filter(this::isListingLine)
275278
)
276-
.flatMap(this::processRemoteSource)
279+
.flatMap(sourceInListing -> processRemoteSource(sharedFetch, sourceInListing))
277280
.doOnTerminate(sharedFetch::close)
278281
.checkpoint("Processing remote listing at " + source, true);
279282
}

0 commit comments

Comments
 (0)