Skip to content

Commit ada76f3

Browse files
committed
Merge remote-tracking branch 'upstream/main' into mp-rest-tests
2 parents fbb6989 + a81c449 commit ada76f3

File tree

130 files changed

+5349
-883
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

130 files changed

+5349
-883
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java

Lines changed: 145 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.openjdk.jmh.annotations.Scope;
4040
import org.openjdk.jmh.annotations.Setup;
4141
import org.openjdk.jmh.annotations.State;
42-
import org.openjdk.jmh.annotations.TearDown;
4342
import org.openjdk.jmh.annotations.Threads;
4443
import org.openjdk.jmh.annotations.Warmup;
4544
import org.openjdk.jmh.profile.AsyncProfiler;
@@ -51,9 +50,8 @@
5150
import java.io.IOException;
5251
import java.nio.file.Files;
5352
import java.util.Random;
54-
import java.util.concurrent.ExecutorService;
55-
import java.util.concurrent.Executors;
5653
import java.util.concurrent.TimeUnit;
54+
import java.util.function.Supplier;
5755

5856
@BenchmarkMode(Mode.SingleShotTime)
5957
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@@ -71,23 +69,10 @@ public class TSDBDocValuesMergeBenchmark {
7169
LogConfigurator.setNodeName("test");
7270
}
7371

74-
@Param("20431204")
75-
private int nDocs;
76-
77-
@Param("1000")
78-
private int deltaTime;
79-
80-
@Param("42")
81-
private int seed;
82-
8372
private static final String TIMESTAMP_FIELD = "@timestamp";
8473
private static final String HOSTNAME_FIELD = "host.name";
8574
private static final long BASE_TIMESTAMP = 1704067200000L;
8675

87-
private IndexWriter indexWriterWithoutOptimizedMerge;
88-
private IndexWriter indexWriterWithOptimizedMerge;
89-
private ExecutorService executorService;
90-
9176
public static void main(String[] args) throws RunnerException {
9277
final Options options = new OptionsBuilder().include(TSDBDocValuesMergeBenchmark.class.getSimpleName())
9378
.addProfiler(AsyncProfiler.class)
@@ -96,78 +81,168 @@ public static void main(String[] args) throws RunnerException {
9681
new Runner(options).run();
9782
}
9883

99-
@Setup(Level.Trial)
100-
public void setup() throws IOException {
101-
executorService = Executors.newSingleThreadExecutor();
84+
@State(Scope.Benchmark)
85+
public static class StateDenseWithoutOptimizeMerge {
10286

103-
final Directory tempDirectoryWithoutDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp1-"));
104-
final Directory tempDirectoryWithDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp2-"));
87+
@Param("20431204")
88+
private int nDocs;
89+
90+
@Param("1000")
91+
private int deltaTime;
92+
93+
@Param("42")
94+
private int seed;
95+
96+
private Directory directory;
97+
private final Supplier<IndexWriterConfig> iwc = () -> createIndexWriterConfig(false);
98+
99+
@Setup(Level.Trial)
100+
public void setup() throws IOException {
101+
directory = FSDirectory.open(Files.createTempDirectory("temp2-"));
102+
createIndex(directory, iwc.get(), false, nDocs, deltaTime, seed);
103+
}
105104

106-
indexWriterWithoutOptimizedMerge = createIndex(tempDirectoryWithoutDocValuesSkipper, false);
107-
indexWriterWithOptimizedMerge = createIndex(tempDirectoryWithDocValuesSkipper, true);
108105
}
109106

110-
private IndexWriter createIndex(final Directory directory, final boolean optimizedMergeEnabled) throws IOException {
111-
final var iwc = createIndexWriterConfig(optimizedMergeEnabled);
112-
long counter1 = 0;
113-
long counter2 = 10_000_000;
114-
long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 };
115-
long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 };
116-
int numHosts = 1000;
117-
String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" };
107+
@Benchmark
108+
public void forceMergeDenseWithoutOptimizedMerge(StateDenseWithoutOptimizeMerge state) throws IOException {
109+
forceMerge(state.directory, state.iwc.get());
110+
}
118111

119-
final Random random = new Random(seed);
120-
IndexWriter indexWriter = new IndexWriter(directory, iwc);
121-
for (int i = 0; i < nDocs; i++) {
122-
final Document doc = new Document();
123-
124-
final int batchIndex = i / numHosts;
125-
final String hostName = "host-" + batchIndex;
126-
// Slightly vary the timestamp in each document
127-
final long timestamp = BASE_TIMESTAMP + ((i % numHosts) * deltaTime) + random.nextInt(0, deltaTime);
128-
129-
doc.add(new SortedDocValuesField(HOSTNAME_FIELD, new BytesRef(hostName)));
130-
doc.add(new SortedNumericDocValuesField(TIMESTAMP_FIELD, timestamp));
131-
doc.add(new SortedNumericDocValuesField("counter_1", counter1++));
132-
doc.add(new SortedNumericDocValuesField("counter_2", counter2++));
133-
doc.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length]));
134-
doc.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length]));
135-
int numTags = tags.length % (i + 1);
136-
for (int j = 0; j < numTags; j++) {
137-
doc.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j])));
138-
}
112+
@State(Scope.Benchmark)
113+
public static class StateDenseWithOptimizeMerge {
114+
115+
@Param("20431204")
116+
private int nDocs;
117+
118+
@Param("1000")
119+
private int deltaTime;
120+
121+
@Param("42")
122+
private int seed;
123+
124+
private Directory directory;
125+
private final Supplier<IndexWriterConfig> iwc = () -> createIndexWriterConfig(true);
126+
127+
@Setup(Level.Trial)
128+
public void setup() throws IOException {
129+
directory = FSDirectory.open(Files.createTempDirectory("temp1-"));
130+
createIndex(directory, iwc.get(), false, nDocs, deltaTime, seed);
131+
}
132+
133+
}
134+
135+
@Benchmark
136+
public void forceMergeDenseWithOptimizedMerge(StateDenseWithOptimizeMerge state) throws IOException {
137+
forceMerge(state.directory, state.iwc.get());
138+
}
139+
140+
@State(Scope.Benchmark)
141+
public static class StateSparseWithoutOptimizeMerge {
139142

140-
indexWriter.addDocument(doc);
143+
@Param("20431204")
144+
private int nDocs;
145+
146+
@Param("1000")
147+
private int deltaTime;
148+
149+
@Param("42")
150+
private int seed;
151+
152+
private Directory directory;
153+
private final Supplier<IndexWriterConfig> iwc = () -> createIndexWriterConfig(false);
154+
155+
@Setup(Level.Trial)
156+
public void setup() throws IOException {
157+
directory = FSDirectory.open(Files.createTempDirectory("temp4-"));
158+
createIndex(directory, iwc.get(), true, nDocs, deltaTime, seed);
141159
}
142-
indexWriter.commit();
143-
return indexWriter;
160+
144161
}
145162

