Skip to content

Commit 94a1bfb

Browse files
authored
KAFKA-18575: Transaction Version 2 doesn't correctly handle race condition with completing and new transaction(apache#18604)
There is a subtle race condition with transactions V2 if a transaction is still completing when checking if we need to add a partition, but it completes when the request reaches the coordinator. One approach was to remove the verification for TV2 and just check the epoch on write, but a simpler one is to simply return concurrent transactions from the partition leader (before attempting to add the partition). I've done this and added a test for this behavior. Locally, I reproduced the race but adding a 1 second sleep when handling the WriteTxnMarkersRequest and a 2 second delay before adding the partition to the AddPartitionsToTxnManager. Without this change, the race happened on every second transaction as the first one completed. With this change, the error went away. As a followup, we may want to clean up some of the code and comments with respect to verification as the code is used by both TV0 + verification and TV2. But that doesn't need to complete for 4.0. This does :) Reviewers: Jeff Kim <[email protected]>, Artem Livshits <[email protected]>, Calvin Liu <[email protected]>
1 parent 410065a commit 94a1bfb

File tree

2 files changed

+43
-2
lines changed

2 files changed

+43
-2
lines changed

core/src/main/scala/kafka/log/UnifiedLog.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
600600
*/
601601
def hasOngoingTransaction(producerId: Long, producerEpoch: Short): Boolean = lock synchronized {
602602
val entry = producerStateManager.activeProducers.get(producerId)
603+
// With transactions V2, if we see a future epoch, we are likely in the process of completing the previous transaction.
604+
// Return early with ConcurrentTransactionsException until the transaction completes.
605+
if (entry != null && entry.currentTxnFirstOffset.isPresent && entry.producerEpoch() < producerEpoch)
606+
throw new ConcurrentTransactionsException("The producer attempted to update a transaction " +
607+
"while another concurrent operation on the same transaction was ongoing.")
603608
entry != null && entry.currentTxnFirstOffset.isPresent && entry.producerEpoch() == producerEpoch
604609
}
605610

@@ -1030,7 +1035,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
10301035
// transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
10311036
// ongoing. If the transaction is expected to be ongoing, we will not set a VerificationGuard. If the transaction is aborted, hasOngoingTransaction is false and
10321037
// requestVerificationGuard is the sentinel, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
1033-
if (batch.isTransactional && !hasOngoingTransaction(batch.producerId, batch.producerEpoch()) && batchMissingRequiredVerification(batch, requestVerificationGuard))
1038+
if (batch.isTransactional && !batch.isControlBatch && !hasOngoingTransaction(batch.producerId, batch.producerEpoch()) && batchMissingRequiredVerification(batch, requestVerificationGuard))
10341039
throw new InvalidTxnStateException("Record was not part of an ongoing transaction")
10351040
}
10361041

@@ -1051,7 +1056,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
10511056
}
10521057

10531058
private def batchMissingRequiredVerification(batch: MutableRecordBatch, requestVerificationGuard: VerificationGuard): Boolean = {
1054-
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() && !batch.isControlBatch &&
1059+
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() &&
10551060
!verificationGuard(batch.producerId).verify(requestVerificationGuard)
10561061
}
10571062

core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4020,6 +4020,42 @@ class UnifiedLogTest {
40204020
log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
40214021
}
40224022

4023+
@Test
4024+
def testPreviousTransactionOngoing(): Unit = {
4025+
val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
4026+
4027+
val producerId = 23L
4028+
val producerEpoch = 1.toShort
4029+
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
4030+
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
4031+
4032+
val verificationGuard = log.maybeStartTransactionVerification(producerId, 0, producerEpoch)
4033+
assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
4034+
4035+
val transactionalRecords = MemoryRecords.withTransactionalRecords(
4036+
Compression.NONE,
4037+
producerId,
4038+
producerEpoch,
4039+
0,
4040+
new SimpleRecord("1".getBytes),
4041+
new SimpleRecord("2".getBytes)
4042+
)
4043+
log.appendAsLeader(transactionalRecords, origin = AppendOrigin.CLIENT, leaderEpoch = 0, verificationGuard = verificationGuard)
4044+
4045+
assertThrows(classOf[ConcurrentTransactionsException], () => log.maybeStartTransactionVerification(producerId, 0, (producerEpoch + 1).toShort))
4046+
assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
4047+
4048+
val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
4049+
producerId,
4050+
producerEpoch,
4051+
new EndTransactionMarker(ControlRecordType.COMMIT, 0)
4052+
)
4053+
4054+
log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
4055+
val verificationGuard2 = log.maybeStartTransactionVerification(producerId, 0, (producerEpoch + 1).toShort)
4056+
assertNotEquals(VerificationGuard.SENTINEL, verificationGuard2)
4057+
}
4058+
40234059
@Test
40244060
def testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure(): Unit = {
40254061
val logConfig = LogTestUtils.createLogConfig()

0 commit comments

Comments
 (0)