@@ -57,6 +57,17 @@ public static void main(String[] args) throws Exception {
5757 perf .start (args );
5858 }
5959
60+ /**
61+ * Runs the producer-performance test based on command-line arguments, producing records to Kafka and reporting metrics.
62+ *
63+ * Parses the provided CLI arguments, constructs and configures a KafkaProducer, optionally initializes transactions
64+ * and a warmup period, sends the configured number of records while applying throughput throttling, collects latency
65+ * and throughput statistics (including separate steady-state stats when a warmup is configured), and prints final
66+ * summaries and producer metrics as requested.
67+ *
68+ * @param args command-line arguments for this tool (see {@link #argParser()})
69+ * @throws IOException if reading producer configuration or payload files fails
70+ */
6071 void start (String [] args ) throws IOException {
6172 ArgumentParser parser = argParser ();
6273
@@ -158,6 +169,12 @@ record = new ProducerRecord<>(config.topicName, payload);
158169
159170 }
160171
172+ /**
173+ * Create a Kafka producer configured with the supplied properties.
174+ *
175+ * @param props producer configuration properties
176+ * @return a new KafkaProducer<byte[], byte[]> configured with the given properties
177+ */
161178 KafkaProducer <byte [], byte []> createKafkaProducer (Properties props ) {
162179 return new KafkaProducer <>(props );
163180 }
@@ -166,6 +183,24 @@ KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
166183 Stats stats ;
167184 Stats steadyStateStats ;
168185
186+ /**
187+ * Selects or generates a message payload according to the provided inputs.
188+ *
189+ * If `payloadByteList` is non-empty a random entry from the list is returned.
190+ * Otherwise, if `recordSize` is non-null the supplied `payload` buffer is filled
191+ * with random uppercase ASCII letters and returned. If `payloadMonotonic` is true
192+ * the UTF-8 bytes of `recordValue` are returned. If none of these sources is
193+ * available an exception is thrown.
194+ *
195+ * @param recordSize fixed payload size to generate; if non-null the method fills and returns `payload`
196+ * @param payloadByteList list of predefined payloads; a random element is chosen when non-empty
197+ * @param payload buffer to fill when `recordSize` is provided; its length should equal `recordSize`
198+ * @param random source of randomness for selecting list entries or generating bytes
199+ * @param payloadMonotonic when true produce a payload from `recordValue` instead of random content
200+ * @param recordValue numeric value converted to UTF-8 bytes when `payloadMonotonic` is true
201+ * @return the payload byte array to send
202+ * @throws IllegalArgumentException if no payload list, record size, or monotonic option is provided
203+ */
169204 static byte [] generateRandomPayload (Integer recordSize , List <byte []> payloadByteList , byte [] payload ,
170205 SplittableRandom random , boolean payloadMonotonic , long recordValue ) {
171206 if (!payloadByteList .isEmpty ()) {
@@ -181,6 +216,19 @@ static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByte
181216 return payload ;
182217 }
183218
219+ /**
220+ * Build producer Properties from an optional properties file and a list of key=value overrides.
221+ *
222+ * Merges properties loaded from the file at producerConfig (if provided) with any producerProps
223+ * entries (each must be "key=value"), ensures key and value serializers are set to
224+ * ByteArraySerializer, and sets a default client id of "perf-producer-client" when none is present.
225+ *
226+ * @param producerProps list of producer properties in "key=value" form; may be null
227+ * @param producerConfig path to a properties file to load; may be null
228+ * @return a Properties instance containing the merged and normalized producer configuration
229+ * @throws IOException if loading properties from producerConfig fails
230+ * @throws IllegalArgumentException if any string in producerProps is not in "key=value" format
231+ */
184232 static Properties readProps (List <String > producerProps , String producerConfig ) throws IOException {
185233 Properties props = new Properties ();
186234 if (producerConfig != null ) {
@@ -222,7 +270,15 @@ static List<byte[]> readPayloadFile(String payloadFilePath, String payloadDelimi
222270 return payloadByteList ;
223271 }
224272
225- /** Get the command-line argument parser. */
273+ /**
274+ * Create and configure the command-line ArgumentParser for the producer-performance tool.
275+ *
276+ * The parser defines options for topic, number of records, payload selection (exactly one of
277+ * --record-size, --payload-file, or --payload-monotonic), payload delimiter, throughput throttling,
278+ * producer properties/config file, metrics printing, transaction settings, and warmup records.
279+ *
280+ * @return the configured ArgumentParser for the producer-performance tool
281+ */
226282 static ArgumentParser argParser () {
227283 ArgumentParser parser = ArgumentParsers
228284 .newArgumentParser ("producer-performance" )
@@ -376,10 +432,26 @@ static class Stats {
376432 private final boolean isSteadyState ;
377433 private boolean steadyStateActive ;
378434
435+ /**
436+ * Creates a Stats collector configured for a non–steady-state run.
437+ *
438+ * @param numRecords the expected total number of records to size internal sampling structures
439+ */
379440 public Stats (long numRecords ) {
380441 this (numRecords , false );
381442 }
382443
444+ /**
445+ * Initialize a Stats accumulator for the given total record count and mode.
446+ *
447+ * Sets timestamps, sampling rate, allocates the latency sample buffer sized for
448+ * the provided record count and sampling, and initializes counters and window
449+ * bookkeeping. Also configures whether this Stats instance represents a steady-state
450+ * measurement and whether steady-state reporting is active.
451+ *
452+ * @param numRecords total number of records that will be measured; used to compute sampling rate and buffer size
453+ * @param isSteadyState true if this Stats instance should operate in steady-state mode (affects reporting and activation)
454+ */
383455 public Stats (long numRecords , boolean isSteadyState ) {
384456 this .start = System .currentTimeMillis ();
385457 this .windowStart = System .currentTimeMillis ();
@@ -398,6 +470,18 @@ public Stats(long numRecords, boolean isSteadyState) {
398470 this .steadyStateActive = isSteadyState ;
399471 }
400472
473+ /**
474+ * Record a single send result into the aggregated and windowed statistics.
475+ *
476+ * Updates total counters (count, bytes, total and max latency), updates the current window
477+ * counters, samples the latency into the latency buffer according to the sampling rate,
478+ * and, when the reporting interval has elapsed, prints the current window (respecting
479+ * steady-state settings) and starts a new window.
480+ *
481+ * @param latency latency of the recorded send in milliseconds
482+ * @param bytes number of payload bytes for the recorded send
483+ * @param time current time in milliseconds used to evaluate the reporting interval
484+ */
401485 public void record (int latency , int bytes , long time ) {
402486 this .count ++;
403487 this .bytes += bytes ;
@@ -463,6 +547,14 @@ public void newWindow() {
463547 this .windowBytes = 0 ;
464548 }
465549
550+ /**
551+ * Prints aggregated performance metrics to standard output, including total records sent, throughput
552+ * (records/sec and MB/sec), average latency, maximum latency, and latency percentiles (50th, 95th,
553+ * 99th, and 99.9th).
554+ *
555+ * If this Stats instance represents steady-state measurements, the output includes the text
556+ * " steady state" after the record count.
557+ */
466558 public void printTotal () {
467559 long elapsed = System .currentTimeMillis () - start ;
468560 double recsPerSec = 1000.0 * count / (double ) elapsed ;
@@ -499,13 +591,31 @@ static final class PerfCallback implements Callback {
499591 private final Stats stats ;
500592 private final Stats steadyStateStats ;
501593
594+ /**
595+ * Create a callback that records send latency and bytes into the provided statistics collectors.
596+ *
597+ * @param start the send start time in milliseconds since the epoch
598+ * @param bytes the number of bytes in the sent payload
599+ * @param stats the main Stats instance to receive latency and byte measurements
600+ * @param steadyStateStats an optional Stats instance to receive steady-state measurements; may be null
601+ */
502602 public PerfCallback (long start , int bytes , Stats stats , Stats steadyStateStats ) {
503603 this .start = start ;
504604 this .stats = stats ;
505605 this .steadyStateStats = steadyStateStats ;
506606 this .bytes = bytes ;
507607 }
508608
609+ /**
610+ * Handle completion of a produce request by updating statistics on success or reporting the error.
611+ *
612+ * When the send succeeds (exception is null), records the measured latency and byte count into the main
613+ * Stats instance and, if present, into the steady-state Stats; also increments their iteration counters.
614+ * If an exception is present, prints its stack trace.
615+ *
616+ * @param metadata metadata for the sent record (may be null if the send failed)
617+ * @param exception the exception that occurred during send, or null on success
618+ */
509619 public void onCompletion (RecordMetadata metadata , Exception exception ) {
510620 long now = System .currentTimeMillis ();
511621 int latency = (int ) (now - start );
@@ -537,6 +647,19 @@ static final class ConfigPostProcessor {
537647 final boolean transactionsEnabled ;
538648 final List <byte []> payloadByteList ;
539649
650+ /**
651+ * Parse and validate command-line arguments, load payload and producer properties,
652+ * and initialize runtime configuration fields for the performance producer (topicName,
653+ * numRecords, warmupRecords, recordSize, throughput, payloadMonotonic, shouldPrintMetrics,
654+ * payloadByteList, producerProps, transactionsEnabled, transactionDurationMs).
655+ *
656+ * @param parser the configured ArgumentParser for the producer-performance CLI
657+ * @param args the command-line arguments to parse
658+ * @throws IOException if reading the payload or producer properties file fails
659+ * @throws ArgumentParserException if required arguments are missing or invalid (for example:
660+ * --num-records <= 0, --warmup-records >= --num-records, missing producer config,
661+ * nonpositive --record-size, or nonpositive --transaction-duration-ms)
662+ */
540663 public ConfigPostProcessor (ArgumentParser parser , String [] args ) throws IOException , ArgumentParserException {
541664 Namespace namespace = parser .parseArgs (args );
542665 this .topicName = namespace .getString ("topic" );
@@ -591,4 +714,4 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept
591714 this .transactionDurationMs = transactionDurationMsArg ;
592715 }
593716 }
594- }
717+ }
0 commit comments