Skip to content

Conversation

@burningtnt
Copy link
Member

No description provided.

@burningtnt burningtnt marked this pull request as draft December 20, 2025 12:41
…maphore

# Conflicts:
#	HMCL/src/main/java/org/jackhuang/hmcl/ui/FXUtils.java
@burningtnt burningtnt marked this pull request as ready for review December 20, 2025 12:44
@Glavo Glavo requested a review from Copilot December 20, 2025 12:45
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces centralized download concurrency control by extracting concurrency management from FetchTask into a new DownloadConcurrency class. This allows runtime adjustment of download concurrency across all download operations and API requests.

Key Changes:

  • Introduced DownloadConcurrency class to manage concurrency limits globally with support for dynamic adjustment
  • Created ConcurrencyGuard with Token for automatic resource management via try-with-resources
  • Applied concurrency control to Modrinth and CurseForge API calls to prevent overwhelming remote services

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
HMCLCore/src/main/java/org/jackhuang/hmcl/util/io/concurrency/DownloadConcurrency.java New class providing centralized download concurrency management with dynamic resizing support
HMCLCore/src/main/java/org/jackhuang/hmcl/util/io/concurrency/ConcurrencyGuard.java New guard class wrapping semaphore with AutoCloseable token pattern
HMCLCore/src/main/java/org/jackhuang/hmcl/task/Schedulers.java Added isVirtualThreadAvailable() helper method
HMCLCore/src/main/java/org/jackhuang/hmcl/task/FetchTask.java Refactored to use ConcurrencyGuard instead of direct Semaphore, added initialization guard
HMCLCore/src/main/java/org/jackhuang/hmcl/mod/modrinth/ModrinthRemoteModRepository.java Applied concurrency control to all API methods using ConcurrencyGuard
HMCLCore/src/main/java/org/jackhuang/hmcl/mod/curse/CurseForgeRemoteModRepository.java Applied concurrency control to all API methods using ConcurrencyGuard
HMCL/src/main/java/org/jackhuang/hmcl/ui/main/DownloadSettingsPage.java Updated reference from FetchTask.DEFAULT_CONCURRENCY to DownloadConcurrency.DEFAULT_CONCURRENCY
HMCL/src/main/java/org/jackhuang/hmcl/ui/FXUtils.java Added setSignificance(MINOR) to image loading tasks for proper priority handling
HMCL/src/main/java/org/jackhuang/hmcl/setting/ProxyManager.java Added FetchTask.notifyInitialized() call to ensure proper initialization order
HMCL/src/main/java/org/jackhuang/hmcl/setting/DownloadProviders.java Updated to use DownloadConcurrency.set() instead of FetchTask.setDownloadExecutorConcurrency()

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +1191 to +1198
.setSignificance(Task.TaskSignificance.MINOR)
.thenApplyAsync(file -> loadImage(file, requestedWidth, requestedHeight, preserveRatio, smooth))
.setSignificance(Task.TaskSignificance.MINOR);
}

