Skip to content

Commit b520a39

Browse files
committed
Merge branch 'main' into 2025/09/09/cluster-applier-thread-watchdog
2 parents 756e30a + 7c957c3 commit b520a39

File tree

63 files changed

+1798
-235
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+1798
-235
lines changed

docs/changelog/133861.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 133861
2+
summary: Implementing latency improvements for EIS integration
3+
area: Machine Learning
4+
type: bug
5+
issues: []

libs/exponential-histogram/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
// TODO: publish this when ready?
1111
//apply plugin: 'elasticsearch.publish'
1212
apply plugin: 'elasticsearch.build'
13+
apply plugin: 'elasticsearch.internal-test-artifact'
1314

1415
dependencies {
1516
api project(':libs:core')

libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ ExponentialHistogram createAutoReleasedHistogram(Consumer<ExponentialHistogramBu
7070
return result;
7171
}
7272

73-
ExponentialHistogram createAutoReleasedHistogram(int numBuckets, double... values) {
73+
protected ExponentialHistogram createAutoReleasedHistogram(int numBuckets, double... values) {
7474
ReleasableExponentialHistogram result = ExponentialHistogram.create(numBuckets, breaker(), values);
7575
releaseBeforeEnd.add(result);
7676
return result;

libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/QuantileAccuracyTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ private void testDistributionQuantileAccuracy(RealDistribution distribution) {
221221
testQuantileAccuracy(values, bucketCount);
222222
}
223223

224-
private static double[] generateSamples(RealDistribution distribution, int sampleSize) {
224+
public static double[] generateSamples(RealDistribution distribution, int sampleSize) {
225225
double[] values = new double[sampleSize];
226226
for (int i = 0; i < sampleSize; i++) {
227227
values[i] = distribution.sample();
@@ -276,7 +276,7 @@ private double testQuantileAccuracy(double[] values, int bucketCount) {
276276
* The error depends on the raw values put into the histogram and the number of buckets allowed.
277277
* This is an implementation of the error bound computation proven by Theorem 3 in the <a href="https://arxiv.org/pdf/2004.08604">UDDSketch paper</a>
278278
*/
279-
private static double getMaximumRelativeError(double[] values, int bucketCount) {
279+
public static double getMaximumRelativeError(double[] values, int bucketCount) {
280280
HashSet<Long> usedPositiveIndices = new HashSet<>();
281281
HashSet<Long> usedNegativeIndices = new HashSet<>();
282282
int bestPossibleScale = MAX_SCALE;

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -474,9 +474,6 @@ tests:
474474
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlClientYamlIT
475475
method: test {p0=esql/60_usage/Basic ESQL usage output (telemetry) non-snapshot version}
476476
issue: https://github.com/elastic/elasticsearch/issues/133461
477-
- class: org.elasticsearch.xpack.esql.action.LookupJoinTypesIT
478-
method: testLookupJoinOthers
479-
issue: https://github.com/elastic/elasticsearch/issues/133480
480477
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT
481478
method: testStopQueryLocal
482479
issue: https://github.com/elastic/elasticsearch/issues/133481

qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,18 @@
3232
import org.apache.lucene.index.MergePolicy;
3333
import org.apache.lucene.index.VectorEncoding;
3434
import org.apache.lucene.index.VectorSimilarityFunction;
35+
import org.apache.lucene.store.Directory;
3536
import org.apache.lucene.store.FSDirectory;
37+
import org.apache.lucene.store.IOContext;
38+
import org.apache.lucene.store.IndexInput;
39+
import org.apache.lucene.store.MMapDirectory;
40+
import org.apache.lucene.store.NIOFSDirectory;
41+
import org.apache.lucene.store.NativeFSLockFactory;
3642
import org.apache.lucene.util.PrintStreamInfoStream;
3743
import org.elasticsearch.common.io.Channels;
44+
import org.elasticsearch.core.IOUtils;
45+
import org.elasticsearch.index.store.LuceneFilesExtensions;
46+
import org.elasticsearch.index.store.Store;
3847

3948
import java.io.IOException;
4049
import java.io.UncheckedIOException;
@@ -124,7 +133,7 @@ public boolean isEnabled(String component) {
124133

125134
long start = System.nanoTime();
126135
AtomicInteger numDocsIndexed = new AtomicInteger();
127-
try (FSDirectory dir = FSDirectory.open(indexPath); IndexWriter iw = new IndexWriter(dir, iwc);) {
136+
try (Directory dir = getDirectory(indexPath); IndexWriter iw = new IndexWriter(dir, iwc)) {
128137
for (Path docsPath : this.docsPath) {
129138
int dim = this.dim;
130139
try (FileChannel in = FileChannel.open(docsPath)) {
@@ -212,7 +221,7 @@ public boolean isEnabled(String component) {
212221
iwc.setCodec(codec);
213222
logger.debug("KnnIndexer: forceMerge in {}", indexPath);
214223
long startNS = System.nanoTime();
215-
try (IndexWriter iw = new IndexWriter(FSDirectory.open(indexPath), iwc)) {
224+
try (IndexWriter iw = new IndexWriter(getDirectory(indexPath), iwc)) {
216225
iw.forceMerge(1);
217226
}
218227
long endNS = System.nanoTime();
@@ -221,6 +230,14 @@ public boolean isEnabled(String component) {
221230
results.forceMergeTimeMS = TimeUnit.NANOSECONDS.toMillis(elapsedNSec);
222231
}
223232

233+
static Directory getDirectory(Path indexPath) throws IOException {
234+
Directory dir = FSDirectory.open(indexPath);
235+
if (dir instanceof MMapDirectory mmapDir) {
236+
return new HybridDirectory(mmapDir);
237+
}
238+
return dir;
239+
}
240+
224241
static class IndexerThread extends Thread {
225242
private final IndexWriter iw;
226243
private final AtomicInteger numDocsIndexed;
@@ -358,4 +375,64 @@ synchronized void next(byte[] dest) throws IOException {
358375
bytes.get(dest);
359376
}
360377
}
378+
379+
// Copy of Elastic's HybridDirectory which extends NIOFSDirectory and uses MMapDirectory for certain files.
380+
static final class HybridDirectory extends NIOFSDirectory {
381+
private final MMapDirectory delegate;
382+
383+
HybridDirectory(MMapDirectory delegate) throws IOException {
384+
super(delegate.getDirectory(), NativeFSLockFactory.INSTANCE);
385+
this.delegate = delegate;
386+
}
387+
388+
@Override
389+
public IndexInput openInput(String name, IOContext context) throws IOException {
390+
if (useDelegate(name, context)) {
391+
// we need to do these checks on the outer directory since the inner doesn't know about pending deletes
392+
ensureOpen();
393+
ensureCanRead(name);
394+
// we switch the context here since mmap checks for the READONCE context by identity
395+
context = context == Store.READONCE_CHECKSUM ? IOContext.READONCE : context;
396+
// we only use the mmap to open inputs. Everything else is managed by the NIOFSDirectory otherwise
397+
// we might run into trouble with files that are pendingDelete in one directory but still
398+
// listed in listAll() from the other. We on the other hand don't want to list files from both dirs
399+
// and intersect for perf reasons.
400+
return delegate.openInput(name, context);
401+
} else {
402+
return super.openInput(name, context);
403+
}
404+
}
405+
406+
@Override
407+
public void close() throws IOException {
408+
IOUtils.close(super::close, delegate);
409+
}
410+
411+
private static String getExtension(String name) {
412+
// Unlike FileSwitchDirectory#getExtension, we treat `tmp` as a normal file extension, which can have its own rules for mmaping.
413+
final int lastDotIndex = name.lastIndexOf('.');
414+
if (lastDotIndex == -1) {
415+
return "";
416+
} else {
417+
return name.substring(lastDotIndex + 1);
418+
}
419+
}
420+
421+
static boolean useDelegate(String name, IOContext ioContext) {
422+
if (ioContext == Store.READONCE_CHECKSUM) {
423+
// If we're just reading the footer for the checksum then mmap() isn't really necessary, and it's desperately inefficient
424+
// if pre-loading is enabled on this file.
425+
return false;
426+
}
427+
428+
final LuceneFilesExtensions extension = LuceneFilesExtensions.fromExtension(getExtension(name));
429+
if (extension == null || extension.shouldMmap() == false) {
430+
// Other files are either less performance-sensitive (e.g. stored field index, norms metadata)
431+
// or are large and have a random access pattern and mmap leads to page cache trashing
432+
// (e.g. stored fields and term vectors).
433+
return false;
434+
}
435+
return true;
436+
}
437+
}
361438
}

qa/vector/src/main/java/org/elasticsearch/test/knn/KnnSearcher.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import org.apache.lucene.search.Weight;
5353
import org.apache.lucene.store.Directory;
5454
import org.apache.lucene.store.FSDirectory;
55-
import org.apache.lucene.store.MMapDirectory;
5655
import org.apache.lucene.util.BitSet;
5756
import org.apache.lucene.util.BitSetIterator;
5857
import org.apache.lucene.util.FixedBitSet;
@@ -178,7 +177,7 @@ void runSearch(KnnIndexTester.Results finalResults, boolean earlyTermination) th
178177
);
179178
KnnIndexer.VectorReader targetReader = KnnIndexer.VectorReader.create(input, dim, vectorEncoding, offsetByteSize);
180179
long startNS;
181-
try (MMapDirectory dir = new MMapDirectory(indexPath)) {
180+
try (Directory dir = KnnIndexer.getDirectory(indexPath)) {
182181
try (DirectoryReader reader = DirectoryReader.open(dir)) {
183182
IndexSearcher searcher = searchThreads > 1 ? new IndexSearcher(reader, executorService) : new IndexSearcher(reader);
184183
byte[] targetBytes = new byte[dim];

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -343,15 +343,8 @@ static TransportVersion def(int id) {
343343
public static final TransportVersion ML_INFERENCE_LLAMA_ADDED = def(9_125_0_00);
344344
public static final TransportVersion SHARD_WRITE_LOAD_IN_CLUSTER_INFO = def(9_126_0_00);
345345
public static final TransportVersion ESQL_SAMPLE_OPERATOR_STATUS = def(9_127_0_00);
346-
public static final TransportVersion PROJECT_RESERVED_STATE_MOVE_TO_REGISTRY = def(9_147_0_00);
347-
public static final TransportVersion STREAMS_ENDPOINT_PARAM_RESTRICTIONS = def(9_148_0_00);
348-
public static final TransportVersion RESOLVE_INDEX_MODE_FILTER = def(9_149_0_00);
349-
public static final TransportVersion SEMANTIC_QUERY_MULTIPLE_INFERENCE_IDS = def(9_150_0_00);
350-
public static final TransportVersion ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER = def(9_151_0_00);
351-
public static final TransportVersion INFERENCE_API_DISABLE_EIS_RATE_LIMITING = def(9_152_0_00);
352-
public static final TransportVersion GEMINI_THINKING_BUDGET_ADDED = def(9_153_0_00);
353-
public static final TransportVersion VISIT_PERCENTAGE = def(9_154_0_00);
354346
public static final TransportVersion TIME_SERIES_TELEMETRY = def(9_155_0_00);
347+
public static final TransportVersion INFERENCE_API_EIS_DIAGNOSTICS = def(9_156_0_00);
355348

356349
/*
357350
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/admin/indices/resolve/ResolveIndexAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
package org.elasticsearch.action.admin.indices.resolve;
1111

1212
import org.elasticsearch.TransportVersion;
13-
import org.elasticsearch.TransportVersions;
1413
import org.elasticsearch.action.ActionListener;
1514
import org.elasticsearch.action.ActionRequestValidationException;
1615
import org.elasticsearch.action.ActionResponse;
@@ -78,6 +77,7 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
7877
public static final RemoteClusterActionType<Response> REMOTE_TYPE = new RemoteClusterActionType<>(NAME, Response::new);
7978

8079
private static final TransportVersion RESOLVE_INDEX_MODE_ADDED = TransportVersion.fromName("resolve_index_mode_added");
80+
private static final TransportVersion RESOLVE_INDEX_MODE_FILTER = TransportVersion.fromName("resolve_index_mode_filter");
8181

8282
private ResolveIndexAction() {
8383
super(NAME);
@@ -117,7 +117,7 @@ public Request(StreamInput in) throws IOException {
117117
super(in);
118118
this.names = in.readStringArray();
119119
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
120-
if (in.getTransportVersion().onOrAfter(TransportVersions.RESOLVE_INDEX_MODE_FILTER)) {
120+
if (in.getTransportVersion().supports(RESOLVE_INDEX_MODE_FILTER)) {
121121
this.indexModes = in.readEnumSet(IndexMode.class);
122122
} else {
123123
this.indexModes = EnumSet.noneOf(IndexMode.class);
@@ -129,7 +129,7 @@ public void writeTo(StreamOutput out) throws IOException {
129129
super.writeTo(out);
130130
out.writeStringArray(names);
131131
indicesOptions.writeIndicesOptions(out);
132-
if (out.getTransportVersion().onOrAfter(TransportVersions.RESOLVE_INDEX_MODE_FILTER)) {
132+
if (out.getTransportVersion().supports(RESOLVE_INDEX_MODE_FILTER)) {
133133
out.writeEnumSet(indexModes);
134134
}
135135
}

server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.apache.lucene.util.Accountable;
1313
import org.apache.lucene.util.RamUsageEstimator;
14+
import org.elasticsearch.TransportVersion;
1415
import org.elasticsearch.TransportVersions;
1516
import org.elasticsearch.action.ActionRequest;
1617
import org.elasticsearch.action.ActionRequestValidationException;
@@ -65,6 +66,10 @@ public class BulkRequest extends LegacyActionRequest
6566
Accountable,
6667
RawIndexingDataTransportRequest {
6768

69+
private static final TransportVersion STREAMS_ENDPOINT_PARAM_RESTRICTIONS = TransportVersion.fromName(
70+
"streams_endpoint_param_restrictions"
71+
);
72+
6873
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkRequest.class);
6974

7075
private static final int REQUEST_OVERHEAD = 50;
@@ -110,7 +115,7 @@ public BulkRequest(StreamInput in) throws IOException {
110115
if (in.getTransportVersion().onOrAfter(TransportVersions.INGEST_REQUEST_INCLUDE_SOURCE_ON_ERROR)) {
111116
includeSourceOnError = in.readBoolean();
112117
} // else default value is true
113-
if (in.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) {
118+
if (in.getTransportVersion().supports(STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) {
114119
paramsUsed = in.readCollectionAsImmutableSet(StreamInput::readString);
115120
}
116121
}
@@ -479,7 +484,7 @@ public void writeTo(StreamOutput out) throws IOException {
479484
if (out.getTransportVersion().onOrAfter(TransportVersions.INGEST_REQUEST_INCLUDE_SOURCE_ON_ERROR)) {
480485
out.writeBoolean(includeSourceOnError);
481486
}
482-
if (out.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) {
487+
if (out.getTransportVersion().supports(STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) {
483488
out.writeCollection(paramsUsed, StreamOutput::writeString);
484489
}
485490
}

0 commit comments

Comments
 (0)