Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.fastasyncworldedit.core.queue.implementation.QueueHandler;
import com.fastasyncworldedit.core.queue.implementation.blocks.CharGetBlocks;
import com.fastasyncworldedit.core.util.MemUtil;
import com.fastasyncworldedit.core.util.task.FaweThreadUtil;
import com.sk89q.worldedit.extent.Extent;
import com.sk89q.worldedit.internal.util.LogManagerCompat;
import com.sk89q.worldedit.util.formatting.text.TextComponent;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -86,16 +88,18 @@ public synchronized <T extends Future<T>> T call(IQueueExtent<? extends IChunk>
final int finalCopyKey = copyKey;
// Run immediately if possible
if (chunk != null) {
return tryWrappedInternalCall(set, finalizer, finalCopyKey, chunk, nmsWorld);
return tryInternalCall(set, finalizer, finalCopyKey, chunk, nmsWorld);
}
// Submit via the STQE as that will help handle excessive queuing by waiting for the submission count to fall below the
// target size
final Extent extent = FaweThreadUtil.getCurrentExtent();
nmsChunkFuture.thenApply(nmsChunk -> owner.submitTaskUnchecked(() -> (T) tryWrappedInternalCall(
set,
finalizer,
finalCopyKey,
nmsChunk,
nmsWorld
nmsWorld,
extent
)));
// If we have re-submitted, return a completed future to prevent potential deadlocks where a future reliant on the
// above submission is halting the BlockingExecutor, and preventing the above task from actually running. The futures
Expand All @@ -104,6 +108,22 @@ public synchronized <T extends Future<T>> T call(IQueueExtent<? extends IChunk>
}

private <T extends Future<T>> T tryWrappedInternalCall(
IChunkSet set,
Runnable finalizer,
int copyKey,
LevelChunk nmsChunk,
ServerLevel nmsWorld,
Extent extent
) {
FaweThreadUtil.setCurrentExtent(extent);
try {
return tryInternalCall(set, finalizer, copyKey, nmsChunk, nmsWorld);
} finally {
FaweThreadUtil.clearCurrentExtent();
}
}

private <T extends Future<T>> T tryInternalCall(
IChunkSet set,
Runnable finalizer,
int copyKey,
Expand All @@ -120,17 +140,20 @@ private <T extends Future<T>> T tryWrappedInternalCall(
}
}

protected <T extends Future<T>> T handleCallFinalizer(Runnable[] syncTasks, Runnable callback, Runnable finalizer) throws
protected <T extends Future<T>> T handleCallFinalizer(
final Runnable[] syncTasks,
final Runnable callback,
final Runnable finalizer
) throws
Exception {
if (syncTasks != null) {
QueueHandler queueHandler = Fawe.instance().getQueueHandler();
Runnable[] finalSyncTasks = syncTasks;

// Chain the sync tasks and the callback
Callable<Future<?>> chain = () -> {
try {
// Run the sync tasks
for (Runnable task : finalSyncTasks) {
for (Runnable task : syncTasks) {
if (task != null) {
task.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import com.fastasyncworldedit.core.queue.implementation.QueuePool;
import com.fastasyncworldedit.core.util.MathMan;
import com.fastasyncworldedit.core.util.collection.CleanableThreadLocal;
import com.fastasyncworldedit.core.util.task.FaweBasicThreadFactory;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sk89q.jnbt.ByteArrayTag;
import com.sk89q.jnbt.ByteTag;
import com.sk89q.jnbt.CompoundTag;
Expand Down Expand Up @@ -46,7 +46,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -647,9 +646,13 @@ public ThreadPoolExecutor newBlockingExecutor(String name) {
public ThreadPoolExecutor newBlockingExecutor(String name, Logger logger) {
int nThreads = Settings.settings().QUEUE.PARALLEL_THREADS;
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS, queue,
new ThreadFactoryBuilder().setNameFormat(name).build(),
return new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
queue,
new FaweBasicThreadFactory(name),
new ThreadPoolExecutor.CallerRunsPolicy()
) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.fastasyncworldedit.core.queue.Filter;
import com.fastasyncworldedit.core.queue.IQueueChunk;
import com.fastasyncworldedit.core.queue.IQueueExtent;
import com.fastasyncworldedit.core.util.task.FaweThread;
import com.fastasyncworldedit.core.util.task.FaweThreadUtil;
import com.sk89q.worldedit.MaxChangedBlocksException;
import com.sk89q.worldedit.WorldEditException;
import com.sk89q.worldedit.extent.Extent;
Expand Down Expand Up @@ -85,29 +85,29 @@ public ParallelQueueExtent(QueueHandler handler, World world, boolean fastmode,
*/
@Deprecated(forRemoval = true, since = "2.13.0")
public static void clearCurrentExtent() {
FaweThread.clearCurrentExtent();
FaweThreadUtil.clearCurrentExtent();
}

/**
* Sets the extent associated with the calling thread.
*/
@Deprecated(forRemoval = true, since = "2.13.0")
public static void setCurrentExtent(Extent extent) {
FaweThread.setCurrentExtent(extent);
FaweThreadUtil.setCurrentExtent(extent);
}

void enter(Extent extent) {
FaweThread.setCurrentExtent(extent);
FaweThreadUtil.setCurrentExtent(extent);
}

void exit() {
FaweThread.clearCurrentExtent();
FaweThreadUtil.clearCurrentExtent();
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public IQueueExtent<IQueueChunk> getExtent() {
Extent extent = FaweThread.getCurrentExtent();
Extent extent = FaweThreadUtil.getCurrentExtent();
if (extent == null) {
extent = super.getExtent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public synchronized void init(Extent extent, IChunkCache<IChunkGet> get, IChunkC
};
}
if (set == null) {
set = (x, z) -> CharSetBlocks.newInstance(x, z);
set = CharSetBlocks::newInstance;
}
this.cacheGet = get;
this.cacheSet = set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import com.fastasyncworldedit.core.queue.IQueueChunk;
import com.fastasyncworldedit.core.queue.IQueueExtent;
import com.fastasyncworldedit.core.util.MemUtil;
import com.fastasyncworldedit.core.util.task.FaweThreadUtil;
import com.sk89q.worldedit.entity.Entity;
import com.fastasyncworldedit.core.util.task.FaweThread;
import com.sk89q.worldedit.internal.util.LogManagerCompat;
import com.sk89q.worldedit.math.BlockVector3;
import com.sk89q.worldedit.regions.Region;
Expand Down Expand Up @@ -1057,11 +1057,11 @@ public <U extends Future<U>> U call(IQueueExtent<? extends IChunk> owner, IChunk
// This way, locking is spread across multiple STQEs, allowing for better performance

private void trackExtent() {
FaweThread.setCurrentExtent(extent);
FaweThreadUtil.setCurrentExtent(extent);
}

private void untrackExtent() {
FaweThread.clearCurrentExtent();
FaweThreadUtil.clearCurrentExtent();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.fastasyncworldedit.core.util.task;

import com.sk89q.worldedit.extent.Extent;
import org.jetbrains.annotations.ApiStatus;

@ApiStatus.Internal
public final class FaweBasicThread extends Thread implements FaweThread {

private Extent currentExtent;

FaweBasicThread(Runnable runnable) {
super(runnable);
}

@Override
public void clearCurrentExtent() {
this.currentExtent = null;
}

@Override
public void setCurrentExtent(final Extent extent) {
this.currentExtent = extent;
}

@Override
public Extent getCurrentExtent() {
return this.currentExtent;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.fastasyncworldedit.core.util.task;


import org.jetbrains.annotations.ApiStatus;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Locale;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

@ApiStatus.Internal
public class FaweBasicThreadFactory implements ThreadFactory {

private final String nameFormat;
private final AtomicLong count;

public FaweBasicThreadFactory(@Nullable String nameFormat) {
this.nameFormat = nameFormat;
this.count = (nameFormat != null) ? new AtomicLong(0) : null;
}

@Override
public Thread newThread(@Nonnull final Runnable runnable) {
Thread thread = new FaweBasicThread(runnable);
if (nameFormat != null) {
// requireNonNull is safe because we create `count` if (and only if) we have a nameFormat.
thread.setName(String.format(Locale.ROOT, nameFormat, count.getAndIncrement()));
}
if (thread.isDaemon())
thread.setDaemon(false);
if (thread.getPriority() != Thread.NORM_PRIORITY)
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.fastasyncworldedit.core.util.task;

import com.sk89q.worldedit.extent.Extent;
import org.jetbrains.annotations.ApiStatus;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;

@ApiStatus.Internal
public final class FaweForkJoinThread extends ForkJoinWorkerThread implements FaweThread {

private Extent currentExtent;

FaweForkJoinThread(final ForkJoinPool pool) {
super(null, pool, true);
}

@Override
public void clearCurrentExtent() {
this.currentExtent = null;
}

@Override
public void setCurrentExtent(final Extent extent) {
this.currentExtent = extent;
}

@Override
public Extent getCurrentExtent() {
return this.currentExtent;
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.fastasyncworldedit.core.util.task;

import org.jetbrains.annotations.ApiStatus;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.atomic.AtomicInteger;

@ApiStatus.Internal
public class FaweForkJoinWorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {

private final String nameFormat;
Expand All @@ -16,7 +19,7 @@ public FaweForkJoinWorkerThreadFactory(String nameFormat) {

@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final ForkJoinWorkerThread worker = new FaweThread(pool);
final ForkJoinWorkerThread worker = new FaweForkJoinThread(pool);
worker.setName(String.format(nameFormat, idCounter.getAndIncrement()));
return worker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,21 @@
import com.sk89q.worldedit.extent.Extent;
import org.jetbrains.annotations.ApiStatus;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;

@ApiStatus.Internal
public class FaweThread extends ForkJoinWorkerThread {

private static final ThreadLocal<Extent> EXTENTS = new ThreadLocal<>();

private Extent currentExtent;

protected FaweThread(final ForkJoinPool pool) {
super(null, pool, true);
}
public sealed interface FaweThread permits FaweBasicThread, FaweForkJoinThread {

/**
* Removes the extent currently associated with the calling thread.
*/
public static void clearCurrentExtent() {
if (Thread.currentThread() instanceof FaweThread ft) {
ft.currentExtent = null;
} else {
EXTENTS.remove();
}
}
void clearCurrentExtent();

/**
* Sets the extent associated with the calling thread.
*/
public static void setCurrentExtent(Extent extent) {
if (Thread.currentThread() instanceof FaweThread ft) {
ft.currentExtent = extent;
} else {
EXTENTS.set(extent);
}

}

public static Extent getCurrentExtent() {
if (Thread.currentThread() instanceof FaweThread ft) {
return ft.currentExtent;
} else {
return EXTENTS.get();
}
}
void setCurrentExtent(Extent extent);

/**
* Gets the extent associated with the calling thread.
*/
Extent getCurrentExtent();
}
Loading
Loading