Skip to content

Commit 0a754a3

Browse files
authored
feat: extend use of "FaweThread" to include main blocking executor (#3254)
* feat: extend use of "FaweThread" to include main blocking executor * Address comments
1 parent 5cde4dc commit 0a754a3

File tree

11 files changed

+203
-56
lines changed

11 files changed

+203
-56
lines changed

worldedit-bukkit/src/main/java/com/fastasyncworldedit/bukkit/adapter/AbstractBukkitGetBlocks.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import com.fastasyncworldedit.core.queue.implementation.QueueHandler;
1212
import com.fastasyncworldedit.core.queue.implementation.blocks.CharGetBlocks;
1313
import com.fastasyncworldedit.core.util.MemUtil;
14+
import com.fastasyncworldedit.core.util.task.FaweThreadUtil;
15+
import com.sk89q.worldedit.extent.Extent;
1416
import com.sk89q.worldedit.internal.util.LogManagerCompat;
1517
import com.sk89q.worldedit.util.formatting.text.TextComponent;
1618
import org.apache.logging.log4j.Logger;
@@ -86,16 +88,18 @@ public synchronized <T extends Future<T>> T call(IQueueExtent<? extends IChunk>
8688
final int finalCopyKey = copyKey;
8789
// Run immediately if possible
8890
if (chunk != null) {
89-
return tryWrappedInternalCall(set, finalizer, finalCopyKey, chunk, nmsWorld);
91+
return tryInternalCall(set, finalizer, finalCopyKey, chunk, nmsWorld);
9092
}
9193
// Submit via the STQE as that will help handle excessive queuing by waiting for the submission count to fall below the
9294
// target size
95+
final Extent extent = FaweThreadUtil.getCurrentExtent();
9396
nmsChunkFuture.thenApply(nmsChunk -> owner.submitTaskUnchecked(() -> (T) tryWrappedInternalCall(
9497
set,
9598
finalizer,
9699
finalCopyKey,
97100
nmsChunk,
98-
nmsWorld
101+
nmsWorld,
102+
extent
99103
)));
100104
// If we have re-submitted, return a completed future to prevent potential deadlocks where a future reliant on the
101105
// above submission is halting the BlockingExecutor, and preventing the above task from actually running. The futures
@@ -104,6 +108,22 @@ public synchronized <T extends Future<T>> T call(IQueueExtent<? extends IChunk>
104108
}
105109

106110
private <T extends Future<T>> T tryWrappedInternalCall(
111+
IChunkSet set,
112+
Runnable finalizer,
113+
int copyKey,
114+
LevelChunk nmsChunk,
115+
ServerLevel nmsWorld,
116+
Extent extent
117+
) {
118+
FaweThreadUtil.setCurrentExtent(extent);
119+
try {
120+
return tryInternalCall(set, finalizer, copyKey, nmsChunk, nmsWorld);
121+
} finally {
122+
FaweThreadUtil.clearCurrentExtent();
123+
}
124+
}
125+
126+
private <T extends Future<T>> T tryInternalCall(
107127
IChunkSet set,
108128
Runnable finalizer,
109129
int copyKey,
@@ -120,17 +140,20 @@ private <T extends Future<T>> T tryWrappedInternalCall(
120140
}
121141
}
122142

123-
protected <T extends Future<T>> T handleCallFinalizer(Runnable[] syncTasks, Runnable callback, Runnable finalizer) throws
143+
protected <T extends Future<T>> T handleCallFinalizer(
144+
final Runnable[] syncTasks,
145+
final Runnable callback,
146+
final Runnable finalizer
147+
) throws
124148
Exception {
125149
if (syncTasks != null) {
126150
QueueHandler queueHandler = Fawe.instance().getQueueHandler();
127-
Runnable[] finalSyncTasks = syncTasks;
128151

129152
// Chain the sync tasks and the callback
130153
Callable<Future<?>> chain = () -> {
131154
try {
132155
// Run the sync tasks
133-
for (Runnable task : finalSyncTasks) {
156+
for (Runnable task : syncTasks) {
134157
if (task != null) {
135158
task.run();
136159
}

worldedit-core/src/main/java/com/fastasyncworldedit/core/FaweCache.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
import com.fastasyncworldedit.core.queue.implementation.QueuePool;
1616
import com.fastasyncworldedit.core.util.MathMan;
1717
import com.fastasyncworldedit.core.util.collection.CleanableThreadLocal;
18+
import com.fastasyncworldedit.core.util.task.FaweBasicThreadFactory;
1819
import com.google.common.cache.CacheBuilder;
1920
import com.google.common.cache.CacheLoader;
2021
import com.google.common.cache.LoadingCache;
21-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2222
import com.sk89q.jnbt.ByteArrayTag;
2323
import com.sk89q.jnbt.ByteTag;
2424
import com.sk89q.jnbt.CompoundTag;
@@ -46,7 +46,6 @@
4646
import java.util.List;
4747
import java.util.Map;
4848
import java.util.Map.Entry;
49-
import java.util.concurrent.ArrayBlockingQueue;
5049
import java.util.concurrent.CancellationException;
5150
import java.util.concurrent.ExecutionException;
5251
import java.util.concurrent.Future;
@@ -647,9 +646,13 @@ public ThreadPoolExecutor newBlockingExecutor(String name) {
647646
public ThreadPoolExecutor newBlockingExecutor(String name, Logger logger) {
648647
int nThreads = Settings.settings().QUEUE.PARALLEL_THREADS;
649648
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
650-
return new ThreadPoolExecutor(nThreads, nThreads,
651-
0L, TimeUnit.MILLISECONDS, queue,
652-
new ThreadFactoryBuilder().setNameFormat(name).build(),
649+
return new ThreadPoolExecutor(
650+
nThreads,
651+
nThreads,
652+
0L,
653+
TimeUnit.MILLISECONDS,
654+
queue,
655+
new FaweBasicThreadFactory(name),
653656
new ThreadPoolExecutor.CallerRunsPolicy()
654657
) {
655658

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/ParallelQueueExtent.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import com.fastasyncworldedit.core.queue.Filter;
1919
import com.fastasyncworldedit.core.queue.IQueueChunk;
2020
import com.fastasyncworldedit.core.queue.IQueueExtent;
21-
import com.fastasyncworldedit.core.util.task.FaweThread;
21+
import com.fastasyncworldedit.core.util.task.FaweThreadUtil;
2222
import com.sk89q.worldedit.MaxChangedBlocksException;
2323
import com.sk89q.worldedit.WorldEditException;
2424
import com.sk89q.worldedit.extent.Extent;
@@ -85,29 +85,29 @@ public ParallelQueueExtent(QueueHandler handler, World world, boolean fastmode,
8585
*/
8686
@Deprecated(forRemoval = true, since = "2.13.0")
8787
public static void clearCurrentExtent() {
88-
FaweThread.clearCurrentExtent();
88+
FaweThreadUtil.clearCurrentExtent();
8989
}
9090

9191
/**
9292
* Sets the extent associated with the calling thread.
9393
*/
9494
@Deprecated(forRemoval = true, since = "2.13.0")
9595
public static void setCurrentExtent(Extent extent) {
96-
FaweThread.setCurrentExtent(extent);
96+
FaweThreadUtil.setCurrentExtent(extent);
9797
}
9898

9999
void enter(Extent extent) {
100-
FaweThread.setCurrentExtent(extent);
100+
FaweThreadUtil.setCurrentExtent(extent);
101101
}
102102

103103
void exit() {
104-
FaweThread.clearCurrentExtent();
104+
FaweThreadUtil.clearCurrentExtent();
105105
}
106106

107107
@Override
108108
@SuppressWarnings({"unchecked", "rawtypes"})
109109
public IQueueExtent<IQueueChunk> getExtent() {
110-
Extent extent = FaweThread.getCurrentExtent();
110+
Extent extent = FaweThreadUtil.getCurrentExtent();
111111
if (extent == null) {
112112
extent = super.getExtent();
113113
}

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/SingleThreadQueueExtent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public synchronized void init(Extent extent, IChunkCache<IChunkGet> get, IChunkC
192192
};
193193
}
194194
if (set == null) {
195-
set = (x, z) -> CharSetBlocks.newInstance(x, z);
195+
set = CharSetBlocks::newInstance;
196196
}
197197
this.cacheGet = get;
198198
this.cacheSet = set;

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/chunk/ChunkHolder.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import com.fastasyncworldedit.core.queue.IQueueChunk;
1212
import com.fastasyncworldedit.core.queue.IQueueExtent;
1313
import com.fastasyncworldedit.core.util.MemUtil;
14+
import com.fastasyncworldedit.core.util.task.FaweThreadUtil;
1415
import com.sk89q.worldedit.entity.Entity;
15-
import com.fastasyncworldedit.core.util.task.FaweThread;
1616
import com.sk89q.worldedit.internal.util.LogManagerCompat;
1717
import com.sk89q.worldedit.math.BlockVector3;
1818
import com.sk89q.worldedit.regions.Region;
@@ -1057,11 +1057,11 @@ public <U extends Future<U>> U call(IQueueExtent<? extends IChunk> owner, IChunk
10571057
// This way, locking is spread across multiple STQEs, allowing for better performance
10581058

10591059
private void trackExtent() {
1060-
FaweThread.setCurrentExtent(extent);
1060+
FaweThreadUtil.setCurrentExtent(extent);
10611061
}
10621062

10631063
private void untrackExtent() {
1064-
FaweThread.clearCurrentExtent();
1064+
FaweThreadUtil.clearCurrentExtent();
10651065
}
10661066

10671067
/**
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.fastasyncworldedit.core.util.task;
2+
3+
import com.sk89q.worldedit.extent.Extent;
4+
import org.jetbrains.annotations.ApiStatus;
5+
6+
@ApiStatus.Internal
7+
public final class FaweBasicThread extends Thread implements FaweThread {
8+
9+
private Extent currentExtent;
10+
11+
FaweBasicThread(Runnable runnable) {
12+
super(runnable);
13+
}
14+
15+
@Override
16+
public void clearCurrentExtent() {
17+
this.currentExtent = null;
18+
}
19+
20+
@Override
21+
public void setCurrentExtent(final Extent extent) {
22+
this.currentExtent = extent;
23+
}
24+
25+
@Override
26+
public Extent getCurrentExtent() {
27+
return this.currentExtent;
28+
}
29+
30+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.fastasyncworldedit.core.util.task;
2+
3+
4+
import org.jetbrains.annotations.ApiStatus;
5+
6+
import javax.annotation.Nonnull;
7+
import javax.annotation.Nullable;
8+
import java.util.Locale;
9+
import java.util.concurrent.Executors;
10+
import java.util.concurrent.ThreadFactory;
11+
import java.util.concurrent.atomic.AtomicLong;
12+
13+
@ApiStatus.Internal
14+
public class FaweBasicThreadFactory implements ThreadFactory {
15+
16+
private final String nameFormat;
17+
private final AtomicLong count;
18+
19+
public FaweBasicThreadFactory(@Nullable String nameFormat) {
20+
this.nameFormat = nameFormat;
21+
this.count = (nameFormat != null) ? new AtomicLong(0) : null;
22+
}
23+
24+
@Override
25+
public Thread newThread(@Nonnull final Runnable runnable) {
26+
Thread thread = new FaweBasicThread(runnable);
27+
if (nameFormat != null) {
28+
// requireNonNull is safe because we create `count` if (and only if) we have a nameFormat.
29+
thread.setName(String.format(Locale.ROOT, nameFormat, count.getAndIncrement()));
30+
}
31+
if (thread.isDaemon())
32+
thread.setDaemon(false);
33+
if (thread.getPriority() != Thread.NORM_PRIORITY)
34+
thread.setPriority(Thread.NORM_PRIORITY);
35+
return thread;
36+
}
37+
38+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.fastasyncworldedit.core.util.task;
2+
3+
import com.sk89q.worldedit.extent.Extent;
4+
import org.jetbrains.annotations.ApiStatus;
5+
6+
import java.util.concurrent.ForkJoinPool;
7+
import java.util.concurrent.ForkJoinWorkerThread;
8+
9+
@ApiStatus.Internal
10+
public final class FaweForkJoinThread extends ForkJoinWorkerThread implements FaweThread {
11+
12+
private Extent currentExtent;
13+
14+
FaweForkJoinThread(final ForkJoinPool pool) {
15+
super(null, pool, true);
16+
}
17+
18+
@Override
19+
public void clearCurrentExtent() {
20+
this.currentExtent = null;
21+
}
22+
23+
@Override
24+
public void setCurrentExtent(final Extent extent) {
25+
this.currentExtent = extent;
26+
}
27+
28+
@Override
29+
public Extent getCurrentExtent() {
30+
return this.currentExtent;
31+
}
32+
33+
}

worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/FaweForkJoinWorkerThreadFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package com.fastasyncworldedit.core.util.task;
22

3+
import org.jetbrains.annotations.ApiStatus;
4+
35
import java.util.concurrent.ForkJoinPool;
46
import java.util.concurrent.ForkJoinWorkerThread;
57
import java.util.concurrent.atomic.AtomicInteger;
68

9+
@ApiStatus.Internal
710
public class FaweForkJoinWorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
811

912
private final String nameFormat;
@@ -16,7 +19,7 @@ public FaweForkJoinWorkerThreadFactory(String nameFormat) {
1619

1720
@Override
1821
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
19-
final ForkJoinWorkerThread worker = new FaweThread(pool);
22+
final ForkJoinWorkerThread worker = new FaweForkJoinThread(pool);
2023
worker.setName(String.format(nameFormat, idCounter.getAndIncrement()));
2124
return worker;
2225
}

worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/FaweThread.java

Lines changed: 7 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,49 +3,21 @@
33
import com.sk89q.worldedit.extent.Extent;
44
import org.jetbrains.annotations.ApiStatus;
55

6-
import java.util.concurrent.ForkJoinPool;
7-
import java.util.concurrent.ForkJoinWorkerThread;
8-
96
@ApiStatus.Internal
10-
public class FaweThread extends ForkJoinWorkerThread {
11-
12-
private static final ThreadLocal<Extent> EXTENTS = new ThreadLocal<>();
13-
14-
private Extent currentExtent;
15-
16-
protected FaweThread(final ForkJoinPool pool) {
17-
super(null, pool, true);
18-
}
7+
public sealed interface FaweThread permits FaweBasicThread, FaweForkJoinThread {
198

209
/**
2110
* Removes the extent currently associated with the calling thread.
2211
*/
23-
public static void clearCurrentExtent() {
24-
if (Thread.currentThread() instanceof FaweThread ft) {
25-
ft.currentExtent = null;
26-
} else {
27-
EXTENTS.remove();
28-
}
29-
}
12+
void clearCurrentExtent();
3013

3114
/**
3215
* Sets the extent associated with the calling thread.
3316
*/
34-
public static void setCurrentExtent(Extent extent) {
35-
if (Thread.currentThread() instanceof FaweThread ft) {
36-
ft.currentExtent = extent;
37-
} else {
38-
EXTENTS.set(extent);
39-
}
40-
41-
}
42-
43-
public static Extent getCurrentExtent() {
44-
if (Thread.currentThread() instanceof FaweThread ft) {
45-
return ft.currentExtent;
46-
} else {
47-
return EXTENTS.get();
48-
}
49-
}
17+
void setCurrentExtent(Extent extent);
5018

19+
/**
20+
* Gets the extent associated with the calling thread.
21+
*/
22+
Extent getCurrentExtent();
5123
}

0 commit comments

Comments
 (0)