Skip to content

Commit 61346e5

Browse files
authored
Merge branch 'main' into downsampling-runs-only-search-nodes
2 parents 22d0310 + 7c213ba commit 61346e5

File tree

16 files changed

+212
-61
lines changed

16 files changed

+212
-61
lines changed

.buildkite/scripts/run-bc-upgrade-tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ select(.active_release == true) |
2424
(.build_candidates | to_entries | sort_by(.value.completed_at))) |
2525
last | .value.manifest_url")"
2626

27-
if [[ -z "$MANIFEST_URL" ]]; then
27+
if [[ -z "$MANIFEST_URL" ]] || [[ "$MANIFEST_URL" == "null" ]]; then
2828
echo "No snapshots or build candidates for branch [$BUILDKITE_BRANCH]."
2929
echo "Skipping BC upgrade tests."
3030
exit 0

.buildkite/scripts/run-pr-upgrade-tests.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ fi
1818

1919
# Identify the merge base of the current commit (branch) and the base branch of the pull request.
2020
# PR upgrade tests are run from the merge base to the current commit.
21-
BASE_COMMIT=$(git merge-base $BUILDKITE_PULL_REQUEST_BASE_BRANCH $BUILDKITE_COMMIT)
21+
git fetch origin $BUILDKITE_PULL_REQUEST_BASE_BRANCH
22+
BASE_COMMIT=$(git merge-base origin/$BUILDKITE_PULL_REQUEST_BASE_BRANCH $BUILDKITE_COMMIT)
2223

2324
VERSION=$(sed -n 's/^elasticsearch[[:space:]]*=[[:space:]]*\(.*\)/\1/p' build-tools-internal/version.properties)
2425

docs/changelog/125921.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
pr: 125921
2+
summary: Allow direct IO for BBQ rescoring
3+
area: Vector Search
4+
type: feature
5+
highlight:
6+
title: Allow direct IO for BBQ rescoring
7+
body: |-
8+
BBQ rescoring performance can be drastically affected by the amount of available
9+
off-heap RAM for use by the system page cache. When there is not enough off-heap RAM
10+
to fit all the vector data in memory, BBQ search latencies can be affected by as much as 5000x.
11+
Specifying the `vector.rescoring.directio=true` Java option on all vector search
12+
nodes modifies rescoring to use direct IO, which eliminates these very high latencies
13+
from searches in low-memory scenarios, at a cost of a reduction
14+
in vector search performance for BBQ indices when the vectors do all fit in memory.
15+
16+
This option is released in 9.1 as a tech preview whilst we analyse its effect
17+
for a variety of use cases.
18+
issues: []

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/direct/TransportPutDatabaseConfigurationAction.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import org.elasticsearch.cluster.SimpleBatchedExecutor;
2121
import org.elasticsearch.cluster.block.ClusterBlockException;
2222
import org.elasticsearch.cluster.block.ClusterBlockLevel;
23+
import org.elasticsearch.cluster.metadata.ProjectId;
2324
import org.elasticsearch.cluster.metadata.ProjectMetadata;
25+
import org.elasticsearch.cluster.project.ProjectResolver;
2426
import org.elasticsearch.cluster.service.ClusterService;
2527
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
2628
import org.elasticsearch.common.Priority;
@@ -58,13 +60,15 @@ public void taskSucceeded(UpdateDatabaseConfigurationTask task, Void unused) {
5860
};
5961

6062
private final MasterServiceTaskQueue<UpdateDatabaseConfigurationTask> updateDatabaseConfigurationTaskQueue;
63+
private final ProjectResolver projectResolver;
6164

6265
@Inject
6366
public TransportPutDatabaseConfigurationAction(
6467
TransportService transportService,
6568
ClusterService clusterService,
6669
ThreadPool threadPool,
67-
ActionFilters actionFilters
70+
ActionFilters actionFilters,
71+
ProjectResolver projectResolver
6872
) {
6973
super(
7074
PutDatabaseConfigurationAction.NAME,
@@ -81,6 +85,7 @@ public TransportPutDatabaseConfigurationAction(
8185
Priority.NORMAL,
8286
UPDATE_TASK_EXECUTOR
8387
);
88+
this.projectResolver = projectResolver;
8489
}
8590

8691
@Override
@@ -89,7 +94,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
8994

9095
updateDatabaseConfigurationTaskQueue.submitTask(
9196
Strings.format("update-geoip-database-configuration-[%s]", id),
92-
new UpdateDatabaseConfigurationTask(listener, request.getDatabase()),
97+
new UpdateDatabaseConfigurationTask(projectResolver.getProjectId(), listener, request.getDatabase()),
9398
null
9499
);
95100
}
@@ -105,9 +110,9 @@ static boolean isNoopUpdate(@Nullable DatabaseConfigurationMetadata existingData
105110
}
106111
}
107112

