Skip to content

Commit 20e5980

Browse files
authored
Locality-aware chunk distribution (#3102)
* Locality-aware chunk distribution * fix * remove jfr event * simplify completion logic * move logic closer together * exception handling * change thread limit * address comments pt.1 * Rework fork heuristics * imports
1 parent 23636f3 commit 20e5980

File tree

4 files changed

+267
-64
lines changed

4 files changed

+267
-64
lines changed
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
package com.fastasyncworldedit.core.queue.implementation;
2+
3+
import com.fastasyncworldedit.core.Fawe;
4+
import com.fastasyncworldedit.core.extent.filter.block.ChunkFilterBlock;
5+
import com.fastasyncworldedit.core.internal.exception.FaweException;
6+
import com.fastasyncworldedit.core.queue.Filter;
7+
import com.sk89q.worldedit.internal.util.LogManagerCompat;
8+
import com.sk89q.worldedit.math.BlockVector3;
9+
import com.sk89q.worldedit.regions.Region;
10+
import org.apache.logging.log4j.Logger;
11+
12+
import java.util.Collection;
13+
import java.util.concurrent.ConcurrentHashMap;
14+
import java.util.concurrent.ConcurrentMap;
15+
import java.util.concurrent.ForkJoinTask;
16+
import java.util.concurrent.RecursiveAction;
17+
18+
class ApplyTask<F extends Filter> extends RecursiveAction implements Runnable {
19+
20+
private static final Logger LOGGER = LogManagerCompat.getLogger();
21+
22+
private static final int INITIAL_REGION_SHIFT = 5;
23+
private static final int SHIFT_REDUCTION = 1;
24+
25+
private final CommonState<F> commonState;
26+
private final ApplyTask<F> before;
27+
private final int minChunkX;
28+
private final int minChunkZ;
29+
private final int maxChunkX;
30+
private final int maxChunkZ;
31+
// Note: shift == INITIAL_REGION_SHIFT means we are in the root node.
32+
// compute() relies on that when triggering postProcess
33+
private final int shift;
34+
35+
@Override
36+
public void run() {
37+
compute();
38+
}
39+
40+
private record CommonState<F extends Filter>(
41+
F originalFilter,
42+
Region region,
43+
ParallelQueueExtent parallelQueueExtent,
44+
ConcurrentMap<Thread, ThreadState<F>> stateCache,
45+
boolean full,
46+
boolean[] faweExceptionReasonsUsed
47+
) {
48+
49+
}
50+
51+
private static final class ThreadState<F extends Filter> {
52+
53+
private final SingleThreadQueueExtent queue;
54+
private final F filter;
55+
private ChunkFilterBlock block;
56+
57+
private ThreadState(SingleThreadQueueExtent queue, F filter) {
58+
this.queue = queue;
59+
this.filter = filter;
60+
}
61+
62+
}
63+
64+
ApplyTask(
65+
final Region region,
66+
final F filter,
67+
final ParallelQueueExtent parallelQueueExtent,
68+
final boolean full, final boolean[] faweExceptionReasonsUsed
69+
) {
70+
this.commonState = new CommonState<>(
71+
filter,
72+
region.clone(), // clone only once, assuming the filter doesn't modify that clone
73+
parallelQueueExtent,
74+
new ConcurrentHashMap<>(),
75+
full,
76+
faweExceptionReasonsUsed
77+
);
78+
this.before = null;
79+
final BlockVector3 minimumPoint = region.getMinimumPoint();
80+
this.minChunkX = minimumPoint.x() >> 4;
81+
this.minChunkZ = minimumPoint.z() >> 4;
82+
final BlockVector3 maximumPoint = region.getMaximumPoint();
83+
this.maxChunkX = maximumPoint.x() >> 4;
84+
this.maxChunkZ = maximumPoint.z() >> 4;
85+
this.shift = INITIAL_REGION_SHIFT;
86+
87+
}
88+
89+
private ApplyTask(
90+
final CommonState<F> commonState,
91+
final ApplyTask<F> before,
92+
final int minChunkX,
93+
final int maxChunkX,
94+
final int minChunkZ,
95+
final int maxChunkZ,
96+
final int higherShift
97+
) {
98+
this.commonState = commonState;
99+
this.minChunkX = minChunkX;
100+
this.maxChunkX = maxChunkX;
101+
this.minChunkZ = minChunkZ;
102+
this.maxChunkZ = maxChunkZ;
103+
this.before = before;
104+
this.shift = Math.max(0, higherShift - SHIFT_REDUCTION);
105+
}
106+
107+
@Override
108+
protected void compute() {
109+
if (this.minChunkX != this.maxChunkX || this.minChunkZ != this.maxChunkZ) {
110+
ApplyTask<F> subtask = null;
111+
int minRegionX = this.minChunkX >> this.shift;
112+
int minRegionZ = this.minChunkZ >> this.shift;
113+
int maxRegionX = this.maxChunkX >> this.shift;
114+
int maxRegionZ = this.maxChunkZ >> this.shift;
115+
// This task covers multiple regions. Create one subtask per region
116+
for (int regionX = minRegionX; regionX <= maxRegionX; regionX++) {
117+
for (int regionZ = minRegionZ; regionZ <= maxRegionZ; regionZ++) {
118+
if (shouldProcessDirectly()) {
119+
// assume we should do a bigger batch of work here - the other threads are busy for a while
120+
processRegion(regionX, regionZ, this.shift);
121+
continue;
122+
}
123+
if (this.shift == 0 && !this.commonState.region.containsChunk(regionX, regionZ)) {
124+
// if shift == 0, region coords are chunk coords
125+
continue; // chunks not intersecting with the region don't need a task
126+
}
127+
128+
// creating more tasks will likely help parallelism as other threads aren't *that* busy
129+
subtask = new ApplyTask<>(
130+
this.commonState,
131+
subtask,
132+
regionX << this.shift,
133+
((regionX + 1) << this.shift) - 1,
134+
regionZ << this.shift,
135+
((regionZ + 1) << this.shift) - 1,
136+
this.shift
137+
);
138+
subtask.fork();
139+
}
140+
}
141+
// try processing tasks in reverse order if not processed already, otherwise "wait" for completion
142+
while (subtask != null) {
143+
if (subtask.tryUnfork()) {
144+
subtask.invoke();
145+
} else {
146+
subtask.join();
147+
}
148+
subtask = subtask.before;
149+
}
150+
} else {
151+
// we reached a task for a single chunk, let's process it
152+
processChunk(this.minChunkX, this.minChunkZ);
153+
}
154+
if (this.shift == INITIAL_REGION_SHIFT) {
155+
onCompletion();
156+
}
157+
}
158+
159+
private boolean shouldProcessDirectly() {
160+
return ForkJoinTask.getSurplusQueuedTaskCount() > Math.max(3, 1 << this.shift);
161+
}
162+
163+
private void processRegion(int regionX, int regionZ, int shift) {
164+
final ThreadState<F> state = getState();
165+
this.commonState.parallelQueueExtent.enter(state.queue);
166+
try {
167+
for (int chunkX = regionX << shift; chunkX <= ((regionX + 1) << shift) - 1; chunkX++) {
168+
for (int chunkZ = regionZ << shift; chunkZ <= ((regionZ + 1) << shift) - 1; chunkZ++) {
169+
if (!this.commonState.region.containsChunk(chunkX, chunkZ)) {
170+
continue; // chunks not intersecting with the region must not be processed
171+
}
172+
applyChunk(chunkX, chunkZ, state);
173+
}
174+
}
175+
} finally {
176+
this.commonState.parallelQueueExtent.exit();
177+
}
178+
179+
}
180+
181+
@SuppressWarnings("unchecked")
182+
private ThreadState<F> getState() {
183+
return this.commonState.stateCache.computeIfAbsent(
184+
Thread.currentThread(),
185+
__ -> new ThreadState<>(
186+
(SingleThreadQueueExtent) this.commonState.parallelQueueExtent.getNewQueue(),
187+
(F) this.commonState.originalFilter.fork()
188+
)
189+
);
190+
}
191+
192+
private void processChunk(int chunkX, int chunkZ) {
193+
final ThreadState<F> state = getState();
194+
this.commonState.parallelQueueExtent.enter(state.queue);
195+
try {
196+
applyChunk(chunkX, chunkZ, state);
197+
} finally {
198+
this.commonState.parallelQueueExtent.exit();
199+
}
200+
}
201+
202+
private void applyChunk(int chunkX, int chunkZ, ThreadState<F> state) {
203+
try {
204+
state.block = state.queue.apply(
205+
state.block,
206+
state.filter,
207+
this.commonState.region,
208+
chunkX,
209+
chunkZ,
210+
this.commonState.full
211+
);
212+
} catch (Throwable t) {
213+
if (t instanceof FaweException faweException) {
214+
Fawe.handleFaweException(this.commonState.faweExceptionReasonsUsed, faweException, LOGGER);
215+
} else if (t.getCause() instanceof FaweException faweException) {
216+
Fawe.handleFaweException(this.commonState.faweExceptionReasonsUsed, faweException, LOGGER);
217+
} else {
218+
throw t;
219+
}
220+
}
221+
}
222+
223+
private void onCompletion() {
224+
for (ForkJoinTask<?> task : flushQueues()) {
225+
if (task.tryUnfork()) {
226+
task.invoke();
227+
} else {
228+
task.join();
229+
}
230+
}
231+
}
232+
233+
private ForkJoinTask<?>[] flushQueues() {
234+
final Collection<ThreadState<F>> values = this.commonState.stateCache.values();
235+
ForkJoinTask<?>[] tasks = new ForkJoinTask[values.size()];
236+
int i = values.size() - 1;
237+
for (final ThreadState<F> value : values) {
238+
tasks[i] = ForkJoinTask.adapt(value.queue::flush).fork();
239+
i--;
240+
}
241+
return tasks;
242+
}
243+
244+
}

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/ParallelQueueExtent.java

Lines changed: 12 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.fastasyncworldedit.core.queue.implementation;
22

3-
import com.fastasyncworldedit.core.Fawe;
43
import com.fastasyncworldedit.core.FaweCache;
54
import com.fastasyncworldedit.core.configuration.Settings;
65
import com.fastasyncworldedit.core.extent.NullExtent;
@@ -49,7 +48,6 @@
4948
import java.util.List;
5049
import java.util.Set;
5150
import java.util.concurrent.ForkJoinTask;
52-
import java.util.stream.IntStream;
5351

5452
public class ParallelQueueExtent extends PassthroughExtent {
5553

@@ -65,8 +63,6 @@ public class ParallelQueueExtent extends PassthroughExtent {
6563
private final boolean fastmode;
6664
private final SideEffectSet sideEffectSet;
6765
private int changes;
68-
private int lastException = Integer.MIN_VALUE;
69-
private int exceptionCount = 0;
7066

7167
public ParallelQueueExtent(QueueHandler handler, World world, boolean fastmode, @Nullable SideEffectSet sideEffectSet) {
7268
super(handler.getQueue(world, new BatchProcessorHolder(), new BatchProcessorHolder()));
@@ -100,11 +96,11 @@ public static void setCurrentExtent(Extent extent) {
10096
FaweThread.setCurrentExtent(extent);
10197
}
10298

103-
private void enter(Extent extent) {
99+
void enter(Extent extent) {
104100
FaweThread.setCurrentExtent(extent);
105101
}
106102

107-
private void exit() {
103+
void exit() {
108104
FaweThread.clearCurrentExtent();
109105
}
110106

@@ -129,13 +125,12 @@ public boolean cancel() {
129125
}
130126

131127
@SuppressWarnings("rawtypes")
132-
private IQueueExtent<IQueueChunk> getNewQueue() {
128+
IQueueExtent<IQueueChunk> getNewQueue() {
133129
SingleThreadQueueExtent queue = (SingleThreadQueueExtent) handler.getQueue(world, this.processor, this.postProcessor);
134130
queue.setFastMode(fastmode);
135131
queue.setSideEffectSet(sideEffectSet);
136132
queue.setFaweExceptionArray(faweExceptionReasonsUsed);
137133
queue.setTargetSize(Settings.settings().QUEUE.TARGET_SIZE * Settings.settings().QUEUE.THREAD_TARGET_SIZE_PERCENT / 100);
138-
enter(queue);
139134
return queue;
140135
}
141136

@@ -158,62 +153,16 @@ public <T extends Filter> T apply(Region region, T filter, boolean full) {
158153
getExtent().flush();
159154
filter.finish();
160155
} else {
161-
final ForkJoinTask[] tasks = IntStream.range(0, size).mapToObj(i -> handler.submit(() -> {
162-
try {
163-
final Filter newFilter = filter.fork();
164-
final Region newRegion = region.clone();
165-
// Create a chunk that we will reuse/reset for each operation
166-
final SingleThreadQueueExtent queue = (SingleThreadQueueExtent) getNewQueue();
167-
synchronized (queue) {
168-
try {
169-
ChunkFilterBlock block = null;
170-
while (true) {
171-
// Get the next chunk posWeakChunk
172-
final int chunkX;
173-
final int chunkZ;
174-
synchronized (chunksIter) {
175-
if (!chunksIter.hasNext()) {
176-
break;
177-
}
178-
final BlockVector2 pos = chunksIter.next();
179-
chunkX = pos.x();
180-
chunkZ = pos.z();
181-
}
182-
block = queue.apply(block, newFilter, newRegion, chunkX, chunkZ, full);
183-
}
184-
queue.flush();
185-
filter.finish();
186-
} catch (Throwable t) {
187-
if (t instanceof FaweException) {
188-
Fawe.handleFaweException(faweExceptionReasonsUsed, (FaweException) t, LOGGER);
189-
} else if (t.getCause() instanceof FaweException) {
190-
Fawe.handleFaweException(faweExceptionReasonsUsed, (FaweException) t.getCause(), LOGGER);
191-
} else {
192-
throw t;
193-
}
194-
}
195-
}
196-
} catch (Throwable e) {
197-
String message = e.getMessage();
198-
int hash = message != null ? message.hashCode() : 0;
199-
if (lastException != hash) {
200-
lastException = hash;
201-
exceptionCount = 0;
202-
LOGGER.catching(e);
203-
} else if (exceptionCount < Settings.settings().QUEUE.PARALLEL_THREADS) {
204-
exceptionCount++;
205-
LOGGER.warn(message);
206-
}
207-
} finally {
208-
exit();
209-
}
210-
})).toArray(ForkJoinTask[]::new);
211-
// Join filters
212-
for (ForkJoinTask task : tasks) {
213-
if (task != null) {
214-
task.quietlyJoin();
215-
}
156+
ForkJoinTask<?> task = this.handler.submit(
157+
new ApplyTask<>(region, filter, this, full, this.faweExceptionReasonsUsed)
158+
);
159+
// wait for task to finish
160+
try {
161+
task.join();
162+
} catch (Throwable e) {
163+
LOGGER.catching(e);
216164
}
165+
// Join filters
217166
filter.join();
218167
}
219168
return filter;

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.concurrent.Future;
3535
import java.util.concurrent.FutureTask;
3636
import java.util.concurrent.ThreadPoolExecutor;
37+
import java.util.concurrent.TimeUnit;
3738
import java.util.function.Supplier;
3839

3940
/**
@@ -50,7 +51,13 @@ public abstract class QueueHandler implements Trimable, Runnable {
5051
Settings.settings().QUEUE.PARALLEL_THREADS,
5152
new FaweForkJoinWorkerThreadFactory("FAWE Fork Join Pool Primary - %s"),
5253
null,
53-
false
54+
false,
55+
Settings.settings().QUEUE.PARALLEL_THREADS,
56+
Settings.settings().QUEUE.PARALLEL_THREADS,
57+
0,
58+
pool -> true,
59+
60,
60+
TimeUnit.SECONDS
5461
);
5562

5663
/**

0 commit comments

Comments
 (0)