-
Notifications
You must be signed in to change notification settings - Fork 25.6k
ESQL: Limit Replace function memory usage #127924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
a6f883a
821e627
992b11e
833e381
4aef2f9
162c91a
455b1fd
49f05ab
fc82f5f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 127924 | ||
| summary: Limit Replace function memory usage | ||
| area: ES|QL | ||
| type: enhancement | ||
| issues: [] |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,9 +28,11 @@ | |
| import java.io.IOException; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import java.util.regex.Matcher; | ||
| import java.util.regex.Pattern; | ||
| import java.util.regex.PatternSyntaxException; | ||
|
|
||
| import static org.elasticsearch.common.unit.ByteSizeUnit.MB; | ||
| import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.FIRST; | ||
| import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND; | ||
| import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.THIRD; | ||
|
|
@@ -39,6 +41,8 @@ | |
| public class Replace extends EsqlScalarFunction { | ||
| public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Replace", Replace::new); | ||
|
|
||
| static final long MAX_RESULT_LENGTH = MB.toBytes(1); | ||
|
|
||
| private final Expression str; | ||
| private final Expression regex; | ||
| private final Expression newStr; | ||
|
|
@@ -121,15 +125,15 @@ public boolean foldable() { | |
| return str.foldable() && regex.foldable() && newStr.foldable(); | ||
| } | ||
|
|
||
| @Evaluator(extraName = "Constant", warnExceptions = PatternSyntaxException.class) | ||
| @Evaluator(extraName = "Constant", warnExceptions = IllegalArgumentException.class) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| static BytesRef process(BytesRef str, @Fixed Pattern regex, BytesRef newStr) { | ||
| if (str == null || regex == null || newStr == null) { | ||
| return null; | ||
| } | ||
| return new BytesRef(regex.matcher(str.utf8ToString()).replaceAll(newStr.utf8ToString())); | ||
| return safeReplace(str, regex, newStr); | ||
| } | ||
|
|
||
| @Evaluator(warnExceptions = PatternSyntaxException.class) | ||
| @Evaluator(warnExceptions = IllegalArgumentException.class) | ||
| static BytesRef process(BytesRef str, BytesRef regex, BytesRef newStr) { | ||
| if (str == null) { | ||
| return null; | ||
|
|
@@ -138,7 +142,30 @@ static BytesRef process(BytesRef str, BytesRef regex, BytesRef newStr) { | |
| if (regex == null || newStr == null) { | ||
| return str; | ||
| } | ||
| return new BytesRef(str.utf8ToString().replaceAll(regex.utf8ToString(), newStr.utf8ToString())); | ||
| return safeReplace(str, Pattern.compile(regex.utf8ToString()), newStr); | ||
| } | ||
|
|
||
| /** | ||
| * Executes a Replace without surpassing the memory limit. | ||
| */ | ||
| private static BytesRef safeReplace(BytesRef strBytesRef, Pattern regex, BytesRef newStrBytesRef) { | ||
| String str = strBytesRef.utf8ToString(); | ||
| Matcher m = regex.matcher(str); | ||
| if (false == m.find()) { | ||
| return strBytesRef; | ||
| } | ||
| String newStr = newStrBytesRef.utf8ToString(); | ||
| // Initialize the buffer with an approximate size for the first replacement | ||
| StringBuilder result = new StringBuilder(str.length() + newStr.length() + 8); | ||
| do { | ||
| m.appendReplacement(result, newStr); | ||
|
|
||
| if (result.length() > MAX_RESULT_LENGTH) { | ||
|
||
| throw new IllegalArgumentException("Creating strings with more than [" + MAX_RESULT_LENGTH + "] bytes is not supported"); | ||
| } | ||
| } while (m.find()); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comparing this with the implementation of |
||
| m.appendTail(result); | ||
| return new BytesRef(result.toString()); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,7 @@ | |
| import java.util.stream.Collectors; | ||
| import java.util.stream.IntStream; | ||
|
|
||
| import static org.elasticsearch.common.unit.ByteSizeUnit.GB; | ||
| import static org.elasticsearch.xpack.esql.EsqlTestUtils.unboundLogicalOptimizerContext; | ||
| import static org.hamcrest.Matchers.either; | ||
| import static org.hamcrest.Matchers.equalTo; | ||
|
|
@@ -304,11 +305,17 @@ public final void testEvaluateInManyThreads() throws ExecutionException, Interru | |
| if (testCase.getExpectedBuildEvaluatorWarnings() != null) { | ||
| assertWarnings(testCase.getExpectedBuildEvaluatorWarnings()); | ||
| } | ||
|
|
||
| List<Object> simpleData = testCase.getDataValues(); | ||
| // Ensure we don't run this test with too much data that could take too long to process. | ||
| // The calculation "ramUsed * count" is just a hint of how much data will the function process, | ||
| // and the limit is arbitrary | ||
| assumeTrue("Input data too big", row(simpleData).ramBytesUsedByBlocks() * count < GB.toBytes(1)); | ||
|
||
|
|
||
| ExecutorService exec = Executors.newFixedThreadPool(threads); | ||
| try { | ||
| List<Future<?>> futures = new ArrayList<>(); | ||
| for (int i = 0; i < threads; i++) { | ||
| List<Object> simpleData = testCase.getDataValues(); | ||
| Page page = row(simpleData); | ||
|
|
||
| futures.add(exec.submit(() -> { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is based on the same Repeat logic:
elasticsearch/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/Repeat.java
Line 43 in c990377
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to shove this in some common spot. I have no idea what a good one is. And probably javadoc it too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We could make both use the same constant, then future readers can easily see that it's being used in 2 cases which tackle a common problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved it to
ScalarFunction. I don't expect aggs to need this, but we could move it later