public static Task<Image> getRemoteImageTask(URI uri, int requestedWidth, int requestedHeight, boolean preserveRatio, boolean smooth) {
return new CacheFileTask(uri)
.setSignificance(Task.TaskSignificance.MINOR)
Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The setSignificance(Task.TaskSignificance.MINOR) is called twice in this chain - once after the CacheFileTask and once after thenApplyAsync. The second call (line 1193) will override the first (line 1191), making the first call redundant. Consider removing the first call or clarify if both are intentional for some reason.

Suggested change
.setSignificance(Task.TaskSignificance.MINOR)
.thenApplyAsync(file -> loadImage(file, requestedWidth, requestedHeight, preserveRatio, smooth))
.setSignificance(Task.TaskSignificance.MINOR);
}
public static Task<Image> getRemoteImageTask(URI uri, int requestedWidth, int requestedHeight, boolean preserveRatio, boolean smooth) {
return new CacheFileTask(uri)
.setSignificance(Task.TaskSignificance.MINOR)
.thenApplyAsync(file -> loadImage(file, requestedWidth, requestedHeight, preserveRatio, smooth))
.setSignificance(Task.TaskSignificance.MINOR);
}
public static Task<Image> getRemoteImageTask(URI uri, int requestedWidth, int requestedHeight, boolean preserveRatio, boolean smooth) {
return new CacheFileTask(uri)

Copilot uses AI. Check for mistakes.
Comment on lines 65 to 82
private static void shrinkConcurrencyVT(int delta) {
for (int i = instances.size() - 1; i >= 0; i--) {
Semaphore semaphore = instances.get(i).get();
if (semaphore == null) {
instances.remove(i);
} else {
Schedulers.io().execute(() -> {
if (semaphore.tryAcquire(delta)) {
return;
}

for (int j = 0; j < delta; j++) {
semaphore.acquireUninterruptibly();
}
});
}
}
}
Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shrinkConcurrencyVT() method spawns asynchronous tasks to acquire permits, but there's no tracking or waiting for these tasks to complete. This could lead to a situation where permits are not yet acquired when the method returns. Additionally, if the tryAcquire fails and the loop begins, the permits may be acquired one at a time across multiple iterations, which could lead to unbalanced permit acquisition if multiple shrink operations happen concurrently.

Consider returning a CompletableFuture or tracking the completion of these asynchronous tasks to ensure proper synchronization.

Copilot uses AI. Check for mistakes.

static {
if (!initialized) {
throw new AssertionError();
Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AssertionError thrown when the HTTP_CLIENT is accessed before initialization lacks a descriptive error message. This makes debugging difficult if this error occurs in production. Consider adding a message like "FetchTask.Holder accessed before ProxyManager initialization" to help diagnose initialization order issues.

Suggested change
throw new AssertionError();
throw new AssertionError("FetchTask.Holder accessed before ProxyManager initialization");

Copilot uses AI. Check for mistakes.
Comment on lines +1191 to +1198
.setSignificance(Task.TaskSignificance.MINOR)
.thenApplyAsync(file -> loadImage(file, requestedWidth, requestedHeight, preserveRatio, smooth))
.setSignificance(Task.TaskSignificance.MINOR);
}

public static Task<Image> getRemoteImageTask(URI uri, int requestedWidth, int requestedHeight, boolean preserveRatio, boolean smooth) {
return new CacheFileTask(uri)
.setSignificance(Task.TaskSignificance.MINOR)
Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The setSignificance(Task.TaskSignificance.MINOR) is called twice in this chain - once after the CacheFileTask and once after thenApplyAsync. The second call (line 1200) will override the first (line 1198), making the first call redundant. Consider removing the first call or clarify if both are intentional for some reason.

Suggested change
.setSignificance(Task.TaskSignificance.MINOR)
.thenApplyAsync(file -> loadImage(file, requestedWidth, requestedHeight, preserveRatio, smooth))
.setSignificance(Task.TaskSignificance.MINOR);
}
public static Task<Image> getRemoteImageTask(URI uri, int requestedWidth, int requestedHeight, boolean preserveRatio, boolean smooth) {
return new CacheFileTask(uri)
.setSignificance(Task.TaskSignificance.MINOR)
.thenApplyAsync(file -> loadImage(file, requestedWidth, requestedHeight, preserveRatio, smooth))
.setSignificance(Task.TaskSignificance.MINOR);
}
public static Task<Image> getRemoteImageTask(URI uri, int requestedWidth, int requestedHeight, boolean preserveRatio, boolean smooth) {
return new CacheFileTask(uri)

Copilot uses AI. Check for mistakes.
Comment on lines 14 to 127
public final class DownloadConcurrency {
private DownloadConcurrency() {
}

public static int DEFAULT_CONCURRENCY = Math.min(Runtime.getRuntime().availableProcessors() * 4, 64);

private static volatile int downloadConcurrency = DEFAULT_CONCURRENCY;
private static final List<WeakReference<Semaphore>> instances = new CopyOnWriteArrayList<>();

public static ConcurrencyGuard of() {
return of(false);
}

public static ConcurrencyGuard of(boolean fair) {
Semaphore instance = new Semaphore(downloadConcurrency, fair);
instances.add(new WeakReference<>(instance));
return new ConcurrencyGuard(instance);
}

@FXThread
public static void set(int concurrency) {
concurrency = Math.max(concurrency, 1);
int delta = concurrency - downloadConcurrency;
downloadConcurrency = concurrency;
if (delta == 0) {
return;
}

FetchTask.setDownloadExecutorConcurrency(concurrency);
if (delta > 0) {
growConcurrency(delta);
} else {
if (Schedulers.isVirtualThreadAvailable()) {
shrinkConcurrencyVT(-delta);
} else {
shrinkConcurrencySlow(-delta);
}
}
}

private static void growConcurrency(int delta) {
for (int i = instances.size() - 1; i >= 0; i--) {
Semaphore semaphore = instances.get(i).get();
if (semaphore == null) {
instances.remove(i);
} else {
semaphore.release(delta);
}
}
}

private static void shrinkConcurrencyVT(int delta) {
for (int i = instances.size() - 1; i >= 0; i--) {
Semaphore semaphore = instances.get(i).get();
if (semaphore == null) {
instances.remove(i);
} else {
Schedulers.io().execute(() -> {
if (semaphore.tryAcquire(delta)) {
return;
}

for (int j = 0; j < delta; j++) {
semaphore.acquireUninterruptibly();
}
});
}
}
}

private static final class SemaphorePair {
private final Semaphore semaphore;
private int count;

public SemaphorePair(Semaphore semaphore, int count) {
this.semaphore = semaphore;
this.count = count;
}
}

private static void shrinkConcurrencySlow(int delta) {
List<SemaphorePair> semaphores = new CircularArrayList<>();
for (int i = instances.size() - 1; i >= 0; i--) {
Semaphore semaphore = instances.get(i).get();
if (semaphore == null) {
instances.remove(i);
} else {
semaphores.add(new SemaphorePair(semaphore, delta));
}
}

Schedulers.defaultScheduler().execute(() -> {
while (!semaphores.isEmpty()) {
SemaphorePair pair = semaphores.get(0);
boolean acquired;
try {
acquired = pair.semaphore.tryAcquire(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
continue;
}

if (acquired) {
pair.count--;
if (pair.count == 0) {
semaphores.remove(0);
}
} else {
semaphores.remove(0);
semaphores.add(pair);
}
}
});
}
}
Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new DownloadConcurrency class introduces critical concurrency control logic but lacks any test coverage. Given the complex synchronization logic in set(), growConcurrency(), shrinkConcurrencyVT(), and shrinkConcurrencySlow(), and the presence of comprehensive tests for similar utility classes in this repository (e.g., CircularArrayListTest, TaskTest), tests should be added to verify:

  • Correct permit allocation for newly created semaphores
  • Proper permit adjustment when concurrency is increased/decreased
  • Thread-safety of concurrent of() and set() calls
  • Proper cleanup of garbage-collected semaphore references

Copilot uses AI. Check for mistakes.
Comment on lines 22 to 49

public static ConcurrencyGuard of() {
return of(false);
}

public static ConcurrencyGuard of(boolean fair) {
Semaphore instance = new Semaphore(downloadConcurrency, fair);
instances.add(new WeakReference<>(instance));
return new ConcurrencyGuard(instance);
}

@FXThread
public static void set(int concurrency) {
concurrency = Math.max(concurrency, 1);
int delta = concurrency - downloadConcurrency;
downloadConcurrency = concurrency;
if (delta == 0) {
return;
}

FetchTask.setDownloadExecutorConcurrency(concurrency);
if (delta > 0) {
growConcurrency(delta);
} else {
if (Schedulers.isVirtualThreadAvailable()) {
shrinkConcurrencyVT(-delta);
} else {
shrinkConcurrencySlow(-delta);
Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race condition between reading downloadConcurrency and updating the semaphore concurrency. If set() is called concurrently with of(), a newly created semaphore could be initialized with a stale concurrency value. The new semaphore will then be added to instances but will not be adjusted by the ongoing set() call's grow/shrink logic, resulting in inconsistent permit counts across semaphores.

Consider synchronizing the of() method with set() or reading downloadConcurrency inside a synchronized block to ensure consistency.

Suggested change
public static ConcurrencyGuard of() {
return of(false);
}
public static ConcurrencyGuard of(boolean fair) {
Semaphore instance = new Semaphore(downloadConcurrency, fair);
instances.add(new WeakReference<>(instance));
return new ConcurrencyGuard(instance);
}
@FXThread
public static void set(int concurrency) {
concurrency = Math.max(concurrency, 1);
int delta = concurrency - downloadConcurrency;
downloadConcurrency = concurrency;
if (delta == 0) {
return;
}
FetchTask.setDownloadExecutorConcurrency(concurrency);
if (delta > 0) {
growConcurrency(delta);
} else {
if (Schedulers.isVirtualThreadAvailable()) {
shrinkConcurrencyVT(-delta);
} else {
shrinkConcurrencySlow(-delta);
private static final Object LOCK = new Object();
public static ConcurrencyGuard of() {
return of(false);
}
public static ConcurrencyGuard of(boolean fair) {
Semaphore instance;
synchronized (LOCK) {
instance = new Semaphore(downloadConcurrency, fair);
instances.add(new WeakReference<>(instance));
}
return new ConcurrencyGuard(instance);
}
@FXThread
public static void set(int concurrency) {
concurrency = Math.max(concurrency, 1);
synchronized (LOCK) {
int delta = concurrency - downloadConcurrency;
downloadConcurrency = concurrency;
if (delta == 0) {
return;
}
FetchTask.setDownloadExecutorConcurrency(concurrency);
if (delta > 0) {
growConcurrency(delta);
} else {
if (Schedulers.isVirtualThreadAvailable()) {
shrinkConcurrencyVT(-delta);
} else {
shrinkConcurrencySlow(-delta);
}

Copilot uses AI. Check for mistakes.
Comment on lines 22 to 50

public static ConcurrencyGuard of() {
return of(false);
}

public static ConcurrencyGuard of(boolean fair) {
Semaphore instance = new Semaphore(downloadConcurrency, fair);
instances.add(new WeakReference<>(instance));
return new ConcurrencyGuard(instance);
}

@FXThread
public static void set(int concurrency) {
concurrency = Math.max(concurrency, 1);
int delta = concurrency - downloadConcurrency;
downloadConcurrency = concurrency;
if (delta == 0) {
return;
}

FetchTask.setDownloadExecutorConcurrency(concurrency);
if (delta > 0) {
growConcurrency(delta);
} else {
if (Schedulers.isVirtualThreadAvailable()) {
shrinkConcurrencyVT(-delta);
} else {
shrinkConcurrencySlow(-delta);
}
Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race condition in the set() method. The downloadConcurrency variable is updated before delta is calculated, which creates a window where downloadConcurrency has the new value but the semaphores haven't been adjusted yet. If of() is called during this window, it will create a semaphore with the new concurrency value but it won't appear in the grow/shrink adjustments that follow.

Consider calculating delta first, then updating downloadConcurrency after the adjustments, or synchronize both methods.

Suggested change
public static ConcurrencyGuard of() {
return of(false);
}
public static ConcurrencyGuard of(boolean fair) {
Semaphore instance = new Semaphore(downloadConcurrency, fair);
instances.add(new WeakReference<>(instance));
return new ConcurrencyGuard(instance);
}
@FXThread
public static void set(int concurrency) {
concurrency = Math.max(concurrency, 1);
int delta = concurrency - downloadConcurrency;
downloadConcurrency = concurrency;
if (delta == 0) {
return;
}
FetchTask.setDownloadExecutorConcurrency(concurrency);
if (delta > 0) {
growConcurrency(delta);
} else {
if (Schedulers.isVirtualThreadAvailable()) {
shrinkConcurrencyVT(-delta);
} else {
shrinkConcurrencySlow(-delta);
}
private static final Object LOCK = new Object();
public static ConcurrencyGuard of() {
return of(false);
}
public static ConcurrencyGuard of(boolean fair) {
synchronized (LOCK) {
Semaphore instance = new Semaphore(downloadConcurrency, fair);
instances.add(new WeakReference<>(instance));
return new ConcurrencyGuard(instance);
}
}
@FXThread
public static void set(int concurrency) {
synchronized (LOCK) {
concurrency = Math.max(concurrency, 1);
int oldConcurrency = downloadConcurrency;
int delta = concurrency - oldConcurrency;
if (delta == 0) {
downloadConcurrency = concurrency;
FetchTask.setDownloadExecutorConcurrency(concurrency);
return;
}
FetchTask.setDownloadExecutorConcurrency(concurrency);
if (delta > 0) {
growConcurrency(delta);
} else {
if (Schedulers.isVirtualThreadAvailable()) {
shrinkConcurrencyVT(-delta);
} else {
shrinkConcurrencySlow(-delta);
}
}
downloadConcurrency = concurrency;

Copilot uses AI. Check for mistakes.
private DownloadConcurrency() {
}

public static int DEFAULT_CONCURRENCY = Math.min(Runtime.getRuntime().availableProcessors() * 4, 64);
Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field DEFAULT_CONCURRENCY should be declared as final since it's initialized once and never reassigned. This makes the intent clearer and prevents accidental modification.

Suggested change
public static int DEFAULT_CONCURRENCY = Math.min(Runtime.getRuntime().availableProcessors() * 4, 64);
public static final int DEFAULT_CONCURRENCY = Math.min(Runtime.getRuntime().availableProcessors() * 4, 64);

Copilot uses AI. Check for mistakes.
Comment on lines 138 to 139
} catch (InterruptedException ignored) {
// Cancelled
Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catch block for InterruptedException is now unreachable code. The SEMAPHORE.acquire() method uses acquireUninterruptibly() internally and returns a Token, so it never throws InterruptedException. This catch block should be removed since it can never be executed and may confuse readers about the actual interruption behavior.

Suggested change
} catch (InterruptedException ignored) {
// Cancelled

Copilot uses AI. Check for mistakes.
Copy link
Member

@Glavo Glavo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得 CF/MR 不应该使用设置中的并发限制,而应该有一个独立的更严格的限制。下载中那个线程数对于这些 API 来说可能还是太暴力了一点。所以这个 DownloadConcurrency 感觉必要性都不高。

instances.remove(i);
} else {
Schedulers.io().execute(() -> {
if (semaphore.tryAcquire(delta)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么要在 execute 内部执行这项操作?我认为在外部 FX 线程上直接执行更好,tryAcquire 不会阻塞 FX 线程,当可以直接 tryAcquire 的时候能够避免整个 execute 任务调度的开销。

而且目前看来,ConcurrencyGuard 的实例数极少,直接共用 shrinkConcurrencyVT 就差不多了。这本来就是一个极低频的操作,不太在意开销,而且就算都用 shrinkConcurrencyVT 的实现开销也是可以接受的,只要能在 FX 线程上完成 tryAcquire 就不会创建新线程,就算用到了 io 线程池也可能会复用已缓存的线程,哪怕要创建新线程,代价也是可接受的。我认为不太值得为此新增一条实现相对复杂的优化分支。

import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

public final class ConcurrencyGuard {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个类看起来只是为了支持 twr 语句吗?原生 Semaphore 的用法相比 twr 感觉也没复杂到那里,这个类我觉得不是很必要。就算要做的话,我认为也不需要外层这个 ConcurrencyGuard,只用里面的这个 Token 配合一个静态的工具方法就行。

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

使用 try-with-resource 能避免 lock 了没 unlock 的问题,避免 #4913 重演

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

使用 try-with-resource 能避免 lock 了没 unlock 的问题,避免 #4913 重演

但我还是认为不应该有 ConcurrencyGuard 这个东西,就算要 twr 那也只需要里面这个 Token 类,外面的完全没有必要。

@burningtnt
Copy link
Member Author

回退了相关代码。现在,CurseForge 和 Modrinth 的并发数被硬编码为 16。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants