Skip to content

Commit 73f93c7

Browse files
committed
Merge remote-tracking branch 'upstream/main' into 04212025/PauseIndexingES11516
refresh
2 parents e1f5de2 + 1a7d630 commit 73f93c7

File tree

11 files changed

+317
-35
lines changed

11 files changed

+317
-35
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesAggregatorBenchmark.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@
2121
import org.elasticsearch.compute.data.Block;
2222
import org.elasticsearch.compute.data.BlockFactory;
2323
import org.elasticsearch.compute.data.BytesRefBlock;
24+
import org.elasticsearch.compute.data.BytesRefVector;
2425
import org.elasticsearch.compute.data.ElementType;
2526
import org.elasticsearch.compute.data.IntBlock;
27+
import org.elasticsearch.compute.data.IntVector;
2628
import org.elasticsearch.compute.data.LongBlock;
2729
import org.elasticsearch.compute.data.LongVector;
30+
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
2831
import org.elasticsearch.compute.data.Page;
2932
import org.elasticsearch.compute.operator.AggregationOperator;
3033
import org.elasticsearch.compute.operator.DriverContext;
@@ -282,11 +285,18 @@ private static Block dataBlock(int groups, String dataType) {
282285
int blockLength = blockLength(groups);
283286
return switch (dataType) {
284287
case BYTES_REF -> {
285-
try (BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(blockLength)) {
288+
try (
289+
BytesRefVector.Builder dict = blockFactory.newBytesRefVectorBuilder(blockLength);
290+
IntVector.Builder ords = blockFactory.newIntVectorBuilder(blockLength)
291+
) {
292+
final int dictLength = Math.min(blockLength, KEYWORDS.length);
293+
for (int i = 0; i < dictLength; i++) {
294+
dict.appendBytesRef(KEYWORDS[i]);
295+
}
286296
for (int i = 0; i < blockLength; i++) {
287-
builder.appendBytesRef(KEYWORDS[i % KEYWORDS.length]);
297+
ords.appendInt(i % dictLength);
288298
}
289-
yield builder.build();
299+
yield new OrdinalBytesRefVector(ords.build(), dict.build()).asBlock();
290300
}
291301
}
292302
case INT -> {

docs/changelog/127849.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127849
2+
summary: Optimize ordinal inputs in Values aggregation
3+
area: "ES|QL"
4+
type: enhancement
5+
issues: []

muted-tests.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,12 @@ tests:
459459
- class: org.elasticsearch.xpack.inference.qa.mixed.HuggingFaceServiceMixedIT
460460
method: testElser
461461
issue: https://github.com/elastic/elasticsearch/issues/127876
462+
- class: org.elasticsearch.xpack.inference.qa.mixed.OpenAIServiceMixedIT
463+
method: testOpenAiCompletions
464+
issue: https://github.com/elastic/elasticsearch/issues/127878
465+
- class: org.elasticsearch.xpack.esql.qa.mixed.EsqlClientYamlIT
466+
method: test {p0=esql/120_profile/avg 8.14 or after}
467+
issue: https://github.com/elastic/elasticsearch/issues/127879
462468

463469
# Examples:
464470
#

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,34 +1998,31 @@ public Builder numberOfShards(int numberOfShards) {
19981998

19991999
/**
20002000
* Builder to create IndexMetadata that has an increased shard count (used for re-shard).
2001-
* The new shard count must be a multiple of the original shardcount.
2001+
* The new shard count must be a multiple of the original shardcount as well as a factor
2002+
* of routingNumShards.
20022003
* We do not support shrinking the shard count.
2003-
* @param shardCount updated shardCount
2004-
*
2005-
* TODO: Check if this.version needs to be incremented
2004+
* @param targetShardCount target shard count after resharding
20062005
*/
2007-
public Builder reshardAddShards(int shardCount) {
2008-
// Assert routingNumShards is null ?
2009-
// Assert numberOfShards > 0
2010-
if (shardCount % numberOfShards() != 0) {
2006+
public Builder reshardAddShards(int targetShardCount) {
2007+
final int sourceNumShards = numberOfShards();
2008+
if (targetShardCount % sourceNumShards != 0) {
20112009
throw new IllegalArgumentException(
20122010
"New shard count ["
2013-
+ shardCount
2011+
+ targetShardCount
20142012
+ "] should be a multiple"
20152013
+ " of current shard count ["
2016-
+ numberOfShards()
2014+
+ sourceNumShards
20172015
+ "] for ["
20182016
+ index
20192017
+ "]"
20202018
);
20212019
}
2022-
IndexVersion indexVersionCreated = indexCreatedVersion(settings);
2023-
settings = Settings.builder().put(settings).put(SETTING_NUMBER_OF_SHARDS, shardCount).build();
2024-
var newPrimaryTerms = new long[shardCount];
2020+
settings = Settings.builder().put(settings).put(SETTING_NUMBER_OF_SHARDS, targetShardCount).build();
2021+
var newPrimaryTerms = new long[targetShardCount];
20252022
Arrays.fill(newPrimaryTerms, this.primaryTerms.length, newPrimaryTerms.length, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
20262023
System.arraycopy(primaryTerms, 0, newPrimaryTerms, 0, this.primaryTerms.length);
20272024
primaryTerms = newPrimaryTerms;
2028-
routingNumShards = MetadataCreateIndexService.calculateNumRoutingShards(shardCount, indexVersionCreated);
2025+
routingNumShards = MetadataCreateIndexService.getIndexNumberOfRoutingShards(settings, sourceNumShards, this.routingNumShards);
20292026
return this;
20302027
}
20312028

@@ -3034,7 +3031,7 @@ public static ShardId selectCloneShard(int shardId, IndexMetadata sourceIndexMet
30343031
return new ShardId(sourceIndexMetadata.getIndex(), shardId);
30353032
}
30363033

3037-
private static void assertSplitMetadata(int numSourceShards, int numTargetShards, IndexMetadata sourceIndexMetadata) {
3034+
public static void assertSplitMetadata(int numSourceShards, int numTargetShards, IndexMetadata sourceIndexMetadata) {
30383035
if (numSourceShards > numTargetShards) {
30393036
throw new IllegalArgumentException(
30403037
"the number of source shards ["

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1282,10 +1282,23 @@ private static void validateSoftDeleteSettings(Settings indexSettings) {
12821282
* it will return the value configured for that index.
12831283
*/
12841284
static int getIndexNumberOfRoutingShards(Settings indexSettings, @Nullable IndexMetadata sourceMetadata) {
1285+
final int routingNumShards = getIndexNumberOfRoutingShards(
1286+
indexSettings,
1287+
sourceMetadata == null ? 1 : sourceMetadata.getNumberOfShards(),
1288+
sourceMetadata == null ? 0 : sourceMetadata.getRoutingNumShards()
1289+
);
1290+
return routingNumShards;
1291+
}
1292+
1293+
/**
1294+
* Calculates the number of routing shards based on the configured value in indexSettings or if recovering from another index
1295+
* it will return the value configured for that index.
1296+
*/
1297+
static int getIndexNumberOfRoutingShards(Settings indexSettings, final int sourceNumShards, final int sourceRoutingNumShards) {
12851298
final int numTargetShards = INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettings);
12861299
final IndexVersion indexVersionCreated = IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(indexSettings);
12871300
final int routingNumShards;
1288-
if (sourceMetadata == null || sourceMetadata.getNumberOfShards() == 1) {
1301+
if (sourceNumShards == 1) {
12891302
// in this case we either have no index to recover from or
12901303
// we have a source index with 1 shard and without an explicit split factor
12911304
// or one that is valid in that case we can split into whatever and auto-generate a new factor.
@@ -1299,7 +1312,7 @@ static int getIndexNumberOfRoutingShards(Settings indexSettings, @Nullable Index
12991312
} else {
13001313
assert IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(indexSettings) == false
13011314
: "index.number_of_routing_shards should not be present on the target index on resize";
1302-
routingNumShards = sourceMetadata.getRoutingNumShards();
1315+
routingNumShards = sourceRoutingNumShards;
13031316
}
13041317
return routingNumShards;
13051318
}

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/GroupingAggregatorImplementer.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import static java.util.stream.Collectors.joining;
3737
import static org.elasticsearch.compute.gen.AggregatorImplementer.capitalize;
38+
import static org.elasticsearch.compute.gen.Methods.optionalStaticMethod;
3839
import static org.elasticsearch.compute.gen.Methods.requireAnyArgs;
3940
import static org.elasticsearch.compute.gen.Methods.requireAnyType;
4041
import static org.elasticsearch.compute.gen.Methods.requireArgs;
@@ -336,10 +337,32 @@ private MethodSpec prepareProcessPage() {
336337
builder.beginControlFlow("if (valuesBlock.mayHaveNulls())");
337338
builder.addStatement("state.enableGroupIdTracking(seenGroupIds)");
338339
builder.endControlFlow();
339-
builder.addStatement("return $L", addInput(b -> b.addStatement("addRawInput(positionOffset, groupIds, valuesBlock$L)", extra)));
340+
if (shouldWrapAddInput(blockType(aggParam.type()))) {
341+
builder.addStatement(
342+
"var addInput = $L",
343+
addInput(b -> b.addStatement("addRawInput(positionOffset, groupIds, valuesBlock$L)", extra))
344+
);
345+
builder.addStatement("return $T.wrapAddInput(addInput, state, valuesBlock)", declarationType);
346+
} else {
347+
builder.addStatement(
348+
"return $L",
349+
addInput(b -> b.addStatement("addRawInput(positionOffset, groupIds, valuesBlock$L)", extra))
350+
);
351+
}
340352
}
341353
builder.endControlFlow();
342-
builder.addStatement("return $L", addInput(b -> b.addStatement("addRawInput(positionOffset, groupIds, valuesVector$L)", extra)));
354+
if (shouldWrapAddInput(vectorType(aggParam.type()))) {
355+
builder.addStatement(
356+
"var addInput = $L",
357+
addInput(b -> b.addStatement("addRawInput(positionOffset, groupIds, valuesVector$L)", extra))
358+
);
359+
builder.addStatement("return $T.wrapAddInput(addInput, state, valuesVector)", declarationType);
360+
} else {
361+
builder.addStatement(
362+
"return $L",
363+
addInput(b -> b.addStatement("addRawInput(positionOffset, groupIds, valuesVector$L)", extra))
364+
);
365+
}
343366
return builder.build();
344367
}
345368

@@ -526,6 +549,15 @@ private void combineRawInputForArray(MethodSpec.Builder builder, String arrayVar
526549
warningsBlock(builder, () -> builder.addStatement("$T.combine(state, groupId, $L)", declarationType, arrayVariable));
527550
}
528551

552+
private boolean shouldWrapAddInput(TypeName valuesType) {
553+
return optionalStaticMethod(
554+
declarationType,
555+
requireType(GROUPING_AGGREGATOR_FUNCTION_ADD_INPUT),
556+
requireName("wrapAddInput"),
557+
requireArgs(requireType(GROUPING_AGGREGATOR_FUNCTION_ADD_INPUT), requireType(aggState.declaredType()), requireType(valuesType))
558+
) != null;
559+
}
560+
529561
private void warningsBlock(MethodSpec.Builder builder, Runnable block) {
530562
if (warnExceptions.isEmpty() == false) {
531563
builder.beginControlFlow("try");

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Methods.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,20 +59,31 @@ static ExecutableElement requireStaticMethod(
5959
TypeMatcher returnTypeMatcher,
6060
NameMatcher nameMatcher,
6161
ArgumentMatcher argumentMatcher
62+
) {
63+
ExecutableElement method = optionalStaticMethod(declarationType, returnTypeMatcher, nameMatcher, argumentMatcher);
64+
if (method == null) {
65+
var message = nameMatcher.names.size() == 1 ? "Requires method: " : "Requires one of methods: ";
66+
var signatures = nameMatcher.names.stream()
67+
.map(name -> "public static " + returnTypeMatcher + " " + declarationType + "#" + name + "(" + argumentMatcher + ")")
68+
.collect(joining(" or "));
69+
throw new IllegalArgumentException(message + signatures);
70+
}
71+
return method;
72+
}
73+
74+
static ExecutableElement optionalStaticMethod(
75+
TypeElement declarationType,
76+
TypeMatcher returnTypeMatcher,
77+
NameMatcher nameMatcher,
78+
ArgumentMatcher argumentMatcher
6279
) {
6380
return typeAndSuperType(declarationType).flatMap(type -> ElementFilter.methodsIn(type.getEnclosedElements()).stream())
6481
.filter(method -> method.getModifiers().contains(Modifier.STATIC))
6582
.filter(method -> nameMatcher.test(method.getSimpleName().toString()))
6683
.filter(method -> returnTypeMatcher.test(TypeName.get(method.getReturnType())))
6784
.filter(method -> argumentMatcher.test(method.getParameters().stream().map(it -> TypeName.get(it.asType())).toList()))
6885
.findFirst()
69-
.orElseThrow(() -> {
70-
var message = nameMatcher.names.size() == 1 ? "Requires method: " : "Requires one of methods: ";
71-
var signatures = nameMatcher.names.stream()
72-
.map(name -> "public static " + returnTypeMatcher + " " + declarationType + "#" + name + "(" + argumentMatcher + ")")
73-
.collect(joining(" or "));
74-
return new IllegalArgumentException(message + signatures);
75-
});
86+
.orElse(null);
7687
}
7788

7889
static NameMatcher requireName(String... names) {

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java

Lines changed: 18 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/ValuesBytesRefGroupingAggregatorFunction.java

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)