Skip to content

Commit 0a48361

Browse files
authored
KAFKA-19690-Add epoch check before verification guard check to prevent unexpected fatal error (#20534)
We are seeing cases where a Kafka Streams (KS) thread stalls for ~20 seconds. During this stall, the broker correctly aborts the open transaction (triggered by the 10-second transaction timeout). However, when the KS thread resumes, instead of receiving the expected InvalidProducerEpochException (which we already handle gracefully as part of transaction abort), the client is instead hit with an InvalidTxnStateException. KS currently treats this as a fatal error, causing the application to fail. To fix this, we've added an epoch check before the verification check to send the recoverable InvalidProducerEpochException instead of the fatal InvalidTxnStateException. This helps safeguard both tv1 and tv2 clients Reviewers: Justine Olshan <[email protected]>
1 parent dbe9d34 commit 0a48361

File tree

2 files changed

+103
-4
lines changed

2 files changed

+103
-4
lines changed

core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5065,6 +5065,96 @@ class UnifiedLogTest {
50655065
}
50665066

50675067
case class TimestampAndEpoch(timestamp: Long, leaderEpoch: Int)
5068+
5069+
@Test
5070+
def testStaleProducerEpochReturnsRecoverableErrorForTV1Clients(): Unit = {
5071+
// Producer epoch gets incremented (coordinator fail over, completed transaction, etc.)
5072+
// and client has stale cached epoch. Fix prevents fatal InvalidTxnStateException.
5073+
5074+
val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
5075+
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
5076+
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
5077+
5078+
val producerId = 123L
5079+
val oldEpoch = 5.toShort
5080+
val newEpoch = 6.toShort
5081+
5082+
// Step 1: Simulate a scenario where producer epoch was incremented to fence the producer
5083+
val previousRecords = MemoryRecords.withTransactionalRecords(
5084+
Compression.NONE, producerId, newEpoch, 0,
5085+
new SimpleRecord("previous-key".getBytes, "previous-value".getBytes)
5086+
)
5087+
val previousGuard = log.maybeStartTransactionVerification(producerId, 0, newEpoch, false) // TV1 = supportsEpochBump = false
5088+
log.appendAsLeader(previousRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, previousGuard)
5089+
5090+
// Complete the transaction normally (commits do update producer state with current epoch)
5091+
val commitMarker = MemoryRecords.withEndTransactionMarker(
5092+
producerId, newEpoch, new EndTransactionMarker(ControlRecordType.COMMIT, 0)
5093+
)
5094+
log.appendAsLeader(commitMarker, 0, AppendOrigin.COORDINATOR, RequestLocal.noCaching, VerificationGuard.SENTINEL)
5095+
5096+
// Step 2: TV1 client tries to write with stale cached epoch (before learning about epoch increment)
5097+
val staleEpochRecords = MemoryRecords.withTransactionalRecords(
5098+
Compression.NONE, producerId, oldEpoch, 0,
5099+
new SimpleRecord("stale-epoch-key".getBytes, "stale-epoch-value".getBytes)
5100+
)
5101+
5102+
// Step 3: Verify our fix - should get InvalidProducerEpochException (recoverable), not InvalidTxnStateException (fatal)
5103+
val exception = assertThrows(classOf[InvalidProducerEpochException], () => {
5104+
val staleGuard = log.maybeStartTransactionVerification(producerId, 0, oldEpoch, false)
5105+
log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, staleGuard)
5106+
})
5107+
5108+
// Verify the error message indicates epoch mismatch
5109+
assertTrue(exception.getMessage.contains("smaller than the last seen epoch"))
5110+
assertTrue(exception.getMessage.contains(s"$oldEpoch"))
5111+
assertTrue(exception.getMessage.contains(s"$newEpoch"))
5112+
}
5113+
5114+
@Test
5115+
def testStaleProducerEpochReturnsRecoverableErrorForTV2Clients(): Unit = {
5116+
// Check producer epoch FIRST - if stale, return recoverable error before verification checks.
5117+
5118+
val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
5119+
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
5120+
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
5121+
5122+
val producerId = 456L
5123+
val originalEpoch = 3.toShort
5124+
val bumpedEpoch = 4.toShort
5125+
5126+
// Step 1: Start transaction with epoch 3 (before timeout)
5127+
val initialRecords = MemoryRecords.withTransactionalRecords(
5128+
Compression.NONE, producerId, originalEpoch, 0,
5129+
new SimpleRecord("ks-initial-key".getBytes, "ks-initial-value".getBytes)
5130+
)
5131+
val initialGuard = log.maybeStartTransactionVerification(producerId, 0, originalEpoch, true) // TV2 = supportsEpochBump = true
5132+
log.appendAsLeader(initialRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, initialGuard)
5133+
5134+
// Step 2: Coordinator times out and aborts transaction
5135+
// TV2 (KIP-890): Coordinator bumps epoch from 3 → 4 and sends abort marker with epoch 4
5136+
val abortMarker = MemoryRecords.withEndTransactionMarker(
5137+
producerId, bumpedEpoch, new EndTransactionMarker(ControlRecordType.ABORT, 0)
5138+
)
5139+
log.appendAsLeader(abortMarker, 0, AppendOrigin.COORDINATOR, RequestLocal.noCaching, VerificationGuard.SENTINEL)
5140+
5141+
// Step 3: TV2 transactional producer tries to append with stale epoch (timeout recovery scenario)
5142+
val staleEpochRecords = MemoryRecords.withTransactionalRecords(
5143+
Compression.NONE, producerId, originalEpoch, 0,
5144+
new SimpleRecord("ks-resume-key".getBytes, "ks-resume-value".getBytes)
5145+
)
5146+
5147+
// Step 4: Verify our fix works for TV2 - should get InvalidProducerEpochException (recoverable), not InvalidTxnStateException (fatal)
5148+
val exception = assertThrows(classOf[InvalidProducerEpochException], () => {
5149+
val staleGuard = log.maybeStartTransactionVerification(producerId, 0, originalEpoch, true) // TV2 = supportsEpochBump = true
5150+
log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT, RequestLocal.noCaching, staleGuard)
5151+
})
5152+
5153+
// Verify the error message indicates epoch mismatch (3 < 4)
5154+
assertTrue(exception.getMessage.contains("smaller than the last seen epoch"))
5155+
assertTrue(exception.getMessage.contains(s"$originalEpoch"))
5156+
assertTrue(exception.getMessage.contains(s"$bumpedEpoch"))
5157+
}
50685158
}
50695159

