Skip to content
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,83 +63,22 @@
public class TestLinearWriteSpeed {

public static void main(String[] args) throws Exception {
OptionParser parser = new OptionParser();

OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
.withRequiredArg()
.describedAs("path")
.ofType(String.class)
.defaultsTo(System.getProperty("java.io.tmpdir"));

OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Long.class);

OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Integer.class);

OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Integer.class)
.defaultsTo(1024);

OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of logs or files.")
.withRequiredArg()
.describedAs("num_files")
.ofType(Integer.class)
.defaultsTo(1);

OptionSpec<Long> reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
.withRequiredArg()
.describedAs("ms")
.ofType(Long.class)
.defaultsTo(1000L);

OptionSpec<Integer> maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
.withRequiredArg()
.describedAs("mb")
.ofType(Integer.class)
.defaultsTo(Integer.MAX_VALUE);

OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
.withRequiredArg()
.describedAs("message_count")
.ofType(Long.class)
.defaultsTo(Long.MAX_VALUE);

OptionSpec<String> compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
.withRequiredArg()
.describedAs("codec")
.ofType(String.class)
.defaultsTo(CompressionType.NONE.name);

OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use")
.withRequiredArg()
.describedAs("level")
.ofType(Integer.class);

OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.");
OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
var parser = new OptionParser();
var option = createOptions(parser);
OptionSet options = parser.parse(args);
CommandLineUtils.checkRequiredArgs(parser, options, bytesOpt, sizeOpt);

long bytesToWrite = options.valueOf(bytesOpt);
int bufferSize = options.valueOf(sizeOpt);
int numFiles = options.valueOf(filesOpt);
long reportingInterval = options.valueOf(reportingIntervalOpt);
String dir = options.valueOf(dirOpt);
long maxThroughputBytes = options.valueOf(maxThroughputOpt) * 1024L * 1024L;
CommandLineUtils.checkRequiredArgs(parser, options, option.bytesOpt, option.sizeOpt);
long bytesToWrite = options.valueOf(option.bytesOpt);
int bufferSize = options.valueOf(option.sizeOpt);
int numFiles = options.valueOf(option.filesOpt);
long reportingInterval = options.valueOf(option.reportingIntervalOpt);
String dir = options.valueOf(option.dirOpt);
long maxThroughputBytes = options.valueOf(option.maxThroughputOpt) * 1024L * 1024L;
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
int messageSize = options.valueOf(messageSizeOpt);
long flushInterval = options.valueOf(flushIntervalOpt);
CompressionType compressionType = CompressionType.forName(options.valueOf(compressionCodecOpt));
int messageSize = options.valueOf(option.messageSizeOpt);
long flushInterval = options.valueOf(option.flushIntervalOpt);
CompressionType compressionType = CompressionType.forName(options.valueOf(option.compressionCodecOpt));
Compression.Builder<? extends Compression> compressionBuilder = Compression.of(compressionType);
Integer compressionLevel = options.valueOf(compressionLevelOpt);
Integer compressionLevel = options.valueOf(option.compressionLevelOpt);

