diff --git a/src/main/java/io/github/opencubicchunks/cubicchunks/core/server/chunkio/RegionCubeStorage.java b/src/main/java/io/github/opencubicchunks/cubicchunks/core/server/chunkio/RegionCubeStorage.java index 2881fc25d..a71aa00e8 100644 --- a/src/main/java/io/github/opencubicchunks/cubicchunks/core/server/chunkio/RegionCubeStorage.java +++ b/src/main/java/io/github/opencubicchunks/cubicchunks/core/server/chunkio/RegionCubeStorage.java @@ -138,19 +138,29 @@ public boolean cubeExists(CubePos pos) throws IOException { public NBTTagCompound readColumn(ChunkPos pos) throws IOException { //we use a true here in order to force creation and caching of the new region, thus avoiding an expensive Files.exists() check for every cube/column (which // is really expensive on windows) - Optional data = this.save.load(new EntryLocation2D(pos.x, pos.z), true); - return data.isPresent() - ? CompressedStreamTools.readCompressed(new ByteArrayInputStream(data.get().array())) //decompress and parse NBT - : null; //column doesn't exist + Optional optionalData = this.save.load(new EntryLocation2D(pos.x, pos.z), true); + if (!optionalData.isPresent()) { //column doesn't exist + return null; + } + + //decompress and parse NBT + ByteBuffer data = optionalData.get(); + return CompressedStreamTools.readCompressed( + new ByteArrayInputStream(data.array(), data.arrayOffset() + data.position(), data.remaining())); } @Override public NBTTagCompound readCube(CubePos pos) throws IOException { //see comment in readColumn - Optional data = this.save.load(new EntryLocation3D(pos.getX(), pos.getY(), pos.getZ()), true); - return data.isPresent() - ? CompressedStreamTools.readCompressed(new ByteArrayInputStream(data.get().array())) //decompress and parse NBT - : null; //cube doesn't exist + Optional optionalData = this.save.load(new EntryLocation3D(pos.getX(), pos.getY(), pos.getZ()), true); + if (!optionalData.isPresent()) { //cube doesn't exist + return null; + } + + //decompress and parse NBT + ByteBuffer data = optionalData.get(); + return CompressedStreamTools.readCompressed( + new ByteArrayInputStream(data.array(), data.arrayOffset() + data.position(), data.remaining())); } @Override diff --git a/src/main/java/io/github/opencubicchunks/cubicchunks/core/server/chunkio/region/ShadowPagingRegion.java b/src/main/java/io/github/opencubicchunks/cubicchunks/core/server/chunkio/region/ShadowPagingRegion.java index c3cd34548..a09b33637 100644 --- a/src/main/java/io/github/opencubicchunks/cubicchunks/core/server/chunkio/region/ShadowPagingRegion.java +++ b/src/main/java/io/github/opencubicchunks/cubicchunks/core/server/chunkio/region/ShadowPagingRegion.java @@ -39,25 +39,24 @@ import cubicchunks.regionlib.util.CorruptedDataException; import cubicchunks.regionlib.util.Utils; import io.github.opencubicchunks.cubicchunks.core.CubicChunks; -import net.minecraft.util.Tuple; +import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap; +import java.io.EOFException; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; import java.nio.channels.FileChannel; import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; @@ -109,8 +108,21 @@ private static void zeroes(int length, List target) { private final int sectorSize; private final SectorTracker sectorTracker; - private final ReadWriteLock dataLock = new ReentrantReadWriteLock(); - private final ReadWriteLock reserveSectorsLock = new ReentrantReadWriteLock(); + /** + * An ordinary lock which prevents multiple writes from being started at once. + */ + private final Lock startWriteLock = new ReentrantLock(); + + /** + * An {@link OngoingWrite} instance describing an in-progress write operation, or {@code null} if there is none. + */ + private volatile OngoingWrite ongoingWrite = null; + + /** + * A read/write lock which allows writers to wait for all uncontended reads to complete. + */ + private final ReadWriteLock uncontendedReadLock = new ReentrantReadWriteLock(true); + private ShadowPagingRegion(FileChannel file, SectorTracker sectorTracker, IHeaderDataEntryProvider headerEntryProvider, RegionKey regionKey, IKeyProvider keyProvider, int sectorSize) { this.file = file; this.headerEntryProvider = headerEntryProvider; @@ -128,83 +140,113 @@ public void writeValue(K key, ByteBuffer value) throws IOException { @Override public void writeValues(Map entries) throws IOException { + //calling file.force() is slow, so we want to minimize the number of times it needs to be called. the solution is simple: we write the data + // for ALL entries at once, and don't update the headers until it's all been written to disk. + + //we assume that the given Map is safe for concurrent reads as long as we aren't writing to it (i legitimately have no idea what kind of + // implementation *wouldn't* be - what datastructure changes its internal state during a read?) + if (entries.isEmpty()) { //fast-path if there isn't anything to be written return; } - //calling file.force() is slow, so we want to minimize the number of times it needs to be called. the solution is simple: we write the data for ALL - // entries at once, and don't update the headers until it's all been written to disk. + + //allocate temporary objects List exceptions = new ArrayList<>(); - Map> pendingHeaderUpdates = new HashMap<>(entries.size()); - Map entryLocationsToUse = new HashMap<>(entries.size()); - - Lock sectorLock = reserveSectorsLock.writeLock(); - Lock mainLock = dataLock.writeLock(); - sectorLock.lock(); - mainLock.lock(); - //entries.forEach((k, v) -> CubicChunks.LOGGER.error(this + ": WRITE: " + k + ", " + v.remaining())); - try { - // first pass: reserve header locations: - reserveHeaderEntriesPass(entries, exceptions, pendingHeaderUpdates, entryLocationsToUse); - } finally { - sectorLock.unlock(); - } - try { - // second pass: write all data - boolean shouldFlush = writeDataPass(entries, exceptions, entryLocationsToUse); + Map pendingHeaderUpdates = new Object2ObjectOpenHashMap<>(entries.size()); - //flush the file's contents if any of the entries modified the region data - if (shouldFlush) { - this.file.force(true); + //actually perform the write + this.doWrite(new OngoingWrite<>(Collections.emptyMap(), entries), ongoingWrite -> { + try { + //first pass: reserve header locations + this.reserveHeaderEntriesPass(entries, exceptions, pendingHeaderUpdates); + + //second pass: write all data + this.writeDataPass(entries, exceptions, pendingHeaderUpdates); + } catch (RuntimeException | Error | IOException e) { + //something went wrong, roll back all the pending updates + pendingHeaderUpdates.forEach((key, update) -> { + try { + //roll back the update in the sector tracker + this.sectorTracker.rollbackUpdate(key, update); + } catch (RuntimeException | Error | IOException e1) { + e.addSuppressed(e1); + } + }); + + throw e; } - // third pass: execute pending header updates - doPendingHeaderUpdatesPass(pendingHeaderUpdates); + try { + //third pass: execute pending header updates + this.doPendingHeaderUpdatesPass(pendingHeaderUpdates); + } catch (RuntimeException | Error | IOException e) { + //something went wrong, roll back all the pending updates + pendingHeaderUpdates.forEach((key, update) -> { + try { + //roll back the update in the sector tracker + this.sectorTracker.rollbackUpdate(key, update); + //try to update the headers on-disk so that they're restored to their original value, as we don't want to have some + // undefined subset of the writes be applied, but not all of them. + this.updateHeaders(key); + } catch (RuntimeException | Error | IOException e1) { + e.addSuppressed(e1); + } + }); - //throw all pending exceptions at once if any occurred - if (!exceptions.isEmpty()) { - throw new MultiUnsupportedDataException(exceptions); + throw e; } - } finally { - mainLock.unlock(); + }); + + //for all successfully written entries: advance the buffer's position to the end. + // we couldn't do this while writing the data because it would have caused readers to return a clone of an empty buffer, rather than the + // actual buffer range that was written. + pendingHeaderUpdates.forEach((key, update) -> { + ByteBuffer buffer = entries.get(key); + buffer.position(buffer.limit()); + }); + + //throw all pending exceptions at once if any occurred + if (!exceptions.isEmpty()) { + throw new MultiUnsupportedDataException(exceptions); } } private void reserveHeaderEntriesPass(Map entries, List exceptions, - Map> pendingHeaderUpdates, Map entryLocationsToUse) throws IOException { + Map pendingHeaderUpdates) throws IOException { for (Iterator> itr = entries.entrySet().iterator(); itr.hasNext(); ) { Map.Entry entry = itr.next(); K key = entry.getKey(); ByteBuffer value = entry.getValue(); - try { - Optional prevLocation; - if (value == null) { - //if deleting an entry, there's no need to change anything on disk! the only thing that needs - // to be changed is the headers. - prevLocation = null; - } else { - int size = value.remaining(); - int sizeWithSizeInfo = size + Integer.BYTES; - int numSectors = this.getSectorNumber(sizeWithSizeInfo); - - //this may throw UnsupportedDataException if data is too big. - //it won't cause the sector tracker to be updated, meaning that reallocated sectors won't be overwritten by - // subsequent writes from the same batch. - Tuple headerUpdate = this.sectorTracker.reserveForKey(key, numSectors); - prevLocation = Optional.ofNullable(headerUpdate.getFirst()); - entryLocationsToUse.put(key, headerUpdate.getSecond()); + HeaderUpdate update; + if (value == null) { + //if deleting an entry, there's no need to change anything on disk! the only thing that needs + // to be changed is the headers. + update = this.sectorTracker.getUpdateForDeletion(key); + } else { + int size = value.remaining(); + int sizeWithSizeInfo = size + Integer.BYTES; + int numSectors = this.getSectorNumber(sizeWithSizeInfo); + + //this may throw UnsupportedDataException if data is too big. + //it won't cause the sector tracker to be updated, meaning that reallocated sectors won't be overwritten by + // subsequent writes from the same batch. + try { + update = this.sectorTracker.getUpdateWithReservation(key, numSectors); + } catch (UnsupportedDataException e) { + //save exception for later + exceptions.add(new UnsupportedDataException.WithKey(e, key)); + continue; } - pendingHeaderUpdates.put(key, prevLocation); - } catch (UnsupportedDataException e) { - //save exception for later - exceptions.add(new UnsupportedDataException.WithKey(e, key)); } + + pendingHeaderUpdates.put(key, update); } } - private boolean writeDataPass(Map entries, List exceptions, - Map entryLocationsToUse) throws IOException { + private void writeDataPass(Map entries, List exceptions, + Map pendingHeaderUpdates) throws IOException { boolean shouldFlush = false; List tempBuffers = new ArrayList<>(); @@ -220,8 +262,14 @@ private boolean writeDataPass(Map entries, List entries, List entries, List> pendingHeaderUpdates) throws IOException { + private void doPendingHeaderUpdatesPass(Map pendingHeaderUpdates) throws IOException { if (!pendingHeaderUpdates.isEmpty()) { - for (Iterator>> itr = pendingHeaderUpdates.entrySet().iterator(); itr.hasNext(); ) { - Map.Entry> entry = itr.next(); - - K key = entry.getKey(); - Optional prevLocation = entry.getValue(); - if (prevLocation == null) { - //the entry is being deleted, so we need to remove the key from the headers entirely - this.sectorTracker.removeKey(key); - } else { - //the entry changed, and we need to release the previous sectors - this.sectorTracker.updateUsedSectorsFor(prevLocation.orElse(null), null); - } + //first, commit all the pending header updates to the index + pendingHeaderUpdates.forEach(this.sectorTracker::commitUpdate); - //write new header value for this key to disk + //update all the header slots on-disk + for (K key : pendingHeaderUpdates.keySet()) { this.updateHeaders(key); } @@ -271,18 +314,11 @@ private void doPendingHeaderUpdatesPass(Map> pe } @Override public void writeSpecial(K key, Object marker) throws IOException { - Lock sectorLock = reserveSectorsLock.writeLock(); - Lock mainLock = dataLock.writeLock(); - sectorLock.lock(); - mainLock.lock(); - try { + this.doWrite(new OngoingWrite<>(Collections.singletonMap(key, Optional.ofNullable(marker)), Collections.emptyMap()), ongoingWrite -> { this.sectorTracker.setSpecial(key, marker); - updateHeaders(key); - file.force(false); - } finally { - mainLock.unlock(); - sectorLock.unlock(); - } + this.updateHeaders(key); + this.file.force(false); + }); } private void updateHeaders(K key) throws IOException { @@ -293,98 +329,193 @@ private void updateHeaders(K key) throws IOException { Utils.writeFully(file.position((long) key.getId() * entryByteCount), buf); } - @Override public Optional readValue(K key) throws IOException { - - Lock sectorLock = reserveSectorsLock.readLock(); - Lock mainLock = dataLock.readLock(); - boolean mainLocked = false; + private void doWrite(OngoingWrite ongoingWrite, CheckedConsumer, ? extends IOException> writeBody) throws IOException { + //we hold startWriteLock for the entire duration of the write to ensure that there's never more than one writer at once + this.startWriteLock.lock(); try { - sectorLock.lock(); - + //set ongoingWrite, which will redirect all incoming readers to become contended reads on this write. + assert this.ongoingWrite == null : "somehow, ongoingWrite returned!"; + this.ongoingWrite = ongoingWrite; - //CubicChunks.LOGGER.error(this + ": READ: " + key); - Function specialValue = sectorTracker.trySpecialValue(key).orElse(null); - if (specialValue != null) { - return Optional.of(specialValue.apply(key)); - } - Optional entryLocation = sectorTracker.getEntryLocation(key); - if (!entryLocation.isPresent()) { - return Optional.empty(); + try { + //acquire uncontendedReadLock.writeLock(), which will cause us to block until all uncontended readers are completed. once it's been + // acquired, we can be certain that any active readers are contended readers which are reading through our ongoingWrite instance, and + // we can safely update to any keys which are present 'entries' map on disk without having to worry about concurrent readers. + this.uncontendedReadLock.writeLock().lock(); + try { + //now that all the locks have been acquired, we can actually perform the write + writeBody.accept(ongoingWrite); + } finally { + //we've finished writing all the data we wanted to write, so we'll release uncontendedReadLock.writeLock() since readers should + // now be able to safely access any entry on disk without us swapping the data out from underneath them. + this.uncontendedReadLock.writeLock().unlock(); + } + } finally { + //reset ongoingWrite to null, which will prevent new readers from contending on this write. + this.ongoingWrite = null; + + //block until all readers contending on this write have completed. we never release contendedReadLock.writeLock() - making sure it + // remains locked permanently makes sure that any readers which somehow still hold a reference to it will be unable to start + // contending on this write, and the lock will eventually be garbage collected. + ongoingWrite.contendedReadLock.writeLock().lock(); } - mainLock.lock(); - mainLocked = true; - // get sector tracker entry again in case it got deleted in the meantime - return doReadKey(key); - } catch (UncheckedIOException e) { - throw e.getCause(); } finally { - sectorLock.unlock(); - if (mainLocked) { - mainLock.unlock(); - } + //this write is complete! release startWriteLock so that the next writer can start. + this.startWriteLock.unlock(); } } - private Optional doReadKey(K key) { - try { - Optional entryLocation = sectorTracker.getEntryLocation(key); - if (!entryLocation.isPresent()) { - return Optional.empty(); + @Override public Optional readValue(K key) throws IOException { + do { + OngoingWrite ongoingWrite = this.ongoingWrite; + if (ongoingWrite != null) { //there is currently a write operation in progress + if (!ongoingWrite.contendedReadLock.readLock().tryLock()) { + //if we fail to acquire the write's contendedReadLock's read lock, it means the writer thread for this OngoingWrite has completed + // and the OngoingWrite instance is outdated. spin and try again! + continue; + } + try { //do a contended read, reading to-be-updated entries from memory and unmodified entries from disk + if (ongoingWrite.inProgressWriteSpecial.containsKey(key)) { + //the key is being modified by the ongoing write, so to avoid reading from disk (which will probably result in a data race) + // we'll return the new value which is already in memory. + + //we actually can't do anything with the special value marker, as it has to be stored inside the sector map in order to + // access the associated value. instead, we'll just spin until the write completes. + Thread.yield(); + continue; + } else if (ongoingWrite.inProgressWriteData.containsKey(key)) { + //the key is being modified by the ongoing write, so to avoid reading from disk (which will probably result in a data race) + // we'll return the new value which is already in memory. + + ByteBuffer originalWriteData = ongoingWrite.inProgressWriteData.get(key); + if (originalWriteData == null) { //the entry is being deleted + return Optional.empty(); + } + + //duplicate the buffer's contents to return them + originalWriteData = originalWriteData.slice(); //slice the buffer first to avoid changing the position + ByteBuffer clonedWriteData = ByteBuffer.allocate(originalWriteData.remaining()); + clonedWriteData.put(originalWriteData).clear(); + return Optional.of(clonedWriteData); + } else { + //the key is not being modified by the ongoing write. therefore, we can rest assured that nothing related to this key (either + // its header entries or the data associated with it) will be modified during the course of the ongoing write, and can read + // the data from disk like normal. + //if a new write were to start before this read is complete, it's possible that this key could be modified by the new write + // while this read is in progress. however, since the current write can't complete until it acquires + // contendedReadLock.readLock(), and we hold contendedReadLock.readLock() until we finish reading, we can be certain that + // a new write won't start modifying this key until we finish up here. + return this.readFromDisk(key); + } + } finally { + ongoingWrite.contendedReadLock.readLock().unlock(); + } } - RegionEntryLocation loc = entryLocation.get(); - int sectorOffset = loc.getOffset(); - int sectorCount = loc.getSize(); - - // read data size (one int) - ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES); - long position = (long) sectorOffset * sectorSize; - readFully(file, buf, position); - - int dataLength = buf.getInt(0); - if (dataLength > sectorCount * sectorSize) { - throw new CorruptedDataException( - "Expected data size max " + sectorCount * sectorSize + " but found " + dataLength); + + //there was no ongoing write, let's try to begin an uncontended read + + if (this.uncontendedReadLock.readLock().tryLock()) { + //no writer is waiting to start, meaning that ongoingWrite is still null (if ongoingWrite is set while we hold the uncontendedRead + // read lock it doesn't matter, because the writer won't be able to begin until all uncontended readers are finished) + try { + return this.readFromDisk(key); + } finally { + this.uncontendedReadLock.readLock().unlock(); + } } + } while (true); + } + + private Optional readFromDisk(K key) throws IOException { + Function specialValue = this.sectorTracker.trySpecialValue(key).orElse(null); + if (specialValue != null) { + return Optional.of(specialValue.apply(key)); + } + + Optional optionalLocation = this.sectorTracker.getEntryLocation(key); + if (!optionalLocation.isPresent()) { + return Optional.empty(); + } + + RegionEntryLocation location = optionalLocation.get(); + int sectorOffset = location.getOffset(); + int sectorCount = location.getSize(); + + //read all the data in one go up to the end of the entry + ByteBuffer buffer = ByteBuffer.allocate(sectorCount * this.sectorSize); + readFully(this.file, buffer, sectorOffset * (long) this.sectorSize); + buffer.flip(); - // read data - ByteBuffer bytes = ByteBuffer.allocate(dataLength); - readFully(file, bytes, position + Integer.BYTES); - bytes.flip(); - return Optional.of(bytes); - } catch (IOException e) { - throw new UncheckedIOException(e); + //read the actual data length + int dataLength = buffer.getInt(); + if (dataLength > sectorCount * this.sectorSize) { + throw new CorruptedDataException( + "Expected data size max " + sectorCount * this.sectorSize + " but found " + dataLength); } + + //return a slice of the full buffer, so that the user doesn't get access to the length prefix or padding bytes at the end + buffer.limit(buffer.position() + dataLength); + return Optional.of(buffer.slice()); } /** * Returns true if something was stored there before within this region. */ @Override public boolean hasValue(K key) { - reserveSectorsLock.readLock().lock(); - try { - return sectorTracker.trySpecialValue(key).isPresent() || sectorTracker.getEntryLocation(key).isPresent(); - } finally { - reserveSectorsLock.readLock().unlock(); - } + do { + OngoingWrite ongoingWrite = this.ongoingWrite; + if (ongoingWrite != null) { //there is currently a write operation in progress + if (!ongoingWrite.contendedReadLock.readLock().tryLock()) { + //if we fail to acquire the write's contendedReadLock's read lock, it means the writer thread for this OngoingWrite has completed + // and the OngoingWrite instance is outdated. spin and try again! + continue; + } + try { //do a contended read, reading to-be-updated entries from memory and unmodified entries from disk + if (ongoingWrite.inProgressWriteSpecial.containsKey(key)) { + //the key is being modified by the ongoing write, so to avoid reading from disk (which will probably result in a data race) + // we'll return the new value which is already in memory. + return true; + } else if (ongoingWrite.inProgressWriteData.containsKey(key)) { + //the key is being modified by the ongoing write, so to avoid reading from disk (which will probably result in a data race) + // we'll return the new value which is already in memory. + return ongoingWrite.inProgressWriteData.get(key) != null; + } else { + //the key is not being modified by the ongoing write. therefore, we can rest assured that nothing related to this key (either + // its header entries or the data associated with it) will be modified during the course of the ongoing write, and can read + // the data from disk like normal. + return this.sectorTracker.trySpecialValue(key).isPresent() || this.sectorTracker.getEntryLocation(key).isPresent(); + } + } finally { + ongoingWrite.contendedReadLock.readLock().unlock(); + } + } + + //there was no ongoing write, let's try to begin an uncontended read + + if (this.uncontendedReadLock.readLock().tryLock()) { + //no writer is waiting to start, meaning that ongoingWrite is still null (if ongoingWrite is set while we hold the uncontendedRead + // read lock it doesn't matter, because the writer won't be able to begin until all uncontended readers are finished) + try { + return this.sectorTracker.trySpecialValue(key).isPresent() || this.sectorTracker.getEntryLocation(key).isPresent(); + } finally { + this.uncontendedReadLock.readLock().unlock(); + } + } + } while (true); } @Override public void forEachKey(CheckedConsumer cons) throws IOException { - // acquire write locks even when we are "only" reading because callbacks may write - reserveSectorsLock.writeLock().lock(); - dataLock.writeLock().lock(); - try { - int keyCount = this.keyProvider.getKeyCount(regionKey); + //acquire write lock even when we are "only" reading because callbacks may write + this.doWrite(new OngoingWrite(Collections.emptyMap(), Collections.emptyMap()), ongoingWrite -> { + final int keyCount = this.keyProvider.getKeyCount(this.regionKey); for (int id = 0; id < keyCount; id++) { int idFinal = id; // because java is stupid - K key = sectorTracker.getEntryLocation(id).map(loc -> keyProvider.fromRegionAndId(this.regionKey, idFinal)).orElse(null); + K key = this.sectorTracker.getEntryLocation(id).map(loc -> this.keyProvider.fromRegionAndId(this.regionKey, idFinal)).orElse(null); if (key != null) { cons.accept(key); } } - } finally { - dataLock.writeLock().unlock(); - reserveSectorsLock.writeLock().unlock(); - } + }); } private int getSectorNumber(int bytes) { @@ -394,9 +525,7 @@ private int getSectorNumber(int bytes) { @Override public void flush() throws IOException { //CubicChunks.bigWarning(this + ": FLUSH!!!"); - reserveSectorsLock.writeLock().lock(); - dataLock.writeLock().lock(); - try { + this.doWrite(new OngoingWrite(Collections.emptyMap(), Collections.emptyMap()), ongoingWrite -> { boolean fileLengthChanged = false; fileLengthChanged |= this.ensureSectorSizeAligned(); @@ -406,25 +535,18 @@ public void flush() throws IOException { //if the file's length changed, we want to make sure we also force metadata updates to disk this.file.force(fileLengthChanged); - } finally { - dataLock.writeLock().unlock(); - reserveSectorsLock.writeLock().unlock(); - } + }); } @Override public void close() throws IOException { //CubicChunks.bigWarning(this + ": CLOSE!!!"); - reserveSectorsLock.writeLock().lock(); - dataLock.writeLock().lock(); - - //try-with-resources on file to ensure that the file gets closed, even if the other code throws an exception - try (FileChannel file = this.file) { - this.ensureSectorSizeAligned(); - this.erasePendingSectors(); - } finally { - dataLock.writeLock().unlock(); - reserveSectorsLock.writeLock().unlock(); - } + this.doWrite(new OngoingWrite(Collections.emptyMap(), Collections.emptyMap()), ongoingWrite -> { + //try-with-resources on file to ensure that the file gets closed, even if the other code throws an exception + try (FileChannel file = this.file) { + this.ensureSectorSizeAligned(); + this.erasePendingSectors(); + } + }); } /** @@ -481,7 +603,11 @@ public static > ShadowPagingRegion.Builder builder() { public static void readFully(FileChannel src, ByteBuffer data, long position) throws IOException { while (data.hasRemaining()) { - src.read(data, position); + long read = src.read(data, position); + if (read < 0L) { + throw new EOFException(); + } + position += read; } } @@ -524,6 +650,50 @@ public ShadowPagingRegion build() throws IOException { } } + private static class OngoingWrite> { + /** + * A read/write lock which allows the writer to wait for all contended reads accessing this {@link OngoingWrite} instance to complete. + *

+ * Once complete, the writer will acquire this lock's {@link ReadWriteLock#writeLock() write lock} and never release it, thus preventing any + * new readers from becoming contended writes on this {@link OngoingWrite} instance and informing them that this write is complete. + */ + public final ReadWriteLock contendedReadLock = new ReentrantReadWriteLock(true); + + /** + * An immutable map representing a view of the changes being made by this write operation. + */ + public final Map> inProgressWriteSpecial; + + /** + * An immutable map representing a view of the changes being made by this write operation. + */ + public final Map inProgressWriteData; + + public OngoingWrite(Map> inProgressWriteSpecial, Map inProgressWriteData) { + this.inProgressWriteSpecial = inProgressWriteSpecial; + this.inProgressWriteData = inProgressWriteData; + } + } + + private static class HeaderUpdate { + private final RegionEntryLocation prev; + private final RegionEntryLocation next; + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + public HeaderUpdate(Optional prev, Optional next) { + this.prev = prev.orElse(null); + this.next = next.orElse(null); + } + + public Optional getPrev() { + return Optional.ofNullable(this.prev); + } + + public Optional getNext() { + return Optional.ofNullable(this.next); + } + } + private static class SectorTracker> { private final BitSet usedSectors; @@ -564,16 +734,53 @@ public void removeKey(K key) throws IOException { } /** - * Returns the old offset for the given key and the new offset for the given key and requestedSize, and reserves the new sectors. - * + * Reserves {@code requestedSize} new sectors, marks them as used and updates the entry location for the given {@code key}. If a valid + * location cannot be found, {@link UnsupportedOperationException} and this sector tracker will not be modified. + *

* The old sectors will not be released. + * + * @throws UnsupportedDataException if the sector map cannot store a value of the requested size */ - public Tuple reserveForKey(K key, int requestedSize) throws IOException { - Optional existing = sectorMap.getEntryLocation(key); - RegionEntryLocation found = findFree(requestedSize); - this.sectorMap.setOffsetAndSize(key, found); + public HeaderUpdate getUpdateWithReservation(K key, int requestedSize) throws IOException, UnsupportedDataException { + Optional existing = this.sectorMap.getEntryLocation(key); + RegionEntryLocation found = this.findFree(requestedSize); + + this.sectorMap.setOffsetAndSize(key, found); //this will throw UnsupportedDataException without changing anything if it fails this.updateUsedSectorsFor(null, found); //mark new sectors as allocated - return new Tuple<>(existing.orElse(null), found); + + return new HeaderUpdate(existing, Optional.of(found)); + } + + /** + * Prepares a {@link HeaderUpdate} for deleting the entry with the given {@code key}. + *

+ * The old sectors will not be released. + * + * @throws UnsupportedDataException if the sector map cannot store a value of the requested size + */ + public HeaderUpdate getUpdateForDeletion(K key) throws IOException, UnsupportedDataException { + Optional existing = this.sectorMap.getEntryLocation(key); + + this.sectorMap.setOffsetAndSize(key, new RegionEntryLocation(0, 0)); + + //we don't want to de-allocate the old sectors yet, so don't call updateUsedSectorsFor + + return new HeaderUpdate(existing, Optional.empty()); + } + + public void commitUpdate(K key, HeaderUpdate update) { + //the new entry location should already be stored in the sector map, so we don't have to make any changes to it here. + + //release the previously occupied sectors (we assume that the new sectors are already marked as used) + this.updateUsedSectorsFor(update.getPrev().orElse(null), null); + } + + public void rollbackUpdate(K key, HeaderUpdate update) throws IOException { + //restore original state in the sector map + this.sectorMap.setOffsetAndSize(key, update.getPrev().orElseGet(() -> new RegionEntryLocation(0, 0))); + + //free all the newly allocated sectors and re-mark the previously used sectors as used + this.updateUsedSectorsFor(update.getNext().orElse(null), update.getPrev().orElse(null)); } private RegionEntryLocation findFree(int requestedSize) {