diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index d4546ac7d168f..16ab0541deb4a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -57,6 +57,17 @@ public static void main(String[] args) throws Exception { perf.start(args); } + /** + * Runs the producer-performance test based on command-line arguments, producing records to Kafka and reporting metrics. + * + * Parses the provided CLI arguments, constructs and configures a KafkaProducer, optionally initializes transactions + * and a warmup period, sends the configured number of records while applying throughput throttling, collects latency + * and throughput statistics (including separate steady-state stats when a warmup is configured), and prints final + * summaries and producer metrics as requested. + * + * @param args command-line arguments for this tool (see {@link #argParser()}) + * @throws IOException if reading producer configuration or payload files fails + */ void start(String[] args) throws IOException { ArgumentParser parser = argParser(); @@ -158,6 +169,12 @@ record = new ProducerRecord<>(config.topicName, payload); } + /** + * Create a Kafka producer configured with the supplied properties. + * + * @param props producer configuration properties + * @return a new KafkaProducer configured with the given properties + */ KafkaProducer createKafkaProducer(Properties props) { return new KafkaProducer<>(props); } @@ -166,6 +183,24 @@ KafkaProducer createKafkaProducer(Properties props) { Stats stats; Stats steadyStateStats; + /** + * Selects or generates a message payload according to the provided inputs. + * + * If `payloadByteList` is non-empty a random entry from the list is returned. + * Otherwise, if `recordSize` is non-null the supplied `payload` buffer is filled + * with random uppercase ASCII letters and returned. If `payloadMonotonic` is true + * the UTF-8 bytes of `recordValue` are returned. If none of these sources is + * available an exception is thrown. + * + * @param recordSize fixed payload size to generate; if non-null the method fills and returns `payload` + * @param payloadByteList list of predefined payloads; a random element is chosen when non-empty + * @param payload buffer to fill when `recordSize` is provided; its length should equal `recordSize` + * @param random source of randomness for selecting list entries or generating bytes + * @param payloadMonotonic when true produce a payload from `recordValue` instead of random content + * @param recordValue numeric value converted to UTF-8 bytes when `payloadMonotonic` is true + * @return the payload byte array to send + * @throws IllegalArgumentException if no payload list, record size, or monotonic option is provided + */ static byte[] generateRandomPayload(Integer recordSize, List payloadByteList, byte[] payload, SplittableRandom random, boolean payloadMonotonic, long recordValue) { if (!payloadByteList.isEmpty()) { @@ -181,6 +216,19 @@ static byte[] generateRandomPayload(Integer recordSize, List payloadByte return payload; } + /** + * Build producer Properties from an optional properties file and a list of key=value overrides. + * + * Merges properties loaded from the file at producerConfig (if provided) with any producerProps + * entries (each must be "key=value"), ensures key and value serializers are set to + * ByteArraySerializer, and sets a default client id of "perf-producer-client" when none is present. + * + * @param producerProps list of producer properties in "key=value" form; may be null + * @param producerConfig path to a properties file to load; may be null + * @return a Properties instance containing the merged and normalized producer configuration + * @throws IOException if loading properties from producerConfig fails + * @throws IllegalArgumentException if any string in producerProps is not in "key=value" format + */ static Properties readProps(List producerProps, String producerConfig) throws IOException { Properties props = new Properties(); if (producerConfig != null) { @@ -222,7 +270,15 @@ static List readPayloadFile(String payloadFilePath, String payloadDelimi return payloadByteList; } - /** Get the command-line argument parser. */ + /** + * Create and configure the command-line ArgumentParser for the producer-performance tool. + * + * The parser defines options for topic, number of records, payload selection (exactly one of + * --record-size, --payload-file, or --payload-monotonic), payload delimiter, throughput throttling, + * producer properties/config file, metrics printing, transaction settings, and warmup records. + * + * @return the configured ArgumentParser for the producer-performance tool + */ static ArgumentParser argParser() { ArgumentParser parser = ArgumentParsers .newArgumentParser("producer-performance") @@ -376,10 +432,26 @@ static class Stats { private final boolean isSteadyState; private boolean steadyStateActive; + /** + * Creates a Stats collector configured for a non–steady-state run. + * + * @param numRecords the expected total number of records to size internal sampling structures + */ public Stats(long numRecords) { this(numRecords, false); } + /** + * Initialize a Stats accumulator for the given total record count and mode. + * + * Sets timestamps, sampling rate, allocates the latency sample buffer sized for + * the provided record count and sampling, and initializes counters and window + * bookkeeping. Also configures whether this Stats instance represents a steady-state + * measurement and whether steady-state reporting is active. + * + * @param numRecords total number of records that will be measured; used to compute sampling rate and buffer size + * @param isSteadyState true if this Stats instance should operate in steady-state mode (affects reporting and activation) + */ public Stats(long numRecords, boolean isSteadyState) { this.start = System.currentTimeMillis(); this.windowStart = System.currentTimeMillis(); @@ -398,6 +470,18 @@ public Stats(long numRecords, boolean isSteadyState) { this.steadyStateActive = isSteadyState; } + /** + * Record a single send result into the aggregated and windowed statistics. + * + * Updates total counters (count, bytes, total and max latency), updates the current window + * counters, samples the latency into the latency buffer according to the sampling rate, + * and, when the reporting interval has elapsed, prints the current window (respecting + * steady-state settings) and starts a new window. + * + * @param latency latency of the recorded send in milliseconds + * @param bytes number of payload bytes for the recorded send + * @param time current time in milliseconds used to evaluate the reporting interval + */ public void record(int latency, int bytes, long time) { this.count++; this.bytes += bytes; @@ -463,6 +547,14 @@ public void newWindow() { this.windowBytes = 0; } + /** + * Prints aggregated performance metrics to standard output, including total records sent, throughput + * (records/sec and MB/sec), average latency, maximum latency, and latency percentiles (50th, 95th, + * 99th, and 99.9th). + * + * If this Stats instance represents steady-state measurements, the output includes the text + * " steady state" after the record count. + */ public void printTotal() { long elapsed = System.currentTimeMillis() - start; double recsPerSec = 1000.0 * count / (double) elapsed; @@ -499,6 +591,14 @@ static final class PerfCallback implements Callback { private final Stats stats; private final Stats steadyStateStats; + /** + * Create a callback that records send latency and bytes into the provided statistics collectors. + * + * @param start the send start time in milliseconds since the epoch + * @param bytes the number of bytes in the sent payload + * @param stats the main Stats instance to receive latency and byte measurements + * @param steadyStateStats an optional Stats instance to receive steady-state measurements; may be null + */ public PerfCallback(long start, int bytes, Stats stats, Stats steadyStateStats) { this.start = start; this.stats = stats; @@ -506,6 +606,16 @@ public PerfCallback(long start, int bytes, Stats stats, Stats steadyStateStats) this.bytes = bytes; } + /** + * Handle completion of a produce request by updating statistics on success or reporting the error. + * + * When the send succeeds (exception is null), records the measured latency and byte count into the main + * Stats instance and, if present, into the steady-state Stats; also increments their iteration counters. + * If an exception is present, prints its stack trace. + * + * @param metadata metadata for the sent record (may be null if the send failed) + * @param exception the exception that occurred during send, or null on success + */ public void onCompletion(RecordMetadata metadata, Exception exception) { long now = System.currentTimeMillis(); int latency = (int) (now - start); @@ -537,6 +647,19 @@ static final class ConfigPostProcessor { final boolean transactionsEnabled; final List payloadByteList; + /** + * Parse and validate command-line arguments, load payload and producer properties, + * and initialize runtime configuration fields for the performance producer (topicName, + * numRecords, warmupRecords, recordSize, throughput, payloadMonotonic, shouldPrintMetrics, + * payloadByteList, producerProps, transactionsEnabled, transactionDurationMs). + * + * @param parser the configured ArgumentParser for the producer-performance CLI + * @param args the command-line arguments to parse + * @throws IOException if reading the payload or producer properties file fails + * @throws ArgumentParserException if required arguments are missing or invalid (for example: + * --num-records <= 0, --warmup-records >= --num-records, missing producer config, + * nonpositive --record-size, or nonpositive --transaction-duration-ms) + */ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOException, ArgumentParserException { Namespace namespace = parser.parseArgs(args); this.topicName = namespace.getString("topic"); @@ -591,4 +714,4 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept this.transactionDurationMs = transactionDurationMsArg; } } -} +} \ No newline at end of file