Skip to content

Commit 4dd3512

Browse files
authored
KAFKA-19690 Add epoch check before verification guard check to prevent unexpected fatal error (apache#20577)
Cherry-pick changes (apache#20534) to 3.9 Conflicts: -> storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java - it is UnifiedLog.scala in 3.9 -> core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala - had more changes than the 3.9 version, just added the test and kept everything else the same Reviewers: Justine Olshan <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 17157ed commit 4dd3512

File tree

2 files changed

+59
-3
lines changed

2 files changed

+59
-3
lines changed

core/src/main/scala/kafka/log/UnifiedLog.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1067,8 +1067,19 @@ class UnifiedLog(@volatile var logStartOffset: Long,
10671067
// transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
10681068
// ongoing. If the transaction is expected to be ongoing, we will not set a VerificationGuard. If the transaction is aborted, hasOngoingTransaction is false and
10691069
// requestVerificationGuard is the sentinel, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
1070-
if (batch.isTransactional && !hasOngoingTransaction(batch.producerId) && batchMissingRequiredVerification(batch, requestVerificationGuard))
1071-
throw new InvalidTxnStateException("Record was not part of an ongoing transaction")
1070+
if (batch.isTransactional && !hasOngoingTransaction(batch.producerId)) {
1071+
// Check epoch first: if producer epoch is stale, throw recoverable InvalidProducerEpochException.
1072+
val entry = producerStateManager.activeProducers.get(batch.producerId)
1073+
if (entry != null && batch.producerEpoch < entry.producerEpoch) {
1074+
val message = s"Epoch of producer ${batch.producerId} is ${batch.producerEpoch}, which is smaller than the last seen epoch ${entry.producerEpoch}"
1075+
throw new InvalidProducerEpochException(message)
1076+
}
1077+
1078+
// Only check verification if epoch is current
1079+
if (batchMissingRequiredVerification(batch, requestVerificationGuard)) {
1080+
throw new InvalidTxnStateException("Record was not part of an ongoing transaction")
1081+
}
1082+
}
10721083
}
10731084

10741085
// We cache offset metadata for the start of each transaction. This allows us to

core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package kafka.log
1919

2020
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
2121
import kafka.log.remote.RemoteLogManager
22-
import kafka.server.{BrokerTopicStats, KafkaConfig}
22+
import kafka.server.{BrokerTopicStats, KafkaConfig, RequestLocal}
2323
import kafka.utils.TestUtils
2424
import org.apache.kafka.common.compress.Compression
2525
import org.apache.kafka.common.config.TopicConfig
@@ -4666,6 +4666,51 @@ class UnifiedLogTest {
46664666

46674667
(log, segmentWithOverflow)
46684668
}
4669+
4670+
@Test
4671+
def testStaleProducerEpochReturnsRecoverableError(): Unit = {
4672+
// Producer epoch gets incremented (coordinator fail over, completed transaction, etc.)
4673+
// and client has stale cached epoch. Fix prevents fatal InvalidTxnStateException.
4674+
4675+
val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)
4676+
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
4677+
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
4678+
4679+
val producerId = 123L
4680+
val oldEpoch = 5.toShort
4681+
val newEpoch = 6.toShort
4682+
4683+
// Step 1: Simulate a scenario where producer epoch was incremented to fence the producer
4684+
val previousRecords = MemoryRecords.withTransactionalRecords(
4685+
Compression.NONE, producerId, newEpoch, 0,
4686+
new SimpleRecord("previous-key".getBytes, "previous-value".getBytes)
4687+
)
4688+
val previousGuard = log.maybeStartTransactionVerification(producerId, 0, newEpoch)
4689+
log.appendAsLeader(previousRecords, leaderEpoch = 0, origin = AppendOrigin.CLIENT, requestLocal = RequestLocal.NoCaching, verificationGuard = previousGuard)
4690+
4691+
// Complete the transaction normally (commits do update producer state with current epoch)
4692+
val commitMarker = MemoryRecords.withEndTransactionMarker(
4693+
producerId, newEpoch, new EndTransactionMarker(ControlRecordType.COMMIT, 0)
4694+
)
4695+
log.appendAsLeader(commitMarker, leaderEpoch = 0, origin = AppendOrigin.COORDINATOR, requestLocal = RequestLocal.NoCaching, verificationGuard = VerificationGuard.SENTINEL)
4696+
4697+
// Step 2: TV1 client tries to write with stale cached epoch (before learning about epoch increment)
4698+
val staleEpochRecords = MemoryRecords.withTransactionalRecords(
4699+
Compression.NONE, producerId, oldEpoch, 0,
4700+
new SimpleRecord("stale-epoch-key".getBytes, "stale-epoch-value".getBytes)
4701+
)
4702+
4703+
// Step 3: Verify our fix - should get InvalidProducerEpochException (recoverable), not InvalidTxnStateException (fatal)
4704+
val exception = assertThrows(classOf[InvalidProducerEpochException], () => {
4705+
val staleGuard = log.maybeStartTransactionVerification(producerId, 0, oldEpoch)
4706+
log.appendAsLeader(staleEpochRecords, leaderEpoch = 0, origin = AppendOrigin.CLIENT, requestLocal = RequestLocal.NoCaching, verificationGuard = staleGuard)
4707+
})
4708+
4709+
// Verify the error message indicates epoch mismatch
4710+
assertTrue(exception.getMessage.contains("smaller than the last seen epoch"))
4711+
assertTrue(exception.getMessage.contains(s"$oldEpoch"))
4712+
assertTrue(exception.getMessage.contains(s"$newEpoch"))
4713+
}
46694714
}
46704715

46714716
object UnifiedLogTest {

0 commit comments

Comments
 (0)