Skip to content

Commit a57554b

Browse files
committed
Refactor for redundant stats objects
Stats objects now represent the whole test with 'stats' and the steady state portion of the test with 'steadyStateStats'. Remove merging of Stats latency arrays at end of test.
1 parent 440329b commit a57554b

File tree

2 files changed

+29
-20
lines changed

2 files changed

+29
-20
lines changed

tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,9 @@ void start(String[] args) throws IOException {
7777
SplittableRandom random = new SplittableRandom(0);
7878
ProducerRecord<byte[], byte[]> record;
7979
if (config.warmupRecords > 0) {
80-
// TODO: Keep this message? Maybe unnecessary
8180
System.out.println("Warmup first " + config.warmupRecords + " records. Steady state results will print after the complete test summary.");
82-
warmupStats = new Stats(config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS);
83-
} else {
84-
stats = new Stats(config.numRecords, DEFAULT_REPORTING_INTERVAL_MS);
8581
}
82+
stats = new Stats(config.numRecords, DEFAULT_REPORTING_INTERVAL_MS);
8683
long startMs = System.currentTimeMillis();
8784

8885
ThroughputThrottler throttler = new ThroughputThrottler(config.throughput, startMs);
@@ -101,15 +98,14 @@ void start(String[] args) throws IOException {
10198
record = new ProducerRecord<>(config.topicName, payload);
10299

103100
long sendStartMs = System.currentTimeMillis();
104-
if (warmupStats != null) {
101+
if (config.warmupRecords > 0) {
105102
if (i < config.warmupRecords) {
106-
cb = new PerfCallback(sendStartMs, payload.length, warmupStats);
103+
cb = new PerfCallback(sendStartMs, payload.length, stats);
107104
} else {
108105
if (i == config.warmupRecords) {
109-
// Create the steady-state 'stats' object here so its start time is correct
110-
stats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, config.warmupRecords > 0);
106+
steadyStateStats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, config.warmupRecords > 0);
111107
}
112-
cb = new PerfCallback(sendStartMs, payload.length, stats);
108+
cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats);
113109
}
114110
} else {
115111
cb = new PerfCallback(sendStartMs, payload.length, stats);
@@ -133,24 +129,24 @@ record = new ProducerRecord<>(config.topicName, payload);
133129
if (!config.shouldPrintMetrics) {
134130
producer.close();
135131

136-
/* print warmup stats if relevant */
137-
if (warmupStats != null) {
138-
new Stats(warmupStats, stats).printTotal();
139-
}
140132
/* print final results */
141133
stats.printTotal();
134+
/* print steady-state stats if relevant */
135+
if (steadyStateStats != null) {
136+
steadyStateStats.printTotal();
137+
}
142138
} else {
143139
// Make sure all messages are sent before printing out the stats and the metrics
144140
// We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py
145141
// expects this class to work with older versions of the client jar that don't support flush().
146142
producer.flush();
147143

148-
/* print warmup stats if relevant */
149-
if (warmupStats != null) {
150-
new Stats(warmupStats, stats).printTotal();
151-
}
152144
/* print final results */
153145
stats.printTotal();
146+
/* print steady-state stats if relevant */
147+
if (steadyStateStats != null) {
148+
steadyStateStats.printTotal();
149+
}
154150

155151
/* print out metrics */
156152
ToolsUtils.printMetrics(producer.metrics());
@@ -175,7 +171,7 @@ KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
175171
Callback cb;
176172

177173
Stats stats;
178-
Stats warmupStats;
174+
Stats steadyStateStats;
179175

180176
static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByteList, byte[] payload,
181177
SplittableRandom random, boolean payloadMonotonic, long recordValue) {
@@ -523,10 +519,19 @@ static final class PerfCallback implements Callback {
523519
private final long start;
524520
private final int bytes;
525521
private final Stats stats;
522+
private final Stats steadyStateStats;
526523

527524
public PerfCallback(long start, int bytes, Stats stats) {
528525
this.start = start;
529526
this.stats = stats;
527+
this.steadyStateStats = null;
528+
this.bytes = bytes;
529+
}
530+
531+
public PerfCallback(long start, int bytes, Stats stats, Stats steadyStateStats) {
532+
this.start = start;
533+
this.stats = stats;
534+
this.steadyStateStats = steadyStateStats;
530535
this.bytes = bytes;
531536
}
532537

@@ -538,6 +543,10 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
538543
if (exception == null) {
539544
this.stats.record(latency, bytes, now);
540545
this.stats.iteration++;
546+
if (steadyStateStats != null) {
547+
this.steadyStateStats.record(latency, bytes, now);
548+
this.steadyStateStats.iteration++;
549+
}
541550
}
542551
if (exception != null)
543552
exception.printStackTrace();

tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -537,8 +537,8 @@ public void testWarmupNumberOfSuccessfulSendAndClose() throws IOException {
537537
producerPerformanceSpy.start(args);
538538

539539
verify(producerMock, times(10)).send(any(), any());
540-
assertEquals(2, producerPerformanceSpy.warmupStats.totalCount());
541-
assertEquals(8, producerPerformanceSpy.stats.totalCount());
540+
assertEquals(10, producerPerformanceSpy.stats.totalCount());
541+
assertEquals(10 - 2, producerPerformanceSpy.steadyStateStats.totalCount());
542542
verify(producerMock, times(1)).close();
543543
}
544544

0 commit comments

Comments
 (0)