Skip to content

Commit caf6142

Browse files
committed
Refactor main function of TestLinearWriteSpeed
1 parent e018314 commit caf6142

File tree

1 file changed

+156
-98
lines changed

1 file changed

+156
-98
lines changed

jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java

Lines changed: 156 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -63,83 +63,19 @@
6363
public class TestLinearWriteSpeed {
6464

6565
public static void main(String[] args) throws Exception {
66-
OptionParser parser = new OptionParser();
67-
68-
OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
69-
.withRequiredArg()
70-
.describedAs("path")
71-
.ofType(String.class)
72-
.defaultsTo(System.getProperty("java.io.tmpdir"));
73-
74-
OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
75-
.withRequiredArg()
76-
.describedAs("num_bytes")
77-
.ofType(Long.class);
78-
79-
OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
80-
.withRequiredArg()
81-
.describedAs("num_bytes")
82-
.ofType(Integer.class);
83-
84-
OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.")
85-
.withRequiredArg()
86-
.describedAs("num_bytes")
87-
.ofType(Integer.class)
88-
.defaultsTo(1024);
89-
90-
OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of logs or files.")
91-
.withRequiredArg()
92-
.describedAs("num_files")
93-
.ofType(Integer.class)
94-
.defaultsTo(1);
95-
96-
OptionSpec<Long> reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
97-
.withRequiredArg()
98-
.describedAs("ms")
99-
.ofType(Long.class)
100-
.defaultsTo(1000L);
101-
102-
OptionSpec<Integer> maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
103-
.withRequiredArg()
104-
.describedAs("mb")
105-
.ofType(Integer.class)
106-
.defaultsTo(Integer.MAX_VALUE);
107-
108-
OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
109-
.withRequiredArg()
110-
.describedAs("message_count")
111-
.ofType(Long.class)
112-
.defaultsTo(Long.MAX_VALUE);
113-
114-
OptionSpec<String> compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
115-
.withRequiredArg()
116-
.describedAs("codec")
117-
.ofType(String.class)
118-
.defaultsTo(CompressionType.NONE.name);
119-
120-
OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use")
121-
.withRequiredArg()
122-
.describedAs("level")
123-
.ofType(Integer.class);
124-
125-
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.");
126-
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
127-
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
128-
OptionSet options = parser.parse(args);
129-
CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt);
130-
131-
long bytesToWrite = options.valueOf(bytesOpt);
132-
int bufferSize = options.valueOf(sizeOpt);
133-
int numFiles = options.valueOf(filesOpt);
134-
long reportingInterval = options.valueOf(reportingIntervalOpt);
135-
String dir = options.valueOf(dirOpt);
136-
long maxThroughputBytes = options.valueOf(maxThroughputOpt) * 1024L * 1024L;
66+
var option = createOptions(new OptionParser(), args);
67+
long bytesToWrite = option.options.valueOf(option.bytesOpt);
68+
int bufferSize = option.options.valueOf(option.sizeOpt);
69+
int numFiles = option.options.valueOf(option.filesOpt);
70+
long reportingInterval = option.options.valueOf(option.reportingIntervalOpt);
71+
String dir = option.options.valueOf(option.dirOpt);
72+
long maxThroughputBytes = option.options.valueOf(option.maxThroughputOpt) * 1024L * 1024L;
13773
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
138-
int messageSize = options.valueOf(messageSizeOpt);
139-
long flushInterval = options.valueOf(flushIntervalOpt);
140-
CompressionType compressionType = CompressionType.forName(options.valueOf(compressionCodecOpt));
74+
int messageSize = option.options.valueOf(option.messageSizeOpt);
75+
long flushInterval = option.options.valueOf(option.flushIntervalOpt);
76+
CompressionType compressionType = CompressionType.forName(option.options.valueOf(option.compressionCodecOpt));
14177
Compression.Builder<? extends Compression> compressionBuilder = Compression.of(compressionType);
142-
Integer compressionLevel = options.valueOf(compressionLevelOpt);
78+
Integer compressionLevel = option.options.valueOf(option.compressionLevelOpt);
14379

14480
if (compressionLevel != null) setupCompression(compressionType, compressionBuilder, compressionLevel);
14581
Compression compression = compressionBuilder.build();
@@ -155,22 +91,21 @@ public static void main(String[] args) throws Exception {
15591

15692
MemoryRecords messageSet = MemoryRecords.withRecords(compression, recordsList.toArray(new SimpleRecord[0]));
15793
Writable[] writables = new Writable[numFiles];
158-
LogWritable[] logs = new LogWritable[numFiles];
15994
KafkaScheduler scheduler = new KafkaScheduler(1);
16095
scheduler.startup();
16196

16297
for (int i = 0; i < numFiles; i++) {
163-
if (options.has(mmapOpt)) {
98+
if (option.options.has(option.mmapOpt)) {
16499
writables[i] = new MmapWritable(new File(dir, "kafka-test-" + i + ".dat"), bytesToWrite / numFiles, buffer);
165-
} else if (options.has(channelOpt)) {
100+
} else if (option.options.has(option.channelOpt)) {
166101
writables[i] = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer);
167-
} else if (options.has(logOpt)) {
102+
} else if (option.options.has(option.logOpt)) {
168103
int segmentSize = ThreadLocalRandom.current().nextInt(512) * 1024 * 1024 + 64 * 1024 * 1024;
169104
Properties logProperties = new Properties();
170105
logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, Integer.toString(segmentSize));
171106
logProperties.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, Long.toString(flushInterval));
172107
LogConfig logConfig = new LogConfig(logProperties);
173-
logs[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet);
108+
writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet, compression, recordsList);
174109
} else {
175110
System.err.println("Must specify what to write to with one of --log, --channel, or --mmap");
176111
Exit.exit(1);
@@ -187,17 +122,10 @@ public static void main(String[] args) throws Exception {
187122
long written = 0L;
188123
long totalWritten = 0L;
189124
long lastReport = beginTest;
190-
int writeSize = 0;
191125

192126
while (totalWritten + bufferSize < bytesToWrite) {
193127
long start = System.nanoTime();
194-
if (options.has(logOpt)) {
195-
messageSet = MemoryRecords.withRecords(compression, recordsList.toArray(new SimpleRecord[0]));
196-
logs[(int) (count % numFiles)].messages = messageSet;
197-
writeSize = logs[(int) (count % numFiles)].write();
198-
} else {
199-
writeSize = writables[(int) (count % numFiles)].write();
200-
}
128+
int writeSize = writables[(int) (count % numFiles)].write();
201129
long elapsed = System.nanoTime() - start;
202130
maxLatency = Math.max(elapsed, maxLatency);
203131
totalLatency += elapsed;
@@ -223,14 +151,8 @@ public static void main(String[] args) throws Exception {
223151
double elapsedSecs = (System.nanoTime() - beginTest) / (1000.0 * 1000.0 * 1000.0);
224152
System.out.println((bytesToWrite / (1024.0 * 1024.0 * elapsedSecs)) + " MB per sec");
225153
scheduler.shutdown();
226-
if (options.has(logOpt)) {
227-
for (LogWritable log : logs) {
228-
log.close();
229-
}
230-
} else {
231-
for (Writable writable : writables) {
232-
writable.close();
233-
}
154+
for (Writable writable : writables) {
155+
writable.close();
234156
}
235157
}
236158

@@ -312,9 +234,13 @@ public void close() throws IOException {
312234
static class LogWritable implements Writable {
313235
MemoryRecords messages;
314236
UnifiedLog log;
237+
Compression compression;
238+
List<SimpleRecord> recordsList;
315239

316-
public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecords messages) throws IOException {
240+
public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecords messages, Compression compression, List<SimpleRecord> recordsList) throws IOException {
317241
this.messages = messages;
242+
this.compression = compression;
243+
this.recordsList = recordsList;
318244
Utils.delete(dir);
319245
this.log = UnifiedLog.create(
320246
dir,
@@ -337,6 +263,7 @@ public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecord
337263
}
338264

339265
public int write() {
266+
this.messages = MemoryRecords.withRecords(compression, recordsList.toArray(new SimpleRecord[0]));
340267
log.appendAsLeader(
341268
messages,
342269
0,
@@ -352,4 +279,135 @@ public void close() throws IOException {
352279
Utils.delete(log.dir());
353280
}
354281
}
355-
}
282+
283+
private static class Options {
284+
private final OptionSpec<String> dirOpt;
285+
private final OptionSpec<Long> bytesOpt;
286+
private final OptionSpec<Integer> sizeOpt;
287+
private final OptionSpec<Integer> messageSizeOpt;
288+
private final OptionSpec<Integer> filesOpt;
289+
private final OptionSpec<Long> reportingIntervalOpt;
290+
private final OptionSpec<Integer> maxThroughputOpt;
291+
private final OptionSpec<Long> flushIntervalOpt;
292+
private final OptionSpec<String> compressionCodecOpt;
293+
private final OptionSpec<Integer> compressionLevelOpt;
294+
private final OptionSpec<Void> channelOpt;
295+
private final OptionSpec<Void> logOpt;
296+
private final OptionSpec<Void> mmapOpt;
297+
private final OptionSet options;
298+
299+
private Options(
300+
OptionSpec<String> dirOpt,
301+
OptionSpec<Long> bytesOpt,
302+
OptionSpec<Integer> sizeOpt,
303+
OptionSpec<Integer> messageSizeOpt,
304+
OptionSpec<Integer> filesOpt,
305+
OptionSpec<Long> reportingIntervalOpt,
306+
OptionSpec<Integer> maxThroughputOpt,
307+
OptionSpec<Long> flushIntervalOpt,
308+
OptionSpec<String> compressionCodecOpt,
309+
OptionSpec<Integer> compressionLevelOpt,
310+
OptionSpec<Void> channelOpt,
311+
OptionSpec<Void> logOpt,
312+
OptionSpec<Void> mmapOpt,
313+
OptionSet options
314+
) {
315+
this.dirOpt = dirOpt;
316+
this.bytesOpt = bytesOpt;
317+
this.sizeOpt = sizeOpt;
318+
this.messageSizeOpt = messageSizeOpt;
319+
this.filesOpt = filesOpt;
320+
this.reportingIntervalOpt = reportingIntervalOpt;
321+
this.maxThroughputOpt = maxThroughputOpt;
322+
this.flushIntervalOpt = flushIntervalOpt;
323+
this.compressionCodecOpt = compressionCodecOpt;
324+
this.compressionLevelOpt = compressionLevelOpt;
325+
this.channelOpt = channelOpt;
326+
this.logOpt = logOpt;
327+
this.mmapOpt = mmapOpt;
328+
this.options = options;
329+
}
330+
}
331+
332+
private static Options createOptions(OptionParser parser, String[] args) {
333+
OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
334+
.withRequiredArg()
335+
.describedAs("path")
336+
.ofType(String.class)
337+
.defaultsTo(System.getProperty("java.io.tmpdir"));
338+
339+
OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
340+
.withRequiredArg()
341+
.describedAs("num_bytes")
342+
.ofType(Long.class);
343+
344+
OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
345+
.withRequiredArg()
346+
.describedAs("num_bytes")
347+
.ofType(Integer.class);
348+
349+
OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.")
350+
.withRequiredArg()
351+
.describedAs("num_bytes")
352+
.ofType(Integer.class)
353+
.defaultsTo(1024);
354+
355+
OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of logs or files.")
356+
.withRequiredArg()
357+
.describedAs("num_files")
358+
.ofType(Integer.class)
359+
.defaultsTo(1);
360+
361+
OptionSpec<Long> reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
362+
.withRequiredArg()
363+
.describedAs("ms")
364+
.ofType(Long.class)
365+
.defaultsTo(1000L);
366+
367+
OptionSpec<Integer> maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
368+
.withRequiredArg()
369+
.describedAs("mb")
370+
.ofType(Integer.class)
371+
.defaultsTo(Integer.MAX_VALUE);
372+
373+
OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
374+
.withRequiredArg()
375+
.describedAs("message_count")
376+
.ofType(Long.class)
377+
.defaultsTo(Long.MAX_VALUE);
378+
379+
OptionSpec<String> compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
380+
.withRequiredArg()
381+
.describedAs("codec")
382+
.ofType(String.class)
383+
.defaultsTo(CompressionType.NONE.name);
384+
385+
OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use")
386+
.withRequiredArg()
387+
.describedAs("level")
388+
.ofType(Integer.class);
389+
390+
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
391+
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
392+
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to mmap file.");
393+
OptionSet options = parser.parse(args);
394+
CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt);
395+
396+
return new Options(
397+
dirOpt,
398+
bytesOpt,
399+
sizeOpt,
400+
messageSizeOpt,
401+
filesOpt,
402+
reportingIntervalOpt,
403+
maxThroughputOpt,
404+
flushIntervalOpt,
405+
compressionCodecOpt,
406+
compressionLevelOpt,
407+
channelOpt,
408+
logOpt,
409+
mmapOpt,
410+
options
411+
);
412+
}
413+
}

0 commit comments

Comments
 (0)