diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java index 30f8409356d9c..eb5748d3647fc 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.index.mapper.blockloader.docvalues.BlockDocValuesReader; @@ -377,6 +378,13 @@ interface Docs { * also a test implementation, but there may be no more other implementations. */ interface BlockFactory { + /** + * Adjust the circuit breaker with the given delta, if the delta is negative, the breaker will + * be adjusted without tripping. + * @throws CircuitBreakingException if the breaker was put above its limit + */ + void adjustBreaker(long delta) throws CircuitBreakingException; + /** * Build a builder to load booleans as loaded from doc values. Doc values * load booleans in sorted order. @@ -486,6 +494,12 @@ interface BlockFactory { */ Block constantBytes(BytesRef value, int count); + /** + * Build a block that contains {@code value} repeated + * {@code count} times. + */ + Block constantInt(int value, int count); + /** * Build a reader for reading {@link SortedDocValues} */ diff --git a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/Utf8CodePointsFromOrdsBlockLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/Utf8CodePointsFromOrdsBlockLoader.java new file mode 100644 index 0000000000000..730ef2a58a6f3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/Utf8CodePointsFromOrdsBlockLoader.java @@ -0,0 +1,466 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.mapper.blockloader.docvalues; + +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.lucene.util.UnicodeUtil; + +import java.io.IOException; +import java.util.Arrays; + +import static org.elasticsearch.index.mapper.blockloader.docvalues.Warnings.registerSingleValueWarning; + +/** + * A count of utf-8 code points for {@code keyword} style fields that are stored as a lookup table. + */ +public class Utf8CodePointsFromOrdsBlockLoader extends BlockDocValuesReader.DocValuesBlockLoader { + /** + * When there are fewer than this many unique values we use much more efficient "low cardinality" + * loaders. This must be fairly small because we build an untracked int[] with at most this many + * entries for a cache. + */ + static final int LOW_CARDINALITY = 1024; + + private final Warnings warnings; + + private final String fieldName; + + public Utf8CodePointsFromOrdsBlockLoader(Warnings warnings, String fieldName) { + this.fieldName = fieldName; + this.warnings = warnings; + } + + @Override + public IntBuilder builder(BlockFactory factory, int expectedCount) { + return factory.ints(expectedCount); + } + + @Override + public AllReader reader(LeafReaderContext context) throws IOException { + SortedSetDocValues docValues = context.reader().getSortedSetDocValues(fieldName); + if (docValues != null) { + if (docValues.getValueCount() > LOW_CARDINALITY) { + return new ImmediateOrdinals(warnings, docValues); + } + SortedDocValues singleton = DocValues.unwrapSingleton(docValues); + if (singleton != null) { + return new SingletonOrdinals(singleton); + } + return new Ordinals(warnings, docValues); + } + SortedDocValues singleton = context.reader().getSortedDocValues(fieldName); + if (singleton != null) { + if (singleton.getValueCount() > LOW_CARDINALITY) { + return new ImmediateOrdinals(warnings, DocValues.singleton(singleton)); + } + return new SingletonOrdinals(singleton); + } + return new ConstantNullsReader(); + } + + @Override + public String toString() { + return "Utf8CodePointsFromOrds[" + fieldName + "]"; + } + + /** + * Loads low cardinality singleton ordinals in using a cache of code point counts. + *
+ * If we haven't cached the counts for all ordinals then the process looks like: + *
+ *+ * If we have cached the counts for all ordinals we load the + * ordinals and look them up in the cache immediately. + *
+ */ + private static class SingletonOrdinals extends BlockDocValuesReader { + private final SortedDocValues ordinals; + private final int[] cache; + + private int cacheEntriesFilled; + + SingletonOrdinals(SortedDocValues ordinals) { + this.ordinals = ordinals; + + // TODO track this memory. we can't yet because this isn't Closeable + this.cache = new int[ordinals.getValueCount()]; + Arrays.fill(this.cache, -1); + } + + @Override + public Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException { + if (docs.count() - offset == 1) { + return blockForSingleDoc(factory, docs.get(offset)); + } + + if (cacheEntriesFilled == cache.length) { + return buildFromFilledCache(factory, docs, offset); + } + + int[] ords = readOrds(factory, docs, offset); + try { + fillCache(factory, ords); + return buildFromCache(factory, cache, ords); + } finally { + factory.adjustBreaker(-RamUsageEstimator.sizeOf(ords)); + } + } + + @Override + public void read(int docId, StoredFields storedFields, Builder builder) throws IOException { + if (ordinals.advanceExact(docId)) { + ((IntBuilder) builder).appendInt(codePointsAtOrd(ordinals.ordValue())); + } else { + builder.appendNull(); + } + } + + @Override + public int docId() { + return ordinals.docID(); + } + + @Override + public String toString() { + return "Utf8CodePointsFromOrds.SingletonOrdinals"; + } + + private Block blockForSingleDoc(BlockFactory factory, int docId) throws IOException { + if (ordinals.advanceExact(docId)) { + return factory.constantInt(codePointsAtOrd(ordinals.ordValue()), 1); + } else { + return factory.constantNulls(1); + } + } + + private int[] readOrds(BlockFactory factory, Docs docs, int offset) throws IOException { + int count = docs.count() - offset; + long size = sizeOfArray(count); + factory.adjustBreaker(size); + int[] ords = null; + try { + ords = new int[count]; + for (int i = offset; i < docs.count(); i++) { + int doc = docs.get(i); + if (ordinals.advanceExact(doc) == false) { + ords[i] = -1; + continue; + } + ords[i] = ordinals.ordValue(); + } + int[] result = ords; + ords = null; + return result; + } finally { + if (ords != null) { + factory.adjustBreaker(-size); + } + } + } + + private void fillCache(BlockFactory factory, int[] ords) throws IOException { + factory.adjustBreaker(RamUsageEstimator.sizeOf(ords)); + try { + int[] sortedOrds = ords.clone(); + Arrays.sort(sortedOrds); + int i = 0; + while (i < sortedOrds.length && sortedOrds[i] < 0) { + i++; + } + while (i < sortedOrds.length) { + // Fill the cache. Duplicates will noop quickly. + codePointsAtOrd(sortedOrds[i++]); + } + } finally { + factory.adjustBreaker(-RamUsageEstimator.sizeOf(ords)); + } + } + + private Block buildFromFilledCache(BlockFactory factory, Docs docs, int offset) throws IOException { + int count = docs.count() - offset; + try (IntBuilder builder = factory.ints(count)) { + for (int i = offset; i < docs.count(); i++) { + int doc = docs.get(i); + if (ordinals.advanceExact(doc) == false) { + builder.appendNull(); + continue; + } + builder.appendInt(cache[ordinals.ordValue()]); + } + return builder.build(); + } + } + + private int codePointsAtOrd(int ord) throws IOException { + if (cache[ord] >= 0) { + return cache[ord]; + } + BytesRef v = ordinals.lookupOrd(ord); + int count = UnicodeUtil.codePointCount(v); + cache[ord] = count; + cacheEntriesFilled++; + return count; + } + } + + /** + * Loads low cardinality non-singleton ordinals in using a cache of code point counts. + * See {@link SingletonOrdinals} for the process + */ + private static class Ordinals extends BlockDocValuesReader { + private final Warnings warnings; + private final SortedSetDocValues ordinals; + private final int[] cache; + + private int cacheEntriesFilled; + + Ordinals(Warnings warnings, SortedSetDocValues ordinals) { + this.warnings = warnings; + this.ordinals = ordinals; + + // TODO track this memory. we can't yet because this isn't Releasable + this.cache = new int[Math.toIntExact(ordinals.getValueCount())]; + Arrays.fill(this.cache, -1); + } + + @Override + public Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException { + if (docs.count() - offset == 1) { + return blockForSingleDoc(factory, docs.get(offset)); + } + + if (cacheEntriesFilled == cache.length) { + return buildFromFilledCache(factory, docs, offset); + } + + int[] ords = readOrds(factory, docs, offset); + try { + fillCache(factory, ords); + return buildFromCache(factory, cache, ords); + } finally { + factory.adjustBreaker(-RamUsageEstimator.shallowSizeOf(ords)); + } + } + + @Override + public void read(int docId, StoredFields storedFields, Builder builder) throws IOException { + if (ordinals.advanceExact(docId) == false) { + builder.appendNull(); + return; + } + if (ordinals.docValueCount() != 1) { + registerSingleValueWarning(warnings); + builder.appendNull(); + return; + } + ((IntBuilder) builder).appendInt(codePointsAtOrd(Math.toIntExact(ordinals.nextOrd()))); + } + + @Override + public int docId() { + return ordinals.docID(); + } + + @Override + public String toString() { + return "Utf8CodePointsFromOrds.Ordinals"; + } + + private Block blockForSingleDoc(BlockFactory factory, int docId) throws IOException { + if (ordinals.advanceExact(docId) == false) { + return factory.constantNulls(1); + } + if (ordinals.docValueCount() == 1) { + return factory.constantInt(codePointsAtOrd(Math.toIntExact(ordinals.nextOrd())), 1); + } + registerSingleValueWarning(warnings); + return factory.constantNulls(1); + } + + private int[] readOrds(BlockFactory factory, Docs docs, int offset) throws IOException { + int count = docs.count() - offset; + long size = sizeOfArray(count); + factory.adjustBreaker(size); + int[] ords = null; + try { + ords = new int[docs.count() - offset]; + for (int i = offset; i < docs.count(); i++) { + int doc = docs.get(i); + if (ordinals.advanceExact(doc) == false) { + ords[i] = -1; + continue; + } + if (ordinals.docValueCount() != 1) { + registerSingleValueWarning(warnings); + ords[i] = -1; + continue; + } + ords[i] = Math.toIntExact(ordinals.nextOrd()); + } + int[] result = ords; + ords = null; + return result; + } finally { + if (ords != null) { + factory.adjustBreaker(-size); + } + } + } + + private void fillCache(BlockFactory factory, int[] ords) throws IOException { + factory.adjustBreaker(RamUsageEstimator.sizeOf(ords)); + try { + int[] sortedOrds = ords.clone(); + Arrays.sort(sortedOrds); + int i = 0; + while (i < sortedOrds.length && sortedOrds[i] < 0) { + i++; + } + while (i < sortedOrds.length) { + // Fill the cache. Duplicates will noop quickly. + codePointsAtOrd(sortedOrds[i++]); + } + } finally { + factory.adjustBreaker(-RamUsageEstimator.sizeOf(ords)); + } + } + + private Block buildFromFilledCache(BlockFactory factory, Docs docs, int offset) throws IOException { + int count = docs.count() - offset; + try (IntBuilder builder = factory.ints(count)) { + for (int i = offset; i < docs.count(); i++) { + int doc = docs.get(i); + if (ordinals.advanceExact(doc) == false) { + builder.appendNull(); + continue; + } + if (ordinals.docValueCount() != 1) { + registerSingleValueWarning(warnings); + builder.appendNull(); + continue; + } + builder.appendInt(cache[Math.toIntExact(ordinals.nextOrd())]); + } + return builder.build(); + } + } + + private int codePointsAtOrd(int ord) throws IOException { + if (cache[ord] >= 0) { + return cache[ord]; + } + BytesRef v = ordinals.lookupOrd(ord); + int count = UnicodeUtil.codePointCount(v); + cache[ord] = count; + cacheEntriesFilled++; + return count; + } + } + + /** + * Loads a count of utf-8 code points for each ordinal doc by doc, without a cache. We use this when there + * are many unique doc values and the cache hit rate is unlikely to be high. + */ + private static class ImmediateOrdinals extends BlockDocValuesReader { + private final Warnings warnings; + private final SortedSetDocValues ordinals; + + ImmediateOrdinals(Warnings warnings, SortedSetDocValues ordinals) { + this.ordinals = ordinals; + this.warnings = warnings; + } + + @Override + public Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException { + if (docs.count() - offset == 1) { + return blockForSingleDoc(factory, docs.get(offset)); + } + try (IntBuilder builder = factory.ints(docs.count() - offset)) { + for (int i = offset; i < docs.count(); i++) { + read(docs.get(i), builder); + } + return builder.build(); + } + } + + @Override + public void read(int docId, StoredFields storedFields, Builder builder) throws IOException { + read(docId, (IntBuilder) builder); + } + + private void read(int docId, IntBuilder builder) throws IOException { + if (ordinals.advanceExact(docId) == false) { + builder.appendNull(); + return; + } + if (ordinals.docValueCount() != 1) { + registerSingleValueWarning(warnings); + builder.appendNull(); + return; + } + builder.appendInt(codePointsAtOrd(ordinals.nextOrd())); + } + + @Override + public int docId() { + return ordinals.docID(); + } + + @Override + public String toString() { + return "Utf8CodePointsFromOrds.Immediate"; + } + + private Block blockForSingleDoc(BlockFactory factory, int docId) throws IOException { + if (ordinals.advanceExact(docId) == false) { + return factory.constantNulls(1); + } + if (ordinals.docValueCount() == 1) { + return factory.constantInt(codePointsAtOrd(ordinals.nextOrd()), 1); + } + registerSingleValueWarning(warnings); + return factory.constantNulls(1); + } + + private int codePointsAtOrd(long ord) throws IOException { + return UnicodeUtil.codePointCount(ordinals.lookupOrd(ord)); + } + } + + private static Block buildFromCache(BlockFactory factory, int[] cache, int[] ords) { + try (IntBuilder builder = factory.ints(ords.length)) { + for (int ord : ords) { + if (ord >= 0) { + builder.appendInt(cache[ord]); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + + private static long sizeOfArray(int count) { + return RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + (long) Integer.BYTES * (long) count); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/Warnings.java b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/Warnings.java new file mode 100644 index 0000000000000..67fcb543d48c7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/mapper/blockloader/docvalues/Warnings.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.mapper.blockloader.docvalues; + +/** + * Warnings returned when loading values for ESQL. These are returned as HTTP 299 headers like so: + *{@code
+ * < Warning: 299 Elasticsearch-${ver} "No limit defined, adding default limit of [1000]"
+ * < Warning: 299 Elasticsearch-${ver} "Line 1:27: evaluation of [a + 1] failed, treating result as null. Only first 20 failures recorded."
+ * < Warning: 299 Elasticsearch-${ver} "Line 1:27: java.lang.IllegalArgumentException: single-value function encountered multi-value"
+ * }
+ */
+interface Warnings {
+ /**
+ * Register a warning. ESQL deduplicates and limits the number of warnings returned so it should
+ * be fine to blast as many warnings into this as you encounter.
+ * @param exceptionClass The class of exception. Pick the same exception you'd use if you were
+ * throwing it back over HTTP.
+ * @param message The message to the called. ESQL's warnings machinery attaches the location
+ * in the original query and the text so there isn't any need to describe the
+ * function in this message.
+ */
+ void registerException(Class extends Exception> exceptionClass, String message);
+
+ /**
+ * Register the canonical warning for when a single-value only function encounters
+ * a multivalued field.
+ */
+ static void registerSingleValueWarning(Warnings warnings) {
+ warnings.registerException(IllegalArgumentException.class, "single-value function encountered multi-value");
+ }
+}
diff --git a/server/src/test/java/org/elasticsearch/index/mapper/blockloader/docvalues/ForceDocAtATime.java b/server/src/test/java/org/elasticsearch/index/mapper/blockloader/docvalues/ForceDocAtATime.java
new file mode 100644
index 0000000000000..0a8ffc7ba2941
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/index/mapper/blockloader/docvalues/ForceDocAtATime.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.index.mapper.blockloader.docvalues;
+
+import org.elasticsearch.index.mapper.BlockLoader;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+
+public class ForceDocAtATime implements BlockLoader.AllReader {
+ private final Supplier