108-
static void validatePrerequisites(DatabaseConfiguration database, ClusterState state) {
113+
static void validatePrerequisites(ProjectId projectId, DatabaseConfiguration database, ClusterState state) {
109114
// we need to verify that the database represents a unique file (name) among the various databases for this same provider
110-
IngestGeoIpMetadata geoIpMeta = state.metadata().getProject().custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
115+
IngestGeoIpMetadata geoIpMeta = state.metadata().getProject(projectId).custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
111116

112117
Optional<DatabaseConfiguration> sameName = geoIpMeta.getDatabases()
113118
.values()
@@ -125,12 +130,14 @@ static void validatePrerequisites(DatabaseConfiguration database, ClusterState s
125130
});
126131
}
127132

128-
private record UpdateDatabaseConfigurationTask(ActionListener<AcknowledgedResponse> listener, DatabaseConfiguration database)
129-
implements
130-
ClusterStateTaskListener {
133+
private record UpdateDatabaseConfigurationTask(
134+
ProjectId projectId,
135+
ActionListener<AcknowledgedResponse> listener,
136+
DatabaseConfiguration database
137+
) implements ClusterStateTaskListener {
131138

132139
ClusterState execute(ClusterState currentState) throws Exception {
133-
final var project = currentState.metadata().getProject();
140+
final var project = currentState.metadata().getProject(projectId);
134141
IngestGeoIpMetadata geoIpMeta = project.custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
135142

136143
String id = database.id();
@@ -140,7 +147,7 @@ ClusterState execute(ClusterState currentState) throws Exception {
140147
return currentState;
141148
}
142149

143-
validatePrerequisites(database, currentState);
150+
validatePrerequisites(projectId, database, currentState);
144151

145152
Map<String, DatabaseConfigurationMetadata> databases = new HashMap<>(geoIpMeta.getDatabases());
146153
databases.put(

modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/direct/TransportPutDatabaseConfigurationActionTests.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
package org.elasticsearch.ingest.geoip.direct;
1111

1212
import org.elasticsearch.cluster.ClusterState;
13-
import org.elasticsearch.cluster.metadata.Metadata;
13+
import org.elasticsearch.cluster.metadata.ProjectId;
14+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1415
import org.elasticsearch.ingest.geoip.IngestGeoIpMetadata;
1516
import org.elasticsearch.test.ESTestCase;
1617

@@ -20,33 +21,34 @@
2021
public class TransportPutDatabaseConfigurationActionTests extends ESTestCase {
2122

2223
public void testValidatePrerequisites() {
24+
ProjectId projectId = randomProjectIdOrDefault();
2325
// Test that we reject two configurations with the same database name but different ids:
2426
String name = randomAlphaOfLengthBetween(1, 50);
2527
IngestGeoIpMetadata ingestGeoIpMetadata = randomIngestGeoIpMetadata(name);
2628
ClusterState state = ClusterState.builder(ClusterState.EMPTY_STATE)
27-
.metadata(Metadata.builder(Metadata.EMPTY_METADATA).putCustom(IngestGeoIpMetadata.TYPE, ingestGeoIpMetadata))
29+
.putProjectMetadata(ProjectMetadata.builder(projectId).putCustom(IngestGeoIpMetadata.TYPE, ingestGeoIpMetadata).build())
2830
.build();
2931
DatabaseConfiguration databaseConfiguration = randomDatabaseConfiguration(randomIdentifier(), name);
3032
expectThrows(
3133
IllegalArgumentException.class,
32-
() -> TransportPutDatabaseConfigurationAction.validatePrerequisites(databaseConfiguration, state)
34+
() -> TransportPutDatabaseConfigurationAction.validatePrerequisites(projectId, databaseConfiguration, state)
3335
);
3436

3537
// Test that we do not reject two configurations with different database names:
3638
String differentName = randomValueOtherThan(name, () -> randomAlphaOfLengthBetween(1, 50));
3739
DatabaseConfiguration databaseConfigurationForDifferentName = randomDatabaseConfiguration(randomIdentifier(), differentName);
38-
TransportPutDatabaseConfigurationAction.validatePrerequisites(databaseConfigurationForDifferentName, state);
40+
TransportPutDatabaseConfigurationAction.validatePrerequisites(projectId, databaseConfigurationForDifferentName, state);
3941

4042
// Test that we do not reject a configuration if none already exists:
41-
TransportPutDatabaseConfigurationAction.validatePrerequisites(databaseConfiguration, ClusterState.EMPTY_STATE);
43+
TransportPutDatabaseConfigurationAction.validatePrerequisites(projectId, databaseConfiguration, ClusterState.EMPTY_STATE);
4244

4345
// Test that we do not reject a configuration if one with the same database name AND id already exists:
4446
DatabaseConfiguration databaseConfigurationSameNameSameId = ingestGeoIpMetadata.getDatabases()
4547
.values()
4648
.iterator()
4749
.next()
4850
.database();
49-
TransportPutDatabaseConfigurationAction.validatePrerequisites(databaseConfigurationSameNameSameId, state);
51+
TransportPutDatabaseConfigurationAction.validatePrerequisites(projectId, databaseConfigurationSameNameSameId, state);
5052
}
5153

5254
private IngestGeoIpMetadata randomIngestGeoIpMetadata(String name) {

muted-tests.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,9 @@ tests:
569569
- class: org.elasticsearch.compute.aggregation.TopIntAggregatorFunctionTests
570570
method: testManyInitialManyPartialFinalRunnerThrowing
571571
issue: https://github.com/elastic/elasticsearch/issues/130145
572+
- class: org.elasticsearch.ingest.geoip.direct.TransportPutDatabaseConfigurationActionTests
573+
method: testValidatePrerequisites
574+
issue: https://github.com/elastic/elasticsearch/issues/130178
572575

573576
# Examples:
574577
#

server/src/internalClusterTest/java/org/elasticsearch/index/store/DirectIOIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.apache.lucene.store.IndexOutput;
1818
import org.apache.lucene.tests.util.LuceneTestCase;
1919
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.index.codec.vectors.es818.ES818BinaryQuantizedVectorsFormat;
2021
import org.elasticsearch.plugins.Plugin;
2122
import org.elasticsearch.search.vectors.KnnSearchBuilder;
2223
import org.elasticsearch.search.vectors.VectorData;
@@ -43,7 +44,9 @@
4344
public class DirectIOIT extends ESIntegTestCase {
4445

4546
@BeforeClass
46-
public static void checkSupported() throws IOException {
47+
public static void checkSupported() {
48+
assumeTrue("Direct IO is not enabled", ES818BinaryQuantizedVectorsFormat.USE_DIRECT_IO);
49+
4750
Path path = createTempDir("directIOProbe");
4851
try (Directory dir = open(path); IndexOutput out = dir.createOutput("out", IOContext.DEFAULT)) {
4952
out.writeString("test");

server/src/main/java/org/elasticsearch/index/codec/vectors/es818/ES818BinaryQuantizedVectorsFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@
8787
*/
8888
public class ES818BinaryQuantizedVectorsFormat extends FlatVectorsFormat {
8989

90-
static final boolean USE_DIRECT_IO = Boolean.parseBoolean(System.getProperty("vector.rescoring.directio", "true"));
90+
public static final boolean USE_DIRECT_IO = Boolean.parseBoolean(System.getProperty("vector.rescoring.directio", "false"));
9191

9292
public static final String BINARIZED_VECTOR_COMPONENT = "BVEC";
9393
public static final String NAME = "ES818BinaryQuantizedVectorsFormat";

server/src/main/java/org/elasticsearch/index/codec/vectors/es818/ES818BinaryQuantizedVectorsReader.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public class ES818BinaryQuantizedVectorsReader extends FlatVectorsReader impleme
6565

6666
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(ES818BinaryQuantizedVectorsReader.class);
6767

68-
private final Map<String, FieldEntry> fields = new HashMap<>();
68+
private final Map<String, FieldEntry> fields;
6969
private final IndexInput quantizedVectorData;
7070
private final FlatVectorsReader rawVectorsReader;
7171
private final ES818BinaryFlatVectorsScorer vectorScorer;
@@ -77,6 +77,7 @@ public class ES818BinaryQuantizedVectorsReader extends FlatVectorsReader impleme
7777
ES818BinaryFlatVectorsScorer vectorsScorer
7878
) throws IOException {
7979
super(vectorsScorer);
80+
this.fields = new HashMap<>();
8081
this.vectorScorer = vectorsScorer;
8182
this.rawVectorsReader = rawVectorsReader;
8283
int versionMeta = -1;
@@ -120,6 +121,24 @@ public class ES818BinaryQuantizedVectorsReader extends FlatVectorsReader impleme
120121
}
121122
}
122123

124+
private ES818BinaryQuantizedVectorsReader(ES818BinaryQuantizedVectorsReader clone, FlatVectorsReader rawVectorsReader) {
125+
super(clone.vectorScorer);
126+
this.rawVectorsReader = rawVectorsReader;
127+
this.vectorScorer = clone.vectorScorer;
128+
this.quantizedVectorData = clone.quantizedVectorData;
129+
this.fields = clone.fields;
130+
}
131+
132+
// For testing
133+
FlatVectorsReader getRawVectorsReader() {
134+
return rawVectorsReader;
135+
}
136+
137+
@Override
138+
public FlatVectorsReader getMergeInstance() {
139+
return new ES818BinaryQuantizedVectorsReader(this, rawVectorsReader.getMergeInstance());
140+
}
141+
123142
private void readFields(ChecksumIndexInput meta, FieldInfos infos) throws IOException {
124143
for (int fieldNumber = meta.readInt(); fieldNumber != -1; fieldNumber = meta.readInt()) {
125144
FieldInfo info = infos.fieldInfo(fieldNumber);

server/src/main/java/org/elasticsearch/index/codec/vectors/es818/MergeReaderWrapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ protected MergeReaderWrapper(FlatVectorsReader mainReader, FlatVectorsReader mer
3636
this.mergeReader = mergeReader;
3737
}
3838

39+
// For testing
40+
FlatVectorsReader getMainReader() {
41+
return mainReader;
42+
}
43+
3944
@Override
4045
public RandomVectorScorer getRandomVectorScorer(String field, float[] target) throws IOException {
4146
return mainReader.getRandomVectorScorer(field, target);

0 commit comments

Comments
 (0)