Skip to content

Commit 921eb11

Browse files
Merge branch 'main' into date-nanos-implicit-casting-behind-snapshot
2 parents 12c2693 + c1a4f8a commit 921eb11

File tree

16 files changed

+396
-63
lines changed

16 files changed

+396
-63
lines changed

qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexTester.java

Lines changed: 42 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,11 @@ public static void main(String[] args) throws Exception {
177177
int[] nProbes = cmdLineArgs.indexType().equals(IndexType.IVF) && cmdLineArgs.numQueries() > 0
178178
? cmdLineArgs.nProbes()
179179
: new int[] { 0 };
180+
String indexType = cmdLineArgs.indexType().name().toLowerCase(Locale.ROOT);
181+
Results indexResults = new Results(cmdLineArgs.docVectors().getFileName().toString(), indexType, cmdLineArgs.numDocs());
180182
Results[] results = new Results[nProbes.length];
181183
for (int i = 0; i < nProbes.length; i++) {
182-
results[i] = new Results(cmdLineArgs.indexType().name().toLowerCase(Locale.ROOT), cmdLineArgs.numDocs());
184+
results[i] = new Results(cmdLineArgs.docVectors().getFileName().toString(), indexType, cmdLineArgs.numDocs());
183185
}
184186
logger.info("Running KNN index tester with arguments: " + cmdLineArgs);
185187
Codec codec = createCodec(cmdLineArgs);
@@ -199,12 +201,12 @@ public static void main(String[] args) throws Exception {
199201
throw new IllegalArgumentException("Index path does not exist: " + indexPath);
200202
}
201203
if (cmdLineArgs.reindex()) {
202-
knnIndexer.createIndex(results[0]);
204+
knnIndexer.createIndex(indexResults);
203205
}
204206
if (cmdLineArgs.forceMerge()) {
205-
knnIndexer.forceMerge(results[0]);
207+
knnIndexer.forceMerge(indexResults);
206208
} else {
207-
knnIndexer.numSegments(results[0]);
209+
knnIndexer.numSegments(indexResults);
208210
}
209211
}
210212
if (cmdLineArgs.queryVectors() != null && cmdLineArgs.numQueries() > 0) {
@@ -214,24 +216,27 @@ public static void main(String[] args) throws Exception {
214216
knnSearcher.runSearch(results[i], cmdLineArgs.earlyTermination());
215217
}
216218
}
217-
formattedResults.results.addAll(List.of(results));
219+
formattedResults.queryResults.addAll(List.of(results));
220+
formattedResults.indexResults.add(indexResults);
218221
}
219222
logger.info("Results: \n" + formattedResults);
220223
}
221224

