Skip to content

Commit d049c44

Browse files
mingyen066chia7712
authored andcommitted
KAFKA-12392 Deprecate '--max-partition-memory-bytes' option in ConsoleProducer (apache#20952)
This patch implements [KIP-1231](https://cwiki.apache.org/confluence/x/xQl3Fw) where we replace `--max-partition-memory-bytes` with existing `--batch-size`. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 6d99a0f commit d049c44

File tree

3 files changed

+19
-6
lines changed

3 files changed

+19
-6
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@
279279
<suppress checks="BooleanExpressionComplexity"
280280
files="(StreamsResetter|DefaultMessageFormatter).java"/>
281281
<suppress checks="NPathComplexity"
282-
files="(AclCommand|DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool|LineMessageReader).java"/>
282+
files="(AclCommand|DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool|LineMessageReader|ConsoleProducer).java"/>
283283
<suppress checks="ImportControl"
284284
files="SignalLogger.java"/>
285285
<suppress checks="IllegalImport"

docs/upgrade.html

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ <h5><a id="upgrade_4_2_0_from" href="#upgrade_4_2_0_from">Upgrading Servers to 4
2525

2626
<h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in 4.2.0</a></h5>
2727
<ul>
28+
<li>
29+
The <code>--max-partition-memory-bytes</code> option in <code>kafka-console-producer</code>
30+
is deprecated and will be removed in Kafka 5.0. Please use <code>--batch-size</code> instead.
31+
</li>
2832
<li>
2933
Queues for Kafka (<a href="https://cwiki.apache.org/confluence/x/4hA0Dw">KIP-932</a>) is production-ready in Apache Kafka 4.2. This feature introduces a new kind of group called
3034
share groups, as an alternative to consumer groups. Consumers in a share group cooperatively consume records from topics, without assigning each partition to just one consumer.

tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ static final class ConsoleProducerOptions extends CommandDefaultOptions {
126126
private final OptionSpec<Long> metadataExpiryMsOpt;
127127
private final OptionSpec<Long> maxBlockMsOpt;
128128
private final OptionSpec<Long> maxMemoryBytesOpt;
129+
@Deprecated(since = "4.2", forRemoval = true)
129130
private final OptionSpec<Integer> maxPartitionMemoryBytesOpt;
130131
private final OptionSpec<String> messageReaderOpt;
131132
private final OptionSpec<Integer> socketBufferSizeOpt;
@@ -156,8 +157,11 @@ public ConsoleProducerOptions(String[] args) {
156157
.withOptionalArg()
157158
.describedAs("compression-codec")
158159
.ofType(String.class);
159-
batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously. " +
160-
"please note that this option will be replaced if max-partition-memory-bytes is also set")
160+
batchSizeOpt = parser.accepts("batch-size", "The buffer size in bytes allocated for a partition. " +
161+
"When records are received which are smaller than this size the producer " +
162+
"will attempt to optimistically group them together until this size is reached. " +
163+
"This is the option to control batch.size in producer configs. " +
164+
"Please note that this option will be replaced if max-partition-memory-bytes is also set.")
161165
.withRequiredArg()
162166
.describedAs("size")
163167
.ofType(Integer.class)
@@ -212,9 +216,11 @@ public ConsoleProducerOptions(String[] args) {
212216
.ofType(Long.class)
213217
.defaultsTo(32 * 1024 * 1024L);
214218
maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes",
215-
"The buffer size allocated for a partition. When records are received which are smaller than this size the producer " +
219+
"(Deprecated) The buffer size in bytes allocated for a partition. " +
220+
"When records are received which are smaller than this size the producer " +
216221
"will attempt to optimistically group them together until this size is reached. " +
217-
"This is the option to control `batch.size` in producer configs.")
222+
"This is the option to control batch.size in producer configs. " +
223+
"This option will be removed in Apache Kafka 5.0. Use --batch-size instead.")
218224
.withRequiredArg()
219225
.describedAs("memory in bytes per partition")
220226
.ofType(Integer.class)
@@ -335,6 +341,10 @@ void checkArgs() {
335341
readerPropertyOpt = propertyOpt;
336342
}
337343

344+
if (options.has(maxPartitionMemoryBytesOpt)) {
345+
System.out.println("Warning: --max-partition-memory-bytes is deprecated and will be removed in Apache Kafka 5.0. Use --batch-size instead.");
346+
}
347+
338348
try {
339349
ToolsUtils.validateBootstrapServer(options.valueOf(bootstrapServerOpt));
340350
} catch (IllegalArgumentException e) {
@@ -398,7 +408,6 @@ Properties producerProps() throws IOException {
398408
CommandLineUtils.maybeMergeOptions(props, RETRY_BACKOFF_MS_CONFIG, options, retryBackoffMsOpt);
399409
CommandLineUtils.maybeMergeOptions(props, SEND_BUFFER_CONFIG, options, socketBufferSizeOpt);
400410
CommandLineUtils.maybeMergeOptions(props, BUFFER_MEMORY_CONFIG, options, maxMemoryBytesOpt);
401-
// We currently have 2 options to set the batch.size value. We'll deprecate/remove one of them in KIP-717.
402411
CommandLineUtils.maybeMergeOptions(props, BATCH_SIZE_CONFIG, options, batchSizeOpt);
403412
CommandLineUtils.maybeMergeOptions(props, BATCH_SIZE_CONFIG, options, maxPartitionMemoryBytesOpt);
404413
CommandLineUtils.maybeMergeOptions(props, METADATA_MAX_AGE_CONFIG, options, metadataExpiryMsOpt);

0 commit comments

Comments
 (0)