Skip to content

Commit 3e9ae03

Browse files
KAFKA-20034: set deliveryCompleteCount to 0 when share partition is initialized with non negative start offset. (#21246)
Currently, when the start offset for a share partition is altered, the deliveryCompleteCount is set as -1. This results in a hyphenated lag, until the consumption begins, leading to some write states sent to the persister. But, when the start offset is altered to some non zero value, the system should be able to calculate the lag, irrespective of whether the consumption has begun or not from the new place. This PR resolves this by setting the deliveryCompleteCount to 0, whenever the start offset is changed to a non negative value. Test Results -> <img width="1726" height="928" alt="image" src="https://github.com/user-attachments/assets/35dc8b43-4590-4d80-868e-1764d7ceb2f8" /> Reviewers: Sushant Mahajan <[email protected]>, Andrew Schofield <[email protected]>
1 parent 45a51fe commit 3e9ae03

File tree

3 files changed

+137
-4
lines changed

3 files changed

+137
-4
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import org.apache.kafka.clients.admin.Admin;
2222
import org.apache.kafka.clients.admin.AlterConfigOp;
2323
import org.apache.kafka.clients.admin.AlterConfigsOptions;
24+
import org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions;
2425
import org.apache.kafka.clients.admin.ConfigEntry;
2526
import org.apache.kafka.clients.admin.CreateTopicsResult;
27+
import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
2628
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
2729
import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
2830
import org.apache.kafka.clients.admin.ListShareGroupOffsetsResult;
@@ -3411,6 +3413,97 @@ public void testSharePartitionLagOnShareCoordinatorMovement() {
34113413
}
34123414
}
34133415

