Skip to content

Conversation

benwtrent
Copy link
Member

@benwtrent benwtrent commented Sep 16, 2025

One significant cost of DirectIO is simply waiting for bytes to be read in a path dedicate for compute.

This change adds "prefetch" capabilities to DirectIO by allowing to prefetch particular file positions. For simplicity, I have it always prefetch a DirectIO page. Initially I did a bunch of work to allow prefetching multiple pages (e.g. more than 8192 bytes), but this greatly complicated the implementation. I think this can be added as a follow up.

Here are some benchmarks for vectors. Note, the recall difference indicates I am doing something wrong right now. I am thinking I have a couple off-by-one errors and I am still investigating.

Opening as a draft until I can figure out this weird bug (and of course, remove all my extraneous changes used for testing this thing)...This is labeled as 9.2, but I would be very surprise if it actually lands there.

This PR:

index_name                      index_type  visit_percentage(%)  latency(ms)  net_cpu_time(ms)  avg_cpu_count     QPS  recall   visited  filter_selectivity
------------------------------  ----------  -------------------  -----------  ----------------  -------------  ------  ------  --------  ------------------
cohere-wikipedia-docs-768d.vec         ivf                 1.00         5.57              0.00           0.00  179.53    0.92  87397.37                1.00

Baseline DirectIO:

index_name                      index_type  visit_percentage(%)  latency(ms)  net_cpu_time(ms)  avg_cpu_count     QPS  recall   visited  filter_selectivity
------------------------------  ----------  -------------------  -----------  ----------------  -------------  ------  ------  --------  ------------------
cohere-wikipedia-docs-768d.vec         ivf                 1.00         8.12              0.00           0.00  123.15    0.94  87397.37                1.00

Baseline MMAP (when many floating points can still just reside in memory):

index_name                      index_type  visit_percentage(%)  latency(ms)  net_cpu_time(ms)  avg_cpu_count     QPS  recall   visited  filter_selectivity
------------------------------  ----------  -------------------  -----------  ----------------  -------------  ------  ------  --------  ------------------
cohere-wikipedia-docs-768d.vec         ivf                 1.00         3.58              0.00           0.00  279.33    0.94  87397.37                1.00

@elasticsearchmachine
Copy link
Collaborator

Hi @benwtrent, I've created a changelog YAML for you.


import static java.nio.ByteOrder.LITTLE_ENDIAN;

public class AsyncDirectIOIndexInput extends IndexInput {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change.

@benwtrent
Copy link
Member Author

Hey reviewers, I am marking this "ready to review", but obviously, I think pieces of it need to be split out.

