@@ -81,13 +81,13 @@ public static void main(String[] args) throws Exception {
8181 .describedAs ("num_bytes" )
8282 .ofType (Integer .class );
8383
84- OptionSpec <Integer > messageSizeOpt = parser .accepts ("message-size" , "REQUIRED: The size of each message in the message set." )
84+ OptionSpec <Integer > messageSizeOpt = parser .accepts ("message-size" , "The size of each message in the message set." )
8585 .withRequiredArg ()
8686 .describedAs ("num_bytes" )
8787 .ofType (Integer .class )
8888 .defaultsTo (1024 );
8989
90- OptionSpec <Integer > filesOpt = parser .accepts ("files" , "REQUIRED: The number of logs or files." )
90+ OptionSpec <Integer > filesOpt = parser .accepts ("files" , "The number of logs or files." )
9191 .withRequiredArg ()
9292 .describedAs ("num_files" )
9393 .ofType (Integer .class )
@@ -120,14 +120,13 @@ public static void main(String[] args) throws Exception {
120120 OptionSpec <Integer > compressionLevelOpt = parser .accepts ("level" , "The compression level to use" )
121121 .withRequiredArg ()
122122 .describedAs ("level" )
123- .ofType (Integer .class )
124- .defaultsTo (0 );
123+ .ofType (Integer .class );
125124
126125 OptionSpec <Void > mmapOpt = parser .accepts ("mmap" , "Do writes to memory-mapped files." );
127126 OptionSpec <Void > channelOpt = parser .accepts ("channel" , "Do writes to file channels." );
128127 OptionSpec <Void > logOpt = parser .accepts ("log" , "Do writes to kafka logs." );
129128 OptionSet options = parser .parse (args );
130- CommandLineUtils .checkRequiredArgs (parser , options , bytesOpt , sizeOpt , filesOpt );
129+ CommandLineUtils .checkRequiredArgs (parser , options , bytesOpt , sizeOpt );
131130
132131 long bytesToWrite = options .valueOf (bytesOpt );
133132 int bufferSize = options .valueOf (sizeOpt );
@@ -140,9 +139,10 @@ public static void main(String[] args) throws Exception {
140139 long flushInterval = options .valueOf (flushIntervalOpt );
141140 CompressionType compressionType = CompressionType .forName (options .valueOf (compressionCodecOpt ));
142141 Compression .Builder <? extends Compression > compressionBuilder = Compression .of (compressionType );
143- int compressionLevel = options .valueOf (compressionLevelOpt );
142+ Integer compressionLevel = options .valueOf (compressionLevelOpt );
144143
145- setupCompression (compressionType , compressionBuilder , compressionLevel );
144+ if (compressionLevel != null ) setupCompression (compressionType , compressionBuilder , compressionLevel );
145+ Compression compression = compressionBuilder .build ();
146146
147147 ThreadLocalRandom .current ().nextBytes (buffer .array ());
148148 int numMessages = bufferSize / (messageSize + Records .LOG_OVERHEAD );
@@ -153,7 +153,7 @@ public static void main(String[] args) throws Exception {
153153 recordsList .add (new SimpleRecord (createTime , null , new byte [messageSize ]));
154154 }
155155
156- MemoryRecords messageSet = MemoryRecords .withRecords (Compression . NONE , recordsList .toArray (new SimpleRecord [0 ]));
156+ MemoryRecords messageSet = MemoryRecords .withRecords (compression , recordsList .toArray (new SimpleRecord [0 ]));
157157 Writable [] writables = new Writable [numFiles ];
158158 KafkaScheduler scheduler = new KafkaScheduler (1 );
159159 scheduler .startup ();
@@ -222,7 +222,7 @@ public static void main(String[] args) throws Exception {
222222
223223 private static void setupCompression (CompressionType compressionType ,
224224 Compression .Builder <? extends Compression > compressionBuilder ,
225- int compressionLevel ) {
225+ Integer compressionLevel ) {
226226 switch (compressionType ) {
227227 case GZIP :
228228 ((GzipCompression .Builder ) compressionBuilder ).level (compressionLevel );
0 commit comments