Skip to content

Commit 92fe00e

Browse files
KAFKA-19497; Topic replay code does not handle creation and deletion in the same delta (apache#20242)
There is a small logic bug in topic replay. If a topic is created and then removed before the TopicsDelta is applied, we end up with the deleted topic in createdTopics on the delta. Tis issue is fixed by removing the topicName from createTopics when replaying RemoveTopicRecord to make sure the deleted topics won't appear in createTopics. Reviewers: José Armando García Sancio <[email protected]>, Kevin Wu <[email protected]>, Alyssa Huang <[email protected]>
1 parent 0412be9 commit 92fe00e

File tree

2 files changed

+12
-0
lines changed

2 files changed

+12
-0
lines changed

metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ public String replay(RemoveTopicRecord record) {
136136
String topicName;
137137
if (topicDelta != null) {
138138
topicName = topicDelta.image().name();
139+
createdTopics.remove(topicName);
139140
if (image.topicsById().containsKey(record.topicId())) {
140141
deletedTopicIds.add(record.topicId());
141142
}

metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -897,4 +897,15 @@ public void testTopicIdToNameView() {
897897
var result = IMAGE1.topicIdToNameView().get("zar");
898898
assertNull(result);
899899
}
900+
901+
@Test
902+
public void testTopicsDeltaCreateThenDelete() {
903+
TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
904+
delta.replay(new TopicRecord().setName("test").setTopicId(FOO_UUID));
905+
assertEquals(delta.createdTopicIds().contains(FOO_UUID), true);
906+
assertEquals(delta.deletedTopicIds().contains(FOO_UUID), false);
907+
delta.replay(new RemoveTopicRecord().setTopicId(FOO_UUID));
908+
assertEquals(delta.deletedTopicIds().contains(FOO_UUID), false);
909+
assertEquals(delta.createdTopicIds().contains(FOO_UUID), false);
910+
}
900911
}

0 commit comments

Comments
 (0)