Skip to content

Commit 4dfde5a

Browse files
committed
fetch: download specific file to adjacent temp
1 parent 43204cd commit 4dfde5a

File tree

2 files changed

+15
-2
lines changed

2 files changed

+15
-2
lines changed

src/main/java/me/itzg/helpers/files/ReactiveFileUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
import java.nio.channels.FileChannel;
66
import java.nio.file.Files;
77
import java.nio.file.Path;
8+
import java.nio.file.StandardCopyOption;
89
import java.nio.file.StandardOpenOption;
910
import java.time.Instant;
11+
import java.util.function.Function;
1012
import java.util.stream.Collectors;
1113
import lombok.extern.slf4j.Slf4j;
1214
import reactor.core.publisher.Mono;
@@ -107,4 +109,13 @@ public static <T> Mono<T> removeFailedDownload(Throwable throwable, Path outputF
107109
})
108110
.subscribeOn(Schedulers.boundedElastic());
109111
}
112+
113+
public static Function<Path, Mono<Path>> moveTo(Path to) {
114+
return from -> Mono.fromCallable(() -> {
115+
log.debug("Moving {} to {}", from, to);
116+
Files.move(from, to, StandardCopyOption.REPLACE_EXISTING);
117+
return to;
118+
})
119+
.subscribeOn(Schedulers.boundedElastic());
120+
}
110121
}

src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public Mono<Path> assemble() {
6464

6565
final boolean useIfModifiedSince = skipUpToDate && Files.exists(file);
6666

67+
final Path tempDownloadFile = file.getParent().resolve(file.getFileName() + ".download");
6768
return useReactiveClient(client ->
6869
client
6970
.doOnRequest((httpClientRequest, connection) ->
@@ -107,7 +108,7 @@ public Mono<Path> assemble() {
107108
return failedContentTypeMono(resp);
108109
}
109110

110-
return ReactiveFileUtils.writeByteBufFluxToFile(byteBufFlux, file)
111+
return ReactiveFileUtils.writeByteBufFluxToFile(byteBufFlux, tempDownloadFile)
111112
.flatMap(fileSize -> {
112113
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file);
113114
downloadedHandler.call(uri, file, fileSize);
@@ -120,12 +121,13 @@ public Mono<Path> assemble() {
120121
uri, formatDuration(durationMillis), transferRate(durationMillis, fileSize)
121122
);
122123
}
123-
return Mono.just(file);
124+
return Mono.just(tempDownloadFile);
124125
});
125126
});
126127

127128
})
128129
.last()
130+
.flatMap(ReactiveFileUtils.moveTo(file))
129131
.contextWrite(context -> context.put("downloadStart", currentTimeMillis()))
130132
);
131133
}

0 commit comments

Comments
 (0)