2121import java .util .Deque ;
2222import java .util .concurrent .ConcurrentLinkedDeque ;
2323import java .util .concurrent .atomic .AtomicBoolean ;
24+ import java .util .concurrent .locks .ReadWriteLock ;
25+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
2426
2527/**
2628 * A direct on disk implementation of LongList. This implementation creates a temporary file to store the data.
@@ -64,6 +66,28 @@ public class LongListDisk extends AbstractLongList<Long> {
6466 */
6567 private final Deque <Long > freeChunks = new ConcurrentLinkedDeque <>();
6668
69+ /**
70+ * Protects readers and writers from observing a chunk file-offset
71+ * that has been recycled by a concurrent
72+ * {@link #updateValidRange} → {@link #closeChunk} → {@link #createChunk}
73+ * cycle.
74+ *
75+ * <ul>
76+ * <li><b>Read lock</b> – held by {@link #get},
77+ * {@link #putToChunk}, and {@link #putIfEqual} while they
78+ * resolve a chunk offset and access the backing file.
79+ * Multiple operations proceed concurrently.</li>
80+ * <li><b>Write lock</b> – held by {@link #updateValidRange} for
81+ * its entire duration. Since the parent implementation may
82+ * free chunks via {@link #closeChunk} and recycle their file
83+ * offsets into {@link #freeChunks}, the write lock ensures
84+ * that all in-flight readers and writers have completed before
85+ * any chunk is freed, and that no new reader or writer can
86+ * start while offsets are being zeroed and recycled.</li>
87+ * </ul>
88+ */
89+ final ReadWriteLock chunkRecycleLock = new ReentrantReadWriteLock ();
90+
6791 /**
6892 * A helper flag to make sure close() can be called multiple times.
6993 */
@@ -243,9 +267,17 @@ Path createTempFile(final String sourceFileName, final @NonNull Configuration co
243267 return tempDir .resolve (sourceFileName );
244268 }
245269
246- /** {@inheritDoc} */
270+ /**
271+ * {@inheritDoc}
272+ *
273+ * <p>Acquires the read lock to prevent the chunk offset from being
274+ * recycled by a concurrent {@link #closeChunk} while the file write
275+ * is in progress. Multiple {@code putToChunk} and {@code get} calls
276+ * proceed concurrently — only {@code closeChunk} takes the write lock.
277+ */
247278 @ Override
248279 protected synchronized void putToChunk (final Long chunk , final int subIndex , final long value ) {
280+ chunkRecycleLock .readLock ().lock ();
249281 try {
250282 final ByteBuffer buf = TEMP_LONG_BUFFER_THREAD_LOCAL .get ();
251283 final long offset = chunk + (long ) subIndex * Long .BYTES ;
@@ -255,32 +287,103 @@ protected synchronized void putToChunk(final Long chunk, final int subIndex, fin
255287 MerkleDbFileUtils .completelyWrite (currentFileChannel , buf , offset );
256288 } catch (final IOException e ) {
257289 throw new UncheckedIOException (e );
290+ } finally {
291+ chunkRecycleLock .readLock ().unlock ();
258292 }
259293 }
260294
261- /** {@inheritDoc} */
295+ /**
296+ * {@inheritDoc}
297+ *
298+ * <p>Acquires the read lock for the duration of the chunk-offset
299+ * lookup and the subsequent file read. This guarantees that no
300+ * {@link #closeChunk} can recycle the offset into {@link #freeChunks}
301+ * while this read is in progress. Multiple readers hold the read
302+ * lock concurrently without blocking each other.
303+ */
304+ @ Override
305+ public long get (final long index , final long defaultValue ) {
306+ if (index < 0 || index >= capacity ) {
307+ throw new IndexOutOfBoundsException (index );
308+ }
309+
310+ chunkRecycleLock .readLock ().lock ();
311+ try {
312+ if (index >= size .get ()) {
313+ return defaultValue ;
314+ }
315+ final int chunkIndex = toIntExact (index / longsPerChunk );
316+ final long subIndex = index % longsPerChunk ;
317+ final Long chunk = chunkList .get (chunkIndex );
318+ if (chunk == null ) {
319+ return defaultValue ;
320+ }
321+ final long presentValue = lookupInChunk (chunk , subIndex );
322+ return presentValue == IMPERMISSIBLE_VALUE ? defaultValue : presentValue ;
323+ } finally {
324+ chunkRecycleLock .readLock ().unlock ();
325+ }
326+ }
327+
328+ /**
329+ * {@inheritDoc}
330+ *
331+ * <p>Acquires the read lock for the same reason as
332+ * {@link #putToChunk}: the chunk offset must not be recycled between
333+ * the compare-read and the conditional write.
334+ */
262335 @ Override
263336 protected synchronized boolean putIfEqual (
264337 final Long chunk , final int subIndex , final long oldValue , long newValue ) {
265- final ByteBuffer buf = TEMP_LONG_BUFFER_THREAD_LOCAL .get ();
266- buf .position (0 );
338+ chunkRecycleLock .readLock ().lock ();
267339 try {
340+ final ByteBuffer buf = TEMP_LONG_BUFFER_THREAD_LOCAL .get ();
341+ buf .position (0 );
268342 final long offset = chunk + (long ) subIndex * Long .BYTES ;
269343 MerkleDbFileUtils .completelyRead (currentFileChannel , buf , offset );
270344 final long filesOldValue = buf .getLong (0 );
271345 if (filesOldValue == oldValue ) {
272- // write new value to file
273346 buf .putLong (0 , newValue );
274347 buf .position (0 );
275348 MerkleDbFileUtils .completelyWrite (currentFileChannel , buf , offset );
276349 return true ;
277350 }
278351 } catch (final IOException e ) {
279352 throw new UncheckedIOException (e );
353+ } finally {
354+ chunkRecycleLock .readLock ().unlock ();
280355 }
281356 return false ;
282357 }
283358
359+ /**
360+ * {@inheritDoc}
361+ *
362+ * <p>Acquires the write lock for the entire operation. The parent
363+ * implementation may free chunks (via {@link #closeChunk}) and
364+ * recycle their file offsets into {@link #freeChunks}. Holding the
365+ * write lock for the full duration guarantees that:
366+ * <ol>
367+ * <li>All in-flight readers and writers that may hold a reference
368+ * to a chunk offset have completed before any chunk is freed.</li>
369+ * <li>No new reader or writer can start while offsets are being
370+ * zeroed and recycled.</li>
371+ * </ol>
372+ *
373+ * <p>This is a rare operation, so holding the write lock for its
374+ * entire duration — including file I/O in {@code closeChunk} — is
375+ * acceptable.
376+ */
377+ @ Override
378+ public void updateValidRange (final long newMinValidIndex , final long newMaxValidIndex ) {
379+ chunkRecycleLock .writeLock ().lock ();
380+ try {
381+ super .updateValidRange (newMinValidIndex , newMaxValidIndex );
382+ } finally {
383+ chunkRecycleLock .writeLock ().unlock ();
384+ }
385+ }
386+
284387 /**
285388 * Calculate the offset in the chunk for the given index.
286389 * @param index the index to use
@@ -377,6 +480,7 @@ public void close() {
377480 // Already closed
378481 return ;
379482 }
483+ chunkRecycleLock .writeLock ().lock ();
380484 try {
381485 // flush
382486 if (currentFileChannel .isOpen ()) {
@@ -392,12 +496,29 @@ public void close() {
392496 Files .delete (tempDir );
393497 } catch (final IOException e ) {
394498 throw new UncheckedIOException (e );
499+ } finally {
500+ chunkRecycleLock .writeLock ().unlock ();
395501 }
396502 }
397503
398- /** {@inheritDoc} */
504+ /**
505+ * {@inheritDoc}
506+ *
507+ * <p>Zeros the chunk region in the backing file and recycles the
508+ * file offset into {@link #freeChunks} so that {@link #createChunk()}
509+ * can reuse it for a new index range.
510+ *
511+ * <p>When called as part of {@link #updateValidRange}, the
512+ * {@link #chunkRecycleLock} write lock is already held by the
513+ * caller, ensuring that no concurrent reader or writer can access
514+ * the backing file with this chunk's offset while it is being
515+ * zeroed and recycled.
516+ */
399517 @ Override
400518 protected void closeChunk (@ NonNull final Long chunk ) {
519+ // Zero out the chunk region in the backing file.
520+ // This can happen outside the lock – the chunk-list entry is
521+ // already null, so no NEW reader can reach this offset.
401522 final ByteBuffer transferBuffer = initOrGetTransferBuffer ();
402523 fillBufferWithZeroes (transferBuffer );
403524 try {
0 commit comments