Skip to content

Commit 6956417

Browse files
MINOR: Updated name from messages to records for consistency in share partition (apache#20416)
Minor PR to update name of maxInFlightMessages to maxInFlightRecords to maintain consistency in share partition related classes. Reviewers: Andrew Schofield <[email protected]>
1 parent 2cc66f1 commit 6956417

File tree

3 files changed

+58
-58
lines changed

3 files changed

+58
-58
lines changed

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,11 @@ enum SharePartitionState {
175175
private final AtomicReference<Uuid> fetchLock;
176176

177177
/**
178-
* The max in-flight messages is used to limit the number of records that can be in-flight at any
179-
* given time. The max in-flight messages is used to prevent the consumer from fetching too many
178+
* The max in-flight records is used to limit the number of records that can be in-flight at any
179+
* given time. The max in-flight records is used to prevent the consumer from fetching too many
180180
* records from the leader and running out of memory.
181181
*/
182-
private final int maxInFlightMessages;
182+
private final int maxInFlightRecords;
183183

184184
/**
185185
* The max delivery count is used to limit the number of times a record can be delivered to the
@@ -305,7 +305,7 @@ enum SharePartitionState {
305305
String groupId,
306306
TopicIdPartition topicIdPartition,
307307
int leaderEpoch,
308-
int maxInFlightMessages,
308+
int maxInFlightRecords,
309309
int maxDeliveryCount,
310310
int defaultRecordLockDurationMs,
311311
Timer timer,
@@ -315,7 +315,7 @@ enum SharePartitionState {
315315
GroupConfigManager groupConfigManager,
316316
SharePartitionListener listener
317317
) {
318-
this(groupId, topicIdPartition, leaderEpoch, maxInFlightMessages, maxDeliveryCount, defaultRecordLockDurationMs,
318+
this(groupId, topicIdPartition, leaderEpoch, maxInFlightRecords, maxDeliveryCount, defaultRecordLockDurationMs,
319319
timer, time, persister, replicaManager, groupConfigManager, SharePartitionState.EMPTY, listener,
320320
new SharePartitionMetrics(groupId, topicIdPartition.topic(), topicIdPartition.partition()));
321321
}
@@ -326,7 +326,7 @@ enum SharePartitionState {
326326
String groupId,
327327
TopicIdPartition topicIdPartition,
328328
int leaderEpoch,
329-
int maxInFlightMessages,
329+
int maxInFlightRecords,
330330
int maxDeliveryCount,
331331
int defaultRecordLockDurationMs,
332332
Timer timer,
@@ -341,7 +341,7 @@ enum SharePartitionState {
341341
this.groupId = groupId;
342342
this.topicIdPartition = topicIdPartition;
343343
this.leaderEpoch = leaderEpoch;
344-
this.maxInFlightMessages = maxInFlightMessages;
344+
this.maxInFlightRecords = maxInFlightRecords;
345345
this.maxDeliveryCount = maxDeliveryCount;
346346
this.cachedState = new ConcurrentSkipListMap<>();
347347
this.lock = new ReentrantReadWriteLock();
@@ -1302,7 +1302,7 @@ private boolean archiveCompleteBatch(InFlightBatch inFlightBatch, RecordState in
13021302

13031303
/**
13041304
* Checks if the records can be acquired for the share partition. The records can be acquired if
1305-
* the number of records in-flight is less than the max in-flight messages. Or if the fetch is
1305+
* the number of records in-flight is less than the max in-flight records. Or if the fetch is
13061306
* to happen somewhere in between the record states cached in the share partition i.e. re-acquire
13071307
* the records that are already fetched before.
13081308
*
@@ -1312,7 +1312,7 @@ boolean canAcquireRecords() {
13121312
if (nextFetchOffset() != endOffset() + 1) {
13131313
return true;
13141314
}
1315-
return numInFlightRecords() < maxInFlightMessages;
1315+
return numInFlightRecords() < maxInFlightRecords;
13161316
}
13171317

13181318
/**
@@ -1492,24 +1492,24 @@ private void maybeUpdatePersisterGapWindowStartOffset(long offset) {
14921492

14931493
/**
14941494
* The method calculates the last offset and maximum records to acquire. The adjustment is needed
1495-
* to ensure that the records acquired do not exceed the maximum in-flight messages limit.
1495+
* to ensure that the records acquired do not exceed the maximum in-flight records limit.
14961496
*
14971497
* @param fetchOffset The offset from which the records are fetched.
14981498
* @param maxFetchRecords The maximum number of records to acquire.
14991499
* @param lastOffset The last offset to acquire records to, which is the last offset of the fetched batch.
15001500
* @return LastOffsetAndMaxRecords object, containing the last offset to acquire and the maximum records to acquire.
15011501
*/
15021502
private LastOffsetAndMaxRecords lastOffsetAndMaxRecordsToAcquire(long fetchOffset, int maxFetchRecords, long lastOffset) {
1503-
// There can always be records fetched exceeding the max in-flight messages limit. Hence,
1504-
// we need to check if the share partition has reached the max in-flight messages limit
1503+
// There can always be records fetched exceeding the max in-flight records limit. Hence,
1504+
// we need to check if the share partition has reached the max in-flight records limit
15051505
// and only acquire limited records.
15061506
int maxRecordsToAcquire;
15071507
long lastOffsetToAcquire = lastOffset;
15081508
lock.readLock().lock();
15091509
try {
15101510
int inFlightRecordsCount = numInFlightRecords();
1511-
// Take minimum of maxFetchRecords and remaining capacity to fill max in-flight messages limit.
1512-
maxRecordsToAcquire = Math.min(maxFetchRecords, maxInFlightMessages - inFlightRecordsCount);
1511+
// Take minimum of maxFetchRecords and remaining capacity to fill max in-flight records limit.
1512+
maxRecordsToAcquire = Math.min(maxFetchRecords, maxInFlightRecords - inFlightRecordsCount);
15131513
// If the maxRecordsToAcquire is less than or equal to 0, then ideally (check exists to not
15141514
// fetch records for share partitions which are at capacity) the fetch must be happening
15151515
// in-between the in-flight batches i.e. some in-flight records have been released (marked
@@ -1522,15 +1522,15 @@ private LastOffsetAndMaxRecords lastOffsetAndMaxRecordsToAcquire(long fetchOffse
15221522
if (maxRecordsToAcquire <= 0) {
15231523
if (fetchOffset <= endOffset()) {
15241524
// Adjust the max records to acquire to the capacity available to fill the max
1525-
// in-flight messages limit. This can happen when the fetch is happening in-between
1526-
// the in-flight batches and the share partition has reached the max in-flight messages limit.
1525+
// in-flight records limit. This can happen when the fetch is happening in-between
1526+
// the in-flight batches and the share partition has reached the max in-flight records limit.
15271527
maxRecordsToAcquire = Math.min(maxFetchRecords, (int) (endOffset() - fetchOffset + 1));
15281528
// Adjust the last offset to acquire to the endOffset of the share partition.
15291529
lastOffsetToAcquire = endOffset();
15301530
} else {
1531-
// The share partition is already at max in-flight messages, hence cannot acquire more records.
1532-
log.debug("Share partition {}-{} has reached max in-flight messages limit: {}. Cannot acquire more records, inflight records count: {}",
1533-
groupId, topicIdPartition, maxInFlightMessages, inFlightRecordsCount);
1531+
// The share partition is already at max in-flight records, hence cannot acquire more records.
1532+
log.debug("Share partition {}-{} has reached max in-flight records limit: {}. Cannot acquire more records, inflight records count: {}",
1533+
groupId, topicIdPartition, maxInFlightRecords, inFlightRecordsCount);
15341534
}
15351535
}
15361536
} finally {
@@ -1558,13 +1558,13 @@ private ShareAcquiredRecords acquireNewBatchRecords(
15581558
firstAcquiredOffset = endOffset;
15591559
}
15601560

1561-
// Check how many messages can be acquired from the batch.
1561+
// Check how many records can be acquired from the batch.
15621562
long lastAcquiredOffset = lastOffset;
15631563
if (maxFetchRecords < lastAcquiredOffset - firstAcquiredOffset + 1) {
1564-
// The max messages to acquire is less than the complete available batches hence
1564+
// The max records to acquire is less than the complete available batches hence
15651565
// limit the acquired records. The last offset shall be the batches last offset
1566-
// which falls under the max messages limit. As the max fetch records is the soft
1567-
// limit, the last offset can be higher than the max messages.
1566+
// which falls under the max records limit. As the max fetch records is the soft
1567+
// limit, the last offset can be higher than the max records.
15681568
lastAcquiredOffset = lastOffsetFromBatchWithRequestOffset(batches, firstAcquiredOffset + maxFetchRecords - 1);
15691569
// If the initial read gap offset window is active then it's not guaranteed that the
15701570
// batches align on batch boundaries. Hence, reset to last offset itself if the batch's
@@ -2193,7 +2193,7 @@ private boolean maybeUpdateCachedStateAndOffsets() {
21932193
a) Only full batches can be removed from the cachedState, For example if there is batch (0-99)
21942194
and 0-49 records are acknowledged (ACCEPT or REJECT), the first 50 records will not be removed
21952195
from the cachedState. Instead, the startOffset will be moved to 50, but the batch will only
2196-
be removed once all the messages (0-99) are acknowledged (ACCEPT or REJECT).
2196+
be removed once all the records (0-99) are acknowledged (ACCEPT or REJECT).
21972197
*/
21982198

21992199
// Since only a subMap will be removed, we need to find the first and last keys of that subMap

core/src/main/java/kafka/server/share/SharePartitionManager.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,9 @@ public class SharePartitionManager implements AutoCloseable {
122122
private final Timer timer;
123123

124124
/**
125-
* The max in flight messages is the maximum number of messages that can be in flight at any one time per share-partition.
125+
* The max in flight records is the maximum number of records that can be in flight at any one time per share-partition.
126126
*/
127-
private final int maxInFlightMessages;
127+
private final int maxInFlightRecords;
128128

129129
/**
130130
* The max delivery count is the maximum number of times a message can be delivered before it is considered to be archived.
@@ -156,7 +156,7 @@ public SharePartitionManager(
156156
ShareSessionCache cache,
157157
int defaultRecordLockDurationMs,
158158
int maxDeliveryCount,
159-
int maxInFlightMessages,
159+
int maxInFlightRecords,
160160
long remoteFetchMaxWaitMs,
161161
Persister persister,
162162
GroupConfigManager groupConfigManager,
@@ -168,7 +168,7 @@ public SharePartitionManager(
168168
new SharePartitionCache(),
169169
defaultRecordLockDurationMs,
170170
maxDeliveryCount,
171-
maxInFlightMessages,
171+
maxInFlightRecords,
172172
remoteFetchMaxWaitMs,
173173
persister,
174174
groupConfigManager,
@@ -184,7 +184,7 @@ private SharePartitionManager(
184184
SharePartitionCache partitionCache,
185185
int defaultRecordLockDurationMs,
186186
int maxDeliveryCount,
187-
int maxInFlightMessages,
187+
int maxInFlightRecords,
188188
long remoteFetchMaxWaitMs,
189189
Persister persister,
190190
GroupConfigManager groupConfigManager,
@@ -199,7 +199,7 @@ private SharePartitionManager(
199199
new SystemTimerReaper("share-group-lock-timeout-reaper",
200200
new SystemTimer("share-group-lock-timeout")),
201201
maxDeliveryCount,
202-
maxInFlightMessages,
202+
maxInFlightRecords,
203203
remoteFetchMaxWaitMs,
204204
persister,
205205
groupConfigManager,
@@ -217,7 +217,7 @@ private SharePartitionManager(
217217
int defaultRecordLockDurationMs,
218218
Timer timer,
219219
int maxDeliveryCount,
220-
int maxInFlightMessages,
220+
int maxInFlightRecords,
221221
long remoteFetchMaxWaitMs,
222222
Persister persister,
223223
GroupConfigManager groupConfigManager,
@@ -231,7 +231,7 @@ private SharePartitionManager(
231231
this.defaultRecordLockDurationMs = defaultRecordLockDurationMs;
232232
this.timer = timer;
233233
this.maxDeliveryCount = maxDeliveryCount;
234-
this.maxInFlightMessages = maxInFlightMessages;
234+
this.maxInFlightRecords = maxInFlightRecords;
235235
this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
236236
this.persister = persister;
237237
this.groupConfigManager = groupConfigManager;
@@ -710,7 +710,7 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio
710710
sharePartitionKey.groupId(),
711711
sharePartitionKey.topicIdPartition(),
712712
leaderEpoch,
713-
maxInFlightMessages,
713+
maxInFlightRecords,
714714
maxDeliveryCount,
715715
defaultRecordLockDurationMs,
716716
timer,

0 commit comments

Comments
 (0)