diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java index 32c8d87062e46..48c9805b84690 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/TestLinearWriteSpeed.java @@ -63,83 +63,22 @@ public class TestLinearWriteSpeed { public static void main(String[] args) throws Exception { - OptionParser parser = new OptionParser(); - - OptionSpec dirOpt = parser.accepts("dir", "The directory to write to.") - .withRequiredArg() - .describedAs("path") - .ofType(String.class) - .defaultsTo(System.getProperty("java.io.tmpdir")); - - OptionSpec bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.") - .withRequiredArg() - .describedAs("num_bytes") - .ofType(Long.class); - - OptionSpec sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.") - .withRequiredArg() - .describedAs("num_bytes") - .ofType(Integer.class); - - OptionSpec messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.") - .withRequiredArg() - .describedAs("num_bytes") - .ofType(Integer.class) - .defaultsTo(1024); - - OptionSpec filesOpt = parser.accepts("files", "The number of logs or files.") - .withRequiredArg() - .describedAs("num_files") - .ofType(Integer.class) - .defaultsTo(1); - - OptionSpec reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.") - .withRequiredArg() - .describedAs("ms") - .ofType(Long.class) - .defaultsTo(1000L); - - OptionSpec maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.") - .withRequiredArg() - .describedAs("mb") - .ofType(Integer.class) - .defaultsTo(Integer.MAX_VALUE); - - OptionSpec flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes") - .withRequiredArg() - .describedAs("message_count") - .ofType(Long.class) - .defaultsTo(Long.MAX_VALUE); - - OptionSpec compressionCodecOpt = parser.accepts("compression", "The compression codec to use") - .withRequiredArg() - .describedAs("codec") - .ofType(String.class) - .defaultsTo(CompressionType.NONE.name); - - OptionSpec compressionLevelOpt = parser.accepts("level", "The compression level to use") - .withRequiredArg() - .describedAs("level") - .ofType(Integer.class); - - OptionSpec mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files."); - OptionSpec channelOpt = parser.accepts("channel", "Do writes to file channels."); - OptionSpec logOpt = parser.accepts("log", "Do writes to kafka logs."); + var parser = new OptionParser(); + var option = createOptions(parser); OptionSet options = parser.parse(args); - CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt); - - long bytesToWrite = options.valueOf(bytesOpt); - int bufferSize = options.valueOf(sizeOpt); - int numFiles = options.valueOf(filesOpt); - long reportingInterval = options.valueOf(reportingIntervalOpt); - String dir = options.valueOf(dirOpt); - long maxThroughputBytes = options.valueOf(maxThroughputOpt) * 1024L * 1024L; + CommandLineUtils.checkRequiredArgs(parser, options, option.bytesOpt, option.sizeOpt); + long bytesToWrite = options.valueOf(option.bytesOpt); + int bufferSize = options.valueOf(option.sizeOpt); + int numFiles = options.valueOf(option.filesOpt); + long reportingInterval = options.valueOf(option.reportingIntervalOpt); + String dir = options.valueOf(option.dirOpt); + long maxThroughputBytes = options.valueOf(option.maxThroughputOpt) * 1024L * 1024L; ByteBuffer buffer = ByteBuffer.allocate(bufferSize); - int messageSize = options.valueOf(messageSizeOpt); - long flushInterval = options.valueOf(flushIntervalOpt); - CompressionType compressionType = CompressionType.forName(options.valueOf(compressionCodecOpt)); + int messageSize = options.valueOf(option.messageSizeOpt); + long flushInterval = options.valueOf(option.flushIntervalOpt); + CompressionType compressionType = CompressionType.forName(options.valueOf(option.compressionCodecOpt)); Compression.Builder compressionBuilder = Compression.of(compressionType); - Integer compressionLevel = options.valueOf(compressionLevelOpt); + Integer compressionLevel = options.valueOf(option.compressionLevelOpt); if (compressionLevel != null) setupCompression(compressionType, compressionBuilder, compressionLevel); Compression compression = compressionBuilder.build(); @@ -159,17 +98,17 @@ public static void main(String[] args) throws Exception { scheduler.startup(); for (int i = 0; i < numFiles; i++) { - if (options.has(mmapOpt)) { + if (options.has(option.mmapOpt)) { writables[i] = new MmapWritable(new File(dir, "kafka-test-" + i + ".dat"), bytesToWrite / numFiles, buffer); - } else if (options.has(channelOpt)) { + } else if (options.has(option.channelOpt)) { writables[i] = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer); - } else if (options.has(logOpt)) { + } else if (options.has(option.logOpt)) { int segmentSize = ThreadLocalRandom.current().nextInt(512) * 1024 * 1024 + 64 * 1024 * 1024; Properties logProperties = new Properties(); logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, Integer.toString(segmentSize)); logProperties.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, Long.toString(flushInterval)); LogConfig logConfig = new LogConfig(logProperties); - writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet); + writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet, compression, recordsList); } else { System.err.println("Must specify what to write to with one of --log, --channel, or --mmap"); Exit.exit(1); @@ -298,9 +237,13 @@ public void close() throws IOException { static class LogWritable implements Writable { MemoryRecords messages; UnifiedLog log; + Compression compression; + SimpleRecord[] records; - public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecords messages) throws IOException { + public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecords messages, Compression compression, List recordsList) throws IOException { this.messages = messages; + this.compression = compression; + this.records = recordsList.toArray(new SimpleRecord[0]); Utils.delete(dir); this.log = UnifiedLog.create( dir, @@ -323,6 +266,7 @@ public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecord } public int write() { + this.messages = MemoryRecords.withRecords(compression, records); log.appendAsLeader( messages, 0, @@ -338,4 +282,91 @@ public void close() throws IOException { Utils.delete(log.dir()); } } -} + + private record Options(OptionSpec dirOpt, OptionSpec bytesOpt, OptionSpec sizeOpt, + OptionSpec messageSizeOpt, OptionSpec filesOpt, + OptionSpec reportingIntervalOpt, OptionSpec maxThroughputOpt, + OptionSpec flushIntervalOpt, OptionSpec compressionCodecOpt, + OptionSpec compressionLevelOpt, OptionSpec channelOpt, + OptionSpec logOpt, OptionSpec mmapOpt) { + } + + private static Options createOptions(OptionParser parser) { + OptionSpec dirOpt = parser.accepts("dir", "The directory to write to.") + .withRequiredArg() + .describedAs("path") + .ofType(String.class) + .defaultsTo(System.getProperty("java.io.tmpdir")); + + OptionSpec bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.") + .withRequiredArg() + .describedAs("num_bytes") + .ofType(Long.class); + + OptionSpec sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.") + .withRequiredArg() + .describedAs("num_bytes") + .ofType(Integer.class); + + OptionSpec messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.") + .withRequiredArg() + .describedAs("num_bytes") + .ofType(Integer.class) + .defaultsTo(1024); + + OptionSpec filesOpt = parser.accepts("files", "The number of logs or files.") + .withRequiredArg() + .describedAs("num_files") + .ofType(Integer.class) + .defaultsTo(1); + + OptionSpec reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.") + .withRequiredArg() + .describedAs("ms") + .ofType(Long.class) + .defaultsTo(1000L); + + OptionSpec maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.") + .withRequiredArg() + .describedAs("mb") + .ofType(Integer.class) + .defaultsTo(Integer.MAX_VALUE); + + OptionSpec flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes") + .withRequiredArg() + .describedAs("message_count") + .ofType(Long.class) + .defaultsTo(Long.MAX_VALUE); + + OptionSpec compressionCodecOpt = parser.accepts("compression", "The compression codec to use") + .withRequiredArg() + .describedAs("codec") + .ofType(String.class) + .defaultsTo(CompressionType.NONE.name); + + OptionSpec compressionLevelOpt = parser.accepts("level", "The compression level to use") + .withRequiredArg() + .describedAs("level") + .ofType(Integer.class); + + OptionSpec channelOpt = parser.accepts("channel", "Do writes to file channels."); + OptionSpec logOpt = parser.accepts("log", "Do writes to kafka logs."); + OptionSpec mmapOpt = parser.accepts("mmap", "Do writes to mmap file."); + + return new Options( + dirOpt, + bytesOpt, + sizeOpt, + messageSizeOpt, + filesOpt, + reportingIntervalOpt, + maxThroughputOpt, + flushIntervalOpt, + compressionCodecOpt, + compressionLevelOpt, + channelOpt, + logOpt, + mmapOpt + ); + } +} \ No newline at end of file