Skip to content

Commit 6d10db7

Browse files
committed
Processes skipUpToDate
1 parent 1f2fc51 commit 6d10db7

File tree

3 files changed

+92
-85
lines changed

3 files changed

+92
-85
lines changed

src/main/java/me/itzg/helpers/http/FetchBuilderBase.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package me.itzg.helpers.http;
22

3-
import static io.netty.handler.codec.http.HttpHeaderNames.ACCEPT;
4-
import static io.netty.handler.codec.http.HttpHeaderNames.AUTHORIZATION;
5-
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
3+
import static io.netty.handler.codec.http.HttpHeaderNames.*;
64

5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import io.netty.handler.codec.http.HttpHeaderNames;
7+
import io.netty.handler.codec.http.HttpHeaders;
8+
import io.netty.handler.codec.http.HttpStatusClass;
9+
import io.netty.handler.codec.http.HttpUtil;
710
import java.net.URI;
811
import java.net.URISyntaxException;
912
import java.nio.charset.StandardCharsets;
@@ -22,19 +25,12 @@
2225
import java.util.regex.Matcher;
2326
import java.util.regex.Pattern;
2427
import java.util.stream.Collectors;
25-
26-
import org.slf4j.Logger;
27-
28-
import com.fasterxml.jackson.databind.ObjectMapper;
29-
30-
import io.netty.handler.codec.http.HttpHeaderNames;
31-
import io.netty.handler.codec.http.HttpHeaders;
32-
import io.netty.handler.codec.http.HttpStatusClass;
33-
import io.netty.handler.codec.http.HttpUtil;
3428
import lombok.extern.slf4j.Slf4j;
3529
import me.itzg.helpers.errors.GenericException;
3630
import me.itzg.helpers.http.SharedFetch.Options;
3731
import me.itzg.helpers.json.ObjectMappers;
32+
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
33+
import org.slf4j.Logger;
3834
import reactor.core.publisher.Mono;
3935
import reactor.netty.ByteBufMono;
4036
import reactor.netty.Connection;
@@ -286,6 +282,25 @@ protected void applyHeaders(io.netty.handler.codec.http.HttpHeaders headers) {
286282
state.requestHeaders.forEach(headers::set);
287283
}
288284

