Skip to content

Commit af76dd8

Browse files
Merge branch 'main' into 2025/04/15/ShardRoutingState
2 parents 47a2a8a + 62c0629 commit af76dd8

File tree

25 files changed

+487
-86
lines changed

25 files changed

+487
-86
lines changed

distribution/tools/server-cli/src/main/java/org/elasticsearch/server/cli/ServerCli.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,7 @@ protected ServerProcess startServer(Terminal terminal, ProcessInfo processInfo,
271271
.withProcessInfo(processInfo)
272272
.withServerArgs(args)
273273
.withTempDir(tempDir)
274-
.withJvmOptions(jvmOptions)
275-
.withWorkingDir(args.logsDir());
274+
.withJvmOptions(jvmOptions);
276275
return serverProcessBuilder.start();
277276
}
278277

distribution/tools/server-cli/src/main/java/org/elasticsearch/server/cli/ServerProcessBuilder.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.server.cli;
1111

1212
import org.elasticsearch.bootstrap.ServerArgs;
13+
import org.elasticsearch.cli.ExitCodes;
1314
import org.elasticsearch.cli.ProcessInfo;
1415
import org.elasticsearch.cli.Terminal;
1516
import org.elasticsearch.cli.UserException;
@@ -21,6 +22,8 @@
2122
import java.io.IOException;
2223
import java.io.OutputStream;
2324
import java.io.UncheckedIOException;
25+
import java.nio.file.FileAlreadyExistsException;
26+
import java.nio.file.Files;
2427
import java.nio.file.Path;
2528
import java.util.HashMap;
2629
import java.util.List;
@@ -44,7 +47,6 @@ public class ServerProcessBuilder {
4447
private ServerArgs serverArgs;
4548
private ProcessInfo processInfo;
4649
private List<String> jvmOptions;
47-
private Path workingDir;
4850
private Terminal terminal;
4951

5052
// this allows mocking the process building by tests
@@ -84,11 +86,6 @@ public ServerProcessBuilder withJvmOptions(List<String> jvmOptions) {
8486
return this;
8587
}
8688

87-
public ServerProcessBuilder withWorkingDir(Path workingDir) {
88-
this.workingDir = workingDir;
89-
return this;
90-
}
91-
9289
/**
9390
* Specifies the {@link Terminal} to use for reading input and writing output from/to the cli console
9491
*/
@@ -141,6 +138,17 @@ public ServerProcess start() throws UserException {
141138
return start(ProcessBuilder::start);
142139
}
143140

141+
private void ensureWorkingDirExists() throws UserException {
142+
Path workingDir = serverArgs.logsDir();
143+
try {
144+
Files.createDirectories(workingDir);
145+
} catch (FileAlreadyExistsException e) {
146+
throw new UserException(ExitCodes.CONFIG, "Logs dir [" + workingDir + "] exists but is not a directory", e);
147+
} catch (IOException e) {
148+
throw new UserException(ExitCodes.CONFIG, "Unable to create logs dir [" + workingDir + "]", e);
149+
}
150+
}
151+
144152
private static void checkRequiredArgument(Object argument, String argumentName) {
145153
if (argument == null) {
146154
throw new IllegalStateException(
@@ -157,12 +165,14 @@ ServerProcess start(ProcessStarter processStarter) throws UserException {
157165
checkRequiredArgument(jvmOptions, "jvmOptions");
158166
checkRequiredArgument(terminal, "terminal");
159167

168+
ensureWorkingDirExists();
169+
160170
Process jvmProcess = null;
161171
ErrorPumpThread errorPump;
162172

163173
boolean success = false;
164174
try {
165-
jvmProcess = createProcess(getCommand(), getJvmArgs(), jvmOptions, getEnvironment(), workingDir, processStarter);
175+
jvmProcess = createProcess(getCommand(), getJvmArgs(), jvmOptions, getEnvironment(), serverArgs.logsDir(), processStarter);
166176
errorPump = new ErrorPumpThread(terminal, jvmProcess.getErrorStream());
167177
errorPump.start();
168178
sendArgs(serverArgs, jvmProcess.getOutputStream());

distribution/tools/server-cli/src/test/java/org/elasticsearch/server/cli/ServerProcessTests.java

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public class ServerProcessTests extends ESTestCase {
6565
protected final Map<String, String> sysprops = new HashMap<>();
6666
protected final Map<String, String> envVars = new HashMap<>();
6767
Path esHomeDir;
68-
Path workingDir;
68+
Path logsDir;
6969
Settings.Builder nodeSettings;
7070
ProcessValidator processValidator;
7171
MainMethod mainCallback;
@@ -94,8 +94,8 @@ public void resetEnv() {
9494
sysprops.put("os.name", "Linux");
9595
sysprops.put("java.home", "javahome");
9696
sysprops.put("es.path.home", esHomeDir.toString());
97+
logsDir = esHomeDir.resolve("logs");
9798
envVars.clear();
98-
workingDir = createTempDir();
9999
nodeSettings = Settings.builder();
100100
processValidator = null;
101101
mainCallback = null;
@@ -207,15 +207,7 @@ ProcessInfo createProcessInfo() {
207207
}
208208

209209
ServerArgs createServerArgs(boolean daemonize, boolean quiet) {
210-
return new ServerArgs(
211-
daemonize,
212-
quiet,
213-
null,
214-
secrets,
215-
nodeSettings.build(),
216-
esHomeDir.resolve("config"),
217-
esHomeDir.resolve("logs")
218-
);
210+
return new ServerArgs(daemonize, quiet, null, secrets, nodeSettings.build(), esHomeDir.resolve("config"), logsDir);
219211
}
220212

221213
ServerProcess startProcess(boolean daemonize, boolean quiet) throws Exception {
@@ -231,8 +223,7 @@ ServerProcess startProcess(boolean daemonize, boolean quiet) throws Exception {
231223
.withProcessInfo(pinfo)
232224
.withServerArgs(createServerArgs(daemonize, quiet))
233225
.withJvmOptions(List.of())
234-
.withTempDir(ServerProcessUtils.setupTempDir(pinfo))
235-
.withWorkingDir(workingDir);
226+
.withTempDir(ServerProcessUtils.setupTempDir(pinfo));
236227
return serverProcessBuilder.start(starter);
237228
}
238229

@@ -241,7 +232,7 @@ public void testProcessBuilder() throws Exception {
241232
assertThat(pb.redirectInput(), equalTo(ProcessBuilder.Redirect.PIPE));
242233
assertThat(pb.redirectOutput(), equalTo(ProcessBuilder.Redirect.INHERIT));
243234
assertThat(pb.redirectError(), equalTo(ProcessBuilder.Redirect.PIPE));
244-
assertThat(String.valueOf(pb.directory()), equalTo(workingDir.toString())); // leave default, which is working directory
235+
assertThat(String.valueOf(pb.directory()), equalTo(esHomeDir.resolve("logs").toString()));
245236
};
246237
mainCallback = (args, stdin, stderr, exitCode) -> {
247238
try (PrintStream err = new PrintStream(stderr, true, StandardCharsets.UTF_8)) {
@@ -315,8 +306,7 @@ public void testCommandLineSysprops() throws Exception {
315306
.withProcessInfo(createProcessInfo())
316307
.withServerArgs(createServerArgs(false, false))
317308
.withJvmOptions(List.of("-Dfoo1=bar", "-Dfoo2=baz"))
318-
.withTempDir(Path.of("."))
319-
.withWorkingDir(workingDir);
309+
.withTempDir(Path.of("."));
320310
serverProcessBuilder.start(starter).waitFor();
321311
}
322312

@@ -433,4 +423,26 @@ public void testProcessDies() throws Exception {
433423
int exitCode = server.waitFor();
434424
assertThat(exitCode, equalTo(-9));
435425
}
426+
427+
public void testLogsDirIsFile() throws Exception {
428+
Files.createFile(logsDir);
429+
var e = expectThrows(UserException.class, this::runForeground);
430+
assertThat(e.getMessage(), containsString("exists but is not a directory"));
431+
}
432+
433+
public void testLogsDirCreateParents() throws Exception {
434+
Path testDir = createTempDir();
435+
logsDir = testDir.resolve("subdir/logs");
436+
processValidator = pb -> assertThat(String.valueOf(pb.directory()), equalTo(logsDir.toString()));
437+
runForeground();
438+
}
439+
440+
public void testLogsCreateFailure() throws Exception {
441+
Path testDir = createTempDir();
442+
Path parentFile = testDir.resolve("exists");
443+
Files.createFile(parentFile);
444+
logsDir = parentFile.resolve("logs");
445+
var e = expectThrows(UserException.class, this::runForeground);
446+
assertThat(e.getMessage(), containsString("Unable to create logs dir"));
447+
}
436448
}

docs/reference/elasticsearch/configuration-reference/thread-pool-settings.md

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,7 @@ The following are the types of thread pools and their respective parameters:
9494
9595
### `fixed` [fixed-thread-pool]
9696

97-
The `fixed` thread pool holds a fixed size of threads to handle the requests with a queue (optionally bounded) for pending requests that have no threads to service them.
98-
99-
The `size` parameter controls the number of threads.
100-
101-
The `queue_size` allows to control the size of the queue of pending requests that have no threads to execute them. By default, it is set to `-1` which means its unbounded. When a request comes in and the queue is full, it will abort the request.
97+
A `fixed` thread pool holds a fixed number of threads as determined by the `size` parameter. If a task is submitted to a `fixed` thread pool and there are fewer than `size` busy threads in the pool then the task will execute immediately. If all the threads are busy when a task is submitted then it will be held in a queue for later execution. The `queue_size` parameter controls the maximum size of this queue. A `queue_size` of `-1` means that the queue is unbounded, but most `fixed` thread pools specify a bound on their queue size by default. If a bounded queue is full then it will reject further work, which typically causes the corresponding requests to fail.
10298

10399
```yaml
104100
thread_pool:
@@ -114,6 +110,8 @@ The `scaling` thread pool holds a dynamic number of threads. This number is prop
114110

115111
The `keep_alive` parameter determines how long a thread should be kept around in the thread pool without it doing any work.
116112

113+
If a task is submitted to a `scaling` thread pool when its maximum number of threads are already busy with other tasks, the new task will be held in a queue for later execution. The queue in a `scaling` thread pool is always unbounded.
114+
117115
```yaml
118116
thread_pool:
119117
warmer:

muted-tests.yml

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -408,30 +408,12 @@ tests:
408408
- class: org.elasticsearch.cli.keystore.AddStringKeyStoreCommandTests
409409
method: testStdinWithMultipleValues
410410
issue: https://github.com/elastic/elasticsearch/issues/126882
411-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
412-
method: testDeleteCancelRunningTask
413-
issue: https://github.com/elastic/elasticsearch/issues/126994
414-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
415-
method: testMaxResponseSize
416-
issue: https://github.com/elastic/elasticsearch/issues/126995
417-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
418-
method: testRemoveAsyncIndex
419-
issue: https://github.com/elastic/elasticsearch/issues/126975
420-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
421-
method: testCleanupOnFailure
422-
issue: https://github.com/elastic/elasticsearch/issues/126999
423-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
424-
method: testUpdateStoreKeepAlive
425-
issue: https://github.com/elastic/elasticsearch/issues/127001
426-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
427-
method: testRestartAfterCompletion
428-
issue: https://github.com/elastic/elasticsearch/issues/126974
429-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
430-
method: testDeleteCleanupIndex
431-
issue: https://github.com/elastic/elasticsearch/issues/127008
432411
- class: org.elasticsearch.packaging.test.DockerTests
433412
method: test024InstallPluginFromArchiveUsingConfigFile
434413
issue: https://github.com/elastic/elasticsearch/issues/126936
414+
- class: org.elasticsearch.repositories.blobstore.testkit.analyze.RepositoryAnalysisFailureIT
415+
method: testFailsOnReadError
416+
issue: https://github.com/elastic/elasticsearch/issues/127029
435417

436418
# Examples:
437419
#

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2727
import org.elasticsearch.core.Nullable;
2828
import org.elasticsearch.core.Releasable;
29+
import org.elasticsearch.core.Releasables;
2930
import org.elasticsearch.core.Tuple;
3031
import org.elasticsearch.search.SearchPhaseResult;
3132
import org.elasticsearch.search.SearchService;
@@ -162,7 +163,7 @@ public void consumeResult(SearchPhaseResult result, Runnable next) {
162163
consume(querySearchResult, next);
163164
}
164165

165-
private final List<Tuple<TopDocsStats, MergeResult>> batchedResults = new ArrayList<>();
166+
private final ArrayDeque<Tuple<TopDocsStats, MergeResult>> batchedResults = new ArrayDeque<>();
166167

167168
/**
168169
* Unlinks partial merge results from this instance and returns them as a partial merge result to be sent to the coordinating node.
@@ -214,7 +215,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
214215
buffer.sort(RESULT_COMPARATOR);
215216
final TopDocsStats topDocsStats = this.topDocsStats;
216217
var mergeResult = this.mergeResult;
217-
final List<Tuple<TopDocsStats, MergeResult>> batchedResults;
218+
final ArrayDeque<Tuple<TopDocsStats, MergeResult>> batchedResults;
218219
synchronized (this.batchedResults) {
219220
batchedResults = this.batchedResults;
220221
}
@@ -226,8 +227,8 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
226227
if (mergeResult != null) {
227228
consumePartialMergeResult(mergeResult, topDocsList, aggsList);
228229
}
229-
for (int i = 0; i < batchedResults.size(); i++) {
230-
Tuple<TopDocsStats, MergeResult> batchedResult = batchedResults.set(i, null);
230+
Tuple<TopDocsStats, MergeResult> batchedResult;
231+
while ((batchedResult = batchedResults.poll()) != null) {
231232
topDocsStats.add(batchedResult.v1());
232233
consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
233234
}
@@ -528,6 +529,12 @@ private void releaseBuffer() {
528529
querySearchResult.releaseAggs();
529530
}
530531
}
532+
synchronized (this.batchedResults) {
533+
Tuple<TopDocsStats, MergeResult> batchedResult;
534+
while ((batchedResult = batchedResults.poll()) != null) {
535+
Releasables.close(batchedResult.v2().reducedAggs());
536+
}
537+
}
531538
}
532539

533540
private synchronized void onMergeFailure(Exception exc) {

test/framework/build.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,12 @@ dependencies {
3232

3333
api "org.elasticsearch:mocksocket:${versions.mocksocket}"
3434

35+
testImplementation project(":modules:mapper-extras")
36+
testImplementation project(':x-pack:plugin:core')
3537
testImplementation project(':x-pack:plugin:mapper-unsigned-long')
3638
testImplementation project(':x-pack:plugin:mapper-counted-keyword')
37-
testImplementation project(":modules:mapper-extras")
39+
testImplementation project(':x-pack:plugin:mapper-constant-keyword')
40+
testImplementation project(':x-pack:plugin:wildcard')
3841
}
3942

4043
sourceSets {

test/framework/src/main/java/org/elasticsearch/logsdb/datageneration/FieldType.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.logsdb.datageneration.datasource.DataSource;
1313
import org.elasticsearch.logsdb.datageneration.fields.leaf.BooleanFieldDataGenerator;
1414
import org.elasticsearch.logsdb.datageneration.fields.leaf.ByteFieldDataGenerator;
15+
import org.elasticsearch.logsdb.datageneration.fields.leaf.ConstantKeywordFieldDataGenerator;
1516
import org.elasticsearch.logsdb.datageneration.fields.leaf.CountedKeywordFieldDataGenerator;
1617
import org.elasticsearch.logsdb.datageneration.fields.leaf.DateFieldDataGenerator;
1718
import org.elasticsearch.logsdb.datageneration.fields.leaf.DoubleFieldDataGenerator;
@@ -26,6 +27,7 @@
2627
import org.elasticsearch.logsdb.datageneration.fields.leaf.ShortFieldDataGenerator;
2728
import org.elasticsearch.logsdb.datageneration.fields.leaf.TextFieldDataGenerator;
2829
import org.elasticsearch.logsdb.datageneration.fields.leaf.UnsignedLongFieldDataGenerator;
30+
import org.elasticsearch.logsdb.datageneration.fields.leaf.WildcardFieldDataGenerator;
2931

3032
/**
3133
* Lists all leaf field types that are supported for data generation by default.
@@ -46,7 +48,9 @@ public enum FieldType {
4648
DATE("date"),
4749
GEO_POINT("geo_point"),
4850
TEXT("text"),
49-
IP("ip");
51+
IP("ip"),
52+
CONSTANT_KEYWORD("constant_keyword"),
53+
WILDCARD("wildcard");
5054

5155
private final String name;
5256

@@ -56,7 +60,7 @@ public enum FieldType {
5660

5761
public FieldDataGenerator generator(String fieldName, DataSource dataSource) {
5862
return switch (this) {
59-
case KEYWORD -> new KeywordFieldDataGenerator(fieldName, dataSource);
63+
case KEYWORD -> new KeywordFieldDataGenerator(dataSource);
6064
case LONG -> new LongFieldDataGenerator(fieldName, dataSource);
6165
case UNSIGNED_LONG -> new UnsignedLongFieldDataGenerator(fieldName, dataSource);
6266
case INTEGER -> new IntegerFieldDataGenerator(fieldName, dataSource);
@@ -72,6 +76,8 @@ public FieldDataGenerator generator(String fieldName, DataSource dataSource) {
7276
case GEO_POINT -> new GeoPointFieldDataGenerator(dataSource);
7377
case TEXT -> new TextFieldDataGenerator(dataSource);
7478
case IP -> new IpFieldDataGenerator(dataSource);
79+
case CONSTANT_KEYWORD -> new ConstantKeywordFieldDataGenerator();
80+
case WILDCARD -> new WildcardFieldDataGenerator(dataSource);
7581
};
7682
}
7783

@@ -93,6 +99,8 @@ public static FieldType tryParse(String name) {
9399
case "geo_point" -> FieldType.GEO_POINT;
94100
case "text" -> FieldType.TEXT;
95101
case "ip" -> FieldType.IP;
102+
case "constant_keyword" -> FieldType.CONSTANT_KEYWORD;
103+
case "wildcard" -> FieldType.WILDCARD;
96104
default -> null;
97105
};
98106
}

test/framework/src/main/java/org/elasticsearch/logsdb/datageneration/datasource/DataSourceHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ default DataSourceResponse.IpGenerator handle(DataSourceRequest.IpGenerator requ
7878
return null;
7979
}
8080

81+
default DataSourceResponse.VersionStringGenerator handle(DataSourceRequest.VersionStringGenerator request) {
82+
return null;
83+
}
84+
8185
default DataSourceResponse.NullWrapper handle(DataSourceRequest.NullWrapper request) {
8286
return null;
8387
}

test/framework/src/main/java/org/elasticsearch/logsdb/datageneration/datasource/DataSourceRequest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,12 @@ public DataSourceResponse.IpGenerator accept(DataSourceHandler handler) {
126126
}
127127
}
128128

129+
record VersionStringGenerator() implements DataSourceRequest<DataSourceResponse.VersionStringGenerator> {
130+
public DataSourceResponse.VersionStringGenerator accept(DataSourceHandler handler) {
131+
return handler.handle(this);
132+
}
133+
}
134+
129135
record NullWrapper() implements DataSourceRequest<DataSourceResponse.NullWrapper> {
130136
public DataSourceResponse.NullWrapper accept(DataSourceHandler handler) {
131137
return handler.handle(this);

0 commit comments

Comments
 (0)