3416+
@ClusterTest
3417+
public void testSharePartitionLagAfterAlterShareGroupOffsets() {
3418+
String groupId = "group1";
3419+
try (Producer<byte[], byte[]> producer = createProducer();
3420+
Admin adminClient = createAdminClient()) {
3421+
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "Message".getBytes());
3422+
// Producing 100 records to the topic partition.
3423+
for (int i = 0; i < 100; i++) {
3424+
producer.send(record);
3425+
}
3426+
producer.flush();
3427+
3428+
// Create a new share consumer. Since the share.auto.offset.reset is not altered, it should be latest by default.
3429+
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(groupId, Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
3430+
shareConsumer.subscribe(List.of(tp.topic()));
3431+
// Polling share consumer to make sure it joins the group and subscribes to the topic.
3432+
waitedPoll(shareConsumer, 2500L, 0, true, groupId, List.of(new TopicPartition(tp.topic(), 0)));
3433+
// Producing 5 additional records to the topic partition.
3434+
for (int i = 0; i < 5; i++) {
3435+
producer.send(record);
3436+
}
3437+
producer.flush();
3438+
// Polling share consumer to make sure the records are consumed.
3439+
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 5);
3440+
assertEquals(5, records.count());
3441+
// Accept the record first to move the offset forward and register the state with persister.
3442+
records.forEach(r -> shareConsumer.acknowledge(r, AcknowledgeType.ACCEPT));
3443+
shareConsumer.commitSync();
3444+
// After accepting, the lag should be 0 because the record is consumed successfully.
3445+
verifySharePartitionLag(adminClient, groupId, tp, 0L);
3446+
// Closing the share consumer so that the offsets can be altered.
3447+
shareConsumer.close();
3448+
// Alter the start offset of the share partition to 40.
3449+
alterShareGroupOffsets(adminClient, groupId, tp, 40L);
3450+
// After altering, the share partition start offset should be 40.
3451+
verifySharePartitionStartOffset(adminClient, groupId, tp, 40L);
3452+
// Verify that the lag is now 65 since the start offset is altered to 40 and there are total 105 records in the partition.
3453+
verifySharePartitionLag(adminClient, groupId, tp, 65L);
3454+
} catch (InterruptedException | ExecutionException e) {
3455+
fail("Test failed with exception: " + e.getMessage());
3456+
}
3457+
}
3458+
3459+
@ClusterTest
3460+
public void testSharePartitionLagAfterDeleteShareGroupOffsets() {
3461+
String groupId = "group1";
3462+
alterShareAutoOffsetReset(groupId, "earliest");
3463+
try (Producer<byte[], byte[]> producer = createProducer();
3464+
Admin adminClient = createAdminClient()) {
3465+
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "Message".getBytes());
3466+
// Producing 5 records to the topic partition.
3467+
for (int i = 0; i < 5; i++) {
3468+
producer.send(record);
3469+
}
3470+
producer.flush();
3471+
// Create a new share consumer.
3472+
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(groupId, Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
3473+
shareConsumer.subscribe(List.of(tp.topic()));
3474+
// Polling share consumer to make sure it joins the group and consumes the produced records.
3475+
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 5);
3476+
assertEquals(5, records.count());
3477+
// Accept the records first to move the offset forward and register the state with persister.
3478+
records.forEach(r -> shareConsumer.acknowledge(r, AcknowledgeType.ACCEPT));
3479+
shareConsumer.commitSync();
3480+
// After accepting, the lag should be 0 because the record is consumed successfully.
3481+
verifySharePartitionLag(adminClient, groupId, tp, 0L);
3482+
// Closing the share consumer so that the offsets can be deleted.
3483+
shareConsumer.close();
3484+
// Delete the share group offsets.
3485+
deleteShareGroupOffsets(adminClient, groupId, tp.topic());
3486+
// Verify that the share partition offsets are deleted.
3487+
verifySharePartitionOffsetsDeleted(adminClient, groupId, tp);
3488+
// Create a new share consumer.
3489+
ShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer(groupId, Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
3490+
shareConsumer2.subscribe(List.of(tp.topic()));
3491+
// Since the offsets are deleted, the share consumer should consume from the beginning (share.auto.offset.reset is earliest).
3492+
// Thus, the consumer should consume all 5 records again.
3493+
records = waitedPoll(shareConsumer2, 2500L, 5);
3494+
assertEquals(5, records.count());
3495+
// Accept the records first to move the offset forward and register the state with persister.
3496+
records.forEach(r -> shareConsumer2.acknowledge(r, AcknowledgeType.ACCEPT));
3497+
shareConsumer2.commitSync();
3498+
// After accepting, the lag should be 0 because the records are consumed successfully.
3499+
verifySharePartitionLag(adminClient, groupId, tp, 0L);
3500+
// Closing the share consumer so that the offsets can be deleted.
3501+
shareConsumer2.close();
3502+
} catch (InterruptedException | ExecutionException e) {
3503+
fail("Test failed with exception: " + e.getMessage());
3504+
}
3505+
}
3506+
34143507
@ClusterTest
34153508
public void testFetchWithThrottledDelivery() {
34163509
alterShareAutoOffsetReset("group1", "earliest");
@@ -4095,6 +4188,14 @@ private SharePartitionOffsetInfo sharePartitionOffsetInfo(Admin adminClient, Str
40954188
return partitionResult;
40964189
}
40974190

4191+
private void verifySharePartitionStartOffset(Admin adminClient, String groupId, TopicPartition tp, long expectedStartOffset) throws InterruptedException {
4192+
TestUtils.waitForCondition(() -> {
4193+
SharePartitionOffsetInfo sharePartitionOffsetInfo = sharePartitionOffsetInfo(adminClient, groupId, tp);
4194+
return sharePartitionOffsetInfo != null &&
4195+
sharePartitionOffsetInfo.startOffset() == expectedStartOffset;
4196+
}, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to retrieve share partition lag");
4197+
}
4198+
40984199
private void verifySharePartitionLag(Admin adminClient, String groupId, TopicPartition tp, long expectedLag) throws InterruptedException {
40994200
TestUtils.waitForCondition(() -> {
41004201
SharePartitionOffsetInfo sharePartitionOffsetInfo = sharePartitionOffsetInfo(adminClient, groupId, tp);
@@ -4104,6 +4205,28 @@ private void verifySharePartitionLag(Admin adminClient, String groupId, TopicPar
41044205
}, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "Failed to retrieve share partition lag");
41054206
}
41064207

4208+
private void verifySharePartitionOffsetsDeleted(Admin adminClient, String groupId, TopicPartition tp) throws InterruptedException {
4209+
TestUtils.waitForCondition(
4210+
() -> sharePartitionOffsetInfo(adminClient, groupId, tp) == null,
4211+
DEFAULT_MAX_WAIT_MS,
4212+
DEFAULT_POLL_INTERVAL_MS,
4213+
() -> "Failed to retrieve share partition lag");
4214+
}
4215+
4216+
private void alterShareGroupOffsets(Admin adminClient, String groupId, TopicPartition topicPartition, Long newOffset) throws InterruptedException, ExecutionException {
4217+
adminClient.alterShareGroupOffsets(
4218+
groupId,
4219+
Map.of(topicPartition, newOffset),
4220+
new AlterShareGroupOffsetsOptions().timeoutMs(30000)).partitionResult(topicPartition).get();
4221+
}
4222+
4223+
private void deleteShareGroupOffsets(Admin adminClient, String groupId, String topic) throws InterruptedException, ExecutionException {
4224+
adminClient.deleteShareGroupOffsets(
4225+
groupId,
4226+
Set.of(topic),
4227+
new DeleteShareGroupOffsetsOptions().timeoutMs(30000)).topicResult(topic).get();
4228+
}
4229+
41074230
private void alterShareRecordLockDurationMs(String groupId, int newValue) {
41084231
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
41094232
Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new HashMap<>();

server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,12 @@ public static PartitionIdLeaderEpochData newPartitionIdLeaderEpochData(int parti
4242
}
4343

4444
public static PartitionStateData newPartitionStateData(int partition, int stateEpoch, long startOffset) {
45-
return new PartitionData(partition, stateEpoch, startOffset, UNINITIALIZED_DELIVERY_COMPLETE_COUNT, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null);
45+
// If the start offset is uninitialized (when the share partition is being initialized for the first time), the
46+
// consumption hasn't started yet, and lag cannot be calculated. Thus, deliveryCompleteCount is also set as -1.
47+
// But, if start offset is a non-negative value (when the start offset is altered), the lag can be calculated
48+
// from that point onward. Hence, we set deliveryCompleteCount to 0 in that case.
49+
int deliveryCompleteCount = startOffset == UNINITIALIZED_START_OFFSET ? UNINITIALIZED_DELIVERY_COMPLETE_COUNT : 0;
50+
return new PartitionData(partition, stateEpoch, startOffset, deliveryCompleteCount, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null);
4651
}
4752

4853
public static PartitionErrorData newPartitionErrorData(int partition, short errorCode, String errorMessage) {

share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class ShareGroupOffset {
3535
public static final int NO_TIMESTAMP = 0;
3636
public static final int UNINITIALIZED_EPOCH = 0;
3737
public static final int UNINITIALIZED_DELIVERY_COMPLETE_COUNT = -1;
38+
public static final int UNINITIALIZED_START_OFFSET = -1;
3839
public static final int DEFAULT_EPOCH = 0;
3940

4041
private final int snapshotEpoch;
@@ -160,14 +161,18 @@ public static ShareGroupOffset fromRequest(InitializeShareGroupStateRequestData.
160161
}
161162

162163
public static ShareGroupOffset fromRequest(InitializeShareGroupStateRequestData.PartitionData data, int snapshotEpoch, long timestamp) {
163-
// This method is invoked during InitializeShareGroupStateRequest. Since the deliveryCompleteCount is not yet
164-
// known at this stage, it is initialized to its default value.
164+
// This method is invoked during InitializeShareGroupStateRequest. If the start offset is uninitialized (when the
165+
// share partition is being initialized for the first time), the consumption hasn't started yet, and lag cannot
166+
// be calculated. Thus, deliveryCompleteCount is also set as -1. But, if start offset is a non-negative value (when
167+
// the start offset is altered), the lag can be calculated from that point onward. Hence, we set deliveryCompleteCount
168+
// to 0 in that case.
169+
int deliveryCompleteCount = data.startOffset() == UNINITIALIZED_START_OFFSET ? UNINITIALIZED_DELIVERY_COMPLETE_COUNT : 0;
165170
return new ShareGroupOffset(
166171
snapshotEpoch,
167172
data.stateEpoch(),
168173
UNINITIALIZED_EPOCH,
169174
data.startOffset(),
170-
UNINITIALIZED_DELIVERY_COMPLETE_COUNT,
175+
deliveryCompleteCount,
171176
List.of(),
172177
timestamp,
173178
timestamp

0 commit comments

Comments
 (0)