285+
protected void applyHeaders(SimpleRequestBuilder requestBuilder) {
286+
final Set<String> contentTypes = getAcceptContentTypes();
287+
if (contentTypes != null && !contentTypes.isEmpty()) {
288+
contentTypes.forEach(s -> requestBuilder.addHeader(ACCEPT.toString(), s));
289+
}
290+
291+
if (state.userInfo != null) {
292+
requestBuilder.setHeader(
293+
AUTHORIZATION.toString(),
294+
"Basic " +
295+
Base64.getEncoder().encodeToString(
296+
state.userInfo.getBytes(StandardCharsets.UTF_8)
297+
)
298+
);
299+
}
300+
301+
state.requestHeaders.forEach(requestBuilder::setHeader);
302+
}
303+
289304
static String formatDuration(long millis) {
290305
final StringBuilder sb = new StringBuilder();
291306
final long minutes = millis / 60000;
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package me.itzg.helpers.http;
2+
3+
import org.apache.hc.core5.concurrent.FutureCallback;
4+
import reactor.core.publisher.MonoSink;
5+
6+
class MonoSinkFutureCallbackAdapter<T> implements FutureCallback<T> {
7+
8+
private final MonoSink<T> sink;
9+
10+
public MonoSinkFutureCallbackAdapter(MonoSink<T> sink) {
11+
this.sink = sink;
12+
}
13+
14+
@Override
15+
public void completed(T result) {
16+
sink.success(result);
17+
}
18+
19+
@Override
20+
public void failed(Exception ex) {
21+
sink.error(ex);
22+
}
23+
24+
@Override
25+
public void cancelled() {
26+
sink.success();
27+
}
28+
}

src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java

Lines changed: 37 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
2121
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
2222
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
23-
import org.apache.hc.core5.concurrent.FutureCallback;
2423
import org.apache.hc.core5.http.ContentType;
2524
import org.apache.hc.core5.http.HttpException;
2625
import org.apache.hc.core5.http.HttpResponse;
26+
import org.apache.hc.core5.http.HttpStatus;
2727
import org.jetbrains.annotations.NotNull;
2828
import reactor.core.publisher.Mono;
2929
import reactor.core.scheduler.Schedulers;
@@ -216,69 +216,23 @@ else if (skipUpToDate) {
216216

217217
return alreadyUpToDateMono
218218
.filter(alreadyUpToDate -> !alreadyUpToDate)
219-
.flatMap(notUsed ->
220-
221-
Mono.<Path>create(sink -> {
222-
223-
final SimpleRequestBuilder reqBuilder = SimpleRequestBuilder.get(resourceUrl);
224-
225-
hcHttpClient.execute(
226-
SimpleRequestProducer.create(reqBuilder.build()),
227-
new ResponseToFileConsumer(outputFile),
228-
new FutureCallback<Path>() {
229-
230-
@Override
231-
public void completed(Path result) {
232-
sink.success(result);
233-
}
234-
235-
@Override
236-
public void failed(Exception ex) {
237-
sink.error(ex);
238-
}
239-
240-
@Override
241-
public void cancelled() {
242-
sink.success();
243-
}
244-
}
219+
.flatMap(notUsed -> {
220+
final SimpleRequestBuilder reqBuilder = SimpleRequestBuilder.get(resourceUrl);
221+
applyHeaders(reqBuilder);
222+
223+
return
224+
fileLastModifiedMono
225+
.map(instant -> reqBuilder.setHeader("If-Modified-Since", httpDateTimeFormatter.format(instant)))
226+
.then(
227+
Mono.<Path>create(sink -> {
228+
hcHttpClient.execute(
229+
SimpleRequestProducer.create(reqBuilder.build()),
230+
new ResponseToFileConsumer(outputFile),
231+
new MonoSinkFutureCallbackAdapter<>(sink)
232+
);
233+
})
245234
);
246-
})
247-
248-
/*client
249-
.headers(this::applyHeaders)
250-
.headersWhen(headers ->
251-
skipUpToDate ?
252-
fileLastModifiedMono
253-
.map(outputLastModified -> headers.set(
254-
IF_MODIFIED_SINCE,
255-
httpDateTimeFormatter.format(outputLastModified)
256-
))
257-
.defaultIfEmpty(headers)
258-
: Mono.just(headers)
259-
)
260-
.followRedirect(true)
261-
.doOnRequest(debugLogRequest(log, "file fetch"))
262-
.doOnRequest(
263-
(httpClientRequest, connection) -> statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile))
264-
.get()
265-
.uri(resourceUrl)
266-
.response((resp, byteBufFlux) -> {
267-
if (skipUpToDate && resp.status() == HttpResponseStatus.NOT_MODIFIED) {
268-
log.debug("The file {} is already up to date", outputFile);
269-
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile);
270-
return Mono.just(outputFile);
271-
}
272-
273-
if (notSuccess(resp)) {
274-
return failedRequestMono(resp, byteBufFlux.aggregate(), "Downloading file");
275-
}
276-
277-
return copyBodyInputStreamToFile(byteBufFlux, outputFile);
278-
})
279-
.last()
280-
.checkpoint("Fetching file into directory")*/
281-
)
235+
})
282236
.defaultIfEmpty(outputFile);
283237
}
284238

@@ -317,33 +271,43 @@ public ResponseToFileConsumer(Path outputFile) {
317271

318272
@Override
319273
public void releaseResources() {
320-
try {
321-
channel.close();
322-
} catch (IOException e) {
323-
throw new RuntimeException(e);
274+
if (channel != null) {
275+
try {
276+
channel.close();
277+
} catch (IOException e) {
278+
throw new RuntimeException(e);
279+
}
324280
}
325281
}
326282

327283
@Override
328284
protected int capacityIncrement() {
329-
return 1024;
285+
return 4096;
330286
}
331287

332288
@Override
333289
protected void data(ByteBuffer src, boolean endOfStream) throws IOException {
334-
amount += channel.write(src);
335-
if (endOfStream) {
336-
channel.close();
290+
if (channel != null) {
291+
amount += channel.write(src);
292+
if (endOfStream) {
293+
channel.close();
337294

338-
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri(), outputFile);
339-
downloadedHandler.call(uri(), outputFile, amount);
295+
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri(), outputFile);
296+
downloadedHandler.call(uri(), outputFile, amount);
297+
}
340298
}
341299
}
342300

343301
@Override
344302
protected void start(HttpResponse response,
345303
ContentType contentType
346304
) throws HttpException, IOException {
305+
if (skipUpToDate && response.getCode() == HttpStatus.SC_NOT_MODIFIED) {
306+
log.debug("The file {} is already up to date", outputFile);
307+
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile);
308+
return;
309+
}
310+
347311
statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile);
348312

349313
channel = Files.newByteChannel(outputFile,

0 commit comments

Comments
 (0)