Skip to content

Commit eb03188

Browse files
committed
Merge branch 'main' into esql_pragma_load_source
2 parents 7f99566 + 760b231 commit eb03188

File tree

20 files changed

+377
-26
lines changed

20 files changed

+377
-26
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/EvalBenchmark.java

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@
1111

1212
import org.apache.lucene.util.BytesRef;
1313
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
14+
import org.elasticsearch.common.settings.Settings;
1415
import org.elasticsearch.common.util.BigArrays;
1516
import org.elasticsearch.compute.data.Block;
1617
import org.elasticsearch.compute.data.BlockFactory;
1718
import org.elasticsearch.compute.data.BooleanBlock;
1819
import org.elasticsearch.compute.data.BooleanVector;
20+
import org.elasticsearch.compute.data.BytesRefBlock;
21+
import org.elasticsearch.compute.data.BytesRefVector;
1922
import org.elasticsearch.compute.data.DoubleBlock;
2023
import org.elasticsearch.compute.data.DoubleVector;
2124
import org.elasticsearch.compute.data.LongBlock;
@@ -40,9 +43,13 @@
4043
import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMin;
4144
import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
4245
import org.elasticsearch.xpack.esql.expression.function.scalar.string.RLike;
46+
import org.elasticsearch.xpack.esql.expression.function.scalar.string.ToLower;
47+
import org.elasticsearch.xpack.esql.expression.function.scalar.string.ToUpper;
4348
import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Add;
4449
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals;
4550
import org.elasticsearch.xpack.esql.planner.Layout;
51+
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
52+
import org.elasticsearch.xpack.esql.session.Configuration;
4653
import org.openjdk.jmh.annotations.Benchmark;
4754
import org.openjdk.jmh.annotations.BenchmarkMode;
4855
import org.openjdk.jmh.annotations.Fork;
@@ -56,8 +63,10 @@
5663
import org.openjdk.jmh.annotations.Warmup;
5764

5865
import java.time.Duration;
66+
import java.time.ZoneOffset;
5967
import java.util.Arrays;
6068
import java.util.List;
69+
import java.util.Locale;
6170
import java.util.Map;
6271
import java.util.concurrent.TimeUnit;
6372

@@ -106,7 +115,9 @@ public class EvalBenchmark {
106115
"long_equal_to_int",
107116
"mv_min",
108117
"mv_min_ascending",
109-
"rlike" }
118+
"rlike",
119+
"to_lower",
120+
"to_upper" }
110121
)
111122
public String operation;
112123

@@ -169,7 +180,7 @@ private static EvalOperator.ExpressionEvaluator evaluator(String operation) {
169180
new Coalesce(Source.EMPTY, lhs, List.of(f2)),
170181
layout(f1, f2)
171182
).get(driverContext);
172-
String desc = operation.endsWith("lazy") ? "CoalesceLazyEvaluator" : "CoalesceEagerEvaluator";
183+
String desc = operation.endsWith("lazy") ? "CoalesceLongLazyEvaluator" : "CoalesceLongEagerEvaluator";
173184
if (evaluator.toString().contains(desc) == false) {
174185
throw new IllegalArgumentException("Evaluator was [" + evaluator + "] but expected one containing [" + desc + "]");
175186
}
@@ -214,6 +225,16 @@ private static EvalOperator.ExpressionEvaluator evaluator(String operation) {
214225
RLike rlike = new RLike(Source.EMPTY, keywordField, new RLikePattern(".ar"));
215226
yield EvalMapper.toEvaluator(FOLD_CONTEXT, rlike, layout(keywordField)).get(driverContext);
216227
}
228+
case "to_lower" -> {
229+
FieldAttribute keywordField = keywordField();
230+
ToLower toLower = new ToLower(Source.EMPTY, keywordField, configuration());
231+
yield EvalMapper.toEvaluator(FOLD_CONTEXT, toLower, layout(keywordField)).get(driverContext);
232+
}
233+
case "to_upper" -> {
234+
FieldAttribute keywordField = keywordField();
235+
ToUpper toUpper = new ToUpper(Source.EMPTY, keywordField, configuration());
236+
yield EvalMapper.toEvaluator(FOLD_CONTEXT, toUpper, layout(keywordField)).get(driverContext);
237+
}
217238
default -> throw new UnsupportedOperationException();
218239
};
219240
}
@@ -234,6 +255,23 @@ private static FieldAttribute keywordField() {
234255
return new FieldAttribute(Source.EMPTY, "keyword", new EsField("keyword", DataType.KEYWORD, Map.of(), true));
235256
}
236257

