Skip to content

Commit f9e66db

Browse files
authored
fix: add a WrapperChunk to allow "Invalidating" a ChunkHolder on submission (#3354)
1 parent eded42b commit f9e66db

File tree

7 files changed

+723
-27
lines changed

7 files changed

+723
-27
lines changed

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/Filter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public interface Filter {
2424
/**
2525
* Do something with the IChunk<br>
2626
*/
27-
default @Nonnull <T extends IChunk> T applyChunk(T chunk, @Nullable Region region) {
27+
default @Nonnull <U extends IChunk> U applyChunk(U chunk, @Nullable Region region) {
2828
return chunk;
2929
}
3030

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/IQueueChunk.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
package com.fastasyncworldedit.core.queue;
22

3+
import com.fastasyncworldedit.core.queue.implementation.chunk.ChunkHolder;
4+
import com.fastasyncworldedit.core.queue.implementation.chunk.WrapperChunk;
5+
import org.jetbrains.annotations.ApiStatus;
6+
37
import java.util.concurrent.Callable;
48
import java.util.concurrent.ExecutionException;
59
import java.util.concurrent.Future;
610

11+
@ApiStatus.Internal
712
public interface IQueueChunk<T extends Future<T>> extends IChunk, Callable<T> {
813

914
/**
@@ -15,6 +20,25 @@ default IQueueChunk<T> reset() {
1520
return this;
1621
}
1722

23+
/**
24+
* Set a new {@link WrapperChunk} that allows prevention of a {@link ChunkHolder} instance being cached "locally" whilst it
25+
* has been called/submitted, causing issues with processing/postprocessing, etc.
26+
* If a wrapper has already been set, throws {@link IllegalStateException} as there should be no circumstance for us to set
27+
* a new wrapper (does nothing if attempting to set the same wrapper).
28+
*
29+
* @param parentWrapper wrapper wrapping this {@link ChunkHolder instance}
30+
* @throws IllegalStateException if there is already a wrapper set and a new wrapper instance is attempted to be se
31+
* @since TODO
32+
*/
33+
void setWrapper(WrapperChunk<?> parentWrapper);
34+
35+
/**
36+
* Invalidate the {@link WrapperChunk} if present.
37+
*
38+
* @since TODO
39+
*/
40+
void invalidateWrapper();
41+
1842
/**
1943
* Apply the queued changes to the world containing this chunk.
2044
* <p>The future returned may return another future. To ensure completion keep calling {@link

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/IQueueExtent.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import com.fastasyncworldedit.core.internal.simd.SimdSupport;
66
import com.fastasyncworldedit.core.internal.simd.VectorizedCharFilterBlock;
77
import com.fastasyncworldedit.core.internal.simd.VectorizedFilter;
8+
import com.fastasyncworldedit.core.queue.implementation.chunk.ChunkHolder;
9+
import com.fastasyncworldedit.core.queue.implementation.chunk.WrapperChunk;
810
import com.sk89q.worldedit.extent.Extent;
911
import com.sk89q.worldedit.function.operation.Operation;
1012
import com.sk89q.worldedit.math.BlockVector2;
@@ -166,11 +168,24 @@ default ChunkFilterBlock apply(
166168
// if (!filter.appliesChunk(chunkX, chunkZ)) {
167169
// return block;
168170
// }
169-
T chunk = this.getOrCreateChunk(chunkX, chunkZ);
171+
T initial = this.getOrCreateChunk(chunkX, chunkZ);
172+
WrapperChunk<T> chunk = new WrapperChunk<>(initial, () -> this.getOrCreateChunk(chunkX, chunkZ));
173+
if (initial instanceof ChunkHolder<?> holder) {
174+
holder.setWrapper(chunk);
175+
}
170176

171-
T newChunk = filter.applyChunk(chunk, region);
177+
IChunk newChunk = filter.applyChunk(chunk, region);
178+
if (newChunk == chunk) {
179+
newChunk = chunk.get();
180+
} else {
181+
T c = (T) newChunk;
182+
chunk.setWrapped(c);
183+
// The IDE lies, it is possible for it to be a ChunkHolder because we're a little loose with our generic types...
184+
if (c instanceof ChunkHolder<?> holder) {
185+
holder.setWrapper(chunk);
186+
}
187+
}
172188
if (newChunk != null) {
173-
chunk = newChunk;
174189
if (block == null) {
175190
if (SimdSupport.useVectorApi() && filter instanceof VectorizedFilter) {
176191
block = new VectorizedCharFilterBlock(this);
@@ -181,12 +196,16 @@ default ChunkFilterBlock apply(
181196
block.initChunk(chunkX, chunkZ);
182197
chunk.filterBlocks(filter, block, region, full);
183198
}
184-
this.submit(chunk);
199+
// If null, then assume it has already been submitted and the WrapperChunk has therefore been invalidated
200+
T toSubmit = chunk.get();
201+
if (toSubmit != null) {
202+
this.submit(toSubmit);
203+
}
185204
return block;
186205
}
187206

188207
@Override
189-
default <T extends Filter> T apply(Region region, T filter, boolean full) {
208+
default <U extends Filter> U apply(Region region, U filter, boolean full) {
190209
final Set<BlockVector2> chunks = region.getChunks();
191210
ChunkFilterBlock block = null;
192211
for (BlockVector2 chunk : chunks) {

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/SingleThreadQueueExtent.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.ConcurrentLinkedQueue;
3737
import java.util.concurrent.ExecutionException;
3838
import java.util.concurrent.Future;
39+
import java.util.concurrent.atomic.AtomicReference;
3940
import java.util.concurrent.locks.ReentrantLock;
4041

4142
/**
@@ -53,6 +54,7 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen
5354
private final Long2ObjectLinkedOpenHashMap<IQueueChunk<?>> chunks = new Long2ObjectLinkedOpenHashMap<>();
5455
private final ConcurrentLinkedQueue<Future<?>> submissions = new ConcurrentLinkedQueue<>();
5556
private final ReentrantLock getChunkLock = new ReentrantLock();
57+
private final AtomicReference<IQueueChunk> lastChunk = new AtomicReference<>();
5658
private World world = null;
5759
private int minY = 0;
5860
private int maxY = 255;
@@ -61,7 +63,6 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen
6163
private boolean initialized;
6264
private Thread currentThread;
6365
// Last access pointers
64-
private volatile IQueueChunk lastChunk;
6566
private boolean enabledQueue = true;
6667
private boolean fastmode = false;
6768
// Array for lazy avoidance of concurrent modification exceptions and needless overcomplication of code (synchronisation is
@@ -161,13 +162,13 @@ protected synchronized void reset() {
161162
return;
162163
}
163164
getChunkLock.lock();
165+
this.lastChunk.set(null);
164166
try {
165167
this.chunks.clear();
166168
} finally {
167169
getChunkLock.unlock();
168170
}
169171
this.enabledQueue = true;
170-
this.lastChunk = null;
171172
this.currentThread = null;
172173
this.initialized = false;
173174
this.setProcessor(EmptyBatchProcessor.getInstance());
@@ -221,9 +222,7 @@ public boolean isEmpty() {
221222

222223
@Override
223224
public <V extends Future<V>> V submit(IQueueChunk chunk) {
224-
if (lastChunk == chunk) {
225-
lastChunk = null;
226-
}
225+
this.lastChunk.compareAndExchange(chunk, null);
227226
final long index = MathMan.pairInt(chunk.getX(), chunk.getZ());
228227
getChunkLock.lock();
229228
chunks.remove(index, chunk);
@@ -254,6 +253,8 @@ private <V extends Future<V>> V submitUnchecked(IQueueChunk chunk) {
254253
}
255254
}
256255

256+
chunk.invalidateWrapper();
257+
257258
if (Fawe.isMainThread()) {
258259
V result = (V) chunk.call();
259260
if (result == null) {
@@ -277,8 +278,9 @@ public <V extends Future<V>> V submitTaskUnchecked(Callable<V> callable) {
277278
public synchronized boolean trim(boolean aggressive) {
278279
cacheGet.trim(aggressive);
279280
cacheSet.trim(aggressive);
281+
LOGGER.info("trim");
280282
if (Thread.currentThread() == currentThread) {
281-
lastChunk = null;
283+
lastChunk.set(null);
282284
return chunks.isEmpty();
283285
}
284286
if (!submissions.isEmpty()) {
@@ -316,7 +318,7 @@ public IQueueChunk wrap(IQueueChunk chunk) {
316318

317319
@Override
318320
public final IQueueChunk getOrCreateChunk(int x, int z) {
319-
final IQueueChunk lastChunk = this.lastChunk;
321+
final IQueueChunk lastChunk = this.lastChunk.get();
320322
if (lastChunk != null && lastChunk.getX() == x && lastChunk.getZ() == z) {
321323
return lastChunk;
322324
}
@@ -330,7 +332,7 @@ public final IQueueChunk getOrCreateChunk(int x, int z) {
330332
try {
331333
IQueueChunk chunk = chunks.get(pair);
332334
if (chunk != null) {
333-
this.lastChunk = chunk;
335+
this.lastChunk.set(chunk);
334336
return chunk;
335337
}
336338
final int size = chunks.size();
@@ -340,8 +342,9 @@ public final IQueueChunk getOrCreateChunk(int x, int z) {
340342
// - queue size > target size and primary queue has less than num threads submissions
341343
int targetSize = lowMem ? Settings.settings().QUEUE.PARALLEL_THREADS + 8 : this.targetSize;
342344
if (enabledQueue && size > targetSize && (lowMem || Fawe.instance().getQueueHandler().isUnderutilized())) {
343-
chunk = chunks.removeFirst();
344-
final Future future = submitUnchecked(chunk);
345+
IQueueChunk toSubmit = chunks.removeFirst();
346+
this.lastChunk.compareAndExchange(toSubmit, null);
347+
final Future future = submitUnchecked(toSubmit);
345348
if (future != null && !future.isDone()) {
346349
pollSubmissions(targetSize, lowMem);
347350
submissions.add(future);
@@ -351,7 +354,7 @@ public final IQueueChunk getOrCreateChunk(int x, int z) {
351354
chunk = wrap(chunk);
352355

353356
chunks.put(pair, chunk);
354-
this.lastChunk = chunk;
357+
this.lastChunk.set(chunk);
355358

356359
return chunk;
357360
} finally {
@@ -481,6 +484,7 @@ public synchronized void flush() {
481484
if (MemUtil.isMemoryLimited()) {
482485
while (!chunks.isEmpty()) {
483486
IQueueChunk chunk = chunks.removeFirst();
487+
this.lastChunk.compareAndExchange(chunk, null);
484488
final Future future = submitUnchecked(chunk);
485489
if (future != null && !future.isDone()) {
486490
pollSubmissions(Settings.settings().QUEUE.PARALLEL_THREADS, true);
@@ -490,6 +494,7 @@ public synchronized void flush() {
490494
} else {
491495
while (!chunks.isEmpty()) {
492496
IQueueChunk chunk = chunks.removeFirst();
497+
this.lastChunk.compareAndExchange(chunk, null);
493498
final Future future = submitUnchecked(chunk);
494499
if (future != null && !future.isDone()) {
495500
submissions.add(future);

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/chunk/ChunkHolder.java

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
*/
3737
@SuppressWarnings("rawtypes")
3838
public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
39+
3940
private static final Logger LOGGER = LogManagerCompat.getLogger();
4041

4142
public static ChunkHolder newInstance() {
@@ -45,7 +46,7 @@ public static ChunkHolder newInstance() {
4546
private volatile IChunkGet chunkExisting; // The existing chunk (e.g. a clipboard, or the world, before changes)
4647
private volatile IChunkSet chunkSet; // The blocks to be set to the chunkExisting
4748
private IBlockDelegate delegate; // delegate handles the abstraction of the chunk layers
48-
private IQueueExtent<? extends IChunk> extent; // the parent queue extent which has this chunk
49+
private IQueueExtent<?> extent; // the parent queue extent which has this chunk
4950
private int chunkX;
5051
private int chunkZ;
5152
private boolean fastmode;
@@ -54,6 +55,7 @@ public static ChunkHolder newInstance() {
5455
private boolean createCopy = false;
5556
private long initTime = -1L;
5657
private SideEffectSet sideEffectSet;
58+
private WrapperChunk<?> parentWrapper = null;
5759

5860
private ChunkHolder() {
5961
this.delegate = NULL;
@@ -63,7 +65,28 @@ public void init(IBlockDelegate delegate) {
6365
this.delegate = delegate;
6466
}
6567

68+
@Override
69+
public void setWrapper(WrapperChunk<?> parentWrapper) {
70+
if (parentWrapper == this.parentWrapper) {
71+
return;
72+
}
73+
if (this.parentWrapper != null) {
74+
throw new IllegalStateException("Wrapper already set");
75+
}
76+
this.parentWrapper = parentWrapper;
77+
}
78+
79+
@Override
80+
public void invalidateWrapper() {
81+
if (this.parentWrapper != null) {
82+
if (!this.parentWrapper.invalidate(this)) {
83+
throw new IllegalStateException("Existing chunk not equal to expected");
84+
}
85+
}
86+
}
87+
6688
private static final AtomicBoolean recycleWarning = new AtomicBoolean(false);
89+
6790
@Override
6891
public void recycle() {
6992
if (!recycleWarning.getAndSet(true)) {
@@ -346,10 +369,12 @@ public int[] getHeightMap(ChunkHolder chunk, HeightMapType type) {
346369

347370
@Override
348371
public void flushLightToGet(ChunkHolder chunk) {
349-
chunk.chunkExisting.setLightingToGet(chunk.chunkSet.getLight(), chunk.chunkSet.getMinSectionPosition(),
372+
chunk.chunkExisting.setLightingToGet(
373+
chunk.chunkSet.getLight(), chunk.chunkSet.getMinSectionPosition(),
350374
chunk.chunkSet.getMaxSectionPosition()
351375
);
352-
chunk.chunkExisting.setSkyLightingToGet(chunk.chunkSet.getSkyLight(), chunk.chunkSet.getMinSectionPosition(),
376+
chunk.chunkExisting.setSkyLightingToGet(
377+
chunk.chunkSet.getSkyLight(), chunk.chunkSet.getMinSectionPosition(),
353378
chunk.chunkSet.getMaxSectionPosition()
354379
);
355380
}
@@ -892,10 +917,15 @@ public boolean hasNonEmptySection(final int layer) {
892917
public synchronized void filterBlocks(Filter filter, ChunkFilterBlock block, @Nullable Region region, boolean full) {
893918
final IChunkGet get = getOrCreateGet();
894919
final IChunkSet set = getOrCreateSet();
920+
if (parentWrapper == null) {
921+
parentWrapper = new WrapperChunk<>(this, () -> this.extent.getOrCreateChunk(getX(), getZ()));
922+
} else if (parentWrapper.get() != this) {
923+
throw new IllegalStateException("Parent WrapperChunk is not storing this chunk!?");
924+
}
895925
try {
896-
block.filter(this, get, set, filter, region, full);
926+
block.filter(parentWrapper, get, set, filter, region, full);
897927
} finally {
898-
filter.finishChunk(this);
928+
filter.finishChunk(parentWrapper);
899929
}
900930
}
901931

@@ -998,6 +1028,12 @@ public synchronized <V extends IChunk> void init(IQueueExtent<V> extent, int chu
9981028
this.extent = extent;
9991029
this.chunkX = chunkX;
10001030
this.chunkZ = chunkZ;
1031+
if (this.parentWrapper != null) {
1032+
if (!this.parentWrapper.invalidate(this)) {
1033+
throw new IllegalStateException("Existing chunk not equal to expected");
1034+
}
1035+
this.parentWrapper = null;
1036+
}
10011037
if (chunkSet != null) {
10021038
chunkSet.reset();
10031039
delegate = SET;
@@ -1013,9 +1049,11 @@ public synchronized T call() {
10131049
if (chunkSet != null && !chunkSet.isEmpty()) {
10141050
IChunkSet copy = chunkSet.createCopy();
10151051

1016-
return this.call(extent, copy, () -> {
1017-
// Do nothing
1018-
});
1052+
return this.call(
1053+
extent, copy, () -> {
1054+
// Do nothing
1055+
}
1056+
);
10191057
}
10201058
return null;
10211059
}
@@ -1025,14 +1063,20 @@ public synchronized T call() {
10251063
*/
10261064

10271065
@Override
1028-
public <U extends Future<U>> U call(IQueueExtent<? extends IChunk> owner, IChunkSet set, Runnable finalize) {
1066+
public <U extends Future<U>> U call(IQueueExtent<?> owner, IChunkSet set, Runnable finalize) {
10291067
if (set != null) {
1068+
if (parentWrapper != null) {
1069+
if (!parentWrapper.invalidate(this)) {
1070+
throw new IllegalStateException("Existing chunk not equal to expected");
1071+
}
1072+
}
10301073
IChunkGet get = getOrCreateGet();
10311074
try {
10321075
get.lockCall();
10331076
trackExtent();
10341077
boolean postProcess = !(getExtent().getPostProcessor() instanceof EmptyBatchProcessor);
10351078
final int copyKey = get.setCreateCopy(postProcess);
1079+
// We should always be performing processing/postprocessing with this instance (i.e. not with this.parentWrapper)
10361080
final IChunkSet iChunkSet = getExtent().processSet(this, get, set);
10371081
Runnable finalizer;
10381082
if (postProcess) {
@@ -1067,7 +1111,7 @@ private void untrackExtent() {
10671111
/**
10681112
* Get the extent this chunk is in.
10691113
*/
1070-
public IQueueExtent<? extends IChunk> getExtent() {
1114+
public IQueueExtent<?> getExtent() {
10711115
return extent;
10721116
}
10731117

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/chunk/NullChunk.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ public boolean isEmpty() {
4747
return true;
4848
}
4949

50+
@Override
51+
public void setWrapper(final WrapperChunk parentWrapper) {
52+
}
53+
54+
@Override
55+
public void invalidateWrapper() {
56+
}
57+
5058
public Future call() {
5159
return null;
5260
}

0 commit comments

Comments
 (0)