Skip to content

Commit 46e843d

Browse files
KAFKA-19383: Handle the deleted topics when applying ClearElrRecord (#20034)
https://issues.apache.org/jira/browse/KAFKA-19383 When applying the ClearElrRecord, it may pick up the topicId in the image without checking if the topic has been deleted. This can cause the creation of a new TopicRecord with an old topic ID. Reviewers: Alyssa Huang <[email protected]>, Artem Livshits <[email protected]>, Colin P. McCabe <[email protected]> No conflicts.
1 parent 7e51a2a commit 46e843d

File tree

2 files changed

+248
-8
lines changed

2 files changed

+248
-8
lines changed

metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,26 +96,43 @@ public void replay(PartitionChangeRecord record) {
9696
topicDelta.replay(record);
9797
}
9898

99+
private void maybeReplayClearElrRecord(Uuid topicId, ClearElrRecord record) {
100+
// Only apply the record if the topic is not deleted.
101+
if (!deletedTopicIds.contains(topicId)) {
102+
TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
103+
topicDelta.replay(record);
104+
}
105+
}
106+
107+
// When replaying the ClearElrRecord, we need to first find the latest topic ID associated with the topic(s) because
108+
// multiple topic IDs for the same topic in a TopicsDelta is possible in the event of topic deletion and recreation.
109+
// Second, we should not add the topicDelta if the given topic ID has been deleted. So that we don't leak the
110+
// deleted topic ID.
99111
public void replay(ClearElrRecord record) {
100112
if (!record.topicName().isEmpty()) {
101-
Uuid topicId;
102-
if (image.getTopic(record.topicName()) != null) {
103-
topicId = image.getTopic(record.topicName()).id();
104-
} else {
113+
Uuid topicId = null;
114+
// CreatedTopics contains the latest topic IDs. It should be checked first in case the topic is deleted and
115+
// created in the same batch.
116+
if (createdTopics.containsKey(record.topicName())) {
105117
topicId = createdTopics.get(record.topicName());
118+
} else if (image.getTopic(record.topicName()) != null) {
119+
topicId = image.getTopic(record.topicName()).id();
106120
}
121+
107122
if (topicId == null) {
108123
throw new RuntimeException("Unable to clear elr for topic with name " +
109124
record.topicName() + ": no such topic found.");
110125
}
111-
TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
112-
topicDelta.replay(record);
126+
127+
maybeReplayClearElrRecord(topicId, record);
113128
} else {
114129
// Update all the existing topics
115130
image.topicsById().forEach((topicId, image) -> {
116-
TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
117-
topicDelta.replay(record);
131+
maybeReplayClearElrRecord(topicId, record);
118132
});
133+
createdTopicIds().forEach((topicId -> {
134+
maybeReplayClearElrRecord(topicId, record);
135+
}));
119136
}
120137
}
121138

metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,229 @@ public void testClearElrRecords() {
429429
assertEquals(0, image.getTopic(barId).partitions().get(0).lastKnownElr.length);
430430
}
431431

432+
@Test
433+
public void testClearElrRecordOnNonExistingTopic() {
434+
TopicsImage image = TopicsImage.EMPTY;
435+
436+
List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
437+
topicRecords.addAll(List.of(
438+
new ApiMessageAndVersion(
439+
new ClearElrRecord().setTopicName("foo"),
440+
CLEAR_ELR_RECORD.highestSupportedVersion()
441+
))
442+
);
443+
TopicsDelta delta = new TopicsDelta(image);
444+
assertThrows(RuntimeException.class, () -> RecordTestUtils.replayAll(delta, topicRecords));
445+
}
446+
447+
@Test
448+
public void testClearElrRecords_All_ForDeletedTopics() {
449+
Uuid fooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
450+
Uuid fooId2 = Uuid.randomUuid();
451+
Uuid barId = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
452+
Uuid barId2 = Uuid.randomUuid();
453+
454+
List<TopicImage> topics = new ArrayList<>();
455+
topics.add(
456+
newTopicImage(
457+
"foo",
458+
fooId,
459+
newPartition(new int[] {0, 1, 2, 3})
460+
)
461+
);
462+
TopicsImage image = new TopicsImage(newTopicsByIdMap(topics),
463+
newTopicsByNameMap(topics));
464+
465+
List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
466+
topicRecords.add(
467+
new ApiMessageAndVersion(
468+
new PartitionRecord().setTopicId(fooId).
469+
setPartitionId(0).
470+
setLeader(0).
471+
setIsr(List.of(1, 2, 3)),
472+
PARTITION_RECORD.highestSupportedVersion()
473+
)
474+
);
475+
476+
TopicsDelta delta = new TopicsDelta(image);
477+
RecordTestUtils.replayAll(delta, topicRecords);
478+
image = delta.apply();
479+
480+
topicRecords = new ArrayList<>();
481+
/* Test the following:
482+
1. Topic foo is deleted and created in the same delta, the clear elr applies on the new topic
483+
2. Topic bar is created, deleted, then created in the same delta, the clear elr applies on the new topic
484+
*/
485+
topicRecords.addAll(List.of(
486+
new ApiMessageAndVersion(
487+
new RemoveTopicRecord().setTopicId(fooId),
488+
REMOVE_TOPIC_RECORD.highestSupportedVersion()
489+
),
490+
new ApiMessageAndVersion(
491+
new TopicRecord().setTopicId(fooId2).
492+
setName("foo"),
493+
TOPIC_RECORD.highestSupportedVersion()
494+
),
495+
new ApiMessageAndVersion(
496+
new PartitionRecord().setTopicId(fooId2).setPartitionId(0).
497+
setIsr(List.of(0, 1)).
498+
setEligibleLeaderReplicas(List.of(2)).
499+
setLastKnownElr(List.of(3)),
500+
PARTITION_CHANGE_RECORD.highestSupportedVersion()
501+
),
502+
new ApiMessageAndVersion(
503+
new TopicRecord().setTopicId(barId).
504+
setName("bar"),
505+
TOPIC_RECORD.highestSupportedVersion()
506+
),
507+
new ApiMessageAndVersion(
508+
new PartitionRecord().setTopicId(barId).
509+
setPartitionId(0).
510+
setLeader(0).
511+
setIsr(List.of(1, 2, 3)),
512+
PARTITION_RECORD.highestSupportedVersion()
513+
),
514+
new ApiMessageAndVersion(
515+
new RemoveTopicRecord().setTopicId(barId),
516+
REMOVE_TOPIC_RECORD.highestSupportedVersion()
517+
),
518+
new ApiMessageAndVersion(
519+
new TopicRecord().setTopicId(barId2).
520+
setName("bar"),
521+
TOPIC_RECORD.highestSupportedVersion()
522+
),
523+
new ApiMessageAndVersion(
524+
new PartitionRecord().setTopicId(barId2).
525+
setPartitionId(0).
526+
setLeader(0).
527+
setIsr(List.of(1)).
528+
setEligibleLeaderReplicas(List.of(2)).
529+
setLastKnownElr(List.of(3)),
530+
PARTITION_RECORD.highestSupportedVersion()
531+
),
532+
new ApiMessageAndVersion(
533+
new ClearElrRecord(),
534+
CLEAR_ELR_RECORD.highestSupportedVersion()
535+
))
536+
);
537+
delta = new TopicsDelta(image);
538+
RecordTestUtils.replayAll(delta, topicRecords);
539+
image = delta.apply();
540+
assertEquals(2, image.topicsById().size());
541+
assertEquals(2, image.topicsByName().size());
542+
543+
assertEquals(0, image.getTopic(fooId2).partitions().get(0).elr.length);
544+
assertEquals(0, image.getTopic(fooId2).partitions().get(0).lastKnownElr.length);
545+
assertEquals(0, image.getTopic(barId2).partitions().get(0).elr.length);
546+
assertEquals(0, image.getTopic(barId2).partitions().get(0).lastKnownElr.length);
547+
}
548+
549+
@Test
550+
public void testClearElrRecords_Single_ForDeletedTopics() {
551+
Uuid fooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
552+
Uuid fooId2 = Uuid.randomUuid();
553+
Uuid barId = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
554+
Uuid barId2 = Uuid.randomUuid();
555+
556+
List<TopicImage> topics = new ArrayList<>();
557+
topics.add(
558+
newTopicImage(
559+
"foo",
560+
fooId,
561+
newPartition(new int[] {0, 1, 2, 3})
562+
)
563+
);
564+
TopicsImage image = new TopicsImage(newTopicsByIdMap(topics),
565+
newTopicsByNameMap(topics));
566+
567+
List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
568+
topicRecords.add(
569+
new ApiMessageAndVersion(
570+
new PartitionRecord().setTopicId(fooId).
571+
setPartitionId(0).
572+
setLeader(0).
573+
setIsr(List.of(1, 2, 3)),
574+
PARTITION_RECORD.highestSupportedVersion()
575+
)
576+
);
577+
578+
TopicsDelta delta = new TopicsDelta(image);
579+
RecordTestUtils.replayAll(delta, topicRecords);
580+
image = delta.apply();
581+
582+
topicRecords = new ArrayList<>();
583+
/* Test the following:
584+
1. Topic foo is deleted and created in the same delta, the clear elr applies on the new topic
585+
2. Topic bar is created, deleted, then created in the same delta, the clear elr applies on the new topic
586+
*/
587+
topicRecords.addAll(List.of(
588+
new ApiMessageAndVersion(
589+
new RemoveTopicRecord().setTopicId(fooId),
590+
REMOVE_TOPIC_RECORD.highestSupportedVersion()
591+
),
592+
new ApiMessageAndVersion(
593+
new TopicRecord().setTopicId(fooId2).
594+
setName("foo"),
595+
TOPIC_RECORD.highestSupportedVersion()
596+
),
597+
new ApiMessageAndVersion(
598+
new PartitionRecord().setTopicId(fooId2).setPartitionId(0).
599+
setIsr(List.of(0, 1)).
600+
setEligibleLeaderReplicas(List.of(2)).
601+
setLastKnownElr(List.of(3)),
602+
PARTITION_CHANGE_RECORD.highestSupportedVersion()
603+
),
604+
new ApiMessageAndVersion(
605+
new TopicRecord().setTopicId(barId).
606+
setName("bar"),
607+
TOPIC_RECORD.highestSupportedVersion()
608+
),
609+
new ApiMessageAndVersion(
610+
new PartitionRecord().setTopicId(barId).
611+
setPartitionId(0).
612+
setLeader(0).
613+
setIsr(List.of(1, 2, 3)),
614+
PARTITION_RECORD.highestSupportedVersion()
615+
),
616+
new ApiMessageAndVersion(
617+
new RemoveTopicRecord().setTopicId(barId),
618+
REMOVE_TOPIC_RECORD.highestSupportedVersion()
619+
),
620+
new ApiMessageAndVersion(
621+
new TopicRecord().setTopicId(barId2).
622+
setName("bar"),
623+
TOPIC_RECORD.highestSupportedVersion()
624+
),
625+
new ApiMessageAndVersion(
626+
new PartitionRecord().setTopicId(barId2).
627+
setPartitionId(0).
628+
setLeader(0).
629+
setIsr(List.of(1)).
630+
setEligibleLeaderReplicas(List.of(2)).
631+
setLastKnownElr(List.of(3)),
632+
PARTITION_RECORD.highestSupportedVersion()
633+
),
634+
new ApiMessageAndVersion(
635+
new ClearElrRecord().setTopicName("foo"),
636+
CLEAR_ELR_RECORD.highestSupportedVersion()
637+
),
638+
new ApiMessageAndVersion(
639+
new ClearElrRecord().setTopicName("bar"),
640+
CLEAR_ELR_RECORD.highestSupportedVersion()
641+
))
642+
);
643+
delta = new TopicsDelta(image);
644+
RecordTestUtils.replayAll(delta, topicRecords);
645+
image = delta.apply();
646+
assertEquals(2, image.topicsById().size());
647+
assertEquals(2, image.topicsByName().size());
648+
649+
assertEquals(0, image.getTopic(fooId2).partitions().get(0).elr.length);
650+
assertEquals(0, image.getTopic(fooId2).partitions().get(0).lastKnownElr.length);
651+
assertEquals(0, image.getTopic(barId2).partitions().get(0).elr.length);
652+
assertEquals(0, image.getTopic(barId2).partitions().get(0).lastKnownElr.length);
653+
}
654+
432655
@Test
433656
public void testClearElrRecordForNonExistTopic() {
434657
TopicsImage image = new TopicsImage(newTopicsByIdMap(Collections.emptyList()),

0 commit comments

Comments
 (0)