@@ -155,6 +155,7 @@ public static void main(String[] args) throws Exception {
155
155
156
156
MemoryRecords messageSet = MemoryRecords .withRecords (compression , recordsList .toArray (new SimpleRecord [0 ]));
157
157
Writable [] writables = new Writable [numFiles ];
158
+ LogWritable [] logs = new LogWritable [numFiles ];
158
159
KafkaScheduler scheduler = new KafkaScheduler (1 );
159
160
scheduler .startup ();
160
161
@@ -169,7 +170,7 @@ public static void main(String[] args) throws Exception {
169
170
logProperties .put (TopicConfig .SEGMENT_BYTES_CONFIG , Integer .toString (segmentSize ));
170
171
logProperties .put (TopicConfig .FLUSH_MESSAGES_INTERVAL_CONFIG , Long .toString (flushInterval ));
171
172
LogConfig logConfig = new LogConfig (logProperties );
172
- writables [i ] = new LogWritable (new File (dir , "kafka-test-" + i ), logConfig , scheduler , messageSet );
173
+ logs [i ] = new LogWritable (new File (dir , "kafka-test-" + i ), logConfig , scheduler , messageSet );
173
174
} else {
174
175
System .err .println ("Must specify what to write to with one of --log, --channel, or --mmap" );
175
176
Exit .exit (1 );
@@ -186,10 +187,17 @@ public static void main(String[] args) throws Exception {
186
187
long written = 0L ;
187
188
long totalWritten = 0L ;
188
189
long lastReport = beginTest ;
190
+ int writeSize = 0 ;
189
191
190
192
while (totalWritten + bufferSize < bytesToWrite ) {
191
193
long start = System .nanoTime ();
192
- int writeSize = writables [(int ) (count % numFiles )].write ();
194
+ if (options .has (logOpt )) {
195
+ messageSet = MemoryRecords .withRecords (compression , recordsList .toArray (new SimpleRecord [0 ]));
196
+ logs [(int ) (count % numFiles )].messages = messageSet ;
197
+ writeSize = logs [(int ) (count % numFiles )].write ();
198
+ } else {
199
+ writeSize = writables [(int ) (count % numFiles )].write ();
200
+ }
193
201
long elapsed = System .nanoTime () - start ;
194
202
maxLatency = Math .max (elapsed , maxLatency );
195
203
totalLatency += elapsed ;
@@ -215,8 +223,14 @@ public static void main(String[] args) throws Exception {
215
223
double elapsedSecs = (System .nanoTime () - beginTest ) / (1000.0 * 1000.0 * 1000.0 );
216
224
System .out .println ((bytesToWrite / (1024.0 * 1024.0 * elapsedSecs )) + " MB per sec" );
217
225
scheduler .shutdown ();
218
- for (Writable writable : writables ) {
219
- writable .close ();
226
+ if (options .has (logOpt )) {
227
+ for (LogWritable log : logs ) {
228
+ log .close ();
229
+ }
230
+ } else {
231
+ for (Writable writable : writables ) {
232
+ writable .close ();
233
+ }
220
234
}
221
235
}
222
236
0 commit comments