222225
static class FormattedResults {
223-
List<Results> results = new ArrayList<>();
226+
List<Results> indexResults = new ArrayList<>();
227+
List<Results> queryResults = new ArrayList<>();
224228

225229
@Override
226230
public String toString() {
227-
if (results.isEmpty()) {
231+
if (indexResults.isEmpty() && queryResults.isEmpty()) {
228232
return "No results available.";
229233
}
230234

231-
String[] indexingHeaders = { "index_type", "num_docs", "index_time(ms)", "force_merge_time(ms)", "num_segments" };
235+
String[] indexingHeaders = { "index_name", "index_type", "num_docs", "index_time(ms)", "force_merge_time(ms)", "num_segments" };
232236

233237
// Define column headers
234238
String[] searchHeaders = {
239+
"index_name",
235240
"index_type",
236241
"n_probe",
237242
"latency(ms)",
@@ -245,33 +250,34 @@ public String toString() {
245250

246251
StringBuilder sb = new StringBuilder();
247252

248-
Results indexResult = results.get(0); // Assuming all results have the same index type and numDocs
249-
String[] indexData = {
250-
indexResult.indexType,
251-
Integer.toString(indexResult.numDocs),
252-
Long.toString(indexResult.indexTimeMS),
253-
Long.toString(indexResult.forceMergeTimeMS),
254-
Integer.toString(indexResult.numSegments) };
255-
256-
printBlock(sb, indexingHeaders, new String[][] { indexData });
257-
258-
String[][] searchData = new String[results.size()][];
259-
// Format and append each row of data
260-
for (int i = 0; i < results.size(); i++) {
261-
Results result = results.get(i);
262-
searchData[i] = new String[] {
263-
result.indexType,
264-
Integer.toString(result.nProbe),
265-
String.format(Locale.ROOT, "%.2f", result.avgLatency),
266-
String.format(Locale.ROOT, "%.2f", result.netCpuTimeMS),
267-
String.format(Locale.ROOT, "%.2f", result.avgCpuCount),
268-
String.format(Locale.ROOT, "%.2f", result.qps),
269-
String.format(Locale.ROOT, "%.2f", result.avgRecall),
270-
String.format(Locale.ROOT, "%.2f", result.averageVisited) };
271-
253+
String[][] indexResultsArray = new String[indexResults.size()][];
254+
for (int i = 0; i < indexResults.size(); i++) {
255+
Results indexResult = indexResults.get(i);
256+
indexResultsArray[i] = new String[] {
257+
indexResult.indexName,
258+
indexResult.indexType,
259+
Integer.toString(indexResult.numDocs),
260+
Long.toString(indexResult.indexTimeMS),
261+
Long.toString(indexResult.forceMergeTimeMS),
262+
Integer.toString(indexResult.numSegments) };
263+
}
264+
printBlock(sb, indexingHeaders, indexResultsArray);
265+
String[][] queryResultsArray = new String[queryResults.size()][];
266+
for (int i = 0; i < queryResults.size(); i++) {
267+
Results queryResult = queryResults.get(i);
268+
queryResultsArray[i] = new String[] {
269+
queryResult.indexName,
270+
queryResult.indexType,
271+
Integer.toString(queryResult.nProbe),
272+
String.format(Locale.ROOT, "%.2f", queryResult.avgLatency),
273+
String.format(Locale.ROOT, "%.2f", queryResult.netCpuTimeMS),
274+
String.format(Locale.ROOT, "%.2f", queryResult.avgCpuCount),
275+
String.format(Locale.ROOT, "%.2f", queryResult.qps),
276+
String.format(Locale.ROOT, "%.2f", queryResult.avgRecall),
277+
String.format(Locale.ROOT, "%.2f", queryResult.averageVisited) };
272278
}
273279

274-
printBlock(sb, searchHeaders, searchData);
280+
printBlock(sb, searchHeaders, queryResultsArray);
275281

276282
return sb.toString();
277283
}
@@ -331,7 +337,7 @@ private int[] calculateColumnWidths(String[] headers, String[]... data) {
331337
}
332338

333339
static class Results {
334-
final String indexType;
340+
final String indexType, indexName;
335341
final int numDocs;
336342
long indexTimeMS;
337343
long forceMergeTimeMS;
@@ -344,7 +350,8 @@ static class Results {
344350
double netCpuTimeMS;
345351
double avgCpuCount;
346352

347-
Results(String indexType, int numDocs) {
353+
Results(String indexName, String indexType, int numDocs) {
354+
this.indexName = indexName;
348355
this.indexType = indexType;
349356
this.numDocs = numDocs;
350357
}

server/src/main/java/module-info.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,8 @@
431431
org.elasticsearch.search.SearchFeatures,
432432
org.elasticsearch.script.ScriptFeatures,
433433
org.elasticsearch.search.retriever.RetrieversFeatures,
434-
org.elasticsearch.action.admin.cluster.stats.ClusterStatsFeatures;
434+
org.elasticsearch.action.admin.cluster.stats.ClusterStatsFeatures,
435+
org.elasticsearch.ingest.IngestFeatures;
435436

436437
uses org.elasticsearch.plugins.internal.SettingsExtension;
437438
uses RestExtension;

server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ static void executeDocument(
4848
pipeline.getVersion(),
4949
pipeline.getMetadata(),
5050
verbosePipelineProcessor,
51+
pipeline.getFieldAccessPattern(),
5152
pipeline.getDeprecated()
5253
);
5354
ingestDocument.executePipeline(verbosePipeline, (result, e) -> {

server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.common.xcontent.XContentHelper;
2222
import org.elasticsearch.core.RestApiVersion;
2323
import org.elasticsearch.core.UpdateForV10;
24+
import org.elasticsearch.features.NodeFeature;
2425
import org.elasticsearch.index.VersionType;
2526
import org.elasticsearch.ingest.ConfigurationUtils;
2627
import org.elasticsearch.ingest.IngestDocument;
@@ -38,6 +39,7 @@
3839
import java.util.List;
3940
import java.util.Map;
4041
import java.util.Objects;
42+
import java.util.function.Predicate;
4143

4244
public class SimulatePipelineRequest extends LegacyActionRequest implements ToXContentObject {
4345
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(SimulatePipelineRequest.class);
@@ -154,15 +156,17 @@ static Parsed parse(
154156
Map<String, Object> config,
155157
boolean verbose,
156158
IngestService ingestService,
157-
RestApiVersion restApiVersion
159+
RestApiVersion restApiVersion,
160+
Predicate<NodeFeature> hasFeature
158161
) throws Exception {
159162
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
160163
Pipeline pipeline = Pipeline.create(
161164
SIMULATED_PIPELINE_ID,
162165
pipelineConfig,
163166
ingestService.getProcessorFactories(),
164167
ingestService.getScriptService(),
165-
projectId
168+
projectId,
169+
hasFeature
166170
);
167171
List<IngestDocument> ingestDocumentList = parseDocs(config, restApiVersion);
168172
return new Parsed(pipeline, ingestDocumentList, verbose);

server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
import org.elasticsearch.cluster.node.DiscoveryNode;
1919
import org.elasticsearch.cluster.node.DiscoveryNodes;
2020
import org.elasticsearch.cluster.project.ProjectResolver;
21+
import org.elasticsearch.cluster.service.ClusterService;
2122
import org.elasticsearch.common.Randomness;
2223
import org.elasticsearch.common.settings.Setting;
2324
import org.elasticsearch.common.util.concurrent.EsExecutors;
2425
import org.elasticsearch.common.xcontent.XContentHelper;
2526
import org.elasticsearch.core.TimeValue;
27+
import org.elasticsearch.features.FeatureService;
2628
import org.elasticsearch.ingest.IngestService;
2729
import org.elasticsearch.injection.guice.Inject;
2830
import org.elasticsearch.tasks.Task;
@@ -51,6 +53,8 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
5153
private final SimulateExecutionService executionService;
5254
private final TransportService transportService;
5355
private final ProjectResolver projectResolver;
56+
private final ClusterService clusterService;
57+
private final FeatureService featureService;
5458
private volatile TimeValue ingestNodeTransportActionTimeout;
5559
// ThreadLocal because our unit testing framework does not like sharing Randoms across threads
5660
private final ThreadLocal<Random> random = ThreadLocal.withInitial(Randomness::get);
@@ -61,7 +65,9 @@ public SimulatePipelineTransportAction(
6165
TransportService transportService,
6266
ActionFilters actionFilters,
6367
IngestService ingestService,
64-
ProjectResolver projectResolver
68+
ProjectResolver projectResolver,
69+
ClusterService clusterService,
70+
FeatureService featureService
6571
) {
6672
super(
6773
SimulatePipelineAction.NAME,
@@ -74,6 +80,8 @@ public SimulatePipelineTransportAction(
7480
this.executionService = new SimulateExecutionService(threadPool);
7581
this.transportService = transportService;
7682
this.projectResolver = projectResolver;
83+
this.clusterService = clusterService;
84+
this.featureService = featureService;
7785
this.ingestNodeTransportActionTimeout = INGEST_NODE_TRANSPORT_ACTION_TIMEOUT.get(ingestService.getClusterService().getSettings());
7886
ingestService.getClusterService()
7987
.getClusterSettings()
@@ -117,7 +125,8 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe
117125
source,
118126
request.isVerbose(),
119127
ingestService,
120-
request.getRestApiVersion()
128+
request.getRestApiVersion(),
129+
(feature) -> featureService.clusterHasFeature(clusterService.state(), feature)
121130
);
122131
}
123132
executionService.execute(simulateRequest, listener);
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest;
11+
12+
import org.elasticsearch.cluster.metadata.DataStream;
13+
import org.elasticsearch.features.FeatureSpecification;
14+
import org.elasticsearch.features.NodeFeature;
15+
16+
import java.util.Set;
17+
18+
public class IngestFeatures implements FeatureSpecification {
19+
@Override
20+
public Set<NodeFeature> getFeatures() {
21+
if (DataStream.LOGS_STREAM_FEATURE_FLAG) {
22+
return Set.of(IngestService.FIELD_ACCESS_PATTERN);
23+
} else {
24+
return Set.of();
25+
}
26+
}
27+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest;
11+
12+
import java.util.Map;
13+
14+
public enum IngestPipelineFieldAccessPattern {
15+
/**
16+
* Field names will be split on the `.` character into their contingent parts. Resolution will strictly check
17+
* for nested objects following the field path.
18+
*/
19+
CLASSIC("classic"),
20+
/**
21+
* Field names will be split on the `.` character into their contingent parts. Resolution will flexibly check
22+
* for nested objects following the field path. If nested objects are not found for a key, the access pattern
23+
* will fall back to joining subsequent path elements together until it finds the next object that matches the
24+
* concatenated path. Allows for simple resolution of dotted field names.
25+
*/
26+
FLEXIBLE("flexible");
27+
28+
private final String key;
29+
30+
IngestPipelineFieldAccessPattern(String key) {
31+
this.key = key;
32+
}
33+
34+
public String getKey() {
35+
return key;
36+
}
37+
38+
private static final Map<String, IngestPipelineFieldAccessPattern> NAME_REGISTRY = Map.of(CLASSIC.key, CLASSIC, FLEXIBLE.key, FLEXIBLE);
39+
40+
public static boolean isValidAccessPattern(String accessPatternName) {
41+
return NAME_REGISTRY.containsKey(accessPatternName);
42+
}
43+
44+
public static IngestPipelineFieldAccessPattern getAccessPattern(String accessPatternName) {
45+
IngestPipelineFieldAccessPattern accessPattern = NAME_REGISTRY.get(accessPatternName);
46+
if (accessPattern == null) {
47+
throw new IllegalArgumentException("Invalid ingest pipeline access pattern name [" + accessPatternName + "] given");
48+
}
49+
return accessPattern;
50+
}
51+
}

0 commit comments

Comments
 (0)