if (compressionLevel != null) setupCompression(compressionType, compressionBuilder, compressionLevel);
Compression compression = compressionBuilder.build();
Expand All @@ -159,17 +98,17 @@ public static void main(String[] args) throws Exception {
scheduler.startup();

for (int i = 0; i < numFiles; i++) {
if (options.has(mmapOpt)) {
if (options.has(option.mmapOpt)) {
writables[i] = new MmapWritable(new File(dir, "kafka-test-" + i + ".dat"), bytesToWrite / numFiles, buffer);
} else if (options.has(channelOpt)) {
} else if (options.has(option.channelOpt)) {
writables[i] = new ChannelWritable(new File(dir, "kafka-test-" + i + ".dat"), buffer);
} else if (options.has(logOpt)) {
} else if (options.has(option.logOpt)) {
int segmentSize = ThreadLocalRandom.current().nextInt(512) * 1024 * 1024 + 64 * 1024 * 1024;
Properties logProperties = new Properties();
logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, Integer.toString(segmentSize));
logProperties.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, Long.toString(flushInterval));
LogConfig logConfig = new LogConfig(logProperties);
writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet);
writables[i] = new LogWritable(new File(dir, "kafka-test-" + i), logConfig, scheduler, messageSet, compression, recordsList);
} else {
System.err.println("Must specify what to write to with one of --log, --channel, or --mmap");
Exit.exit(1);
Expand Down Expand Up @@ -298,9 +237,13 @@ public void close() throws IOException {
static class LogWritable implements Writable {
MemoryRecords messages;
UnifiedLog log;
Compression compression;
List<SimpleRecord> recordsList;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please convert it to array type to avoid creating many array?


public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecords messages) throws IOException {
public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecords messages, Compression compression, List<SimpleRecord> recordsList) throws IOException {
this.messages = messages;
this.compression = compression;
this.recordsList = recordsList;
Utils.delete(dir);
this.log = UnifiedLog.create(
dir,
Expand All @@ -323,6 +266,7 @@ public LogWritable(File dir, LogConfig config, Scheduler scheduler, MemoryRecord
}

public int write() {
this.messages = MemoryRecords.withRecords(compression, recordsList.toArray(new SimpleRecord[0]));
log.appendAsLeader(
messages,
0,
Expand All @@ -338,4 +282,91 @@ public void close() throws IOException {
Utils.delete(log.dir());
}
}
}

private record Options(OptionSpec<String> dirOpt, OptionSpec<Long> bytesOpt, OptionSpec<Integer> sizeOpt,
OptionSpec<Integer> messageSizeOpt, OptionSpec<Integer> filesOpt,
OptionSpec<Long> reportingIntervalOpt, OptionSpec<Integer> maxThroughputOpt,
OptionSpec<Long> flushIntervalOpt, OptionSpec<String> compressionCodecOpt,
OptionSpec<Integer> compressionLevelOpt, OptionSpec<Void> channelOpt,
OptionSpec<Void> logOpt, OptionSpec<Void> mmapOpt) {
}

private static Options createOptions(OptionParser parser) {
OptionSpec<String> dirOpt = parser.accepts("dir", "The directory to write to.")
.withRequiredArg()
.describedAs("path")
.ofType(String.class)
.defaultsTo(System.getProperty("java.io.tmpdir"));

OptionSpec<Long> bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Long.class);

OptionSpec<Integer> sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Integer.class);

OptionSpec<Integer> messageSizeOpt = parser.accepts("message-size", "The size of each message in the message set.")
.withRequiredArg()
.describedAs("num_bytes")
.ofType(Integer.class)
.defaultsTo(1024);

OptionSpec<Integer> filesOpt = parser.accepts("files", "The number of logs or files.")
.withRequiredArg()
.describedAs("num_files")
.ofType(Integer.class)
.defaultsTo(1);

OptionSpec<Long> reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.")
.withRequiredArg()
.describedAs("ms")
.ofType(Long.class)
.defaultsTo(1000L);

OptionSpec<Integer> maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.")
.withRequiredArg()
.describedAs("mb")
.ofType(Integer.class)
.defaultsTo(Integer.MAX_VALUE);

OptionSpec<Long> flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes")
.withRequiredArg()
.describedAs("message_count")
.ofType(Long.class)
.defaultsTo(Long.MAX_VALUE);

OptionSpec<String> compressionCodecOpt = parser.accepts("compression", "The compression codec to use")
.withRequiredArg()
.describedAs("codec")
.ofType(String.class)
.defaultsTo(CompressionType.NONE.name);

OptionSpec<Integer> compressionLevelOpt = parser.accepts("level", "The compression level to use")
.withRequiredArg()
.describedAs("level")
.ofType(Integer.class);

OptionSpec<Void> channelOpt = parser.accepts("channel", "Do writes to file channels.");
OptionSpec<Void> logOpt = parser.accepts("log", "Do writes to kafka logs.");
OptionSpec<Void> mmapOpt = parser.accepts("mmap", "Do writes to mmap file.");

return new Options(
dirOpt,
bytesOpt,
sizeOpt,
messageSizeOpt,
filesOpt,
reportingIntervalOpt,
maxThroughputOpt,
flushIntervalOpt,
compressionCodecOpt,
compressionLevelOpt,
channelOpt,
logOpt,
mmapOpt
);
}
}
Loading