Skip to content

Commit d3d52f4

Browse files
authored
Fix bug with async direct IO fetching that caused misreading of bytes (#136213)
1 parent 18a1a91 commit d3d52f4

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

server/src/main/java/org/elasticsearch/index/store/AsyncDirectIOIndexInput.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,7 @@ boolean readBytes(long pos, ByteBuffer slice, int delta) throws IOException {
523523
}
524524

525525
void clearSlotAndMaybeStartPending(int slot) {
526+
assert prefetchThreads.get(slot) != null && prefetchThreads.get(slot).isDone();
526527
prefetchThreads.set(slot, null);
527528
posToSlot.remove(prefetchPos[slot]);
528529
if (pendingPrefetches.isEmpty()) {
@@ -531,9 +532,22 @@ void clearSlotAndMaybeStartPending(int slot) {
531532
}
532533
final long req = pendingPrefetches.removeFirst();
533534
posToSlot.put(req, slot);
535+
prefetchPos[slot] = req;
534536
startPrefetch(req, slot);
535537
}
536538

539+
private boolean assertSlotsConsistent() {
540+
posToSlot.forEach((k, v) -> {
541+
if (prefetchThreads.get(v) == null) {
542+
throw new AssertionError("posToSlot inconsistent: slot " + v + " for pos " + k + " has no prefetch thread");
543+
}
544+
if (prefetchPos[v] != k) {
545+
throw new AssertionError("posToSlot inconsistent: slot " + v + " for pos " + k + " has prefetchPos " + prefetchPos[v]);
546+
}
547+
});
548+
return true;
549+
}
550+
537551
void startPrefetch(long pos, int slot) {
538552
Future<ByteBuffer> future = executor.submit(() -> {
539553
var prefetchBuffers = DirectIOPrefetcher.this.prefetchBuffers;
@@ -550,6 +564,7 @@ void startPrefetch(long pos, int slot) {
550564
return prefetchBuffer;
551565
});
552566
prefetchThreads.set(slot, future);
567+
assert assertSlotsConsistent();
553568
}
554569

555570
@Override

server/src/test/java/org/elasticsearch/index/store/AsyncDirectIODirectoryTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.nio.file.Path;
4545

4646
import static org.apache.lucene.store.FSDirectory.open;
47+
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
4748

4849
public class AsyncDirectIODirectoryTests extends BaseDirectoryTestCase {
4950

@@ -74,7 +75,7 @@ private static int getBlockSize(Path path) throws IOException {
7475

7576
@Override
7677
protected Directory getDirectory(Path path) throws IOException {
77-
return new FsDirectoryFactory.AlwaysDirectIODirectory(open(path), 8192, 8192, 32);
78+
return new FsDirectoryFactory.AlwaysDirectIODirectory(open(path), 8192, 8192, randomIntBetween(0, 32));
7879
}
7980

8081
public void testIndexWriteRead() throws IOException {

0 commit comments

Comments
 (0)