diff --git a/qa/vector/src/main/java/module-info.java b/qa/vector/src/main/java/module-info.java index 0bcb7bc98b651..8232c82f76c43 100644 --- a/qa/vector/src/main/java/module-info.java +++ b/qa/vector/src/main/java/module-info.java @@ -18,5 +18,6 @@ requires org.elasticsearch.logging; requires java.management; requires jdk.management; + requires org.apache.lucene.misc; requires org.elasticsearch.gpu; } diff --git a/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java b/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java index 1534448abee87..ad7654a3f428c 100644 --- a/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java +++ b/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java @@ -34,16 +34,11 @@ import org.apache.lucene.index.VectorSimilarityFunction; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.MMapDirectory; -import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.store.NativeFSLockFactory; import org.apache.lucene.util.PrintStreamInfoStream; import org.elasticsearch.common.io.Channels; -import org.elasticsearch.core.IOUtils; -import org.elasticsearch.index.store.LuceneFilesExtensions; -import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.FsDirectoryFactory; import java.io.IOException; import java.io.UncheckedIOException; @@ -233,7 +228,7 @@ public boolean isEnabled(String component) { static Directory getDirectory(Path indexPath) throws IOException { Directory dir = FSDirectory.open(indexPath); if (dir instanceof MMapDirectory mmapDir) { - return new HybridDirectory(mmapDir); + return new FsDirectoryFactory.HybridDirectory(NativeFSLockFactory.INSTANCE, mmapDir, 64); } return dir; } @@ -375,64 +370,4 @@ synchronized void next(byte[] dest) throws IOException { bytes.get(dest); } } - - // Copy of Elastic's HybridDirectory which extends NIOFSDirectory and uses MMapDirectory for certain files. - static final class HybridDirectory extends NIOFSDirectory { - private final MMapDirectory delegate; - - HybridDirectory(MMapDirectory delegate) throws IOException { - super(delegate.getDirectory(), NativeFSLockFactory.INSTANCE); - this.delegate = delegate; - } - - @Override - public IndexInput openInput(String name, IOContext context) throws IOException { - if (useDelegate(name, context)) { - // we need to do these checks on the outer directory since the inner doesn't know about pending deletes - ensureOpen(); - ensureCanRead(name); - // we switch the context here since mmap checks for the READONCE context by identity - context = context == Store.READONCE_CHECKSUM ? IOContext.READONCE : context; - // we only use the mmap to open inputs. Everything else is managed by the NIOFSDirectory otherwise - // we might run into trouble with files that are pendingDelete in one directory but still - // listed in listAll() from the other. We on the other hand don't want to list files from both dirs - // and intersect for perf reasons. - return delegate.openInput(name, context); - } else { - return super.openInput(name, context); - } - } - - @Override - public void close() throws IOException { - IOUtils.close(super::close, delegate); - } - - private static String getExtension(String name) { - // Unlike FileSwitchDirectory#getExtension, we treat `tmp` as a normal file extension, which can have its own rules for mmaping. - final int lastDotIndex = name.lastIndexOf('.'); - if (lastDotIndex == -1) { - return ""; - } else { - return name.substring(lastDotIndex + 1); - } - } - - static boolean useDelegate(String name, IOContext ioContext) { - if (ioContext == Store.READONCE_CHECKSUM) { - // If we're just reading the footer for the checksum then mmap() isn't really necessary, and it's desperately inefficient - // if pre-loading is enabled on this file. - return false; - } - - final LuceneFilesExtensions extension = LuceneFilesExtensions.fromExtension(getExtension(name)); - if (extension == null || extension.shouldMmap() == false) { - // Other files are either less performance-sensitive (e.g. stored field index, norms metadata) - // or are large and have a random access pattern and mmap leads to page cache trashing - // (e.g. stored fields and term vectors). - return false; - } - return true; - } - } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index d4fef4e9bb489..aa3dfd4b37b24 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -179,6 +179,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexModule.INDEX_RECOVERY_TYPE_SETTING, IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING, FsDirectoryFactory.INDEX_LOCK_FACTOR_SETTING, + FsDirectoryFactory.ASYNC_PREFETCH_LIMIT, EngineConfig.INDEX_CODEC_SETTING, IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS, IndexSettings.DEFAULT_PIPELINE, diff --git a/server/src/main/java/org/elasticsearch/index/store/AsyncDirectIOIndexInput.java b/server/src/main/java/org/elasticsearch/index/store/AsyncDirectIOIndexInput.java new file mode 100644 index 0000000000000..2bf041dfac23b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/store/AsyncDirectIOIndexInput.java @@ -0,0 +1,560 @@ +/* + * @notice + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modifications copyright (C) 2025 Elasticsearch B.V. + */ + +package org.elasticsearch.index.store; + +import com.carrotsearch.hppc.IntArrayDeque; +import com.carrotsearch.hppc.IntDeque; +import com.carrotsearch.hppc.LongArrayDeque; +import com.carrotsearch.hppc.LongDeque; + +import org.apache.lucene.store.IndexInput; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; + +import java.io.Closeable; +import java.io.EOFException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.OpenOption; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static java.nio.ByteOrder.LITTLE_ENDIAN; + +/** + * An implementation of {@link IndexInput} that uses Direct I/O to bypass OS cache and + * provides asynchronous prefetching of data. + */ +public class AsyncDirectIOIndexInput extends IndexInput { + + private static final Logger LOGGER = LogManager.getLogger(AsyncDirectIOIndexInput.class); + + /** + * Copied from Lucene + */ + static final OpenOption ExtendedOpenOption_DIRECT; // visible for test + + static { + OpenOption option; + try { + final Class clazz = Class.forName("com.sun.nio.file.ExtendedOpenOption").asSubclass(OpenOption.class); + option = Arrays.stream(clazz.getEnumConstants()).filter(e -> e.toString().equalsIgnoreCase("DIRECT")).findFirst().orElse(null); + } catch (Exception ex) { + option = null; + } + ExtendedOpenOption_DIRECT = option; + } + + static OpenOption getDirectOpenOption() { + if (ExtendedOpenOption_DIRECT == null) { + throw new UnsupportedOperationException( + "com.sun.nio.file.ExtendedOpenOption.DIRECT is not available in the current JDK version." + ); + } + return ExtendedOpenOption_DIRECT; + } + + @SuppressForbidden(reason = "requires FileChannel#read") + private static void readDirectChannel(FileChannel c, ByteBuffer bb, long p) throws IOException { + c.read(bb, p); + } + + private final DirectIOPrefetcher prefetcher; + private final ByteBuffer buffer; + private final FileChannel channel; + private final int blockSize; + private final long offset; + private final long length; + private final boolean isClosable; // clones and slices are not closable + private boolean isOpen; + private long filePos; + + /** + * Creates a new instance of AsyncDirectIOIndexInput for reading index input with direct IO bypassing + * OS buffer + * + * @param path the path to the file to read + * @param blockSize the block size to use for alignment. This must match the filesystem + * block size, otherwise an IOException will be thrown. + * @param bufferSize the size of the read buffer. This must be a multiple of blockSize. + * @param maxPrefetches the maximum number of concurrent prefetches to allow. + * This also determines the maximum number of total prefetches that can be + * outstanding. The total number of prefetches is maxPrefetches * 16. + * A larger number of maxPrefetches allows for more aggressive prefetching, + * but also uses more memory (maxPrefetches * bufferSize). + * @throws UnsupportedOperationException if the JDK does not support Direct I/O + * @throws IOException if the operating system or filesystem does not support Direct I/O + * or a sufficient equivalent. + */ + public AsyncDirectIOIndexInput(Path path, int blockSize, int bufferSize, int maxPrefetches) throws IOException { + super("DirectIOIndexInput(path=\"" + path + "\")"); + this.channel = FileChannel.open(path, StandardOpenOption.READ, getDirectOpenOption()); + this.blockSize = blockSize; + this.prefetcher = new DirectIOPrefetcher(blockSize, this.channel, bufferSize, maxPrefetches, maxPrefetches * 16); + this.buffer = allocateBuffer(bufferSize, blockSize); + this.isOpen = true; + this.isClosable = true; + this.length = channel.size(); + this.offset = 0L; + this.filePos = -bufferSize; + this.buffer.limit(0); + } + + // for clone/slice + private AsyncDirectIOIndexInput(String description, AsyncDirectIOIndexInput other, long offset, long length) throws IOException { + super(description); + Objects.checkFromIndexSize(offset, length, other.channel.size()); + final int bufferSize = other.buffer.capacity(); + this.buffer = allocateBuffer(bufferSize, other.blockSize); + this.blockSize = other.blockSize; + this.channel = other.channel; + this.prefetcher = new DirectIOPrefetcher( + this.blockSize, + this.channel, + bufferSize, + other.prefetcher.maxConcurrentPrefetches, + other.prefetcher.maxTotalPrefetches + ); + this.isOpen = true; + this.isClosable = false; + this.length = length; + this.offset = offset; + this.filePos = -bufferSize; + buffer.limit(0); + } + + private static ByteBuffer allocateBuffer(int bufferSize, int blockSize) { + return ByteBuffer.allocateDirect(bufferSize + blockSize - 1).alignedSlice(blockSize).order(LITTLE_ENDIAN); + } + + /** + * Prefetches the given range of bytes. The range will be aligned to blockSize and will + * be chopped up into chunks of buffer size. + * @param pos the position to prefetch from, must be non-negative and within file length + * @param length the length to prefetch, must be non-negative. This length may cause multiple + * prefetches to be issued, depending on the buffer size. + */ + @Override + public void prefetch(long pos, long length) throws IOException { + if (pos < 0 || length < 0 || pos + length > this.length) { + throw new IllegalArgumentException("Invalid prefetch range: pos=" + pos + ", length=" + length + ", fileLength=" + this.length); + } + // check if our current buffer already contains the requested range + final long absPos = pos + offset; + final long alignedPos = absPos - (absPos % blockSize); + // we only prefetch into a single buffer, even if length exceeds buffer size + // maybe we should improve this... + prefetcher.prefetch(alignedPos, length); + } + + @Override + public void close() throws IOException { + prefetcher.close(); + if (isOpen && isClosable) { + channel.close(); + isOpen = false; + } + } + + @Override + public long getFilePointer() { + long filePointer = filePos + buffer.position() - offset; + assert filePointer == -buffer.capacity() - offset || filePointer >= 0 + : "filePointer should either be initial value equal to negative buffer capacity, or larger than or equal to 0"; + return Math.max(filePointer, 0); + } + + @Override + public void seek(long pos) throws IOException { + if (pos != getFilePointer()) { + final long absolutePos = pos + offset; + if (absolutePos >= filePos && absolutePos < filePos + buffer.limit()) { + // the new position is within the existing buffer + buffer.position(Math.toIntExact(absolutePos - filePos)); + } else { + seekInternal(pos); // do an actual seek/read + } + } + assert pos == getFilePointer(); + } + + private void seekInternal(long pos) throws IOException { + final long absPos = pos + offset; + final long alignedPos = absPos - (absPos % blockSize); + filePos = alignedPos - buffer.capacity(); + + final int delta = (int) (absPos - alignedPos); + refill(delta, delta); + } + + private void refill(int bytesToRead) throws IOException { + assert filePos % blockSize == 0; + refill(bytesToRead, 0); + } + + private void refill(int bytesToRead, int delta) throws IOException { + long nextFilePos = filePos + buffer.capacity(); + // BaseDirectoryTestCase#testSeekPastEOF test for consecutive read past EOF, + // hence throwing EOFException early to maintain buffer state (position in particular) + if (nextFilePos > offset + length || ((offset + length) - nextFilePos < bytesToRead)) { + filePos = nextFilePos; + throw new EOFException("read past EOF: " + this); + } + buffer.clear(); + try { + if (prefetcher.readBytes(nextFilePos, buffer, delta)) { + // handle potentially differently aligned prefetch buffer + // this gets tricky as the prefetch buffer is always blockSize aligned + // but the prefetches might be aligned on an earlier block boundary + // so we need to adjust the filePos accordingly + long currentLogicalPos = nextFilePos + delta; + filePos = currentLogicalPos - buffer.position(); + return; + } + filePos = nextFilePos; + // read may return -1 here iff filePos == channel.size(), but that's ok as it just reaches + // EOF + // when filePos > channel.size(), an EOFException will be thrown from above + // we failed, log stacktrace to figure out why + assert filePos % blockSize == 0 : "filePos [" + filePos + "] must be aligned to block size [" + blockSize + "]"; + readDirectChannel(channel, buffer, filePos); + buffer.flip(); + buffer.position(delta); + } catch (IOException ioe) { + throw new IOException(ioe.getMessage() + ": " + this, ioe); + } + } + + @Override + public long length() { + return length; + } + + @Override + public byte readByte() throws IOException { + if (buffer.hasRemaining() == false) { + refill(1); + } + + return buffer.get(); + } + + @Override + public short readShort() throws IOException { + if (buffer.remaining() >= Short.BYTES) { + return buffer.getShort(); + } else { + return super.readShort(); + } + } + + @Override + public int readInt() throws IOException { + if (buffer.remaining() >= Integer.BYTES) { + return buffer.getInt(); + } else { + return super.readInt(); + } + } + + @Override + public long readLong() throws IOException { + if (buffer.remaining() >= Long.BYTES) { + return buffer.getLong(); + } else { + return super.readLong(); + } + } + + @Override + public void readBytes(byte[] dst, int offset, int len) throws IOException { + int toRead = len; + while (true) { + final int left = buffer.remaining(); + if (left < toRead) { + buffer.get(dst, offset, left); + toRead -= left; + offset += left; + refill(toRead); + } else { + buffer.get(dst, offset, toRead); + break; + } + } + } + + @Override + public void readInts(int[] dst, int offset, int len) throws IOException { + int remainingDst = len; + while (remainingDst > 0) { + int cnt = Math.min(buffer.remaining() / Integer.BYTES, remainingDst); + buffer.asIntBuffer().get(dst, offset + len - remainingDst, cnt); + buffer.position(buffer.position() + Integer.BYTES * cnt); + remainingDst -= cnt; + if (remainingDst > 0) { + if (buffer.hasRemaining()) { + dst[offset + len - remainingDst] = readInt(); + --remainingDst; + } else { + refill(remainingDst * Integer.BYTES); + } + } + } + } + + @Override + public void readFloats(float[] dst, int offset, int len) throws IOException { + int remainingDst = len; + while (remainingDst > 0) { + int cnt = Math.min(buffer.remaining() / Float.BYTES, remainingDst); + buffer.asFloatBuffer().get(dst, offset + len - remainingDst, cnt); + buffer.position(buffer.position() + Float.BYTES * cnt); + remainingDst -= cnt; + if (remainingDst > 0) { + if (buffer.hasRemaining()) { + dst[offset + len - remainingDst] = Float.intBitsToFloat(readInt()); + --remainingDst; + } else { + refill(remainingDst * Float.BYTES); + } + } + } + } + + @Override + public void readLongs(long[] dst, int offset, int len) throws IOException { + int remainingDst = len; + while (remainingDst > 0) { + int cnt = Math.min(buffer.remaining() / Long.BYTES, remainingDst); + buffer.asLongBuffer().get(dst, offset + len - remainingDst, cnt); + buffer.position(buffer.position() + Long.BYTES * cnt); + remainingDst -= cnt; + if (remainingDst > 0) { + if (buffer.hasRemaining()) { + dst[offset + len - remainingDst] = readLong(); + --remainingDst; + } else { + refill(remainingDst * Long.BYTES); + } + } + } + } + + @Override + public AsyncDirectIOIndexInput clone() { + try { + var clone = new AsyncDirectIOIndexInput("clone:" + this, this, offset, length); + // TODO figure out how to make this async + // https://github.com/elastic/elasticsearch/issues/136046 + clone.seekInternal(getFilePointer()); + return clone; + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + if ((length | offset) < 0 || length > this.length - offset) { + throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: " + this); + } + var slice = new AsyncDirectIOIndexInput(sliceDescription, this, this.offset + offset, length); + // TODO figure out how to make this async + // https://github.com/elastic/elasticsearch/issues/136046 + slice.seekInternal(0L); + return slice; + } + + /** + * A simple prefetcher that uses virtual threads to prefetch data into direct byte buffers. + */ + private static class DirectIOPrefetcher implements Closeable { + private final int maxConcurrentPrefetches; + private final int maxTotalPrefetches; + private final FileChannel channel; + private final int blockSize; + private final long[] prefetchPos; + // statically initialized to maxConcurrentPrefetches + private final List> prefetchThreads; + private final TreeMap posToSlot; + private final IntDeque slots; + private final ByteBuffer[] prefetchBuffers; + private final int prefetchBytesSize; + private final LongDeque pendingPrefetches = new LongArrayDeque(); + private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + + DirectIOPrefetcher(int blockSize, FileChannel channel, int prefetchBytesSize, int maxConcurrentPrefetches, int maxTotalPrefetches) { + this.blockSize = blockSize; + this.channel = channel; + this.maxConcurrentPrefetches = maxConcurrentPrefetches; + this.prefetchPos = new long[maxConcurrentPrefetches]; + this.prefetchThreads = new ArrayList<>(maxConcurrentPrefetches); + this.slots = new IntArrayDeque(maxConcurrentPrefetches); + for (int i = 0; i < maxConcurrentPrefetches; i++) { + prefetchThreads.add(null); + slots.addLast(i); + } + this.posToSlot = new TreeMap<>(); + this.prefetchBuffers = new ByteBuffer[maxConcurrentPrefetches]; + this.prefetchBytesSize = prefetchBytesSize; + this.maxTotalPrefetches = maxTotalPrefetches; + } + + /** + * Initiate prefetch of the given range. The range will be aligned to blockSize and + * chopped up into chunks of prefetchBytesSize. + * If there are not enough slots available, the prefetch request will be queued + * until a slot becomes available. This throttling may occur if the number of + * concurrent prefetches is exceeded, or if there is significant IO pressure. + * @param pos the position to prefetch from, must be non-negative and within file length + * @param length the length to prefetch, must be non-negative. + */ + void prefetch(long pos, long length) { + // first determine how many slots we need given the length + int numSlots = (int) Math.min( + (length + prefetchBytesSize - 1) / prefetchBytesSize, + maxTotalPrefetches - (this.posToSlot.size() + this.pendingPrefetches.size()) + ); + while (numSlots > 0 && (this.posToSlot.size() + this.pendingPrefetches.size()) < maxTotalPrefetches) { + final int slot; + Integer existingSlot = this.posToSlot.get(pos); + if (existingSlot != null && prefetchThreads.get(existingSlot) != null) { + // already being prefetched and hasn't been consumed. + // return early + return; + } + if (this.posToSlot.size() < maxConcurrentPrefetches && slots.isEmpty() == false) { + slot = slots.removeFirst(); + posToSlot.put(pos, slot); + prefetchPos[slot] = pos; + } else { + slot = -1; + LOGGER.debug("queueing prefetch of pos [{}] with length [{}], waiting for open slot", pos, length); + pendingPrefetches.addLast(pos); + } + if (slot != -1) { + startPrefetch(pos, slot); + } + pos += prefetchBytesSize; + numSlots--; + } + + } + + /** + * Try to read the requested bytes from an already prefetched buffer. + * If the requested bytes are not in a prefetched buffer, return false. + * @param pos the absolute position to read from + * @param slice the buffer to read into, must be pre-sized to the required length + * @param delta an offset into the slice buffer to start writing at + * @return true if the requested bytes were read from a prefetched buffer, false otherwise + * @throws IOException if an I/O error occurs + */ + boolean readBytes(long pos, ByteBuffer slice, int delta) throws IOException { + final var entry = this.posToSlot.floorEntry(pos + delta); + if (entry == null) { + return false; + } + final int slot = entry.getValue(); + final long prefetchedPos = entry.getKey(); + // determine if the requested pos is within the prefetched range + if (pos + delta >= prefetchedPos + prefetchBytesSize) { + return false; + } + final Future thread = prefetchThreads.get(slot); + ByteBuffer prefetchBuffer = null; + try { + prefetchBuffer = thread == null ? null : thread.get(); + } catch (ExecutionException e) { + IOException ioException = (IOException) ExceptionsHelper.unwrap(e, IOException.class); + if (ioException != null) { + throw ioException; + } + throw new IOException(e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + if (prefetchBuffer == null) { + clearSlotAndMaybeStartPending(slot); + } + } + if (prefetchBuffer == null) { + return false; + } + + // our buffer sizes are uniform, and match the required buffer size, however, the position here + // might be before the requested pos, so offset it + slice.put(prefetchBuffer); + slice.flip(); + slice.position(Math.toIntExact(pos - prefetchedPos) + delta); + clearSlotAndMaybeStartPending(slot); + return true; + } + + void clearSlotAndMaybeStartPending(int slot) { + prefetchThreads.set(slot, null); + posToSlot.remove(prefetchPos[slot]); + if (pendingPrefetches.isEmpty()) { + slots.addLast(slot); + return; + } + final long req = pendingPrefetches.removeFirst(); + posToSlot.put(req, slot); + startPrefetch(req, slot); + } + + void startPrefetch(long pos, int slot) { + Future future = executor.submit(() -> { + var prefetchBuffers = DirectIOPrefetcher.this.prefetchBuffers; + ByteBuffer prefetchBuffer = prefetchBuffers[slot]; + if (prefetchBuffer == null) { + prefetchBuffer = allocateBuffer(prefetchBytesSize, blockSize); + prefetchBuffers[slot] = prefetchBuffer; + } else { + prefetchBuffer.clear(); + } + assert pos % blockSize == 0 : "prefetch pos [" + pos + "] must be aligned to block size [" + blockSize + "]"; + readDirectChannel(channel, prefetchBuffer, pos); + prefetchBuffer.flip(); + return prefetchBuffer; + }); + prefetchThreads.set(slot, future); + } + + @Override + public void close() throws IOException { + executor.shutdownNow(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java b/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java index 226e12a4c5119..f1ddb871a7ae6 100644 --- a/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java +++ b/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java @@ -25,8 +25,8 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; -import org.elasticsearch.common.util.FeatureFlag; import org.elasticsearch.core.IOUtils; +import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.StandardIOBehaviorHint; @@ -66,8 +66,6 @@ public class FsDirectoryFactory implements IndexStorePlugin.DirectoryFactory { sharedArenaMaxPermits = value; // default to 1 } - private static final FeatureFlag MADV_RANDOM_FEATURE_FLAG = new FeatureFlag("madv_random"); - public static final Setting INDEX_LOCK_FACTOR_SETTING = new Setting<>("index.store.fs.fs_lock", "native", (s) -> { return switch (s) { case "native" -> NativeFSLockFactory.INSTANCE; @@ -76,6 +74,17 @@ public class FsDirectoryFactory implements IndexStorePlugin.DirectoryFactory { }; // can we set on both - node and index level, some nodes might be running on NFS so they might need simple rather than native }, Property.IndexScope, Property.NodeScope); + public static final Setting ASYNC_PREFETCH_LIMIT = Setting.intSetting( + "index.store.fs.directio_async_prefetch_limit", + 64, + // 0 disables async prefetching + 0, + // creates 256 * 8k buffers, which is 2MB + 256, + Property.IndexScope, + Property.NodeScope + ); + @Override public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throws IOException { final Path location = path.resolveIndex(); @@ -85,6 +94,7 @@ public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throw } protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException { + final int asyncPrefetchLimit = indexSettings.getValue(ASYNC_PREFETCH_LIMIT); final String storeType = indexSettings.getSettings() .get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey()); IndexModule.Type type; @@ -100,7 +110,7 @@ protected Directory newFSDirectory(Path location, LockFactory lockFactory, Index final FSDirectory primaryDirectory = FSDirectory.open(location, lockFactory); if (primaryDirectory instanceof MMapDirectory mMapDirectory) { mMapDirectory = adjustSharedArenaGrouping(mMapDirectory); - return new HybridDirectory(lockFactory, setMMapFunctions(mMapDirectory, preLoadExtensions)); + return new HybridDirectory(lockFactory, setMMapFunctions(mMapDirectory, preLoadExtensions), asyncPrefetchLimit); } else { return primaryDirectory; } @@ -159,23 +169,23 @@ public static boolean isHybridFs(Directory directory) { return unwrap instanceof HybridDirectory; } - static final class HybridDirectory extends NIOFSDirectory { + @SuppressForbidden(reason = "requires Files.getFileStore for blockSize") + private static int getBlockSize(Path path) throws IOException { + return Math.toIntExact(Files.getFileStore(path).getBlockSize()); + } + + public static final class HybridDirectory extends NIOFSDirectory { private final MMapDirectory delegate; private final DirectIODirectory directIODelegate; - HybridDirectory(LockFactory lockFactory, MMapDirectory delegate) throws IOException { + public HybridDirectory(LockFactory lockFactory, MMapDirectory delegate, int asyncPrefetchLimit) throws IOException { super(delegate.getDirectory(), lockFactory); this.delegate = delegate; DirectIODirectory directIO; try { // use 8kB buffer (two pages) to guarantee it can load all of an un-page-aligned 1024-dim float vector - directIO = new DirectIODirectory(delegate, 8192, DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT) { - @Override - protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) { - return true; - } - }; + directIO = new AlwaysDirectIODirectory(delegate, 8192, DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT, asyncPrefetchLimit); } catch (Exception e) { // directio not supported Log.warn("Could not initialize DirectIO access", e); @@ -293,4 +303,30 @@ MMapDirectory getDelegate() { return delegate; } } + + static final class AlwaysDirectIODirectory extends DirectIODirectory { + private final int blockSize; + private final int asyncPrefetchLimit; + + AlwaysDirectIODirectory(FSDirectory delegate, int mergeBufferSize, long minBytesDirect, int asyncPrefetchLimit) throws IOException { + super(delegate, mergeBufferSize, minBytesDirect); + blockSize = getBlockSize(delegate.getDirectory()); + this.asyncPrefetchLimit = asyncPrefetchLimit; + } + + @Override + protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) { + return true; + } + + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + ensureOpen(); + if (asyncPrefetchLimit > 0) { + return new AsyncDirectIOIndexInput(getDirectory().resolve(name), blockSize, 8192, asyncPrefetchLimit); + } else { + return in.openInput(name, context); + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/store/AsyncDirectIODirectoryTests.java b/server/src/test/java/org/elasticsearch/index/store/AsyncDirectIODirectoryTests.java new file mode 100644 index 0000000000000..01165a98017af --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/store/AsyncDirectIODirectoryTests.java @@ -0,0 +1,193 @@ +/* + * @notice + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modifications copyright (C) 2025 Elasticsearch B.V. + */ +package org.elasticsearch.index.store; + +import com.carrotsearch.randomizedtesting.RandomizedTest; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.PhraseQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.tests.index.RandomIndexWriter; +import org.apache.lucene.tests.store.BaseDirectoryTestCase; +import org.elasticsearch.common.logging.LogConfigurator; +import org.elasticsearch.core.SuppressForbidden; +import org.junit.BeforeClass; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.apache.lucene.store.FSDirectory.open; + +public class AsyncDirectIODirectoryTests extends BaseDirectoryTestCase { + + static { + LogConfigurator.loadLog4jPlugins(); + LogConfigurator.configureESLogging(); // native access requires logging to be initialized + } + + @BeforeClass + public static void checkSupported() throws IOException { + assumeTrue( + "This test required a JDK version that has support for ExtendedOpenOption.DIRECT", + AsyncDirectIOIndexInput.ExtendedOpenOption_DIRECT != null + ); + // jdk supports it, let's check that the filesystem does too + Path path = createTempDir("directIOProbe"); + try (Directory dir = open(path); IndexOutput out = dir.createOutput("out", IOContext.DEFAULT)) { + out.writeString("test"); + } catch (IOException e) { + assumeNoException("test requires filesystem that supports Direct IO", e); + } + } + + @SuppressForbidden(reason = "requires Files.getFileStore") + private static int getBlockSize(Path path) throws IOException { + return Math.toIntExact(Files.getFileStore(path).getBlockSize()); + } + + @Override + protected Directory getDirectory(Path path) throws IOException { + return new FsDirectoryFactory.AlwaysDirectIODirectory(open(path), 8192, 8192, 32); + } + + public void testIndexWriteRead() throws IOException { + try (Directory dir = getDirectory(createTempDir("testDirectIODirectory"))) { + try (RandomIndexWriter iw = new RandomIndexWriter(random(), dir)) { + Document doc = new Document(); + Field field = newField("field", "foo bar", TextField.TYPE_STORED); + doc.add(field); + + iw.addDocument(doc); + iw.commit(); + } + + try (IndexReader ir = DirectoryReader.open(dir)) { + IndexSearcher s = newSearcher(ir); + assertEquals(1, s.count(new PhraseQuery("field", "foo", "bar"))); + } + } + } + + public void testIllegalEOFWithFileSizeMultipleOfBlockSize() throws Exception { + Path path = createTempDir("testIllegalEOF"); + final int fileSize = getBlockSize(path) * 2; + + try (Directory dir = getDirectory(path)) { + IndexOutput o = dir.createOutput("out", newIOContext(random())); + byte[] b = new byte[fileSize]; + o.writeBytes(b, 0, fileSize); + o.close(); + try (IndexInput i = dir.openInput("out", newIOContext(random()))) { + i.seek(fileSize); + + // Seeking past EOF should always throw EOFException + expectThrows(EOFException.class, () -> i.seek(fileSize + RandomizedTest.randomIntBetween(1, 2048))); + + // Reading immediately after seeking past EOF should throw EOFException + expectThrows(EOFException.class, () -> i.readByte()); + } + } + } + + public void testReadPastEOFShouldThrowEOFExceptionWithEmptyFile() throws Exception { + // fileSize needs to be 0 to test this condition. Do not randomize. + final int fileSize = 0; + try (Directory dir = getDirectory(createTempDir("testReadPastEOF"))) { + try (IndexOutput o = dir.createOutput("out", newIOContext(random()))) { + o.writeBytes(new byte[fileSize], 0, fileSize); + } + + try (IndexInput i = dir.openInput("out", newIOContext(random()))) { + i.seek(fileSize); + expectThrows(EOFException.class, () -> i.readByte()); + expectThrows(EOFException.class, () -> i.readBytes(new byte[1], 0, 1)); + } + + try (IndexInput i = dir.openInput("out", newIOContext(random()))) { + expectThrows(EOFException.class, () -> i.seek(fileSize + RandomizedTest.randomIntBetween(1, 2048))); + expectThrows(EOFException.class, () -> i.readByte()); + expectThrows(EOFException.class, () -> i.readBytes(new byte[1], 0, 1)); + } + + try (IndexInput i = dir.openInput("out", newIOContext(random()))) { + expectThrows(EOFException.class, () -> i.readByte()); + } + + try (IndexInput i = dir.openInput("out", newIOContext(random()))) { + expectThrows(EOFException.class, () -> i.readBytes(new byte[1], 0, 1)); + } + } + } + + public void testSeekPastEOFAndRead() throws Exception { + try (Directory dir = getDirectory(createTempDir("testSeekPastEOF"))) { + final int len = random().nextInt(2048); + + try (IndexOutput o = dir.createOutput("out", newIOContext(random()))) { + byte[] b = new byte[len]; + o.writeBytes(b, 0, len); + } + + try (IndexInput i = dir.openInput("out", newIOContext(random()))) { + // Seeking past EOF should always throw EOFException + expectThrows(EOFException.class, () -> i.seek(len + RandomizedTest.randomIntBetween(1, 2048))); + + // Reading immediately after seeking past EOF should throw EOFException + expectThrows(EOFException.class, () -> i.readByte()); + } + } + } + + // Ping-pong seeks should be really fast, since the position should be within buffer. + // The test should complete within sub-second times, not minutes. + public void testSeekSmall() throws IOException { + Path tmpDir = createTempDir("testSeekSmall"); + try (Directory dir = getDirectory(tmpDir)) { + int len = atLeast(100); + try (IndexOutput o = dir.createOutput("out", newIOContext(random()))) { + byte[] b = new byte[len]; + for (int i = 0; i < len; i++) { + b[i] = (byte) i; + } + o.writeBytes(b, 0, len); + } + try (IndexInput in = dir.openInput("out", newIOContext(random()))) { + for (int i = 0; i < 100_000; i++) { + in.seek(2); + assertEquals(2, in.readByte()); + in.seek(1); + assertEquals(1, in.readByte()); + in.seek(0); + assertEquals(0, in.readByte()); + } + } + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/store/AsyncDirectIOIndexInputTests.java b/server/src/test/java/org/elasticsearch/index/store/AsyncDirectIOIndexInputTests.java new file mode 100644 index 0000000000000..a9beebf16e37e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/store/AsyncDirectIOIndexInputTests.java @@ -0,0 +1,172 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.store; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.NIOFSDirectory; +import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +public class AsyncDirectIOIndexInputTests extends ESTestCase { + + @SuppressForbidden(reason = "requires Files.getFileStore") + private static int getBlockSize(Path path) throws IOException { + return Math.toIntExact(Files.getFileStore(path).getBlockSize()); + } + + private static final int BASE_BUFFER_SIZE = 8192; + + public void testPrefetchEdgeCase() throws IOException { + byte[] bytes = new byte[BASE_BUFFER_SIZE * 32 + randomIntBetween(1, BASE_BUFFER_SIZE)]; + int offset = 84; + float[] vectorActual = new float[768]; + int[] toSeek = new int[] { 1, 2, 3, 5, 6, 9, 11, 14, 15, 16, 18, 23, 24, 25, 26, 29, 30, 31 }; + int byteSize = vectorActual.length * Float.BYTES; + Path path = createTempDir("testDirectIODirectory"); + int blockSize = getBlockSize(path); + random().nextBytes(bytes); + try (Directory dir = new NIOFSDirectory(path)) { + try (var output = dir.createOutput("test", org.apache.lucene.store.IOContext.DEFAULT)) { + output.writeBytes(bytes, bytes.length); + } + try ( + AsyncDirectIOIndexInput actualInput = new AsyncDirectIOIndexInput( + path.resolve("test"), + blockSize, + BASE_BUFFER_SIZE, + toSeek.length + 1 + ); + ) { + IndexInput actualSlice = actualInput.slice("vectors", offset, bytes.length - offset); + for (int seek : toSeek) { + actualSlice.prefetch((long) seek * byteSize, byteSize); + } + for (int seek : toSeek) { + actualSlice.seek((long) seek * byteSize); + actualSlice.readFloats(vectorActual, 0, vectorActual.length); + assertEquals("mismatch at seek: " + seek, (seek + 1) * byteSize, actualSlice.getFilePointer()); + } + } + } + } + + public void testLargePrefetch() throws IOException { + byte[] bytes = new byte[BASE_BUFFER_SIZE * 10 + randomIntBetween(1, BASE_BUFFER_SIZE)]; + int offset = randomIntBetween(1, BASE_BUFFER_SIZE); + int numBytes = randomIntBetween(BASE_BUFFER_SIZE + 1, BASE_BUFFER_SIZE * 8); + random().nextBytes(bytes); + byte[] trueBytes = new byte[numBytes]; + System.arraycopy(bytes, offset, trueBytes, 0, numBytes); + Path path = createTempDir("testDirectIODirectory"); + int blockSize = getBlockSize(path); + try (Directory dir = new NIOFSDirectory(path)) { + try (var output = dir.createOutput("test", org.apache.lucene.store.IOContext.DEFAULT)) { + output.writeBytes(bytes, bytes.length); + } + try ( + AsyncDirectIOIndexInput actualInput = new AsyncDirectIOIndexInput( + path.resolve("test"), + blockSize, + blockSize, + randomIntBetween(2, 16) + ); + ) { + byte[] actualBytes = new byte[numBytes]; + // prefetch everything at once + actualInput.prefetch(offset, numBytes); + actualInput.seek(offset); + actualInput.readBytes(actualBytes, 0, actualBytes.length); + assertArrayEquals(trueBytes, actualBytes); + } + } + } + + public void testWriteThenReadBytesConsistency() throws IOException { + byte[] bytes = new byte[BASE_BUFFER_SIZE * 8 + randomIntBetween(1, BASE_BUFFER_SIZE)]; + random().nextBytes(bytes); + Path path = createTempDir("testDirectIODirectory"); + int blockSize = getBlockSize(path); + int bufferSize = 1024 * 4; + List seeks = new ArrayList<>(); + int lastSeek = 0; + seeks.add(0); + while (lastSeek < bytes.length) { + int nextSeek = randomIntBetween(lastSeek, Math.min(lastSeek + bufferSize, bytes.length - 1)); + seeks.add(nextSeek); + lastSeek = nextSeek + 1; + } + try (Directory dir = new NIOFSDirectory(path)) { + try (var output = dir.createOutput("test", org.apache.lucene.store.IOContext.DEFAULT)) { + output.writeBytes(bytes, bytes.length); + } + try ( + AsyncDirectIOIndexInput actualInput = new AsyncDirectIOIndexInput( + path.resolve("test"), + blockSize, + bufferSize, + seeks.size() + ); + IndexInput expectedInput = dir.openInput("test", org.apache.lucene.store.IOContext.DEFAULT) + ) { + assert expectedInput instanceof AsyncDirectIOIndexInput == false; + byte[] actualBytes = new byte[bufferSize / 2]; + byte[] expectedBytes = new byte[bufferSize / 2]; + int prevSeek = 0; + for (int j = 1; j < seeks.size(); j++) { + actualInput.seek(prevSeek); + expectedInput.seek(prevSeek); + int seek = seeks.get(j); + int toRead = Math.min(actualBytes.length, bytes.length - prevSeek); + expectedInput.readBytes(expectedBytes, 0, toRead); + actualInput.readBytes(actualBytes, 0, toRead); + prevSeek = seek; + assertArrayEquals(expectedBytes, actualBytes); + } + } + + try ( + AsyncDirectIOIndexInput actualPretchingInput = new AsyncDirectIOIndexInput( + path.resolve("test"), + blockSize, + bufferSize, + seeks.size() + ); + IndexInput expectedInput = dir.openInput("test", org.apache.lucene.store.IOContext.DEFAULT) + ) { + assert expectedInput instanceof AsyncDirectIOIndexInput == false; + byte[] actualBytes = new byte[bufferSize / 2]; + byte[] expectedBytes = new byte[bufferSize / 2]; + for (int seek : seeks) { + // always prefetch just a page + actualPretchingInput.prefetch(seek, 1); + } + int prevSeek = 0; + for (int j = 1; j < seeks.size(); j++) { + actualPretchingInput.seek(prevSeek); + expectedInput.seek(prevSeek); + int seek = seeks.get(j); + int toRead = Math.min(actualBytes.length, bytes.length - prevSeek); + actualPretchingInput.readBytes(actualBytes, 0, toRead); + expectedInput.readBytes(expectedBytes, 0, toRead); + prevSeek = seek; + assertArrayEquals(expectedBytes, actualBytes); + } + } + } + } + +}