63
63
public class TestLinearWriteSpeed {
64
64
65
65
public static void main (String [] args ) throws Exception {
66
- var option = createOptions (new OptionParser (), args );
67
- long bytesToWrite = option .options .valueOf (option .bytesOpt );
68
- int bufferSize = option .options .valueOf (option .sizeOpt );
69
- int numFiles = option .options .valueOf (option .filesOpt );
70
- long reportingInterval = option .options .valueOf (option .reportingIntervalOpt );
71
- String dir = option .options .valueOf (option .dirOpt );
72
- long maxThroughputBytes = option .options .valueOf (option .maxThroughputOpt ) * 1024L * 1024L ;
66
+ var parser = new OptionParser ();
67
+ var option = createOptions (parser );
68
+ OptionSet options = parser .parse (args );
69
+ CommandLineUtils .checkRequiredArgs (parser , options , option .bytesOpt , option .sizeOpt );
70
+ long bytesToWrite = options .valueOf (option .bytesOpt );
71
+ int bufferSize = options .valueOf (option .sizeOpt );
72
+ int numFiles = options .valueOf (option .filesOpt );
73
+ long reportingInterval = options .valueOf (option .reportingIntervalOpt );
74
+ String dir = options .valueOf (option .dirOpt );
75
+ long maxThroughputBytes = options .valueOf (option .maxThroughputOpt ) * 1024L * 1024L ;
73
76
ByteBuffer buffer = ByteBuffer .allocate (bufferSize );
74
- int messageSize = option . options .valueOf (option .messageSizeOpt );
75
- long flushInterval = option . options .valueOf (option .flushIntervalOpt );
76
- CompressionType compressionType = CompressionType .forName (option . options .valueOf (option .compressionCodecOpt ));
77
+ int messageSize = options .valueOf (option .messageSizeOpt );
78
+ long flushInterval = options .valueOf (option .flushIntervalOpt );
79
+ CompressionType compressionType = CompressionType .forName (options .valueOf (option .compressionCodecOpt ));
77
80
Compression .Builder <? extends Compression > compressionBuilder = Compression .of (compressionType );
78
- Integer compressionLevel = option . options .valueOf (option .compressionLevelOpt );
81
+ Integer compressionLevel = options .valueOf (option .compressionLevelOpt );
79
82
80
83
if (compressionLevel != null ) setupCompression (compressionType , compressionBuilder , compressionLevel );
81
84
Compression compression = compressionBuilder .build ();
@@ -95,11 +98,11 @@ public static void main(String[] args) throws Exception {
95
98
scheduler .startup ();
96
99
97
100
for (int i = 0 ; i < numFiles ; i ++) {
98
- if (option . options .has (option .mmapOpt )) {
101
+ if (options .has (option .mmapOpt )) {
99
102
writables [i ] = new MmapWritable (new File (dir , "kafka-test-" + i + ".dat" ), bytesToWrite / numFiles , buffer );
100
- } else if (option . options .has (option .channelOpt )) {
103
+ } else if (options .has (option .channelOpt )) {
101
104
writables [i ] = new ChannelWritable (new File (dir , "kafka-test-" + i + ".dat" ), buffer );
102
- } else if (option . options .has (option .logOpt )) {
105
+ } else if (options .has (option .logOpt )) {
103
106
int segmentSize = ThreadLocalRandom .current ().nextInt (512 ) * 1024 * 1024 + 64 * 1024 * 1024 ;
104
107
Properties logProperties = new Properties ();
105
108
logProperties .put (TopicConfig .SEGMENT_BYTES_CONFIG , Integer .toString (segmentSize ));
@@ -294,7 +297,6 @@ private static class Options {
294
297
private final OptionSpec <Void > channelOpt ;
295
298
private final OptionSpec <Void > logOpt ;
296
299
private final OptionSpec <Void > mmapOpt ;
297
- private final OptionSet options ;
298
300
299
301
private Options (
300
302
OptionSpec <String > dirOpt ,
@@ -309,8 +311,7 @@ private Options(
309
311
OptionSpec <Integer > compressionLevelOpt ,
310
312
OptionSpec <Void > channelOpt ,
311
313
OptionSpec <Void > logOpt ,
312
- OptionSpec <Void > mmapOpt ,
313
- OptionSet options
314
+ OptionSpec <Void > mmapOpt
314
315
) {
315
316
this .dirOpt = dirOpt ;
316
317
this .bytesOpt = bytesOpt ;
@@ -325,11 +326,10 @@ private Options(
325
326
this .channelOpt = channelOpt ;
326
327
this .logOpt = logOpt ;
327
328
this .mmapOpt = mmapOpt ;
328
- this .options = options ;
329
329
}
330
330
}
331
331
332
- private static Options createOptions (OptionParser parser , String [] args ) {
332
+ private static Options createOptions (OptionParser parser ) {
333
333
OptionSpec <String > dirOpt = parser .accepts ("dir" , "The directory to write to." )
334
334
.withRequiredArg ()
335
335
.describedAs ("path" )
@@ -390,8 +390,6 @@ private static Options createOptions(OptionParser parser, String[] args) {
390
390
OptionSpec <Void > channelOpt = parser .accepts ("channel" , "Do writes to file channels." );
391
391
OptionSpec <Void > logOpt = parser .accepts ("log" , "Do writes to kafka logs." );
392
392
OptionSpec <Void > mmapOpt = parser .accepts ("mmap" , "Do writes to mmap file." );
393
- OptionSet options = parser .parse (args );
394
- CommandLineUtils .checkRequiredArgs (parser , options , bytesOpt , sizeOpt );
395
393
396
394
return new Options (
397
395
dirOpt ,
@@ -406,8 +404,7 @@ private static Options createOptions(OptionParser parser, String[] args) {
406
404
compressionLevelOpt ,
407
405
channelOpt ,
408
406
logOpt ,
409
- mmapOpt ,
410
- options
407
+ mmapOpt
411
408
);
412
409
}
413
410
}
0 commit comments