Skip to content

Commit bf56000

Browse files
jim0987795064Ubuntu
authored andcommitted
Refactor main function of TestLinearWriteSpeed
1 parent 423330e commit bf56000

File tree

1 file changed

+153
-81
lines changed

1 file changed

+153
-81
lines changed

jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java

Lines changed: 153 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -63,83 +63,19 @@
6363
public class TestLinearWriteSpeed {
6464

6565
public static void main(String[] args) throws Exception {
66-
OptionParser parser = new OptionParser();
67-
68-
OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
69-
.withRequiredArg()
70-
.describedAs("path")
71-
.ofType(String.class)
72-
.defaultsTo(System.getProperty("java.io.tmpdir"));
73-
74-
OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
75-
.withRequiredArg()
76-
.describedAs("num_bytes")
77-
.ofType(Long.class);
78-
79-
OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
80-
.withRequiredArg()
81-
.describedAs("num_bytes")
82-
.ofType(Integer.class);
83-
84-
OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.")
85-
.withRequiredArg()
86-
.describedAs("num_bytes")
87-
.ofType(Integer.class)
88-
.defaultsTo(1024);
89-
90-
OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of logs or files.")
91-
.withRequiredArg()
92-
.describedAs("num_files")
93-
.ofType(Integer.class)
94-
.defaultsTo(1);
95-
96-
OptionSpec<Long> reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
97-
.withRequiredArg()
98-
.describedAs("ms")
99-
.ofType(Long.class)
100-
.defaultsTo(1000L);
101-
102-
OptionSpec<Integer> maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
103-
.withRequiredArg()
104-
.describedAs("mb")
105-
.ofType(Integer.class)
106-
.defaultsTo(Integer.MAX_VALUE);
107-
108-
OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
109-
.withRequiredArg()
110-
.describedAs("message_count")
111-
.ofType(Long.class)
112-
.defaultsTo(Long.MAX_VALUE);
113-
114-
OptionSpec<String> compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
115-
.withRequiredArg()
116-
.describedAs("codec")
117-
.ofType(String.class)
118-
.defaultsTo(CompressionType.NONE.name);
119-
120-
OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use")
121-
.withRequiredArg()
122-
.describedAs("level")
123-
.ofType(Integer.class);
124-
125-
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.");
126-
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
127-
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
128-
OptionSet options = parser.parse(args);
129-
CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt);
130-
131-
long bytesToWrite = options.valueOf(bytesOpt);
132-
int bufferSize = options.valueOf(sizeOpt);
133-
int numFiles = options.valueOf(filesOpt);
134-
long reportingInterval = options.valueOf(reportingIntervalOpt);
135-
String dir = options.valueOf(dirOpt);
136-
long maxThroughputBytes = options.valueOf(maxThroughputOpt) * 1024L * 1024L;
66+
var option = createOptions(new OptionParser(), args);
67+
long bytesToWrite = option.options.valueOf(option.bytesOpt);
68+
int bufferSize = option.options.valueOf(option.sizeOpt);
69+
int numFiles = option.options.valueOf(option.filesOpt);
70+
long reportingInterval = option.options.valueOf(option.reportingIntervalOpt);
71+
String dir = option.options.valueOf(option.dirOpt);
72+
long maxThroughputBytes = option.options.valueOf(option.maxThroughputOpt) * 1024L * 1024L;
13773
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
138-
int messageSize = options.valueOf(messageSizeOpt);
139-
long flushInterval = options.valueOf(flushIntervalOpt);
140-
CompressionType compressionType = CompressionType.forName(options.valueOf(compressionCodecOpt));
74+
int messageSize = option.options.valueOf(option.messageSizeOpt);
75+
long flushInterval = option.options.valueOf(option.flushIntervalOpt);
76+
CompressionType compressionType = CompressionType.forName(option.options.valueOf(option.compressionCodecOpt));
14177
Compression.Builder<? extends Compression> compressionBuilder = Compression.of(compressionType);
142-
Integer compressionLevel = options.valueOf(compressionLevelOpt);
78+
Integer compressionLevel = option.options.valueOf(option.compressionLevelOpt);
14379

14480
if (compressionLevel != null) setupCompression(compressionType, compressionBuilder, compressionLevel);
14581
Compression compression = compressionBuilder.build();
@@ -159,17 +95,17 @@ public static void main(String[] args) throws Exception {
15995
scheduler.startup();
16096

16197
for (int i = 0; i < numFiles; i++) {
162-
if (options.has(mmapOpt)) {
98+
if (option.options.has(option.mmapOpt)) {
16399
writables[i] = new MmapWritable(new File(dir, "kafka-test-" + i + ".dat"), bytesToWrite / numFiles, buffer);
164-
} else if (options.has(channelOpt)) {
100+
} else if (option.options.has(option.channelOpt)) {
165101
writables[i] = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer);
166-
} else if (options.has(logOpt)) {
102+
} else if (option.options.has(option.logOpt)) {
167103
int segmentSize = ThreadLocalRandom.current().nextInt(512) * 1024 * 1024 + 64 * 1024 * 1024;
168104
Properties logProperties = new Properties();
169105
logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, Integer.toString(segmentSize));
170106
logProperties.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, Long.toString(flushInterval));
171107
LogConfig logConfig = new LogConfig(logProperties);
172-
writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet);
108+
writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet, compression, recordsList);
173109
} else {
174110
System.err.println("Must specify what to write to with one of --log, --channel, or --mmap");
175111
Exit.exit(1);
@@ -298,9 +234,13 @@ public void close() throws IOException {
298234
static class LogWritable implements Writable {
299235
MemoryRecords messages;
300236
UnifiedLog log;
237+
Compression compression;
238+
List<SimpleRecord> recordsList;
301239

302-
public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecords messages) throws IOException {
240+
public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecords messages, Compression compression, List<SimpleRecord> recordsList) throws IOException {
303241
this.messages = messages;
242+
this.compression = compression;
243+
this.recordsList = recordsList;
304244
Utils.delete(dir);
305245
this.log = UnifiedLog.create(
306246
dir,
@@ -323,6 +263,7 @@ public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecord
323263
}
324264

325265
public int write() {
266+
this.messages = MemoryRecords.withRecords(compression, recordsList.toArray(new SimpleRecord[0]));
326267
log.appendAsLeader(
327268
messages,
328269
0,
@@ -338,4 +279,135 @@ public void close() throws IOException {
338279
Utils.delete(log.dir());
339280
}
340281
}
341-
}
282+
283+
private static class Options {
284+
private final OptionSpec<String> dirOpt;
285+
private final OptionSpec<Long> bytesOpt;
286+
private final OptionSpec<Integer> sizeOpt;
287+
private final OptionSpec<Integer> messageSizeOpt;
288+
private final OptionSpec<Integer> filesOpt;
289+
private final OptionSpec<Long> reportingIntervalOpt;
290+
private final OptionSpec<Integer> maxThroughputOpt;
291+
private final OptionSpec<Long> flushIntervalOpt;
292+
private final OptionSpec<String> compressionCodecOpt;
293+
private final OptionSpec<Integer> compressionLevelOpt;
294+
private final OptionSpec<Void> channelOpt;
295+
private final OptionSpec<Void> logOpt;
296+
private final OptionSpec<Void> mmapOpt;
297+
private final OptionSet options;
298+
299+
private Options(
300+
OptionSpec<String> dirOpt,
301+
OptionSpec<Long> bytesOpt,
302+
OptionSpec<Integer> sizeOpt,
303+
OptionSpec<Integer> messageSizeOpt,
304+
OptionSpec<Integer> filesOpt,
305+
OptionSpec<Long> reportingIntervalOpt,
306+
OptionSpec<Integer> maxThroughputOpt,
307+
OptionSpec<Long> flushIntervalOpt,
308+
OptionSpec<String> compressionCodecOpt,
309+
OptionSpec<Integer> compressionLevelOpt,
310+
OptionSpec<Void> channelOpt,
311+
OptionSpec<Void> logOpt,
312+
OptionSpec<Void> mmapOpt,
313+
OptionSet options
314+
) {
315+
this.dirOpt = dirOpt;
316+
this.bytesOpt = bytesOpt;
317+
this.sizeOpt = sizeOpt;
318+
this.messageSizeOpt = messageSizeOpt;
319+
this.filesOpt = filesOpt;
320+
this.reportingIntervalOpt = reportingIntervalOpt;
321+
this.maxThroughputOpt = maxThroughputOpt;
322+
this.flushIntervalOpt = flushIntervalOpt;
323+
this.compressionCodecOpt = compressionCodecOpt;
324+
this.compressionLevelOpt = compressionLevelOpt;
325+
this.channelOpt = channelOpt;
326+
this.logOpt = logOpt;
327+
this.mmapOpt = mmapOpt;
328+
this.options = options;
329+
}
330+
}
331+
332+
private static Options createOptions(OptionParser parser, String[] args) {
333+
OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
334+
.withRequiredArg()
335+
.describedAs("path")
336+
.ofType(String.class)
337+
.defaultsTo(System.getProperty("java.io.tmpdir"));
338+
339+
OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
340+
.withRequiredArg()
341+
.describedAs("num_bytes")
342+
.ofType(Long.class);
343+
344+
OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
345+
.withRequiredArg()
346+
.describedAs("num_bytes")
347+
.ofType(Integer.class);
348+
349+
OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.")
350+
.withRequiredArg()
351+
.describedAs("num_bytes")
352+
.ofType(Integer.class)
353+
.defaultsTo(1024);
354+
355+
OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of logs or files.")
356+
.withRequiredArg()
357+
.describedAs("num_files")
358+
.ofType(Integer.class)
359+
.defaultsTo(1);
360+
361+
OptionSpec<Long> reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
362+
.withRequiredArg()
363+
.describedAs("ms")
364+
.ofType(Long.class)
365+
.defaultsTo(1000L);
366+
367+
OptionSpec<Integer> maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
368+
.withRequiredArg()
369+
.describedAs("mb")
370+
.ofType(Integer.class)
371+
.defaultsTo(Integer.MAX_VALUE);
372+
373+
OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
374+
.withRequiredArg()
375+
.describedAs("message_count")
376+
.ofType(Long.class)
377+
.defaultsTo(Long.MAX_VALUE);
378+
379+
OptionSpec<String> compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
380+
.withRequiredArg()
381+
.describedAs("codec")
382+
.ofType(String.class)
383+
.defaultsTo(CompressionType.NONE.name);
384+
385+
OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use")
386+
.withRequiredArg()
387+
.describedAs("level")
388+
.ofType(Integer.class);
389+
390+
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
391+
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
392+
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to mmap file.");
393+
OptionSet options = parser.parse(args);
394+
CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt);
395+
396+
return new Options(
397+
dirOpt,
398+
bytesOpt,
399+
sizeOpt,
400+
messageSizeOpt,
401+
filesOpt,
402+
reportingIntervalOpt,
403+
maxThroughputOpt,
404+
flushIntervalOpt,
405+
compressionCodecOpt,
406+
compressionLevelOpt,
407+
channelOpt,
408+
logOpt,
409+
mmapOpt,
410+
options
411+
);
412+
}
413+
}

0 commit comments

Comments
 (0)