Skip to content

Commit 8241f43

Browse files
Merge branch 'main' into eis-input-type
2 parents 6c6e6e6 + 0daa931 commit 8241f43

File tree

6 files changed

+171
-96
lines changed

6 files changed

+171
-96
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
package org.elasticsearch.benchmark.vector;
10+
11+
import org.elasticsearch.common.logging.LogConfigurator;
12+
import org.elasticsearch.index.codec.vectors.BQVectorUtils;
13+
import org.openjdk.jmh.annotations.Benchmark;
14+
import org.openjdk.jmh.annotations.BenchmarkMode;
15+
import org.openjdk.jmh.annotations.Fork;
16+
import org.openjdk.jmh.annotations.Measurement;
17+
import org.openjdk.jmh.annotations.Mode;
18+
import org.openjdk.jmh.annotations.OutputTimeUnit;
19+
import org.openjdk.jmh.annotations.Param;
20+
import org.openjdk.jmh.annotations.Scope;
21+
import org.openjdk.jmh.annotations.Setup;
22+
import org.openjdk.jmh.annotations.State;
23+
import org.openjdk.jmh.annotations.Warmup;
24+
import org.openjdk.jmh.infra.Blackhole;
25+
26+
import java.io.IOException;
27+
import java.util.Random;
28+
import java.util.concurrent.TimeUnit;
29+
30+
@BenchmarkMode(Mode.Throughput)
31+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
32+
@State(Scope.Benchmark)
33+
// first iteration is complete garbage, so make sure we really warmup
34+
@Warmup(iterations = 4, time = 1)
35+
// real iterations. not useful to spend tons of time here, better to fork more
36+
@Measurement(iterations = 5, time = 1)
37+
// engage some noise reduction
38+
@Fork(value = 1)
39+
public class PackAsBinaryBenchmark {
40+
41+
static {
42+
LogConfigurator.configureESLogging(); // native access requires logging to be initialized
43+
}
44+
45+
@Param({ "384", "782", "1024" })
46+
int dims;
47+
48+
int length;
49+
50+
int numVectors = 1000;
51+
52+
int[][] qVectors;
53+
byte[] packed;
54+
55+
@Setup
56+
public void setup() throws IOException {
57+
Random random = new Random(123);
58+
59+
this.length = BQVectorUtils.discretize(dims, 64) / 8;
60+
this.packed = new byte[length];
61+
62+
qVectors = new int[numVectors][dims];
63+
for (int[] qVector : qVectors) {
64+
for (int i = 0; i < dims; i++) {
65+
qVector[i] = random.nextInt(2);
66+
}
67+
}
68+
}
69+
70+
@Benchmark
71+
public void packAsBinary(Blackhole bh) {
72+
for (int i = 0; i < numVectors; i++) {
73+
BQVectorUtils.packAsBinary(qVectors[i], packed);
74+
bh.consume(packed);
75+
}
76+
}
77+
78+
@Benchmark
79+
public void packAsBinaryLegacy(Blackhole bh) {
80+
for (int i = 0; i < numVectors; i++) {
81+
BQVectorUtils.packAsBinaryLegacy(qVectors[i], packed);
82+
bh.consume(packed);
83+
}
84+
}
85+
}

muted-tests.yml