146163
@Benchmark
147-
public void forceMergeWithoutOptimizedMerge() throws IOException {
148-
forceMerge(indexWriterWithoutOptimizedMerge);
164+
public void forceMergeSparseWithoutOptimizedMerge(StateSparseWithoutOptimizeMerge state) throws IOException {
165+
forceMerge(state.directory, state.iwc.get());
166+
}
167+
168+
@State(Scope.Benchmark)
169+
public static class StateSparseWithOptimizeMerge {
170+
171+
@Param("20431204")
172+
private int nDocs;
173+
174+
@Param("1000")
175+
private int deltaTime;
176+
177+
@Param("42")
178+
private int seed;
179+
180+
private Directory directory;
181+
private final Supplier<IndexWriterConfig> iwc = () -> createIndexWriterConfig(true);
182+
183+
@Setup(Level.Trial)
184+
public void setup() throws IOException {
185+
directory = FSDirectory.open(Files.createTempDirectory("temp3-"));
186+
createIndex(directory, iwc.get(), true, nDocs, deltaTime, seed);
187+
}
188+
149189
}
150190

151191
@Benchmark
152-
public void forceMergeWithOptimizedMerge() throws IOException {
153-
forceMerge(indexWriterWithOptimizedMerge);
192+
public void forceMergeSparseWithOptimizedMerge(StateSparseWithOptimizeMerge state) throws IOException {
193+
forceMerge(state.directory, state.iwc.get());
154194
}
155195

156-
private void forceMerge(final IndexWriter indexWriter) throws IOException {
157-
indexWriter.forceMerge(1);
196+
private void forceMerge(Directory directory, IndexWriterConfig config) throws IOException {
197+
try (var indexWriter = new IndexWriter(directory, config)) {
198+
indexWriter.forceMerge(1);
199+
}
158200
}
159201

160-
@TearDown(Level.Trial)
161-
public void tearDown() {
162-
if (executorService != null) {
163-
executorService.shutdown();
164-
try {
165-
if (executorService.awaitTermination(30, TimeUnit.SECONDS) == false) {
166-
executorService.shutdownNow();
202+
static void createIndex(Directory directory, IndexWriterConfig iwc, boolean sparse, int nDocs, int deltaTime, int seed)
203+
throws IOException {
204+
long counter1 = 0;
205+
long counter2 = 10_000_000;
206+
long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 };
207+
long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 };
208+
int numHosts = 10000;
209+
String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" };
210+
211+
final Random random = new Random(seed);
212+
try (var indexWriter = new IndexWriter(directory, iwc)) {
213+
for (int i = 0; i < nDocs; i++) {
214+
final Document doc = new Document();
215+
216+
final int batchIndex = i % numHosts;
217+
final String hostName = "host-" + batchIndex;
218+
// Slightly vary the timestamp in each document
219+
final long timestamp = BASE_TIMESTAMP + ((i % numHosts) * deltaTime) + random.nextInt(0, deltaTime);
220+
221+
doc.add(new SortedDocValuesField(HOSTNAME_FIELD, new BytesRef(hostName)));
222+
doc.add(new SortedNumericDocValuesField(TIMESTAMP_FIELD, timestamp));
223+
if (sparse == false || random.nextBoolean()) {
224+
doc.add(new SortedNumericDocValuesField("counter_1", counter1++));
225+
}
226+
if (sparse == false || random.nextBoolean()) {
227+
doc.add(new SortedNumericDocValuesField("counter_2", counter2++));
228+
}
229+
if (sparse == false || random.nextBoolean()) {
230+
doc.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length]));
231+
}
232+
if (sparse == false || random.nextBoolean()) {
233+
doc.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length]));
234+
}
235+
if (sparse == false || random.nextBoolean()) {
236+
int numTags = tags.length % (i + 1);
237+
for (int j = 0; j < numTags; j++) {
238+
doc.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j])));
239+
}
240+
}
241+
indexWriter.addDocument(doc);
242+
243+
if (i % 10000 == 0) {
244+
indexWriter.commit();
167245
}
168-
} catch (InterruptedException e) {
169-
executorService.shutdownNow();
170-
Thread.currentThread().interrupt();
171246
}
172247
}
173248
}

