Skip to content

Commit 3f9d2c2

Browse files
KAFKA-18433: Add BatchSize to ShareFetch request (1/N) (apache#18439)
Reviewers: Apoorv Mittal <[email protected]>, Manikumar Reddy <[email protected]>
1 parent b51b31e commit 3f9d2c2

File tree

4 files changed

+9
-5
lines changed

4 files changed

+9
-5
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, FetchConfi
171171

172172
return ShareFetchRequest.Builder.forConsumer(
173173
groupId, nextMetadata, fetchConfig.maxWaitMs,
174-
fetchConfig.minBytes, fetchConfig.maxBytes, fetchConfig.fetchSize,
174+
fetchConfig.minBytes, fetchConfig.maxBytes, fetchConfig.fetchSize, fetchConfig.maxPollRecords,
175175
added, removed, acknowledgementBatches);
176176
}
177177

clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public Builder(ShareFetchRequestData data, boolean enableUnstableLastVersion) {
4949
}
5050

5151
public static Builder forConsumer(String groupId, ShareRequestMetadata metadata,
52-
int maxWait, int minBytes, int maxBytes, int fetchSize,
52+
int maxWait, int minBytes, int maxBytes, int fetchSize, int batchSize,
5353
List<TopicIdPartition> send, List<TopicIdPartition> forget,
5454
Map<TopicIdPartition, List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementsMap) {
5555
ShareFetchRequestData data = new ShareFetchRequestData();
@@ -67,6 +67,7 @@ public static Builder forConsumer(String groupId, ShareRequestMetadata metadata,
6767
data.setMaxWaitMs(maxWait);
6868
data.setMinBytes(minBytes);
6969
data.setMaxBytes(maxBytes);
70+
data.setBatchSize(batchSize);
7071

7172
// Build a map of topics to fetch keyed by topic ID, and within each a map of partitions keyed by index
7273
Map<Uuid, Map<Integer, ShareFetchRequestData.FetchPartition>> fetchMap = new HashMap<>();

clients/src/main/resources/common/message/ShareFetchRequest.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
"about": "The minimum bytes to accumulate in the response." },
3838
{ "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff",
3939
"about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." },
40+
{ "name": "BatchSize", "type": "int32", "versions": "0+",
41+
"about": "The optimal number of records for batches of acquired records and acknowledgements." },
4042
{ "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
4143
"about": "The topics to fetch.", "fields": [
4244
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
@@ -45,7 +47,7 @@
4547
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
4648
"about": "The partition index." },
4749
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
48-
"about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
50+
"about": "TO BE REMOVED. The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
4951
{ "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
5052
"about": "Record batches to acknowledge.", "fields": [
5153
{ "name": "FirstOffset", "type": "int64", "versions": "0+",

core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2366,8 +2366,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
23662366
acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]],
23672367
maxWaitMs: Int = MAX_WAIT_MS,
23682368
minBytes: Int = 0,
2369-
maxBytes: Int = Int.MaxValue): ShareFetchRequest = {
2370-
ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs, minBytes, maxBytes, maxPartitionBytes, send.asJava, forget.asJava, acknowledgementsMap.asJava)
2369+
maxBytes: Int = Int.MaxValue,
2370+
batchSize: Int = 500): ShareFetchRequest = {
2371+
ShareFetchRequest.Builder.forConsumer(groupId, metadata, maxWaitMs, minBytes, maxBytes, maxPartitionBytes, batchSize, send.asJava, forget.asJava, acknowledgementsMap.asJava)
23712372
.build()
23722373
}
23732374

0 commit comments

Comments
 (0)