Skip to content

Commit af6a820

Browse files
committed
Some optimizations for constant blocks
1 parent 89ad24f commit af6a820

File tree

8 files changed

+202
-21
lines changed

8 files changed

+202
-21
lines changed

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/blockhash/BytesRefBlockHash.java

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunction.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,13 @@ private void addRawInput(int positionOffset, IntBigArrayBlock groups, Block valu
154154
* This method is called for count all.
155155
*/
156156
private void addRawInput(IntVector groups) {
157-
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
158-
int groupId = groups.getInt(groupPosition);
159-
state.increment(groupId, 1);
157+
if (groups.isConstant()) {
158+
state.increment(groups.getInt(0), groups.getPositionCount());
159+
} else {
160+
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
161+
int groupId = groups.getInt(groupPosition);
162+
state.increment(groupId, 1);
163+
}
160164
}
161165
}
162166

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregators.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,14 @@ static void addOrdinalInputVector(
178178
IntVector ordinalIds,
179179
IntVector hashIds
180180
) {
181-
for (int p = 0; p < groupIds.getPositionCount(); p++) {
182-
int groupId = groupIds.getInt(p);
183-
int ord = ordinalIds.getInt(p + positionOffset);
184-
state.addValueOrdinal(groupId, hashIds.getInt(ord));
181+
if (groupIds.isConstant() && hashIds.isConstant()) {
182+
state.addValueOrdinal(groupIds.getInt(0), hashIds.getInt(0));
183+
} else {
184+
for (int p = 0; p < groupIds.getPositionCount(); p++) {
185+
int groupId = groupIds.getInt(p);
186+
int ord = ordinalIds.getInt(p + positionOffset);
187+
state.addValueOrdinal(groupId, hashIds.getInt(ord));
188+
}
185189
}
186190
}
187191

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BytesRefLongBlockHash.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,23 @@ public void add(IntBlock bytesHashes, LongBlock longsBlock, GroupingAggregatorFu
9898
}
9999

100100
public IntVector add(IntVector bytesHashes, LongVector longsVector) {
101+
if (bytesHashes.isConstant()) {
102+
boolean singleValue = true;
103+
// TODO: Should we also support num_runs? Should we add a constant flag to Vector and perform this check in EVAL instead?
104+
if (longsVector.isConstant() == false) {
105+
long firstValue = longsVector.getLong(0);
106+
for (int i = 1; i < longsVector.getPositionCount(); i++) {
107+
if (firstValue != longsVector.getLong(i)) {
108+
singleValue = false;
109+
break;
110+
}
111+
}
112+
}
113+
if (singleValue) {
114+
int singleOrd = Math.toIntExact(hashOrdToGroup(finalHash.add(bytesHashes.getInt(0), longsVector.getLong(0))));
115+
return blockFactory.newConstantIntVector(singleOrd, longsVector.getPositionCount());
116+
}
117+
}
101118
int positions = bytesHashes.getPositionCount();
102119
final int[] ords = new int[positions];
103120
for (int i = 0; i < positions; i++) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/X-BlockHash.java.st

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ final class $Type$BlockHash extends BlockHash {
109109
*/
110110
IntVector add($Type$Vector vector) {
111111
$if(BytesRef)$
112+
if (vector.isConstant()) {
113+
BytesRef v = vector.getBytesRef(0, new BytesRef());
114+
int groupId = Math.toIntExact(hashOrdToGroupNullReserved(hash.add(v)));
115+
return blockFactory.newConstantIntVector(groupId, vector.getPositionCount());
116+
}
112117
var ordinals = vector.asOrdinals();
113118
if (ordinals != null) {
114119
return addOrdinalsVector(ordinals);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/SingletonOrdinalsBuilder.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.compute.data.BytesRefBlock;
1616
import org.elasticsearch.compute.data.BytesRefVector;
1717
import org.elasticsearch.compute.data.IntBlock;
18+
import org.elasticsearch.compute.data.IntVector;
1819
import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
1920
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
2021
import org.elasticsearch.core.Releasable;
@@ -64,6 +65,41 @@ public SingletonOrdinalsBuilder endPositionEntry() {
6465
throw new UnsupportedOperationException("should only have one value per doc");
6566
}
6667

68+
BytesRefBlock tryBuildConstantBlock() {
69+
if (minOrd == maxOrd) {
70+
boolean seenNulls = false;
71+
for (int ord : ords) {
72+
if (ord == -1) {
73+
seenNulls = true;
74+
break;
75+
}
76+
}
77+
if (seenNulls == false) {
78+
final BytesRef v;
79+
try {
80+
v = BytesRef.deepCopyOf(docValues.lookupOrd(minOrd));
81+
} catch (IOException e) {
82+
throw new UncheckedIOException("failed to lookup ordinals", e);
83+
}
84+
BytesRefVector bytes = null;
85+
IntVector ordinals = null;
86+
boolean success = false;
87+
try {
88+
bytes = blockFactory.newConstantBytesRefVector(v, 1);
89+
ordinals = blockFactory.newConstantIntVector(0, ords.length);
90+
final var result = new OrdinalBytesRefBlock(ordinals.asBlock(), bytes);
91+
success = true;
92+
return result;
93+
} finally {
94+
if (success == false) {
95+
Releasables.close(bytes, ordinals);
96+
}
97+
}
98+
}
99+
}
100+
return null;
101+
}
102+
67103
BytesRefBlock buildOrdinal() {
68104
int valueCount = maxOrd - minOrd + 1;
69105
long breakerSize = ordsSize(valueCount);
@@ -172,6 +208,10 @@ public long estimatedBytes() {
172208

173209
@Override
174210
public BytesRefBlock build() {
211+
var constantBlock = tryBuildConstantBlock();
212+
if (constantBlock != null) {
213+
return constantBlock;
214+
}
175215
return shouldBuildOrdinalsBlock() ? buildOrdinal() : buildRegularBlock();
176216
}
177217

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1715,4 +1715,76 @@ private BlockHash buildBlockHash(int emitBatchSize, Block... values) {
17151715
? new PackedValuesBlockHash(specs, blockFactory, emitBatchSize)
17161716
: BlockHash.build(specs, blockFactory, emitBatchSize, true);
17171717
}
1718+
1719+
public void testConstant() {
1720+
try (
1721+
var hash = new BytesRefLongBlockHash(blockFactory, 0, 1, false, randomIntBetween(1, 1000));
1722+
var bytesHash = new BytesRefBlockHash(0, blockFactory)
1723+
) {
1724+
int iters = between(1, 20);
1725+
for (int i = 0; i < iters; i++) {
1726+
final BytesRefBlock bytes;
1727+
final LongBlock longs;
1728+
final boolean constantInput = randomBoolean();
1729+
final int positions = randomIntBetween(1, 100);
1730+
if (constantInput) {
1731+
bytes = blockFactory.newConstantBytesRefBlockWith(new BytesRef(randomAlphaOfLength(10)), positions);
1732+
if (randomBoolean()) {
1733+
try (IntVector hashIds = bytesHash.add(bytes.asVector())) {
1734+
assertTrue(hashIds.isConstant());
1735+
}
1736+
}
1737+
if (randomBoolean()) {
1738+
longs = blockFactory.newConstantLongBlockWith(randomNonNegativeLong(), positions);
1739+
} else {
1740+
long value = randomNonNegativeLong();
1741+
try (var builder = blockFactory.newLongVectorFixedBuilder(positions)) {
1742+
for (int p = 0; p < positions; p++) {
1743+
builder.appendLong(value);
1744+
}
1745+
longs = builder.build().asBlock();
1746+
}
1747+
}
1748+
} else {
1749+
try (var builder = blockFactory.newBytesRefBlockBuilder(positions)) {
1750+
for (int p = 0; p < positions; p++) {
1751+
builder.appendBytesRef(new BytesRef(randomAlphaOfLength(10)));
1752+
}
1753+
bytes = builder.build();
1754+
}
1755+
try (var builder = blockFactory.newLongVectorFixedBuilder(positions)) {
1756+
for (int p = 0; p < positions; p++) {
1757+
builder.appendLong(randomNonNegativeLong());
1758+
}
1759+
longs = builder.build().asBlock();
1760+
}
1761+
}
1762+
try (Page page = new Page(bytes, longs)) {
1763+
hash.add(page, new GroupingAggregatorFunction.AddInput() {
1764+
@Override
1765+
public void add(int positionOffset, IntArrayBlock groupIds) {
1766+
fail("should not call IntArrayBlock");
1767+
}
1768+
1769+
@Override
1770+
public void add(int positionOffset, IntBigArrayBlock groupIds) {
1771+
fail("should not call IntBigArrayBlock");
1772+
}
1773+
1774+
@Override
1775+
public void add(int positionOffset, IntVector groupIds) {
1776+
if (constantInput) {
1777+
assertTrue(groupIds.isConstant());
1778+
}
1779+
}
1780+
1781+
@Override
1782+
public void close() {
1783+
1784+
}
1785+
});
1786+
}
1787+
}
1788+
}
1789+
}
17181790
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/SingletonOrdinalsBuilderTests.java

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,18 @@
1414
import org.apache.lucene.index.IndexWriterConfig;
1515
import org.apache.lucene.index.LeafReaderContext;
1616
import org.apache.lucene.index.SortedDocValues;
17+
import org.apache.lucene.search.Sort;
18+
import org.apache.lucene.search.SortField;
1719
import org.apache.lucene.store.Directory;
1820
import org.apache.lucene.tests.index.RandomIndexWriter;
1921
import org.apache.lucene.util.BytesRef;
2022
import org.elasticsearch.common.breaker.CircuitBreakingException;
2123
import org.elasticsearch.compute.data.BlockFactory;
2224
import org.elasticsearch.compute.data.BytesRefBlock;
25+
import org.elasticsearch.compute.data.BytesRefVector;
2326
import org.elasticsearch.compute.test.ComputeTestCase;
27+
import org.elasticsearch.index.mapper.BlockLoader;
28+
import org.elasticsearch.index.mapper.KeywordFieldMapper;
2429
import org.elasticsearch.indices.CrankyCircuitBreakerService;
2530

2631
import java.io.IOException;
@@ -38,13 +43,13 @@
3843
public class SingletonOrdinalsBuilderTests extends ComputeTestCase {
3944

4045
public void testReader() throws IOException {
41-
testRead(blockFactory());
46+
testRead(blockFactory(), randomBoolean());
4247
}
4348

4449
public void testReadWithCranky() throws IOException {
4550
var factory = crankyBlockFactory();
4651
try {
47-
testRead(factory);
52+
testRead(factory, randomBoolean());
4853
// If we made it this far cranky didn't fail us!
4954
} catch (CircuitBreakingException e) {
5055
logger.info("cranky", e);
@@ -53,30 +58,59 @@ public void testReadWithCranky() throws IOException {
5358
assertThat(factory.breaker().getUsed(), equalTo(0L));
5459
}
5560

56-
private void testRead(BlockFactory factory) throws IOException {
61+
private void testRead(BlockFactory factory, boolean withIndexSorts) throws IOException {
5762
int count = 1000;
58-
try (Directory directory = newDirectory(); RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
63+
var config = new IndexWriterConfig();
64+
if (withIndexSorts) {
65+
config.setIndexSort(new Sort(new SortField("f", SortField.Type.STRING)));
66+
}
67+
try (Directory directory = newDirectory(); RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, config)) {
5968
for (int i = 0; i < count; i++) {
6069
for (BytesRef v : new BytesRef[] { new BytesRef("a"), new BytesRef("b"), new BytesRef("c"), new BytesRef("d") }) {
6170
indexWriter.addDocument(List.of(new SortedDocValuesField("f", v)));
6271
}
6372
}
6473
Map<String, Integer> counts = new HashMap<>();
74+
var keywordField = new KeywordFieldMapper.KeywordFieldType("f");
75+
var blockLoader = keywordField.blockLoader(ValuesSourceReaderOperatorTests.blContext());
76+
var blockFactory = new ComputeBlockLoaderFactory(factory);
6577
try (IndexReader reader = indexWriter.getReader()) {
6678
for (LeafReaderContext ctx : reader.leaves()) {
67-
SortedDocValues docValues = ctx.reader().getSortedDocValues("f");
68-
try (SingletonOrdinalsBuilder builder = new SingletonOrdinalsBuilder(factory, docValues, ctx.reader().numDocs())) {
69-
for (int i = 0; i < ctx.reader().maxDoc(); i++) {
70-
if (ctx.reader().getLiveDocs() == null || ctx.reader().getLiveDocs().get(i)) {
71-
assertThat(docValues.advanceExact(i), equalTo(true));
72-
builder.appendOrd(docValues.ordValue());
79+
int start = 0;
80+
int numDocs = ctx.reader().numDocs();
81+
while (start < numDocs) {
82+
int end = start + randomIntBetween(1, numDocs - start);
83+
BlockLoader.Docs docs = new BlockLoader.Docs() {
84+
@Override
85+
public int count() {
86+
return end;
7387
}
74-
}
75-
try (BytesRefBlock build = buildOrdinalsBuilder(builder)) {
76-
for (int i = 0; i < build.getPositionCount(); i++) {
77-
counts.merge(build.getBytesRef(i, new BytesRef()).utf8ToString(), 1, (lhs, rhs) -> lhs + rhs);
88+
89+
@Override
90+
public int get(int i) {
91+
return i;
92+
}
93+
};
94+
var columnAtATimeReader = blockLoader.columnAtATimeReader(ctx);
95+
try (BlockLoader.Block block = columnAtATimeReader.read(blockFactory, docs, start)) {
96+
BytesRefBlock result = (BytesRefBlock) block;
97+
BytesRef scratch = new BytesRef();
98+
for (int i = 0; i < result.getPositionCount(); i++) {
99+
counts.merge(result.getBytesRef(i, scratch).utf8ToString(), 1, (lhs, rhs) -> lhs + rhs);
100+
}
101+
boolean singleValue = true;
102+
BytesRef first = BytesRef.deepCopyOf(result.getBytesRef(0, scratch));
103+
for (int i = 1; i < result.getPositionCount(); i++) {
104+
if (first.equals(result.getBytesRef(i, scratch)) == false) {
105+
singleValue = false;
106+
break;
107+
}
78108
}
109+
BytesRefVector valuesVector = result.asVector();
110+
assertNotNull(valuesVector);
111+
assertThat(valuesVector.isConstant(), equalTo(singleValue));
79112
}
113+
start = end;
80114
}
81115
}
82116
}

0 commit comments

Comments
 (0)