distribution/tools/server-cli/src/main/java/org/elasticsearch/server/cli/ServerCli.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,7 @@ protected ServerProcess startServer(Terminal terminal, ProcessInfo processInfo,
271271
.withProcessInfo(processInfo)
272272
.withServerArgs(args)
273273
.withTempDir(tempDir)
274-
.withJvmOptions(jvmOptions)
275-
.withWorkingDir(args.logsDir());
274+
.withJvmOptions(jvmOptions);
276275
return serverProcessBuilder.start();
277276
}
278277

distribution/tools/server-cli/src/main/java/org/elasticsearch/server/cli/ServerProcessBuilder.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.server.cli;
1111

1212
import org.elasticsearch.bootstrap.ServerArgs;
13+
import org.elasticsearch.cli.ExitCodes;
1314
import org.elasticsearch.cli.ProcessInfo;
1415
import org.elasticsearch.cli.Terminal;
1516
import org.elasticsearch.cli.UserException;
@@ -21,6 +22,8 @@
2122
import java.io.IOException;
2223
import java.io.OutputStream;
2324
import java.io.UncheckedIOException;
25+
import java.nio.file.FileAlreadyExistsException;
26+
import java.nio.file.Files;
2427
import java.nio.file.Path;
2528
import java.util.HashMap;
2629
import java.util.List;
@@ -44,7 +47,6 @@ public class ServerProcessBuilder {
4447
private ServerArgs serverArgs;
4548
private ProcessInfo processInfo;
4649
private List<String> jvmOptions;
47-
private Path workingDir;
4850
private Terminal terminal;
4951

5052
// this allows mocking the process building by tests
@@ -84,11 +86,6 @@ public ServerProcessBuilder withJvmOptions(List<String> jvmOptions) {
8486
return this;
8587
}
8688

87-
public ServerProcessBuilder withWorkingDir(Path workingDir) {
88-
this.workingDir = workingDir;
89-
return this;
90-
}
91-
9289
/**
9390
* Specifies the {@link Terminal} to use for reading input and writing output from/to the cli console
9491
*/
@@ -141,6 +138,17 @@ public ServerProcess start() throws UserException {
141138
return start(ProcessBuilder::start);
142139
}
143140

141+
private void ensureWorkingDirExists() throws UserException {
142+
Path workingDir = serverArgs.logsDir();
143+
try {
144+
Files.createDirectories(workingDir);
145+
} catch (FileAlreadyExistsException e) {
146+
throw new UserException(ExitCodes.CONFIG, "Logs dir [" + workingDir + "] exists but is not a directory", e);
147+
} catch (IOException e) {
148+
throw new UserException(ExitCodes.CONFIG, "Unable to create logs dir [" + workingDir + "]", e);
149+
}
150+
}
151+
144152
private static void checkRequiredArgument(Object argument, String argumentName) {
145153
if (argument == null) {
146154
throw new IllegalStateException(
@@ -157,12 +165,14 @@ ServerProcess start(ProcessStarter processStarter) throws UserException {
157165
checkRequiredArgument(jvmOptions, "jvmOptions");
158166
checkRequiredArgument(terminal, "terminal");
159167

168+
ensureWorkingDirExists();
169+
160170
Process jvmProcess = null;
161171
ErrorPumpThread errorPump;
162172

163173
boolean success = false;
164174
try {
165-
jvmProcess = createProcess(getCommand(), getJvmArgs(), jvmOptions, getEnvironment(), workingDir, processStarter);
175+
jvmProcess = createProcess(getCommand(), getJvmArgs(), jvmOptions, getEnvironment(), serverArgs.logsDir(), processStarter);
166176
errorPump = new ErrorPumpThread(terminal, jvmProcess.getErrorStream());
167177
errorPump.start();
168178
sendArgs(serverArgs, jvmProcess.getOutputStream());

0 commit comments

Comments
 (0)