Lines changed: 10 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -510,84 +510,33 @@ tests:
510510
- class: org.elasticsearch.gradle.internal.transport.TransportVersionManagementPluginFuncTest
511511
method: cannot change committed ids to a branch
512512
issue: https://github.com/elastic/elasticsearch/issues/132790
513-
- class: org.elasticsearch.xpack.esql.CsvTests
514-
method: test {csv-spec:spatial.ConvertFromStringParseError}
515-
issue: https://github.com/elastic/elasticsearch/issues/132805
516-
- class: org.elasticsearch.xpack.esql.CsvTests
517-
method: test {csv-spec:floats.ScalbWithHugeScaleFactor}
518-
issue: https://github.com/elastic/elasticsearch/issues/132806
519513
- class: org.elasticsearch.reservedstate.service.FileSettingsServiceIT
520514
method: testSettingsAppliedOnStart
521515
issue: https://github.com/elastic/elasticsearch/issues/131210
522516
- class: org.elasticsearch.index.mapper.vectors.SparseVectorFieldMapperTests
523517
method: testPruningScenarios
524518
issue: https://github.com/elastic/elasticsearch/issues/132810
525-
- class: org.elasticsearch.xpack.esql.CsvTests
526-
method: test {csv-spec:spatial_shapes.ConvertFromStringParseError}
527-
issue: https://github.com/elastic/elasticsearch/issues/132812
528-
- class: org.elasticsearch.xpack.esql.CsvTests
529-
method: test {csv-spec:string.MvStringNotEqualsLong}
530-
issue: https://github.com/elastic/elasticsearch/issues/132813
531-
- class: org.elasticsearch.xpack.esql.CsvTests
532-
method: test {csv-spec:spatial.ConvertCartesianFromStringParseError}
533-
issue: https://github.com/elastic/elasticsearch/issues/132814
534-
- class: org.elasticsearch.xpack.esql.CsvTests
535-
method: test {csv-spec:spatial_shapes.ConvertCartesianShapeFromStringParseError}
536-
issue: https://github.com/elastic/elasticsearch/issues/132815
537-
- class: org.elasticsearch.xpack.esql.CsvTests
538-
method: test {csv-spec:string.RepeatNegative}
539-
issue: https://github.com/elastic/elasticsearch/issues/132818
540-
- class: org.elasticsearch.xpack.esql.CsvTests
541-
method: test {csv-spec:stats.WeightedAvgWeightMvWarning}
542-
issue: https://github.com/elastic/elasticsearch/issues/132821
543-
- class: org.elasticsearch.xpack.esql.CsvTests
544-
method: test {csv-spec:ip.PushDownIPWithIn}
545-
issue: https://github.com/elastic/elasticsearch/issues/132822
546-
- class: org.elasticsearch.xpack.esql.CsvTests
547-
method: test {csv-spec:floats.ScalbWithHugeconstantFirstArgument}
548-
issue: https://github.com/elastic/elasticsearch/issues/132823
549-
- class: org.elasticsearch.xpack.esql.CsvTests
550-
method: test {csv-spec:boolean.MvSliceWarnings}
551-
issue: https://github.com/elastic/elasticsearch/issues/132824
552-
- class: org.elasticsearch.xpack.esql.CsvTests
553-
method: test {csv-spec:change_point.Values null column}
554-
issue: https://github.com/elastic/elasticsearch/issues/132825
555-
- class: org.elasticsearch.xpack.esql.CsvTests
556-
method: test {csv-spec:string.MvStringEqualsLongString}
557-
issue: https://github.com/elastic/elasticsearch/issues/132826
558-
- class: org.elasticsearch.xpack.esql.CsvTests
559-
method: test {csv-spec:stats.WeightedAvgFieldMvWarning}
560-
issue: https://github.com/elastic/elasticsearch/issues/132827
561519
- class: org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT
562520
method: test {p0=search/160_exists_query/Test exists query on mapped date field with no doc values}
563521
issue: https://github.com/elastic/elasticsearch/issues/132828
564522
- class: org.elasticsearch.test.rest.yaml.RcsCcsCommonYamlTestSuiteIT
565523
method: test {p0=search/160_exists_query/Test exists query on keyword field in empty index}
566524
issue: https://github.com/elastic/elasticsearch/issues/132829
567525
- class: org.elasticsearch.xpack.esql.CsvTests
568-
method: test {csv-spec:ip.CidrMatchFieldArg}
569-
issue: https://github.com/elastic/elasticsearch/issues/132834
570-
- class: org.elasticsearch.xpack.esql.CsvTests
571-
method: test {csv-spec:change_point.Null values}
572-
issue: https://github.com/elastic/elasticsearch/issues/132835
573-
- class: org.elasticsearch.xpack.esql.CsvTests
574-
method: test {csv-spec:ip.Conditional}
575-
issue: https://github.com/elastic/elasticsearch/issues/132836
576-
- class: org.elasticsearch.xpack.esql.CsvTests
577-
method: test {csv-spec:ip.CdirMatchOrsIPs}
578-
issue: https://github.com/elastic/elasticsearch/issues/132837
526+
method: test {csv-spec:floats.EqualToMultivalue}
527+
issue: https://github.com/elastic/elasticsearch/issues/132852
579528
- class: org.elasticsearch.xpack.esql.CsvTests
580-
method: test {csv-spec:string.SpaceNegative}
581-
issue: https://github.com/elastic/elasticsearch/issues/132838
529+
method: test {csv-spec:string.MvStringEquals}
530+
issue: https://github.com/elastic/elasticsearch/issues/132856
582531
- class: org.elasticsearch.xpack.esql.CsvTests
583-
method: test {csv-spec:string.MvStringNotEqualsFound}
584-
issue: https://github.com/elastic/elasticsearch/issues/132839
532+
method: test {csv-spec:string.LengthOfText}
533+
issue: https://github.com/elastic/elasticsearch/issues/132857
585534
- class: org.elasticsearch.xpack.esql.CsvTests
586-
method: test {csv-spec:string.ConcatOfText}
587-
issue: https://github.com/elastic/elasticsearch/issues/132840
535+
method: test {csv-spec:floats.InMultivalue}
536+
issue: https://github.com/elastic/elasticsearch/issues/132859
588537
- class: org.elasticsearch.xpack.esql.CsvTests
589-
method: test {csv-spec:floats.EqualToOrEqualToMultivalue}
590-
issue: https://github.com/elastic/elasticsearch/issues/132841
538+
method: test {csv-spec:ip.CdirMatchEqualsInsOrs}
539+
issue: https://github.com/elastic/elasticsearch/issues/132860
591540

