Skip to content

Commit 4af5183

Browse files
Speedup MultiTermsAggregator (#123220)
Creating (and more importantly eventually resizing) a fresh stream output makes up a large chunk of the runtime of this aggregation. Also, recursively calling an inline consumer makes this logic even more confusing and adds additional overhead as escape analysis will not be able to remove the allocation of the consumer. => just call a method recursively and reuse the output
1 parent 67293ba commit 4af5183

File tree

1 file changed

+27
-39
lines changed

1 file changed

+27
-39
lines changed

x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java

Lines changed: 27 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.common.util.LongArray;
2525
import org.elasticsearch.common.util.ObjectArray;
2626
import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
27-
import org.elasticsearch.core.CheckedConsumer;
2827
import org.elasticsearch.core.Releasables;
2928
import org.elasticsearch.index.fielddata.FieldData;
3029
import org.elasticsearch.index.fielddata.NumericDoubleValues;
@@ -168,20 +167,6 @@ static List<List<Object>> docTerms(List<TermValues> termValuesList, int doc) thr
168167
return terms;
169168
}
170169

171-
/**
172-
* Packs a list of terms into ByteRef so we can use BytesKeyedBucketOrds
173-
*
174-
* TODO: this is a temporary solution, we should replace it with a more optimal mechanism instead of relying on BytesKeyedBucketOrds
175-
*/
176-
static BytesRef packKey(List<Object> terms) {
177-
try (BytesStreamOutput output = new BytesStreamOutput()) {
178-
output.writeCollection(terms, StreamOutput::writeGenericValue);
179-
return output.bytes().toBytesRef();
180-
} catch (IOException ex) {
181-
throw ExceptionsHelper.convertToRuntime(ex);
182-
}
183-
}
184-
185170
/**
186171
* Unpacks ByteRef back into a list of terms
187172
*
@@ -198,36 +183,39 @@ static List<Object> unpackTerms(BytesRef termsBytes) {
198183
@Override
199184
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, LeafBucketCollector sub) throws IOException {
200185
List<TermValues> termValuesList = termValuesList(aggCtx.getLeafReaderContext());
201-
186+
BytesStreamOutput output = new BytesStreamOutput();
202187
return new LeafBucketCollectorBase(sub, values) {
203188
@Override
204189
public void collect(int doc, long owningBucketOrd) throws IOException {
205190
List<List<Object>> terms = docTerms(termValuesList, doc);
206191
if (terms != null) {
207-
List<Object> path = new ArrayList<>(terms.size());
208-
new CheckedConsumer<Integer, IOException>() {
209-
@Override
210-
public void accept(Integer start) throws IOException {
211-
for (Object term : terms.get(start)) {
212-
if (start == path.size()) {
213-
path.add(term);
214-
} else {
215-
path.set(start, term);
216-
}
217-
if (start < terms.size() - 1) {
218-
this.accept(start + 1);
219-
} else {
220-
long bucketOrd = bucketOrds.add(owningBucketOrd, packKey(path));
221-
if (bucketOrd < 0) { // already seen
222-
bucketOrd = -1 - bucketOrd;
223-
collectExistingBucket(sub, doc, bucketOrd);
224-
} else {
225-
collectBucket(sub, doc, bucketOrd);
226-
}
227-
}
228-
}
192+
doCollect(terms, new ArrayList<>(terms.size()), owningBucketOrd, doc, 0);
193+
}
194+
}
195+
196+
private void doCollect(List<List<Object>> terms, List<Object> path, long owningBucketOrd, int doc, int start)
197+
throws IOException {
198+
for (Object term : terms.get(start)) {
199+
if (start == path.size()) {
200+
path.add(term);
201+
} else {
202+
path.set(start, term);
203+
}
204+
if (start < terms.size() - 1) {
205+
doCollect(terms, path, owningBucketOrd, doc, start + 1);
206+
} else {
207+
// TODO: this is a temporary solution, we should replace it with a more optimal mechanism instead of relying on
208+
// BytesKeyedBucketOrds
209+
output.seek(0L);
210+
output.writeCollection(path, StreamOutput::writeGenericValue);
211+
long bucketOrd = bucketOrds.add(owningBucketOrd, output.bytes().toBytesRef());
212+
if (bucketOrd < 0) { // already seen
213+
bucketOrd = -1 - bucketOrd;
214+
collectExistingBucket(sub, doc, bucketOrd);
215+
} else {
216+
collectBucket(sub, doc, bucketOrd);
229217
}
230-
}.accept(0);
218+
}
231219
}
232220
}
233221
};

0 commit comments

Comments
 (0)