Skip to content

Commit 9be5385

Browse files
committed
Fix concatenating stats constructor
- bytes are now properly accoutned for. - Refactor to use boolean isSteadyState over warmupRecords to track if a Stats object is of steady state
1 parent ce9ce6b commit 9be5385

File tree

1 file changed

+16
-9
lines changed

1 file changed

+16
-9
lines changed

tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ void start(String[] args) throws IOException {
7878
ProducerRecord<byte[], byte[]> record;
7979
if (config.warmupRecords > 0) {
8080
// TODO: Keep this message? Maybe unnecessary
81-
System.out.println("Warmup first " + config.warmupRecords + " records. Steady-state results will print after the complete-test summary.");
81+
System.out.println("Warmup first " + config.warmupRecords + " records. Steady state results will print after the complete test summary.");
8282
warmupStats = new Stats(config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS);
8383
} else {
8484
stats = new Stats(config.numRecords, DEFAULT_REPORTING_INTERVAL_MS);
@@ -107,7 +107,7 @@ record = new ProducerRecord<>(config.topicName, payload);
107107
} else {
108108
if (i == config.warmupRecords) {
109109
// Create the steady-state 'stats' object here so its start time is correct
110-
stats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, config.warmupRecords);
110+
stats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, config.warmupRecords > 0);
111111
}
112112
cb = new PerfCallback(sendStartMs, payload.length, stats);
113113
}
@@ -384,13 +384,13 @@ static class Stats {
384384
private long windowTotalLatency;
385385
private long windowBytes;
386386
private long windowStart;
387-
private long warmupRecords;
387+
private final boolean isSteadyState;
388388

389389
public Stats(long numRecords, int reportingInterval) {
390-
this(numRecords, reportingInterval, 0);
390+
this(numRecords, reportingInterval, false);
391391
}
392392

393-
public Stats(long numRecords, int reportingInterval, long warmupRecords) {
393+
public Stats(long numRecords, int reportingInterval, boolean isSteadyState) {
394394
this.start = System.currentTimeMillis();
395395
this.windowStart = System.currentTimeMillis();
396396
this.iteration = 0;
@@ -404,22 +404,26 @@ public Stats(long numRecords, int reportingInterval, long warmupRecords) {
404404
this.windowBytes = 0;
405405
this.totalLatency = 0;
406406
this.reportingInterval = reportingInterval;
407-
this.warmupRecords = warmupRecords;
407+
this.isSteadyState = isSteadyState;
408408
}
409409

410410
Stats(Stats first, Stats second) {
411411
// create a Stats object that's the combination of two disjoint Stats objects
412412
this.start = Math.min(first.start, second.start);
413413
this.iteration = first.iteration + second.iteration;
414414
this.sampling = first.sampling;
415-
this.latencies = Arrays.copyOf(first.latencies, first.index + second.index);
415+
this.index = first.index() + second.index();
416+
this.latencies = Arrays.copyOf(first.latencies, this.index);
416417
System.arraycopy(second.latencies, 0, this.latencies, first.index(), second.index());
417418
this.maxLatency = Math.max(first.maxLatency, second.maxLatency);
418419
this.windowCount = first.windowCount + second.windowCount;
420+
this.windowMaxLatency = 0;
421+
this.windowTotalLatency = 0;
419422
this.totalLatency = first.totalLatency + second.totalLatency;
420423
this.reportingInterval = first.reportingInterval;
421-
this.warmupRecords = 0;
424+
this.isSteadyState = false; // false except in the steady-state case
422425
this.count = first.count + second.count;
426+
this.bytes = first.bytes + second.bytes;
423427
}
424428

425429
public void record(int latency, int bytes, long time) {
@@ -437,6 +441,9 @@ public void record(int latency, int bytes, long time) {
437441
}
438442
/* maybe report the recent perf */
439443
if (time - windowStart >= reportingInterval) {
444+
if (this.isSteadyState && count == windowCount ){
445+
System.out.println("Beginning steady state.
446+
}
440447
printWindow();
441448
newWindow();
442449
}
@@ -489,7 +496,7 @@ public void printTotal() {
489496
int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
490497
System.out.printf("%d%s records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n",
491498
count,
492-
this.warmupRecords > 0 ? " steady state" : "",
499+
this.isSteadyState ? " steady state" : "",
493500
recsPerSec,
494501
mbPerSec,
495502
totalLatency / (double) count,

0 commit comments

Comments
 (0)