diff --git a/.run/modrinth-modpack (slug).run.xml b/.run/modrinth-modpack (slug).run.xml
index 0a275316..4bd3474d 100644
--- a/.run/modrinth-modpack (slug).run.xml
+++ b/.run/modrinth-modpack (slug).run.xml
@@ -12,6 +12,16 @@
+
+
+
+
+
+
+
+
+
+
diff --git a/src/main/java/me/itzg/helpers/curseforge/CurseForgeInstaller.java b/src/main/java/me/itzg/helpers/curseforge/CurseForgeInstaller.java
index 1fdfb81c..3e4f3d0d 100644
--- a/src/main/java/me/itzg/helpers/curseforge/CurseForgeInstaller.java
+++ b/src/main/java/me/itzg/helpers/curseforge/CurseForgeInstaller.java
@@ -384,6 +384,8 @@ else if (Manifests.allFilesPresent(outputDir, context.prevInstallManifest, ignor
}
}
else {
+ log.info("Downloading modpack zip for {}", modFile.getDisplayName());
+
modpackZip = context.cfApi.downloadTemp(modFile, ".zip",
(status, uri, file) ->
log.debug("Modpack file retrieval: status={} uri={} file={}", status, uri, file)
diff --git a/src/main/java/me/itzg/helpers/fabric/FabricMetaClient.java b/src/main/java/me/itzg/helpers/fabric/FabricMetaClient.java
index 60e797ae..1bdab315 100644
--- a/src/main/java/me/itzg/helpers/fabric/FabricMetaClient.java
+++ b/src/main/java/me/itzg/helpers/fabric/FabricMetaClient.java
@@ -208,7 +208,7 @@ private static void debugDownloadedContent(Path path) {
log.debug("Downloaded launcher jar content starts with: {}",
Hex.encodeHexString(ByteBuffer.wrap(buf, 0, amount))
);
- } catch (IOException e) {
+ } catch (IOException|IndexOutOfBoundsException e) {
log.warn("Failed to debug content of launcher jar", e);
}
}
diff --git a/src/main/java/me/itzg/helpers/files/ByteBufQueue.java b/src/main/java/me/itzg/helpers/files/ByteBufQueue.java
new file mode 100644
index 00000000..ea60a14d
--- /dev/null
+++ b/src/main/java/me/itzg/helpers/files/ByteBufQueue.java
@@ -0,0 +1,53 @@
+package me.itzg.helpers.files;
+
+import io.netty.buffer.ByteBuf;
+import java.util.LinkedList;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+class ByteBufQueue {
+
+ final Lock lock = new ReentrantLock();
+ final Condition readyOrFinished = lock.newCondition();
+ final LinkedList buffers = new LinkedList<>();
+ boolean finished = false;
+
+ public void add(ByteBuf buf) {
+ lock.lock();
+ try {
+ buffers.add(buf);
+ } finally {
+ readyOrFinished.signal();
+ lock.unlock();
+ }
+ }
+
+ public ByteBuf take() {
+ while (true) {
+ lock.lock();
+
+ try {
+ if (!buffers.isEmpty()) {
+ return buffers.removeFirst();
+ }
+ else if (finished) {
+ return null;
+ }
+ readyOrFinished.awaitUninterruptibly();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ public void finish() {
+ lock.lock();
+ try {
+ finished = true;
+ readyOrFinished.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+}
diff --git a/src/main/java/me/itzg/helpers/files/ReactiveFileUtils.java b/src/main/java/me/itzg/helpers/files/ReactiveFileUtils.java
index 7b2c5783..354faa80 100644
--- a/src/main/java/me/itzg/helpers/files/ReactiveFileUtils.java
+++ b/src/main/java/me/itzg/helpers/files/ReactiveFileUtils.java
@@ -1,6 +1,6 @@
package me.itzg.helpers.files;
-import java.io.IOException;
+import io.netty.buffer.ByteBuf;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -8,10 +8,10 @@
import java.time.Instant;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
-import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.ByteBufFlux;
+import reactor.util.function.Tuple2;
@Slf4j
public class ReactiveFileUtils {
@@ -42,36 +42,46 @@ public static Mono createDirectories(Path dir) {
}
public static Mono writeByteBufFluxToFile(ByteBufFlux byteBufFlux, Path file) {
- return Mono.fromCallable(() -> {
- log.trace("Opening {} for writing", file);
- return FileChannel.open(file,
- StandardOpenOption.WRITE,
- StandardOpenOption.CREATE,
- StandardOpenOption.TRUNCATE_EXISTING
- );
- }
- )
- .subscribeOn(Schedulers.boundedElastic())
- .flatMap(outChannel ->
- byteBufFlux
- .asByteBuffer()
- .subscribeOn(Schedulers.boundedElastic())
- .handle((byteBuffer, sink) -> {
- try {
- sink.next(outChannel.write(byteBuffer));
- } catch (IOException e) {
- sink.error(Exceptions.propagate(e));
+ final ByteBufQueue byteBufQueue = new ByteBufQueue();
+
+ // Separate this into a pair of concurrent mono's
+ return Mono.zip(
+ // ...file writer
+ Mono.fromCallable(() -> {
+ try (FileChannel channel = FileChannel.open(file,
+ StandardOpenOption.WRITE,
+ StandardOpenOption.CREATE,
+ StandardOpenOption.TRUNCATE_EXISTING
+ )) {
+ ByteBuf byteBuf;
+ while ((byteBuf = byteBufQueue.take()) != null) {
+ try {
+ //noinspection ResultOfMethodCallIgnored
+ channel.write(byteBuf.nioBuffer());
+ } finally {
+ byteBuf.release();
+ }
+ }
+
+ return file;
}
})
- .doOnTerminate(() -> {
- try {
- outChannel.close();
- log.trace("Closed {}", file);
- } catch (IOException e) {
- log.warn("Failed to close {}", file, e);
- }
+ // ...which runs in a separate thread
+ .subscribeOn(Schedulers.boundedElastic()),
+ // ...and the network consumer flux
+ byteBufFlux
+ // Mark the bytebufs as retained so they can be released after
+ // they are written by the mono above
+ .retain()
+ .map(byteBuf -> {
+ final int amount = byteBuf.readableBytes();
+ byteBufQueue.add(byteBuf);
+ return amount;
})
+ .doOnTerminate(byteBufQueue::finish)
.collect(Collectors.summingLong(value -> value))
- );
+ )
+ // Just expose the total bytes read from network
+ .map(Tuple2::getT2);
}
}
diff --git a/src/main/java/me/itzg/helpers/http/SharedFetch.java b/src/main/java/me/itzg/helpers/http/SharedFetch.java
index 8ea107bc..f8d85875 100644
--- a/src/main/java/me/itzg/helpers/http/SharedFetch.java
+++ b/src/main/java/me/itzg/helpers/http/SharedFetch.java
@@ -12,6 +12,7 @@
import lombok.extern.slf4j.Slf4j;
import me.itzg.helpers.McImageHelper;
import me.itzg.helpers.errors.GenericException;
+import reactor.netty.http.Http11SslContextSpec;
import reactor.netty.http.Http2SslContextSpec;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.HttpClient;
@@ -60,8 +61,6 @@ public SharedFetch(String forCommand, Options options) {
reactiveClient = HttpClient.create(connectionProvider)
.proxyWithSystemProperties()
- // https://projectreactor.io/docs/netty/release/reference/http-client.html#HTTP2
- .protocol(HttpProtocol.HTTP11, HttpProtocol.H2)
.headers(headers -> {
headers
.set(HttpHeaderNames.USER_AGENT.toString(), userAgent)
@@ -72,15 +71,29 @@ public SharedFetch(String forCommand, Options options) {
}
)
// Reference https://projectreactor.io/docs/netty/release/reference/index.html#response-timeout
- .responseTimeout(options.getResponseTimeout())
- .secure(spec ->
- // Http2 SSL supports both HTTP/2 and HTTP/1.1
- spec.sslContext((GenericSslContextSpec>) Http2SslContextSpec.forClient())
- // Reference https://projectreactor.io/docs/netty/release/reference/index.html#ssl-tls-timeout
- .handshakeTimeout(options.getTlsHandshakeTimeout())
- )
-
- ;
+ .responseTimeout(options.getResponseTimeout());
+
+ if (options.isUseHttp2()) {
+ log.debug("Using HTTP/2");
+ reactiveClient
+ // https://projectreactor.io/docs/netty/release/reference/http-client.html#HTTP2
+ .protocol(HttpProtocol.HTTP11, HttpProtocol.H2)
+ .secure(spec ->
+ // Http2 SSL supports both HTTP/2 and HTTP/1.1
+ spec.sslContext((GenericSslContextSpec>) Http2SslContextSpec.forClient())
+ // Reference https://projectreactor.io/docs/netty/release/reference/index.html#ssl-tls-timeout
+ .handshakeTimeout(options.getTlsHandshakeTimeout())
+ );
+ }
+ else {
+ log.debug("Using HTTP/1.1");
+ reactiveClient
+ .secure(spec ->
+ spec.sslContext((GenericSslContextSpec>) Http11SslContextSpec.forClient())
+ // Reference https://projectreactor.io/docs/netty/release/reference/index.html#ssl-tls-timeout
+ .handshakeTimeout(options.getTlsHandshakeTimeout())
+ );
+ }
headers.put("x-fetch-session", fetchSessionId);
@@ -135,6 +148,9 @@ public static class Options {
*/
private final URI filesViaUrl;
+ @Default
+ private final boolean useHttp2 = true;
+
public Options withHeader(String key, String value) {
final Map newHeaders = extraHeaders != null ?
new HashMap<>(extraHeaders) : new HashMap<>();
@@ -142,7 +158,7 @@ public Options withHeader(String key, String value) {
return new Options(
responseTimeout, tlsHandshakeTimeout, maxIdleTimeout, pendingAcquireTimeout,
- newHeaders, filesViaUrl
+ newHeaders, filesViaUrl, useHttp2
);
}
}
diff --git a/src/main/java/me/itzg/helpers/http/SharedFetchArgs.java b/src/main/java/me/itzg/helpers/http/SharedFetchArgs.java
index 6ca4d75b..30438482 100644
--- a/src/main/java/me/itzg/helpers/http/SharedFetchArgs.java
+++ b/src/main/java/me/itzg/helpers/http/SharedFetchArgs.java
@@ -48,6 +48,13 @@ public void setPendingAcquireTimeout(Duration timeout) {
optionsBuilder.pendingAcquireTimeout(timeout);
}
+ @Option(names = "--use-http2", defaultValue = "${env:FETCH_USE_HTTP2:-true}",
+ description = "Whether to use HTTP/2. Default: ${DEFAULT-VALUE}"
+ )
+ public void setUseHttp2(boolean useHttp2) {
+ optionsBuilder.useHttp2(useHttp2);
+ }
+
public Options options() {
return optionsBuilder.build();
}