Skip to content

Commit b0de175

Browse files
authored
ESQL: Small topn speed up (elastic#142228)
Replaces the per-row asc/desc tracking with slightly better encodings, paying a small price at encode time to speed up comparisons and remove some per-`n` memory usage. The per row speed up is about 20%. The memory savings tops out around 10% and goes much much lower for complex rows. But the simple rows that get the saving are quite common because of our late materialization. ``` (data) (sortedIn) (topCount) Score Error -> Score Error longs_asc true 1000 0.819 ± 0.034 -> 0.705 ± 0.017 longs_asc false 1000 14.333 ± 0.444 -> 11.609 ± 0.414 longs_desc true 1000 0.844 ± 0.189 -> 0.705 ± 0.016 longs_desc false 1000 14.548 ± 0.663 -> 13.056 ± 0.425 longs_asc true 100000 124.933 ± 25.351 -> 93.528 ± 1.050 longs_asc false 100000 144.747 ± 2.633 -> 112.780 ± 2.195 longs_desc true 100000 120.441 ± 0.991 -> 93.110 ± 1.716 longs_desc false 100000 146.565 ± 2.352 -> 114.205 ± 5.802 ``` This is only topn's performance. You aren't going to see a 20% speed up from something like ``` FROM logs* | WHERE somethingnonindexed | SORT @timestamp DESC | LIMIT 100 ```` Iterating and loading and filtering is quite likely going to be more expensive than topn. But this'll help some. 20% of 20% is still a 4% improvement. I'll take it. Also, I did this to simplify key management for future work. The encoding basically just flips the bits for DESC fields. This lets us run an unsigned comparison across all bytes rather than finding the first mismatch. While I was getting flamegraphs I noticed that checking if a column is in the key took a surprising amount of time. So I cached it up front because that's super easy.
1 parent ad66eb7 commit b0de175

File tree

67 files changed

+1371
-519
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+1371
-519
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/TopNBenchmark.java

Lines changed: 85 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,13 @@
4141
import org.openjdk.jmh.annotations.Warmup;
4242

4343
import java.util.ArrayList;
44+
import java.util.Arrays;
45+
import java.util.Comparator;
4446
import java.util.List;
4547
import java.util.Random;
4648
import java.util.concurrent.TimeUnit;
4749
import java.util.stream.IntStream;
50+
import java.util.stream.Stream;
4851

4952
@Warmup(iterations = 5)
5053
@Measurement(iterations = 7)
@@ -53,7 +56,6 @@
5356
@State(Scope.Thread)
5457
@Fork(1)
5558
public class TopNBenchmark {
56-
private static final BigArrays BIG_ARRAYS = BigArrays.NON_RECYCLING_INSTANCE; // TODO real big arrays?
5759
private static final BlockFactory blockFactory = BlockFactory.getInstance(
5860
new NoopCircuitBreaker("noop"),
5961
BigArrays.NON_RECYCLING_INSTANCE
@@ -66,8 +68,11 @@ public class TopNBenchmark {
6668
private static final String DOUBLES = "doubles";
6769
private static final String BOOLEANS = "booleans";
6870
private static final String BYTES_REFS = "bytes_refs";
69-
private static final String TWO_LONGS = "two_" + LONGS;
70-
private static final String LONGS_AND_BYTES_REFS = LONGS + "_and_" + BYTES_REFS;
71+
72+
private static final String ASC = "_asc";
73+
private static final String DESC = "_desc";
74+
75+
private static final String AND = "_and_";
7176

7277
static {
7378
LogConfigurator.configureESLogging();
@@ -89,7 +94,20 @@ static void selfTest() {
8994
}
9095
}
9196

92-
@Param({ LONGS, INTS, DOUBLES, BOOLEANS, BYTES_REFS, TWO_LONGS, LONGS_AND_BYTES_REFS })
97+
@Param(
98+
{
99+
LONGS + ASC,
100+
LONGS + DESC,
101+
INTS + ASC,
102+
DOUBLES + ASC,
103+
BOOLEANS + ASC,
104+
BYTES_REFS + ASC,
105+
LONGS + ASC + AND + LONGS + ASC,
106+
LONGS + ASC + AND + LONGS + DESC,
107+
LONGS + DESC + AND + LONGS + DESC,
108+
LONGS + ASC + AND + BYTES_REFS + ASC,
109+
LONGS + DESC + AND + BYTES_REFS + DESC }
110+
)
93111
public String data;
94112

95113
@Param({ "true", "false" })
@@ -105,28 +123,10 @@ static void selfTest() {
105123
public int topCount;
106124

107125
private static Operator operator(String data, int topCount, boolean sortedInput) {
108-
int count = switch (data) {
109-
case LONGS, INTS, DOUBLES, BOOLEANS, BYTES_REFS -> 1;
110-
case TWO_LONGS, LONGS_AND_BYTES_REFS -> 2;
111-
default -> throw new IllegalArgumentException("unsupported data type [" + data + "]");
112-
};
113-
List<ElementType> elementTypes = switch (data) {
114-
case LONGS -> List.of(ElementType.LONG);
115-
case INTS -> List.of(ElementType.INT);
116-
case DOUBLES -> List.of(ElementType.DOUBLE);
117-
case BOOLEANS -> List.of(ElementType.BOOLEAN);
118-
case BYTES_REFS -> List.of(ElementType.BYTES_REF);
119-
case TWO_LONGS -> List.of(ElementType.LONG, ElementType.LONG);
120-
case LONGS_AND_BYTES_REFS -> List.of(ElementType.LONG, ElementType.BYTES_REF);
121-
default -> throw new IllegalArgumentException("unsupported data type [" + data + "]");
122-
};
123-
List<TopNEncoder> encoders = switch (data) {
124-
case LONGS, INTS, DOUBLES, BOOLEANS -> List.of(TopNEncoder.DEFAULT_SORTABLE);
125-
case BYTES_REFS -> List.of(TopNEncoder.UTF8);
126-
case TWO_LONGS -> List.of(TopNEncoder.DEFAULT_SORTABLE, TopNEncoder.DEFAULT_SORTABLE);
127-
case LONGS_AND_BYTES_REFS -> List.of(TopNEncoder.DEFAULT_SORTABLE, TopNEncoder.UTF8);
128-
default -> throw new IllegalArgumentException("unsupported data type [" + data + "]");
129-
};
126+
String[] dataSpec = data.split("_and_");
127+
List<ElementType> elementTypes = Arrays.stream(dataSpec).map(TopNBenchmark::elementType).toList();
128+
List<TopNEncoder> encoders = Arrays.stream(dataSpec).map(TopNBenchmark::encoder).toList();
129+
List<TopNOperator.SortOrder> sortOrders = IntStream.range(0, dataSpec.length).mapToObj(c -> sortOrder(c, dataSpec[c])).toList();
130130
CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(
131131
CircuitBreakerMetrics.NOOP,
132132
Settings.EMPTY,
@@ -139,78 +139,102 @@ private static Operator operator(String data, int topCount, boolean sortedInput)
139139
topCount,
140140
elementTypes,
141141
encoders,
142-
IntStream.range(0, count).mapToObj(c -> new TopNOperator.SortOrder(c, true, false)).toList(),
142+
sortOrders,
143143
8 * 1024,
144144
sortedInput ? TopNOperator.InputOrdering.SORTED : TopNOperator.InputOrdering.NOT_SORTED
145145
);
146146
}
147147

148+
private static ElementType elementType(String data) {
149+
return switch (data.replace(ASC, "").replace(DESC, "")) {
150+
case LONGS -> ElementType.LONG;
151+
case INTS -> ElementType.INT;
152+
case DOUBLES -> ElementType.DOUBLE;
153+
case BOOLEANS -> ElementType.BOOLEAN;
154+
case BYTES_REFS -> ElementType.BYTES_REF;
155+
default -> throw new IllegalArgumentException("unsupported data type [" + data + "]");
156+
};
157+
}
158+
159+
private static TopNEncoder encoder(String data) {
160+
return switch (data.replace(ASC, "").replace(DESC, "")) {
161+
case LONGS, INTS, DOUBLES, BOOLEANS -> TopNEncoder.DEFAULT_SORTABLE;
162+
case BYTES_REFS -> TopNEncoder.UTF8;
163+
default -> throw new IllegalArgumentException("unsupported data type [" + data + "]");
164+
};
165+
}
166+
167+
private static boolean ascDesc(String data) {
168+
if (data.endsWith(ASC)) {
169+
return true;
170+
} else if (data.endsWith(DESC)) {
171+
return false;
172+
} else {
173+
throw new IllegalArgumentException("data neither asc nor desc: " + data);
174+
}
175+
}
176+
177+
private static TopNOperator.SortOrder sortOrder(int channel, String data) {
178+
return new TopNOperator.SortOrder(channel, ascDesc(data), false);
179+
}
180+
148181
private static void checkExpected(int topCount, List<Page> pages) {
149182
if (topCount != pages.stream().mapToLong(Page::getPositionCount).sum()) {
150183
throw new AssertionError("expected [" + topCount + "] but got [" + pages.size() + "]");
151184
}
152185
}
153186

154-
private static Page page(String data) {
155-
return switch (data) {
156-
case TWO_LONGS -> new Page(block(LONGS), block(LONGS));
157-
case LONGS_AND_BYTES_REFS -> new Page(block(LONGS), block(BYTES_REFS));
158-
default -> new Page(block(data));
159-
};
187+
private static Page page(boolean sortedInput, String data) {
188+
String[] dataSpec = data.split("_and_");
189+
return new Page(Arrays.stream(dataSpec).map(d -> block(sortedInput, d)).toArray(Block[]::new));
160190
}
161191

162192
// This creates blocks with uniformly random distributed and sorted data
163-
private static Block block(String data) {
164-
return switch (data) {
193+
private static Block block(boolean sortedInput, String data) {
194+
return switch (data.replace(ASC, "").replace(DESC, "")) {
165195
case LONGS -> {
166196
var builder = blockFactory.newLongBlockBuilder(BLOCK_LENGTH);
167-
168-
new Random().longs(BLOCK_LENGTH, 0, Long.MAX_VALUE).sorted().forEachOrdered(builder::appendLong);
169-
197+
maybeSort(sortedInput, data, new Random().longs(BLOCK_LENGTH, 0, Long.MAX_VALUE).boxed()).forEach(builder::appendLong);
170198
yield builder.build();
171199
}
172200
case INTS -> {
173201
var builder = blockFactory.newIntBlockBuilder(BLOCK_LENGTH);
174-
175-
new Random().ints(BLOCK_LENGTH, 0, Integer.MAX_VALUE).sorted().forEachOrdered(builder::appendInt);
176-
202+
maybeSort(sortedInput, data, new Random().ints(BLOCK_LENGTH, 0, Integer.MAX_VALUE).boxed()).forEach(builder::appendInt);
177203
yield builder.build();
178204
}
179205
case DOUBLES -> {
180206
var builder = blockFactory.newDoubleBlockBuilder(BLOCK_LENGTH);
181-
182-
new Random().doubles(BLOCK_LENGTH, 0, Double.MAX_VALUE).sorted().forEachOrdered(builder::appendDouble);
183-
207+
maybeSort(sortedInput, data, new Random().doubles(BLOCK_LENGTH, 0, Double.MAX_VALUE).boxed()).forEach(
208+
builder::appendDouble
209+
);
184210
yield builder.build();
185211
}
186212
case BOOLEANS -> {
187213
BooleanBlock.Builder builder = blockFactory.newBooleanBlockBuilder(BLOCK_LENGTH);
188-
189-
int falseCount = BLOCK_LENGTH / 2;
190-
int trueCount = BLOCK_LENGTH - falseCount;
191-
192-
for (int i = 0; i < falseCount; i++) {
193-
builder.appendBoolean(false);
194-
}
195-
for (int i = 0; i < trueCount; i++) {
196-
builder.appendBoolean(true);
197-
}
198-
214+
maybeSort(sortedInput, data, new Random().ints(BLOCK_LENGTH, 0, 1).boxed()).forEach(i -> builder.appendBoolean(i == 1));
199215
yield builder.build();
200216
}
201217
case BYTES_REFS -> {
202218
BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(BLOCK_LENGTH);
203-
new Random().ints(BLOCK_LENGTH, 0, Integer.MAX_VALUE)
204-
.mapToObj(Integer::toString)
205-
.sorted()
206-
.forEachOrdered(s -> builder.appendBytesRef(new BytesRef(s)));
207-
219+
maybeSort(sortedInput, data, new Random().ints(BLOCK_LENGTH, 0, Integer.MAX_VALUE).boxed()).forEach(
220+
i -> builder.appendBytesRef(new BytesRef(i.toString()))
221+
);
208222
yield builder.build();
209223
}
210224
default -> throw new UnsupportedOperationException("unsupported data [" + data + "]");
211225
};
212226
}
213227

228+
private static <T extends Comparable<T>> List<T> maybeSort(boolean sortedInput, String data, Stream<T> randomValues) {
229+
List<T> values = new ArrayList<>();
230+
randomValues.forEachOrdered(values::add);
231+
if (sortedInput) {
232+
values.sort(Comparator.naturalOrder());
233+
return ascDesc(data) ? values : values.reversed();
234+
}
235+
return values;
236+
}
237+
214238
@Benchmark
215239
@OperationsPerInvocation(1024 * BLOCK_LENGTH)
216240
public void run() {
@@ -219,7 +243,7 @@ public void run() {
219243

220244
private static void run(String data, int topCount, boolean sortedInput) {
221245
try (Operator operator = operator(data, topCount, sortedInput)) {
222-
Page page = page(data);
246+
Page page = page(sortedInput, data);
223247
for (int i = 0; i < 1024; i++) {
224248
operator.addInput(page.shallowCopy());
225249
}

test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.lucene.tests.util.TestRuleMarkFailure;
4141
import org.apache.lucene.tests.util.TestUtil;
4242
import org.apache.lucene.tests.util.TimeUnits;
43+
import org.apache.lucene.util.BytesRef;
4344
import org.apache.lucene.util.SetOnce;
4445
import org.elasticsearch.ElasticsearchWrapperException;
4546
import org.elasticsearch.ExceptionsHelper;
@@ -3156,4 +3157,23 @@ public static ProjectState projectStateWithEmptyProject() {
31563157
public static ProjectMetadata emptyProject() {
31573158
return ProjectMetadata.builder(randomProjectIdOrDefault()).build();
31583159
}
3160+
3161+
/**
3162+
* Insert random garbage {@link BytesRef}
3163+
*/
3164+
public static BytesRef embedInRandomBytes(BytesRef bytesRef) {
3165+
var offset = randomIntBetween(0, 10);
3166+
var extraLength = randomIntBetween(offset == 0 ? 1 : 0, 10);
3167+
var newBytesArray = randomByteArrayOfLength(bytesRef.length + offset + extraLength);
3168+
3169+
for (int i = 0; i < offset; i++) {
3170+
newBytesArray[i] = randomByte();
3171+
}
3172+
System.arraycopy(bytesRef.bytes, bytesRef.offset, newBytesArray, offset, bytesRef.length);
3173+
for (int i = offset + bytesRef.length; i < newBytesArray.length; i++) {
3174+
newBytesArray[i] = randomByte();
3175+
}
3176+
3177+
return new BytesRef(newBytesArray, offset, bytesRef.length);
3178+
}
31593179
}

test/framework/src/test/java/org/elasticsearch/test/test/ESTestCaseTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import junit.framework.AssertionFailedError;
1313

14+
import org.apache.lucene.util.BytesRef;
1415
import org.apache.lucene.util.Version;
1516
import org.elasticsearch.common.bytes.BytesReference;
1617
import org.elasticsearch.common.time.DateFormatter;
@@ -499,4 +500,11 @@ public void testAssertArrayEqualsPercentElementsAreWithinAbsoluteDelta() {
499500
assertThat(ex.getMessage(), startsWith("arrays first differed at element [3]"));
500501
assertArrayEqualsPercent(expected, actual, 0.001f, 0.02f);
501502
}
503+
504+
public void testEmbedInRandomBytes() {
505+
BytesRef v = new BytesRef(randomByteArrayOfLength(between(1, 10)));
506+
BytesRef withGarbage = embedInRandomBytes(v);
507+
assertThat(withGarbage, equalTo(v));
508+
assertThat(withGarbage.bytes.length, greaterThan(withGarbage.length));
509+
}
502510
}

0 commit comments

Comments
 (0)