592541
# Examples:
593542
#

server/src/main/java/org/elasticsearch/index/codec/vectors/BQVectorUtils.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public static boolean isUnitVector(float[] v) {
4040
return Math.abs(l1norm - 1.0d) <= EPSILON;
4141
}
4242

43-
public static void packAsBinary(int[] vector, byte[] packed) {
43+
public static void packAsBinaryLegacy(int[] vector, byte[] packed) {
4444
for (int i = 0; i < vector.length;) {
4545
byte result = 0;
4646
for (int j = 7; j >= 0 && i < vector.length; j--) {
@@ -54,6 +54,34 @@ public static void packAsBinary(int[] vector, byte[] packed) {
5454
}
5555
}
5656

57+
public static void packAsBinary(int[] vector, byte[] packed) {
58+
int limit = vector.length - 7;
59+
int i = 0;
60+
int index = 0;
61+
for (; i < limit; i += 8, index++) {
62+
assert vector[i] == 0 || vector[i] == 1;
63+
assert vector[i + 1] == 0 || vector[i + 1] == 1;
64+
assert vector[i + 2] == 0 || vector[i + 2] == 1;
65+
assert vector[i + 3] == 0 || vector[i + 3] == 1;
66+
assert vector[i + 4] == 0 || vector[i + 4] == 1;
67+
assert vector[i + 5] == 0 || vector[i + 5] == 1;
68+
assert vector[i + 6] == 0 || vector[i + 6] == 1;
69+
assert vector[i + 7] == 0 || vector[i + 7] == 1;
70+
int result = vector[i] << 7 | (vector[i + 1] << 6) | (vector[i + 2] << 5) | (vector[i + 3] << 4) | (vector[i + 4] << 3)
71+
| (vector[i + 5] << 2) | (vector[i + 6] << 1) | (vector[i + 7]);
72+
packed[index] = (byte) result;
73+
}
74+
if (i == vector.length) {
75+
return;
76+
}
77+
byte result = 0;
78+
for (int j = 7; j >= 0 && i < vector.length; i++, j--) {
79+
assert vector[i] == 0 || vector[i] == 1;
80+
result |= (byte) ((vector[i] & 1) << j);
81+
}
82+
packed[index] = result;
83+
}
84+
5785
public static int discretize(int value, int bucket) {
5886
return ((value + (bucket - 1)) / bucket) * bucket;
5987
}

server/src/test/java/org/elasticsearch/index/codec/vectors/BQVectorUtilsTests.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,20 @@ public void testPackAsBinary() {
6464
assertArrayEquals(new byte[] { (byte) 0b11001010, (byte) 0b11100110 }, packed);
6565
}
6666

67+
public void testPackAsBinaryDuel() {
68+
int dims = random().nextInt(16, 2049);
69+
int[] toPack = new int[dims];
70+
for (int i = 0; i < dims; i++) {
71+
toPack[i] = random().nextInt(2);
72+
}
73+
int length = BQVectorUtils.discretize(dims, 64) / 8;
74+
byte[] packed = new byte[length];
75+
byte[] packedLegacy = new byte[length];
76+
BQVectorUtils.packAsBinaryLegacy(toPack, packedLegacy);
77+
BQVectorUtils.packAsBinary(toPack, packed);
78+
assertArrayEquals(packedLegacy, packed);
79+
}
80+
6781
public void testPadFloat() {
6882
assertArrayEquals(new float[] { 1, 2, 3, 4 }, BQVectorUtils.pad(new float[] { 1, 2, 3, 4 }, 4), DELTA);
6983
assertArrayEquals(new float[] { 1, 2, 3, 4 }, BQVectorUtils.pad(new float[] { 1, 2, 3, 4 }, 3), DELTA);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -180,31 +180,19 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
180180
analyzedPlan(parsed, executionInfo, request.filter(), new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
181181
@Override
182182
public void onResponse(LogicalPlan analyzedPlan) {
183-
optimizeAndExecute(request, executionInfo, planRunner, analyzedPlan, listener);
183+
SubscribableListener.<LogicalPlan>newForked(l -> preOptimizedPlan(analyzedPlan, l))
184+
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(optimizedPlan(p), l))
185+
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
186+
.addListener(listener);
184187
}
185188
});
186189
}
187190

188-
// visible for testing in CsvTests
189-
public void optimizeAndExecute(
190-
EsqlQueryRequest request,
191-
EsqlExecutionInfo executionInfo,
192-
PlanRunner planRunner,
193-
LogicalPlan analyzedPlan,
194-
ActionListener<Result> listener
195-
) {
196-
SubscribableListener.<LogicalPlan>newForked(l -> logicalPlanPreOptimizer.preOptimize(analyzedPlan, l))
197-
.andThenApply(this::optimizedPlan)
198-
.<LogicalPlan>andThen((l, p) -> preMapper.preMapper(p, l))
199-
.<Result>andThen((l, p) -> executeOptimizedPlan(request, executionInfo, planRunner, p, l))
200-
.addListener(listener);
201-
}
202-
203191
/**
204192
* Execute an analyzed plan. Most code should prefer calling {@link #execute} but
205193
* this is public for testing.
206194
*/
207-
private void executeOptimizedPlan(
195+
public void executeOptimizedPlan(
208196
EsqlQueryRequest request,
209197
EsqlExecutionInfo executionInfo,
210198
PlanRunner planRunner,
@@ -804,7 +792,7 @@ private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQu
804792
return EstimatesRowSize.estimateRowSize(0, physicalPlan);
805793
}
806794

807-
private LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
795+
public LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
808796
if (logicalPlan.preOptimized() == false) {
809797
throw new IllegalStateException("Expected pre-optimized plan");
810798
}
@@ -813,7 +801,11 @@ private LogicalPlan optimizedPlan(LogicalPlan logicalPlan) {
813801
return plan;
814802
}
815803

816-
private PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
804+
public void preOptimizedPlan(LogicalPlan logicalPlan, ActionListener<LogicalPlan> listener) {
805+
logicalPlanPreOptimizer.preOptimize(logicalPlan, listener);
806+
}
807+
808+
public PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
817809
if (optimizedPlan.optimized() == false) {
818810
throw new IllegalStateException("Expected optimized plan");
819811
}
@@ -823,7 +815,7 @@ private PhysicalPlan physicalPlan(LogicalPlan optimizedPlan) {
823815
return plan;
824816
}
825817

826-
private PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) {
818+
public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) {
827819
var plan = physicalPlanOptimizer.optimize(physicalPlan(optimizedPlan));
828820
LOGGER.debug("Optimized physical plan:\n{}", plan);
829821
return plan;

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -596,21 +596,28 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception {
596596
TestPhysicalOperationProviders physicalOperationProviders = testOperationProviders(foldCtx, testDatasets);
597597

598598
PlainActionFuture<ActualResults> listener = new PlainActionFuture<>();
599-
session.optimizeAndExecute(
600-
new EsqlQueryRequest(),
601-
new EsqlExecutionInfo(randomBoolean()),
602-
planRunner(bigArrays, foldCtx, physicalOperationProviders),
603-
analyzed,
604-
listener.map(
605-
result -> new ActualResults(
606-
result.schema().stream().map(Attribute::name).toList(),
607-
result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(),
608-
result.schema().stream().map(Attribute::dataType).toList(),
609-
result.pages(),
610-
threadPool.getThreadContext().getResponseHeaders()
599+
600+
session.preOptimizedPlan(analyzed, listener.delegateFailureAndWrap((l, preOptimized) -> {
601+
session.executeOptimizedPlan(
602+
new EsqlQueryRequest(),
603+
new EsqlExecutionInfo(randomBoolean()),
604+
planRunner(bigArrays, foldCtx, physicalOperationProviders),
605+
session.optimizedPlan(preOptimized),
606+
listener.delegateFailureAndWrap(
607+
// Wrap so we can capture the warnings in the calling thread
608+
(next, result) -> next.onResponse(
609+
new ActualResults(
610+
result.schema().stream().map(Attribute::name).toList(),
611+
result.schema().stream().map(a -> Type.asType(a.dataType().nameUpper())).toList(),
612+
result.schema().stream().map(Attribute::dataType).toList(),
613+
result.pages(),
614+
threadPool.getThreadContext().getResponseHeaders()
615+
)
616+
)
611617
)
612-
)
613-
);
618+
);
619+
}));
620+
614621
return listener.get();
615622
}
616623

0 commit comments

Comments
 (0)