Skip to content

Commit b0d6f46

Browse files
committed
fix: add a WrapperChunk to allow "Invalidating" a ChunkHolder on submission
- fixes issues where edits were not fully undone - invalidate a ChunkHolder when submitting, via a "wrapper" to allow filters to be "aware"/forced to use a new ChunkHolder when it is submitted - this also allows us to *not* submit a ChunkHolder in IQueueExtent#filter if the wrapper does not holder it anymore (i.e. it was forcibly submitted, and no more work was done)
1 parent f895ed7 commit b0d6f46

File tree

7 files changed

+703
-25
lines changed

7 files changed

+703
-25
lines changed

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/Filter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public interface Filter {
2424
/**
2525
* Do something with the IChunk<br>
2626
*/
27-
default @Nonnull <T extends IChunk> T applyChunk(T chunk, @Nullable Region region) {
27+
default @Nonnull <U extends IChunk> U applyChunk(U chunk, @Nullable Region region) {
2828
return chunk;
2929
}
3030

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/IQueueChunk.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package com.fastasyncworldedit.core.queue;
22

3+
import com.fastasyncworldedit.core.queue.implementation.chunk.ChunkHolder;
4+
import com.fastasyncworldedit.core.queue.implementation.chunk.WrapperChunk;
5+
import org.jetbrains.annotations.ApiStatus;
6+
37
import java.util.concurrent.Callable;
48
import java.util.concurrent.ExecutionException;
59
import java.util.concurrent.Future;
@@ -15,6 +19,26 @@ default IQueueChunk<T> reset() {
1519
return this;
1620
}
1721

22+
/**
23+
* Set a new {@link WrapperChunk} that allows prevention of a {@link ChunkHolder} instance being cached "locally" whilst it
24+
* has been called/submitted, causing issues with processing/postprocessing, etc.
25+
* If a wrapper has already been set, throws {@link IllegalStateException} as there should be no circumstance for us to set
26+
* a new wrapper (does nothing if attempting to set the same wrapper).
27+
*
28+
* @param parentWrapper wrapper wrapping this {@link ChunkHolder instance}
29+
* @throws IllegalStateException if there is already a wrapper set and a new wrapper instance is attempted to be se
30+
* @since TODO
31+
*/
32+
@ApiStatus.Internal
33+
void setWrapper(WrapperChunk<? extends IChunk> parentWrapper);
34+
35+
/**
36+
* Invalidate the {@link WrapperChunk} if present.
37+
*
38+
* @since TODO
39+
*/
40+
void invalidateWrapper();
41+
1842
/**
1943
* Apply the queued changes to the world containing this chunk.
2044
* <p>The future returned may return another future. To ensure completion keep calling {@link

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/IQueueExtent.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.fastasyncworldedit.core.internal.simd.SimdSupport;
66
import com.fastasyncworldedit.core.internal.simd.VectorizedCharFilterBlock;
77
import com.fastasyncworldedit.core.internal.simd.VectorizedFilter;
8+
import com.fastasyncworldedit.core.queue.implementation.chunk.WrapperChunk;
89
import com.sk89q.worldedit.extent.Extent;
910
import com.sk89q.worldedit.function.operation.Operation;
1011
import com.sk89q.worldedit.math.BlockVector2;
@@ -166,11 +167,16 @@ default ChunkFilterBlock apply(
166167
// if (!filter.appliesChunk(chunkX, chunkZ)) {
167168
// return block;
168169
// }
169-
T chunk = this.getOrCreateChunk(chunkX, chunkZ);
170-
171-
T newChunk = filter.applyChunk(chunk, region);
170+
T initial = this.getOrCreateChunk(chunkX, chunkZ);
171+
WrapperChunk<T> chunk = new WrapperChunk<>(initial, () -> this.getOrCreateChunk(chunkX, chunkZ));
172+
173+
IChunk newChunk = filter.applyChunk(chunk, region);
174+
if (newChunk == chunk) {
175+
newChunk = chunk.get();
176+
} else {
177+
chunk.setWrapped((T) newChunk);
178+
}
172179
if (newChunk != null) {
173-
chunk = newChunk;
174180
if (block == null) {
175181
if (SimdSupport.useVectorApi() && filter instanceof VectorizedFilter) {
176182
block = new VectorizedCharFilterBlock(this);
@@ -181,12 +187,16 @@ default ChunkFilterBlock apply(
181187
block.initChunk(chunkX, chunkZ);
182188
chunk.filterBlocks(filter, block, region, full);
183189
}
184-
this.submit(chunk);
190+
// If null, then assume it has already been submitted and the WrapperChunk has therefore been invalidated
191+
T toSubmit = chunk.get();
192+
if (toSubmit != null) {
193+
this.submit(toSubmit);
194+
}
185195
return block;
186196
}
187197

188198
@Override
189-
default <T extends Filter> T apply(Region region, T filter, boolean full) {
199+
default <U extends Filter> U apply(Region region, U filter, boolean full) {
190200
final Set<BlockVector2> chunks = region.getChunks();
191201
ChunkFilterBlock block = null;
192202
for (BlockVector2 chunk : chunks) {

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/SingleThreadQueueExtent.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.ConcurrentLinkedQueue;
3737
import java.util.concurrent.ExecutionException;
3838
import java.util.concurrent.Future;
39+
import java.util.concurrent.atomic.AtomicReference;
3940
import java.util.concurrent.locks.ReentrantLock;
4041

4142
/**
@@ -53,6 +54,7 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen
5354
private final Long2ObjectLinkedOpenHashMap<IQueueChunk<?>> chunks = new Long2ObjectLinkedOpenHashMap<>();
5455
private final ConcurrentLinkedQueue<Future<?>> submissions = new ConcurrentLinkedQueue<>();
5556
private final ReentrantLock getChunkLock = new ReentrantLock();
57+
private final AtomicReference<IQueueChunk> lastChunk = new AtomicReference<>();
5658
private World world = null;
5759
private int minY = 0;
5860
private int maxY = 255;
@@ -61,7 +63,6 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen
6163
private boolean initialized;
6264
private Thread currentThread;
6365
// Last access pointers
64-
private volatile IQueueChunk lastChunk;
6566
private boolean enabledQueue = true;
6667
private boolean fastmode = false;
6768
// Array for lazy avoidance of concurrent modification exceptions and needless overcomplication of code (synchronisation is
@@ -161,13 +162,13 @@ protected synchronized void reset() {
161162
return;
162163
}
163164
getChunkLock.lock();
165+
this.lastChunk.set(null);
164166
try {
165167
this.chunks.clear();
166168
} finally {
167169
getChunkLock.unlock();
168170
}
169171
this.enabledQueue = true;
170-
this.lastChunk = null;
171172
this.currentThread = null;
172173
this.initialized = false;
173174
this.setProcessor(EmptyBatchProcessor.getInstance());
@@ -221,9 +222,7 @@ public boolean isEmpty() {
221222

222223
@Override
223224
public <V extends Future<V>> V submit(IQueueChunk chunk) {
224-
if (lastChunk == chunk) {
225-
lastChunk = null;
226-
}
225+
this.lastChunk.compareAndExchange(chunk, null);
227226
final long index = MathMan.pairInt(chunk.getX(), chunk.getZ());
228227
getChunkLock.lock();
229228
chunks.remove(index, chunk);
@@ -254,6 +253,8 @@ private <V extends Future<V>> V submitUnchecked(IQueueChunk chunk) {
254253
}
255254
}
256255

256+
chunk.invalidateWrapper();
257+
257258
if (Fawe.isMainThread()) {
258259
V result = (V) chunk.call();
259260
if (result == null) {
@@ -277,8 +278,9 @@ public <V extends Future<V>> V submitTaskUnchecked(Callable<V> callable) {
277278
public synchronized boolean trim(boolean aggressive) {
278279
cacheGet.trim(aggressive);
279280
cacheSet.trim(aggressive);
281+
LOGGER.info("trim");
280282
if (Thread.currentThread() == currentThread) {
281-
lastChunk = null;
283+
lastChunk.set(null);
282284
return chunks.isEmpty();
283285
}
284286
if (!submissions.isEmpty()) {
@@ -316,7 +318,7 @@ public IQueueChunk wrap(IQueueChunk chunk) {
316318

317319
@Override
318320
public final IQueueChunk getOrCreateChunk(int x, int z) {
319-
final IQueueChunk lastChunk = this.lastChunk;
321+
final IQueueChunk lastChunk = this.lastChunk.get();
320322
if (lastChunk != null && lastChunk.getX() == x && lastChunk.getZ() == z) {
321323
return lastChunk;
322324
}
@@ -330,7 +332,7 @@ public final IQueueChunk getOrCreateChunk(int x, int z) {
330332
try {
331333
IQueueChunk chunk = chunks.get(pair);
332334
if (chunk != null) {
333-
this.lastChunk = chunk;
335+
this.lastChunk.set(chunk);
334336
return chunk;
335337
}
336338
final int size = chunks.size();
@@ -340,8 +342,9 @@ public final IQueueChunk getOrCreateChunk(int x, int z) {
340342
// - queue size > target size and primary queue has less than num threads submissions
341343
int targetSize = lowMem ? Settings.settings().QUEUE.PARALLEL_THREADS + 8 : this.targetSize;
342344
if (enabledQueue && size > targetSize && (lowMem || Fawe.instance().getQueueHandler().isUnderutilized())) {
343-
chunk = chunks.removeFirst();
344-
final Future future = submitUnchecked(chunk);
345+
IQueueChunk toSubmit = chunks.removeFirst();
346+
this.lastChunk.compareAndExchange(toSubmit, null);
347+
final Future future = submitUnchecked(toSubmit);
345348
if (future != null && !future.isDone()) {
346349
pollSubmissions(targetSize, lowMem);
347350
submissions.add(future);
@@ -351,7 +354,7 @@ public final IQueueChunk getOrCreateChunk(int x, int z) {
351354
chunk = wrap(chunk);
352355

353356
chunks.put(pair, chunk);
354-
this.lastChunk = chunk;
357+
this.lastChunk.set(chunk);
355358

356359
return chunk;
357360
} finally {
@@ -481,6 +484,7 @@ public synchronized void flush() {
481484
if (MemUtil.isMemoryLimited()) {
482485
while (!chunks.isEmpty()) {
483486
IQueueChunk chunk = chunks.removeFirst();
487+
this.lastChunk.compareAndExchange(chunk, null);
484488
final Future future = submitUnchecked(chunk);
485489
if (future != null && !future.isDone()) {
486490
pollSubmissions(Settings.settings().QUEUE.PARALLEL_THREADS, true);
@@ -490,6 +494,7 @@ public synchronized void flush() {
490494
} else {
491495
while (!chunks.isEmpty()) {
492496
IQueueChunk chunk = chunks.removeFirst();
497+
this.lastChunk.compareAndExchange(chunk, null);
493498
final Future future = submitUnchecked(chunk);
494499
if (future != null && !future.isDone()) {
495500
submissions.add(future);

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/chunk/ChunkHolder.java

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
*/
3737
@SuppressWarnings("rawtypes")
3838
public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
39+
3940
private static final Logger LOGGER = LogManagerCompat.getLogger();
4041

4142
public static ChunkHolder newInstance() {
@@ -54,6 +55,7 @@ public static ChunkHolder newInstance() {
5455
private boolean createCopy = false;
5556
private long initTime = -1L;
5657
private SideEffectSet sideEffectSet;
58+
private WrapperChunk<? extends IChunk> parentWrapper = null;
5759

5860
private ChunkHolder() {
5961
this.delegate = NULL;
@@ -63,7 +65,26 @@ public void init(IBlockDelegate delegate) {
6365
this.delegate = delegate;
6466
}
6567

68+
@Override
69+
public void setWrapper(WrapperChunk<? extends IChunk> parentWrapper) {
70+
if (parentWrapper == this.parentWrapper) {
71+
return;
72+
}
73+
if (this.parentWrapper != null) {
74+
throw new IllegalStateException("Wrapper already set");
75+
}
76+
this.parentWrapper = parentWrapper;
77+
}
78+
79+
@Override
80+
public void invalidateWrapper() {
81+
if (this.parentWrapper != null) {
82+
this.parentWrapper.invalidate(this);
83+
}
84+
}
85+
6686
private static final AtomicBoolean recycleWarning = new AtomicBoolean(false);
87+
6788
@Override
6889
public void recycle() {
6990
if (!recycleWarning.getAndSet(true)) {
@@ -346,10 +367,12 @@ public int[] getHeightMap(ChunkHolder chunk, HeightMapType type) {
346367

347368
@Override
348369
public void flushLightToGet(ChunkHolder chunk) {
349-
chunk.chunkExisting.setLightingToGet(chunk.chunkSet.getLight(), chunk.chunkSet.getMinSectionPosition(),
370+
chunk.chunkExisting.setLightingToGet(
371+
chunk.chunkSet.getLight(), chunk.chunkSet.getMinSectionPosition(),
350372
chunk.chunkSet.getMaxSectionPosition()
351373
);
352-
chunk.chunkExisting.setSkyLightingToGet(chunk.chunkSet.getSkyLight(), chunk.chunkSet.getMinSectionPosition(),
374+
chunk.chunkExisting.setSkyLightingToGet(
375+
chunk.chunkSet.getSkyLight(), chunk.chunkSet.getMinSectionPosition(),
353376
chunk.chunkSet.getMaxSectionPosition()
354377
);
355378
}
@@ -892,10 +915,13 @@ public boolean hasNonEmptySection(final int layer) {
892915
public synchronized void filterBlocks(Filter filter, ChunkFilterBlock block, @Nullable Region region, boolean full) {
893916
final IChunkGet get = getOrCreateGet();
894917
final IChunkSet set = getOrCreateSet();
918+
if (parentWrapper == null) {
919+
parentWrapper = new WrapperChunk<>(this, () -> this.extent.getOrCreateChunk(getX(), getZ()));
920+
}
895921
try {
896-
block.filter(this, get, set, filter, region, full);
922+
block.filter(parentWrapper, get, set, filter, region, full);
897923
} finally {
898-
filter.finishChunk(this);
924+
filter.finishChunk(parentWrapper);
899925
}
900926
}
901927

@@ -998,6 +1024,13 @@ public synchronized <V extends IChunk> void init(IQueueExtent<V> extent, int chu
9981024
this.extent = extent;
9991025
this.chunkX = chunkX;
10001026
this.chunkZ = chunkZ;
1027+
if (this.parentWrapper != null) {
1028+
try {
1029+
this.parentWrapper.invalidate(this);
1030+
} catch (IllegalStateException ignored) {
1031+
}
1032+
this.parentWrapper = null;
1033+
}
10011034
if (chunkSet != null) {
10021035
chunkSet.reset();
10031036
delegate = SET;
@@ -1013,9 +1046,11 @@ public synchronized T call() {
10131046
if (chunkSet != null && !chunkSet.isEmpty()) {
10141047
IChunkSet copy = chunkSet.createCopy();
10151048

1016-
return this.call(extent, copy, () -> {
1017-
// Do nothing
1018-
});
1049+
return this.call(
1050+
extent, copy, () -> {
1051+
// Do nothing
1052+
}
1053+
);
10191054
}
10201055
return null;
10211056
}
@@ -1027,12 +1062,21 @@ public synchronized T call() {
10271062
@Override
10281063
public <U extends Future<U>> U call(IQueueExtent<? extends IChunk> owner, IChunkSet set, Runnable finalize) {
10291064
if (set != null) {
1065+
if (parentWrapper != null) {
1066+
try {
1067+
parentWrapper.invalidate(this);
1068+
} catch (Exception e) {
1069+
LOGGER.error(this.initTime + " : " + ((ChunkHolder) parentWrapper.get()).initTime);
1070+
throw e;
1071+
}
1072+
}
10301073
IChunkGet get = getOrCreateGet();
10311074
try {
10321075
get.lockCall();
10331076
trackExtent();
10341077
boolean postProcess = !(getExtent().getPostProcessor() instanceof EmptyBatchProcessor);
10351078
final int copyKey = get.setCreateCopy(postProcess);
1079+
// We should always be performing processing/postprocessing with this instance (i.e. not with this.parentWrapper)
10361080
final IChunkSet iChunkSet = getExtent().processSet(this, get, set);
10371081
Runnable finalizer;
10381082
if (postProcess) {

worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/chunk/NullChunk.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ public boolean isEmpty() {
4747
return true;
4848
}
4949

50+
@Override
51+
public void setWrapper(final WrapperChunk parentWrapper) {
52+
}
53+
54+
@Override
55+
public void invalidateWrapper() {
56+
}
57+
5058
public Future call() {
5159
return null;
5260
}

0 commit comments

Comments
 (0)