50705160
object UnifiedLogTest {

storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1385,10 +1385,19 @@ private AnalyzeAndValidateProducerStateResult analyzeAndValidateProducerState(Lo
13851385
// transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
13861386
// ongoing. If the transaction is expected to be ongoing, we will not set a VerificationGuard. If the transaction is aborted, hasOngoingTransaction is false and
13871387
// requestVerificationGuard is the sentinel, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
1388-
if (batch.isTransactional()
1389-
&& !hasOngoingTransaction(batch.producerId(), batch.producerEpoch())
1390-
&& batchMissingRequiredVerification(batch, requestVerificationGuard)) {
1391-
throw new InvalidTxnStateException("Record was not part of an ongoing transaction");
1388+
if (batch.isTransactional() && !hasOngoingTransaction(batch.producerId(), batch.producerEpoch())) {
1389+
// Check epoch first: if producer epoch is stale, throw recoverable InvalidProducerEpochException.
1390+
ProducerStateEntry entry = producerStateManager.activeProducers().get(batch.producerId());
1391+
if (entry != null && batch.producerEpoch() < entry.producerEpoch()) {
1392+
String message = "Epoch of producer " + batch.producerId() + " is " + batch.producerEpoch() +
1393+
", which is smaller than the last seen epoch " + entry.producerEpoch();
1394+
throw new InvalidProducerEpochException(message);
1395+
}
1396+
1397+
// Only check verification if epoch is current
1398+
if (batchMissingRequiredVerification(batch, requestVerificationGuard)) {
1399+
throw new InvalidTxnStateException("Record was not part of an ongoing transaction");
1400+
}
13921401
}
13931402
}
13941403

0 commit comments

Comments
 (0)