Skip to content

Commit 6786ca9

Browse files
authored
http: adjust ReactiveFileUtils.writeByteBufFluxToFile (#562)
1 parent 25edc50 commit 6786ca9

File tree

5 files changed

+40
-31
lines changed

5 files changed

+40
-31
lines changed

src/main/java/me/itzg/helpers/fabric/InstallFabricLoaderCommand.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ public class InstallFabricLoaderCommand implements Callable<Integer> {
2828
@Option(names = "--results-file", description = ResultsFileWriter.OPTION_DESCRIPTION, paramLabel = "FILE")
2929
Path resultsFile;
3030

31+
@Option(names = "--force-reinstall", description = "Force reinstall of the loader even if it already exists")
32+
boolean forceReinstall;
33+
3134
@ArgGroup
3235
OriginOptions originOptions = new OriginOptions();
3336

@@ -85,7 +88,8 @@ else if (VERSION_PATTERN.matcher(value).matches()) {
8588
@Override
8689
public Integer call() throws Exception {
8790
final FabricLauncherInstaller installer = new FabricLauncherInstaller(outputDirectory)
88-
.setResultsFile(resultsFile);
91+
.setResultsFile(resultsFile)
92+
.setForceReinstall(forceReinstall);
8993

9094
if (originOptions.fromUri != null) {
9195
installer.installUsingUri(sharedFetchArgs.options(), originOptions.fromUri);

src/main/java/me/itzg/helpers/files/ReactiveFileUtils.java

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package me.itzg.helpers.files;
22

33
import java.io.IOException;
4+
import java.nio.channels.FileChannel;
45
import java.nio.file.Files;
56
import java.nio.file.Path;
67
import java.nio.file.StandardOpenOption;
78
import java.time.Instant;
89
import java.util.stream.Collectors;
910
import lombok.extern.slf4j.Slf4j;
11+
import reactor.core.Exceptions;
1012
import reactor.core.publisher.Mono;
1113
import reactor.core.scheduler.Schedulers;
1214
import reactor.netty.ByteBufFlux;
@@ -39,34 +41,37 @@ public static Mono<Path> createDirectories(Path dir) {
3941
.subscribeOn(Schedulers.boundedElastic());
4042
}
4143

42-
@SuppressWarnings("BlockingMethodInNonBlockingContext")
43-
public static Mono<Long> copyByteBufFluxToFile(ByteBufFlux byteBufFlux, Path file) {
44+
public static Mono<Long> writeByteBufFluxToFile(ByteBufFlux byteBufFlux, Path file) {
4445
return Mono.fromCallable(() -> {
4546
log.trace("Opening {} for writing", file);
46-
return Files.newByteChannel(file, StandardOpenOption.WRITE, StandardOpenOption.CREATE,
47+
return FileChannel.open(file,
48+
StandardOpenOption.WRITE,
49+
StandardOpenOption.CREATE,
4750
StandardOpenOption.TRUNCATE_EXISTING
4851
);
4952
}
5053
)
51-
.flatMap(outChannel -> byteBufFlux.asByteBuffer()
52-
.flatMap(byteBuffer ->
53-
Mono.fromCallable(() -> {
54-
final int count = outChannel.write(byteBuffer);
55-
log.trace("Wrote {} bytes to {}", count, file);
56-
return count;
57-
}
58-
)
59-
)
60-
.doOnTerminate(() -> {
61-
try {
62-
log.trace("Closing file for writing: {}", file);
63-
outChannel.close();
64-
} catch (IOException e) {
65-
log.error("Failed to close file for writing: {}", file, e);
66-
}
67-
})
68-
.collect(Collectors.<Integer>summingLong(value -> value))
69-
.subscribeOn(Schedulers.boundedElastic())
54+
.subscribeOn(Schedulers.boundedElastic())
55+
.flatMap(outChannel ->
56+
byteBufFlux
57+
.asByteBuffer()
58+
.subscribeOn(Schedulers.boundedElastic())
59+
.<Integer>handle((byteBuffer, sink) -> {
60+
try {
61+
sink.next(outChannel.write(byteBuffer));
62+
} catch (IOException e) {
63+
sink.error(Exceptions.propagate(e));
64+
}
65+
})
66+
.doOnTerminate(() -> {
67+
try {
68+
outChannel.close();
69+
log.trace("Closed {}", file);
70+
} catch (IOException e) {
71+
log.warn("Failed to close {}", file, e);
72+
}
73+
})
74+
.collect(Collectors.<Integer>summingLong(value -> value))
7075
);
7176
}
7277
}

src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,17 @@
44
import static io.netty.handler.codec.http.HttpHeaderNames.LAST_MODIFIED;
55
import static java.util.Objects.requireNonNull;
66

7+
import io.netty.handler.codec.http.HttpHeaderNames;
8+
import io.netty.handler.codec.http.HttpResponseStatus;
79
import java.io.IOException;
810
import java.nio.file.Files;
911
import java.nio.file.Path;
1012
import java.time.Instant;
11-
12-
import org.jetbrains.annotations.NotNull;
13-
14-
import io.netty.handler.codec.http.HttpHeaderNames;
15-
import io.netty.handler.codec.http.HttpResponseStatus;
1613
import lombok.Setter;
1714
import lombok.experimental.Accessors;
1815
import lombok.extern.slf4j.Slf4j;
1916
import me.itzg.helpers.files.ReactiveFileUtils;
17+
import org.jetbrains.annotations.NotNull;
2018
import reactor.core.publisher.Mono;
2119
import reactor.core.scheduler.Schedulers;
2220
import reactor.netty.ByteBufFlux;
@@ -244,7 +242,7 @@ else if (skipUpToDate) {
244242
private Mono<Path> copyBodyInputStreamToFile(ByteBufFlux byteBufFlux, Path outputFile) {
245243
log.trace("Copying response body to file={}", outputFile);
246244

247-
return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, outputFile)
245+
return ReactiveFileUtils.writeByteBufFluxToFile(byteBufFlux, outputFile)
248246
.map(fileSize -> {
249247
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri(), outputFile);
250248
downloadedHandler.call(uri(), outputFile, fileSize);

src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public Mono<Path> assemble() {
107107
return failedContentTypeMono(resp);
108108
}
109109

110-
return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, file)
110+
return ReactiveFileUtils.writeByteBufFluxToFile(byteBufFlux, file)
111111
.flatMap(fileSize -> {
112112
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file);
113113
downloadedHandler.call(uri, file, fileSize);

src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,9 @@ private Mono<Path> processRemoteSource(String source) {
228228
.toDirectory(dest)
229229
.skipUpToDate(skipUpToDate)
230230
.skipExisting(skipExisting)
231+
.handleDownloaded((downloaded, uri, size) -> {
232+
log.debug("Downloaded {} from {} ({} bytes)", downloaded, uri, size);
233+
})
231234
.handleStatus((status, uri, file) -> {
232235
switch (status) {
233236
case DOWNLOADING:
@@ -240,7 +243,6 @@ private Mono<Path> processRemoteSource(String source) {
240243
log.info("The file {} already exists", file);
241244
break;
242245
case DOWNLOADED:
243-
log.debug("Finished downloading to file={}", file);
244246
break;
245247
}
246248
})

0 commit comments

Comments
 (0)