258+
private static Configuration configuration() {
259+
return new Configuration(
260+
ZoneOffset.UTC,
261+
Locale.ROOT,
262+
null,
263+
null,
264+
null,
265+
EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.get(Settings.EMPTY),
266+
EsqlPlugin.QUERY_RESULT_TRUNCATION_DEFAULT_SIZE.get(Settings.EMPTY),
267+
null,
268+
false,
269+
Map.of(),
270+
0,
271+
false
272+
);
273+
}
274+
237275
private static Layout layout(FieldAttribute... fields) {
238276
Layout.Builder layout = new Layout.Builder();
239277
layout.append(Arrays.asList(fields));
@@ -366,10 +404,24 @@ private static void checkExpected(String operation, Page actual) {
366404
}
367405
}
368406
}
407+
case "to_lower" -> checkBytes(operation, actual, new BytesRef[] { new BytesRef("foo"), new BytesRef("bar") });
408+
case "to_upper" -> checkBytes(operation, actual, new BytesRef[] { new BytesRef("FOO"), new BytesRef("BAR") });
369409
default -> throw new UnsupportedOperationException(operation);
370410
}
371411
}
372412

413+
private static void checkBytes(String operation, Page actual, BytesRef[] expectedVals) {
414+
BytesRef scratch = new BytesRef();
415+
BytesRefVector v = actual.<BytesRefBlock>getBlock(1).asVector();
416+
for (int i = 0; i < BLOCK_LENGTH; i++) {
417+
BytesRef expected = expectedVals[i % 2];
418+
BytesRef b = v.getBytesRef(i, scratch);
419+
if (b.equals(expected) == false) {
420+
throw new AssertionError("[" + operation + "] expected [" + expected + "] but was [" + b + "]");
421+
}
422+
}
423+
}
424+
373425
private static Page page(String operation) {
374426
return switch (operation) {
375427
case "abs", "add", "date_trunc", "equal_to_const" -> {
@@ -440,7 +492,7 @@ private static Page page(String operation) {
440492
}
441493
yield new Page(builder.build());
442494
}
443-
case "rlike" -> {
495+
case "rlike", "to_lower", "to_upper" -> {
444496
var builder = blockFactory.newBytesRefVectorBuilder(BLOCK_LENGTH);
445497
BytesRef[] values = new BytesRef[] { new BytesRef("foo"), new BytesRef("bar") };
446498
for (int i = 0; i < BLOCK_LENGTH; i++) {

docs/changelog/121920.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 121920
2+
summary: Account for the `SearchHit` source in circuit breaker
3+
area: Search
4+
type: enhancement
5+
issues:
6+
- 89656

docs/changelog/123105.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 123105
2+
summary: fix stale data in synthetic source for string stored field
3+
area: Mapping
4+
type: bug
5+
issues:
6+
- 123110

rest-api-spec/src/main/resources/rest-api-spec/api/ml.start_trained_model_deployment.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@
7575
"options": ["starting", "started", "fully_allocated"],
7676
"default": "started"
7777
}
78+
},
79+
"body":{
80+
"description": "The settings for the trained model deployment",
81+
"required": false
7882
}
7983
}
8084
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,7 @@ public void apply(Settings value, Settings current, Settings previous) {
535535
SearchModule.SCRIPTED_METRICS_AGG_ALLOWED_STORED_SCRIPTS,
536536
SearchService.SEARCH_WORKER_THREADS_ENABLED,
537537
SearchService.QUERY_PHASE_PARALLEL_COLLECTION_ENABLED,
538+
SearchService.MEMORY_ACCOUNTING_BUFFER_SIZE,
538539
ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING,
539540
ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING,
540541
ThreadPool.SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING,

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,6 +834,10 @@ public ScriptService getScriptService() {
834834
return scriptService;
835835
}
836836

837+
public CircuitBreakerService breakerService() {
838+
return circuitBreakerService;
839+
}
840+
837841
List<IndexingOperationListener> getIndexOperationListeners() { // pkg private for testing
838842
return indexingOperationListeners;
839843
}

server/src/main/java/org/elasticsearch/index/mapper/StringStoredFieldFieldLoader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public final void write(XContentBuilder b) throws IOException {
5454
case 1:
5555
b.field(simpleName);
5656
write(b, values.get(0));
57-
return;
57+
break;
5858
default:
5959
b.startArray(simpleName);
6060
for (Object value : values) {

server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.lucene.util.NumericUtils;
2424
import org.elasticsearch.action.search.SearchType;
2525
import org.elasticsearch.cluster.routing.IndexRouting;
26+
import org.elasticsearch.common.breaker.CircuitBreaker;
2627
import org.elasticsearch.common.lucene.search.Queries;
2728
import org.elasticsearch.core.Nullable;
2829
import org.elasticsearch.core.Releasable;
@@ -103,6 +104,7 @@ final class DefaultSearchContext extends SearchContext {
103104
private final IndexShard indexShard;
104105
private final IndexService indexService;
105106
private final ContextIndexSearcher searcher;
107+
private final long memoryAccountingBufferSize;
106108
private DfsSearchResult dfsResult;
107109
private QuerySearchResult queryResult;
108110
private RankFeatureResult rankFeatureResult;
@@ -168,7 +170,8 @@ final class DefaultSearchContext extends SearchContext {
168170
Executor executor,
169171
SearchService.ResultsType resultsType,
170172
boolean enableQueryPhaseParallelCollection,
171-
int minimumDocsPerSlice
173+
int minimumDocsPerSlice,
174+
long memoryAccountingBufferSize
172175
) throws IOException {
173176
this.readerContext = readerContext;
174177
this.request = request;
@@ -179,6 +182,7 @@ final class DefaultSearchContext extends SearchContext {
179182
this.shardTarget = shardTarget;
180183
this.indexService = readerContext.indexService();
181184
this.indexShard = readerContext.indexShard();
185+
this.memoryAccountingBufferSize = memoryAccountingBufferSize;
182186

183187
Engine.Searcher engineSearcher = readerContext.acquireSearcher("search");
184188
int maximumNumberOfSlices = determineMaximumNumberOfSlices(
@@ -902,6 +906,16 @@ public Profilers getProfilers() {
902906
return profilers;
903907
}
904908

909+
@Override
910+
public CircuitBreaker circuitBreaker() {
911+
return indexService.breakerService().getBreaker(CircuitBreaker.REQUEST);
912+
}
913+
914+
@Override
915+
public long memAccountingBufferSize() {
916+
return memoryAccountingBufferSize;
917+
}
918+
905919
public void setProfilers(Profilers profilers) {
906920
this.profilers = profilers;
907921
}

server/src/main/java/org/elasticsearch/search/SearchHit.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@ public final class SearchHit implements Writeable, ToXContentObject, RefCounted
111111

112112
private final RefCounted refCounted;
113113

114-
// used only in tests
115114
public SearchHit(int docId) {
116115
this(docId, null);
117116
}
@@ -293,6 +292,7 @@ public static SearchHit unpooled(int docId, String id) {
293292
}
294293

295294
public static SearchHit unpooled(int nestedTopDocId, String id, NestedIdentity nestedIdentity) {
295+
// always referenced search hits do NOT call #deallocate
296296
return new SearchHit(nestedTopDocId, id, nestedIdentity, ALWAYS_REFERENCED);
297297
}
298298

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,20 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
274274
Property.NodeScope
275275
);
276276

277+
/**
278+
* The size of the buffer used for memory accounting.
279+
* This buffer is used to locally track the memory accummulated during the execution of
280+
* a search request before submitting the accumulated value to the circuit breaker.
281+
*/
282+
public static final Setting<ByteSizeValue> MEMORY_ACCOUNTING_BUFFER_SIZE = Setting.byteSizeSetting(
283+
"search.memory_accounting_buffer_size",
284+
ByteSizeValue.of(1, ByteSizeUnit.MB),
285+
ByteSizeValue.of(1, ByteSizeUnit.MB),
286+
ByteSizeValue.ofBytes(Long.MAX_VALUE),
287+
Property.Dynamic,
288+
Property.NodeScope
289+
);
290+
277291
public static final int DEFAULT_SIZE = 10;
278292
public static final int DEFAULT_FROM = 0;
279293
private static final StackTraceElement[] EMPTY_STACK_TRACE_ARRAY = new StackTraceElement[0];
@@ -291,6 +305,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
291305
private final BigArrays bigArrays;
292306

293307
private final FetchPhase fetchPhase;
308+
private final CircuitBreaker circuitBreaker;
294309
private volatile Executor searchExecutor;
295310
private volatile boolean enableQueryPhaseParallelCollection;
296311

@@ -310,6 +325,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
310325

311326
private volatile boolean enableRewriteAggsToFilterByFilter;
312327

328+
private volatile long memoryAccountingBufferSize;
329+
313330
private final Cancellable keepAliveReaper;
314331

315332
private final AtomicLong idGenerator = new AtomicLong();
@@ -341,11 +358,8 @@ public SearchService(
341358
this.scriptService = scriptService;
342359
this.bigArrays = bigArrays;
343360
this.fetchPhase = fetchPhase;
344-
this.multiBucketConsumerService = new MultiBucketConsumerService(
345-
clusterService,
346-
settings,
347-
circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)
348-
);
361+
circuitBreaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST);
362+
this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings, circuitBreaker);
349363
this.executorSelector = executorSelector;
350364
this.tracer = tracer;
351365

@@ -390,6 +404,10 @@ public SearchService(
390404
enableQueryPhaseParallelCollection = QUERY_PHASE_PARALLEL_COLLECTION_ENABLED.get(settings);
391405
clusterService.getClusterSettings()
392406
.addSettingsUpdateConsumer(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED, this::setEnableQueryPhaseParallelCollection);
407+
408+
memoryAccountingBufferSize = MEMORY_ACCOUNTING_BUFFER_SIZE.get(settings).getBytes();
409+
clusterService.getClusterSettings()
410+
.addSettingsUpdateConsumer(MEMORY_ACCOUNTING_BUFFER_SIZE, newValue -> this.memoryAccountingBufferSize = newValue.getBytes());
393411
}
394412

395413
private void setEnableSearchWorkerThreads(boolean enableSearchWorkerThreads) {
@@ -1195,7 +1213,8 @@ private DefaultSearchContext createSearchContext(
11951213
searchExecutor,
11961214
resultsType,
11971215
enableQueryPhaseParallelCollection,
1198-
minimumDocsPerSlice
1216+
minimumDocsPerSlice,
1217+
memoryAccountingBufferSize
11991218
);
12001219
// we clone the query shard context here just for rewriting otherwise we
12011220
// might end up with incorrect state since we are using now() or script services

0 commit comments

Comments
 (0)