  1. Once we merge Lucene, I am gonna work on addressing reranking and bulk rescoring. We can likely replace pieces of this with the next lucene version.
  2. We need to determine if we want to make this async DirectIO thing more general. Right now, its only prefetching things you ask for directly. Maybe it should prefetch more eagerly? Prefetch more than a single buffer size?

Basically, the main focus of the review should be the AsyncDirectIOIndexInput. The rest is structure I needed to actually put it through its paces and will be moved out when I can into a separate PR.

@benwtrent benwtrent marked this pull request as ready for review September 18, 2025 21:27
@elasticsearchmachine elasticsearchmachine added Team:Search Foundations Meta label for the Search Foundations team in Elasticsearch Team:Search Relevance Meta label for the Search Relevance team in Elasticsearch labels Sep 18, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-search-relevance (Team:Search Relevance)

@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-search-foundations (Team:Search Foundations)

@benwtrent benwtrent changed the title [DRAFT] Adding asynchronous fetching for DirectIO directory Adding asynchronous fetching for DirectIO directory Sep 19, 2025
public AsyncDirectIOIndexInput clone() {
try {
var clone = new AsyncDirectIOIndexInput("clone:" + this, this, offset, length);
// TODO figure out how to make this async
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a seek expected to take a long time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thecoop yep, I have noticed that this has a measurable impact every single time we construct this thing because its a fully synchronous seek.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gosh. Async seek would be something like putting the seek on a virtual thread, and blocking all calls to the cloned instance until the seek is complete. Bit of a faff really.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Async seek would be something like putting the seek on a virtual thread, and blocking all calls to the cloned instance until the seek is complete.

Not really, async seek would just place one in the queue and kick it off. Then if the bytes are ever read (which, for vectors, doesn't really happen as we don't read from slice(pos=0)), it will join the async call and let it read.

The downside is that it will take a async buffer slot.

What it does now is block the calling thread until the seek and read is complete, which gets expensive as we very commonly request a new FloatVectorValues just to see vector count or pass around the object for other validations (never actually reading vector values at all).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we then make the seek lazy, only doing it if the values is actually going to be used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we then make the seek lazy, only doing it if the values is actually going to be used?

@thecoop I have tried, and keep running into very strange edge cases. I will create an issue to track the TODO so it can be tackled later on.

@benwtrent benwtrent requested a review from thecoop September 26, 2025 13:16
if (buildParams.getRuntimeJavaVersion().map { it.majorVersion.toInteger() }.get() >= 21) {
jvmArgs '--add-modules=jdk.incubator.vector', '--enable-native-access=ALL-UNNAMED'
}
if (System.getenv("DO_ASYNC_PROFILING") != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed in the final version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it useful to have a header to just apply my own async profiler, can we keep it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, looking at: #136021

that makes this work nicer, so I will remove this :).

@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
if (useDirectIO(name, context, OptionalLong.of(fileLength(name)))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useDirectIO is overridden to always return true in this class

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize that, but I wanted to make sure we handle correct logic, unrelated to underlying API constants. (maybe we have a more "custom" directIO impl that only does direct IO based on file context,etc.)

public IndexInput openInput(String name, IOContext context) throws IOException {
int blockSize = getBlockSize(path);
ensureOpen();
if (useDirectIO(name, context, OptionalLong.of(fileLength(name)))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, this will always be true here.

(could we just use the implementation in FsDirectoryFactory somehow?)


// Reading immediately after seeking past EOF should throw EOFException
expectThrows(EOFException.class, () -> i.readByte());
i.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Put IndexInput i in a try block?

}

// Ping-pong seeks should be really fast, since the position should be within buffer.
// The test should complete within sub-second times, not minutes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't check the time. Is it worth adding a stopwatch check for a long time, say 1 minute?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thecoop honestly, I am not sure...that would likely make it flaky, I am inheriting these tests from Lucene.

int offset = 84;
float[] vectorActual = new float[768];
int[] toSeek = new int[] { 1, 2, 3, 5, 6, 9, 11, 14, 15, 16, 18, 23, 24, 25, 26, 29, 30, 31 };
int byteSize = 768 * 4;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int byteSize = 768 * 4;
int byteSize = vectorActual.length * Float.BYTES;

import java.util.ArrayList;
import java.util.List;

public class AsyncDirectIOIndexInputTests extends ESTestCase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's quite a few magic numbers in this class. Could they be consolidated and some comments added?

buffer.flip();
buffer.position(delta);
} catch (IOException ioe) {
throw new IOException(ioe.getMessage() + ": " + this, ioe);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to rethrow as an IOException? Note that this hides any thrown subclasses of IOException

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure, this is copied from Lucene. I would hope once Lucene gets async directIO, we can remove this class and rely on Lucene's.

*/
void prefetch(long pos, long length) {
// first determine how many slots we need given the length
int numSlots = (int) Math.min((length + prefetchBytesSize - 1) / prefetchBytesSize, Integer.MAX_VALUE - 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want a max number of slots as Integer.MAX_VALUE - 1? Wouldn't that cause significant numbers having that number?

void prefetch(long pos, long length) {
// first determine how many slots we need given the length
int numSlots = (int) Math.min((length + prefetchBytesSize - 1) / prefetchBytesSize, Integer.MAX_VALUE - 1);
while (numSlots > 0 && (this.posToSlot.size() + this.pendingPrefetches.size()) < maxTotalPrefetches) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this doesn't do any prefetching if we've got max in-progress then. That's probably the right thing to do, but it may be worth making a note that in high-pressure situations (IO taking a long time, exceptions thrown, large blocking prefetches, whatever), prefetching wont be doing anything. Is it worth a debug log in this case to help us diagnose IO problems around this in the future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thecoop let me add a logger! yes

Copy link
Member

@thecoop thecoop left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy with the overall structure of this. I've commented on a few details that could be worked on here, or separately later on

@benwtrent benwtrent requested a review from a team as a code owner October 6, 2025 18:25
}; // can we set on both - node and index level, some nodes might be running on NFS so they might need simple rather than native
}, Property.IndexScope, Property.NodeScope);

public static final Setting<Integer> ASYNC_PREFETCH_LIMIT = Setting.intSetting(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thecoop I made this change as I figured we want a way to shut it off or increase the limit. 64 is pretty conservative with only 8k buffers, but it seems better to be safer than not.

I am also not sure we actually want to document this setting. Ideally, its never touched.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>non-issue :Search Foundations/Search Catch all for Search Foundations :Search Relevance/Vectors Vector search Team:Search Foundations Meta label for the Search Foundations team in Elasticsearch Team:Search Relevance Meta label for the Search Relevance team in Elasticsearch v9.3.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants