Skip to content

Commit 5059819

Browse files
authored
MINOR: Add tests on TxnOffsetCommit and EndTxnMarker protection against invalid producer epoch when TV2 is used (#20024)
This patch adds an API level integration test for the producer epoch verification when processing transactional offset commit and end txn markers. Reviewers: PoAn Yang <[email protected]>, TengYao Chi <[email protected]>, Sean Quah <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 634e99e commit 5059819

File tree

3 files changed

+338
-31
lines changed

3 files changed

+338
-31
lines changed

core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@ package kafka.server
1919
import kafka.network.SocketServer
2020
import kafka.utils.TestUtils
2121
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
22+
import org.apache.kafka.common.internals.Topic
2223
import org.apache.kafka.common.{TopicCollection, TopicIdPartition, TopicPartition, Uuid}
2324
import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection}
2425
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
2526
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
2627
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment
27-
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
28+
import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic}
29+
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData, WriteTxnMarkersRequestData}
2830
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
29-
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse}
31+
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
3032
import org.apache.kafka.common.serialization.StringSerializer
3133
import org.apache.kafka.common.test.ClusterInstance
3234
import org.apache.kafka.common.utils.ProducerIdAndEpoch
@@ -352,6 +354,35 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
352354
assertEquals(expectedError, connectAndReceive[EndTxnResponse](request).error)
353355
}
354356

357+
protected def writeTxnMarkers(
358+
producerId: Long,
359+
producerEpoch: Short,
360+
committed: Boolean,
361+
expectedError: Errors = Errors.NONE,
362+
version: Short = ApiKeys.WRITE_TXN_MARKERS.latestVersion(isUnstableApiEnabled)
363+
): Unit = {
364+
val request = new WriteTxnMarkersRequest.Builder(
365+
new WriteTxnMarkersRequestData()
366+
.setMarkers(List(
367+
new WritableTxnMarker()
368+
.setProducerId(producerId)
369+
.setProducerEpoch(producerEpoch)
370+
.setTransactionResult(committed)
371+
.setTopics(List(
372+
new WritableTxnMarkerTopic()
373+
.setName(Topic.GROUP_METADATA_TOPIC_NAME)
374+
.setPartitionIndexes(List[Integer](0).asJava)
375+
).asJava)
376+
.setCoordinatorEpoch(0)
377+
).asJava)
378+
).build(version)
379+
380+
assertEquals(
381+
expectedError.code,
382+
connectAndReceive[WriteTxnMarkersResponse](request).data.markers.get(0).topics.get(0).partitions.get(0).errorCode
383+
)
384+
}
385+
355386
protected def fetchOffsets(
356387
groups: List[OffsetFetchRequestData.OffsetFetchRequestGroup],
357388
requireStable: Boolean,
@@ -422,6 +453,27 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
422453
groupResponse
423454
}
424455

456+
protected def fetchOffset(
457+
groupId: String,
458+
topic: String,
459+
partition: Int
460+
): Long = {
461+
val groupIdRecord = fetchOffsets(
462+
group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
463+
.setGroupId(groupId)
464+
.setTopics(List(
465+
new OffsetFetchRequestData.OffsetFetchRequestTopics()
466+
.setName(topic)
467+
.setPartitionIndexes(List[Integer](partition).asJava)
468+
).asJava),
469+
requireStable = true,
470+
version = 9
471+
)
472+
val topicRecord = groupIdRecord.topics.asScala.find(_.name == topic).head
473+
val partitionRecord = topicRecord.partitions.asScala.find(_.partitionIndex == partition).head
474+
partitionRecord.committedOffset
475+
}
476+
425477
protected def deleteOffset(
426478
groupId: String,
427479
topic: String,

core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala

Lines changed: 112 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,16 @@
1616
*/
1717
package kafka.server
1818

19-
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
2019
import kafka.utils.TestUtils
2120
import org.apache.kafka.common.errors.UnsupportedVersionException
22-
import org.apache.kafka.common.message.OffsetFetchRequestData
2321
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
24-
import org.apache.kafka.common.requests.JoinGroupRequest
22+
import org.apache.kafka.common.requests.{EndTxnRequest, JoinGroupRequest}
2523
import org.apache.kafka.common.test.ClusterInstance
24+
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
2625
import org.apache.kafka.common.utils.ProducerIdAndEpoch
2726
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
2827
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
29-
import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue}
30-
31-
import scala.jdk.CollectionConverters._
28+
import org.junit.jupiter.api.Assertions.{assertNotEquals, assertThrows}
3229

3330
@ClusterTestDefaults(
3431
types = Array(Type.KRAFT),
@@ -51,6 +48,16 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinat
5148
testTxnOffsetCommit(false)
5249
}
5350

51+
@ClusterTest
52+
def testDelayedTxnOffsetCommitWithBumpedEpochIsRejectedWithNewConsumerGroupProtocol(): Unit = {
53+
testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(true)
54+
}
55+
56+
@ClusterTest
57+
def testDelayedTxnOffsetCommitWithBumpedEpochIsRejectedWithOldConsumerGroupProtocol(): Unit = {
58+
testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(false)
59+
}
60+
5461
private def testTxnOffsetCommit(useNewProtocol: Boolean): Unit = {
5562
val topic = "topic"
5663
val partition = 0
@@ -65,8 +72,8 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinat
6572
// Join the consumer group. Note that we don't heartbeat here so we must use
6673
// a session long enough for the duration of the test.
6774
val (memberId: String, memberEpoch: Int) = joinConsumerGroup(groupId, useNewProtocol)
68-
assertTrue(memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID)
69-
assertTrue(memberEpoch != JoinGroupRequest.UNKNOWN_GENERATION_ID)
75+
assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
76+
assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
7077

7178
createTopic(topic, 1)
7279

@@ -178,7 +185,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinat
178185
transactionalId = transactionalId
179186
)
180187

181-
val originalOffset = fetchOffset(topic, partition, groupId)
188+
val originalOffset = fetchOffset(groupId, topic, partition)
182189

183190
commitTxnOffset(
184191
groupId = groupId,
@@ -207,31 +214,107 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinat
207214

208215
TestUtils.waitUntilTrue(() =>
209216
try {
210-
fetchOffset(topic, partition, groupId) == expectedOffset
217+
fetchOffset(groupId, topic, partition) == expectedOffset
211218
} catch {
212219
case _: Throwable => false
213220
}, "txn commit offset validation failed"
214221
)
215222
}
216223

217-
private def fetchOffset(
218-
topic: String,
219-
partition: Int,
220-
groupId: String
221-
): Long = {
222-
val groupIdRecord = fetchOffsets(
223-
group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
224-
.setGroupId(groupId)
225-
.setTopics(List(
226-
new OffsetFetchRequestData.OffsetFetchRequestTopics()
227-
.setName(topic)
228-
.setPartitionIndexes(List[Integer](partition).asJava)
229-
).asJava),
230-
requireStable = true,
231-
version = 9
232-
)
233-
val topicRecord = groupIdRecord.topics.asScala.find(_.name == topic).head
234-
val partitionRecord = topicRecord.partitions.asScala.find(_.partitionIndex == partition).head
235-
partitionRecord.committedOffset
224+
private def testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(useNewProtocol: Boolean): Unit = {
225+
val topic = "topic"
226+
val partition = 0
227+
val transactionalId = "txn"
228+
val groupId = "group"
229+
val offset = 100L
230+
231+
// Creates the __consumer_offsets and __transaction_state topics because it won't be created automatically
232+
// in this test because it does not use FindCoordinator API.
233+
createOffsetsTopic()
234+
createTransactionStateTopic()
235+
236+
// Join the consumer group. Note that we don't heartbeat here so we must use
237+
// a session long enough for the duration of the test.
238+
val (memberId: String, memberEpoch: Int) = joinConsumerGroup(groupId, useNewProtocol)
239+
assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
240+
assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
241+
242+
createTopic(topic, 1)
243+
244+
for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
245+
val useTV2 = version > EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
246+
247+
// Initialize producer. Wait until the coordinator finishes loading.
248+
var producerIdAndEpoch: ProducerIdAndEpoch = null
249+
TestUtils.waitUntilTrue(() =>
250+
try {
251+
producerIdAndEpoch = initProducerId(
252+
transactionalId = transactionalId,
253+
producerIdAndEpoch = ProducerIdAndEpoch.NONE,
254+
expectedError = Errors.NONE
255+
)
256+
true
257+
} catch {
258+
case _: Throwable => false
259+
}, "initProducerId request failed"
260+
)
261+
262+
addOffsetsToTxn(
263+
groupId = groupId,
264+
producerId = producerIdAndEpoch.producerId,
265+
producerEpoch = producerIdAndEpoch.epoch,
266+
transactionalId = transactionalId
267+
)
268+
269+
// Complete the transaction.
270+
endTxn(
271+
producerId = producerIdAndEpoch.producerId,
272+
producerEpoch = producerIdAndEpoch.epoch,
273+
transactionalId = transactionalId,
274+
isTransactionV2Enabled = useTV2,
275+
committed = true,
276+
expectedError = Errors.NONE
277+
)
278+
279+
// Start a new transaction. Wait for the previous transaction to complete.
280+
TestUtils.waitUntilTrue(() =>
281+
try {
282+
addOffsetsToTxn(
283+
groupId = groupId,
284+
producerId = producerIdAndEpoch.producerId,
285+
producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort else producerIdAndEpoch.epoch,
286+
transactionalId = transactionalId
287+
)
288+
true
289+
} catch {
290+
case _: Throwable => false
291+
}, "addOffsetsToTxn request failed"
292+
)
293+
294+
// Committing offset with old epoch succeeds for TV1 and fails for TV2.
295+
commitTxnOffset(
296+
groupId = groupId,
297+
memberId = if (version >= 3) memberId else JoinGroupRequest.UNKNOWN_MEMBER_ID,
298+
generationId = if (version >= 3) 1 else JoinGroupRequest.UNKNOWN_GENERATION_ID,
299+
producerId = producerIdAndEpoch.producerId,
300+
producerEpoch = producerIdAndEpoch.epoch,
301+
transactionalId = transactionalId,
302+
topic = topic,
303+
partition = partition,
304+
offset = offset,
305+
expectedError = if (useTV2) Errors.INVALID_PRODUCER_EPOCH else Errors.NONE,
306+
version = version.toShort
307+
)
308+
309+
// Complete the transaction.
310+
endTxn(
311+
producerId = producerIdAndEpoch.producerId,
312+
producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort else producerIdAndEpoch.epoch,
313+
transactionalId = transactionalId,
314+
isTransactionV2Enabled = useTV2,
315+
committed = true,
316+
expectedError = Errors.NONE
317+
)
318+
}
236319
}
237320
}

0 commit comments

Comments
 (0)