Skip to content

Commit 309dee3

Browse files
authored
feat(issues1842): cleanup metastream kv after deleting topic (#1843)
Signed-off-by: Robin Han <[email protected]>
1 parent 9b740d2 commit 309dee3

File tree

4 files changed

+44
-7
lines changed

4 files changed

+44
-7
lines changed

metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2157,7 +2157,7 @@ private QuorumController(
21572157
this.streamControlManager = new StreamControlManager(this, snapshotRegistry, logContext,
21582158
this.s3ObjectControlManager, clusterControl, featureControl, replicationControl);
21592159
this.kvControlManager = new KVControlManager(snapshotRegistry, logContext);
2160-
this.topicDeletionManager = new TopicDeletionManager(snapshotRegistry, this, streamControlManager);
2160+
this.topicDeletionManager = new TopicDeletionManager(snapshotRegistry, this, streamControlManager, kvControlManager);
21612161
this.nodeControlManager = new NodeControlManager(snapshotRegistry, new DefaultNodeRuntimeInfoGetter(clusterControl, streamControlManager));
21622162
this.extension = extension.apply(this);
21632163

metadata/src/main/java/org/apache/kafka/controller/stream/TopicDeletionManager.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111

1212
package org.apache.kafka.controller.stream;
1313

14+
import com.automq.stream.s3.metadata.ObjectUtils;
1415
import com.automq.stream.s3.metadata.StreamState;
1516
import com.automq.stream.utils.Threads;
1617
import java.nio.charset.StandardCharsets;
18+
import java.util.ArrayList;
1719
import java.util.Collections;
1820
import java.util.LinkedList;
1921
import java.util.List;
@@ -50,17 +52,20 @@ public class TopicDeletionManager {
5052

5153
private final Controller quorumController;
5254
private final StreamControlManager streamControlManager;
55+
private final KVControlManager kvControlManager;
5356

5457
private CompletableFuture<Void> lastDeletion = CompletableFuture.completedFuture(null);
5558

5659
public TopicDeletionManager(
5760
SnapshotRegistry registry,
5861
Controller quorumController,
59-
StreamControlManager streamControlManager
62+
StreamControlManager streamControlManager,
63+
KVControlManager kvControlManager
6064
) {
6165
this.waitingCleanupTopics = new TimelineHashMap<>(registry, 0);
6266
this.quorumController = quorumController;
6367
this.streamControlManager = streamControlManager;
68+
this.kvControlManager = kvControlManager;
6469
Threads.COMMON_SCHEDULER.scheduleWithFixedDelay(this::cleanupDeletedTopics, 5, 5, TimeUnit.SECONDS);
6570
}
6671

@@ -98,7 +103,16 @@ public ControllerResult<Void> cleanupDeletedTopics0() {
98103
}
99104
}
100105
if (streamsToDelete.isEmpty()) {
101-
List<ApiMessageAndVersion> records = List.of(new ApiMessageAndVersion(new RemoveKVRecord().setKeys(List.of(TopicDeletion.TOPIC_DELETION_PREFIX + topicId)), (short) 0));
106+
List<ApiMessageAndVersion> records = new ArrayList<>();
107+
List<String> metaStreamKvList = new ArrayList<>();
108+
String metaStreamKvPrefix = ObjectUtils.genMetaStreamKvPrefix(topicId);
109+
kvControlManager.kv().forEach((k, v) -> {
110+
if (k.startsWith(metaStreamKvPrefix)) {
111+
metaStreamKvList.add(k);
112+
}
113+
});
114+
records.add(new ApiMessageAndVersion(new RemoveKVRecord().setKeys(metaStreamKvList), (short) 0));
115+
records.add(new ApiMessageAndVersion(new RemoveKVRecord().setKeys(List.of(TopicDeletion.TOPIC_DELETION_PREFIX + topicId)), (short) 0));
102116
LOGGER.info("Topic clean up completed for topic {}", topicId);
103117
if (waitingCleanupTopics.size(lastStableOffset) > 1) {
104118
// there are more topics to clean up, fast trigger next round cleanup

metadata/src/test/java/org/apache/kafka/controller/stream/TopicDeletionManagerTest.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111

1212
package org.apache.kafka.controller.stream;
1313

14+
import com.automq.stream.s3.metadata.ObjectUtils;
1415
import com.automq.stream.s3.metadata.StreamState;
16+
import java.nio.ByteBuffer;
1517
import java.nio.charset.StandardCharsets;
1618
import java.util.Collections;
1719
import java.util.HashMap;
@@ -51,8 +53,13 @@ public class TopicDeletionManagerTest {
5153
final Uuid topicId = new Uuid(1, 2);
5254
SnapshotRegistry registry;
5355
Controller quorumController;
56+
5457
TimelineHashMap<Long, StreamRuntimeMetadata> streams;
5558
StreamControlManager streamControlManager;
59+
60+
TimelineHashMap<String, ByteBuffer> kvs;
61+
KVControlManager kvControlManager;
62+
5663
TopicDeletionManager topicDeletionManager;
5764

5865
@BeforeEach
@@ -63,7 +70,12 @@ public void setup() {
6370
streams = new TimelineHashMap<>(registry, 0);
6471
streamControlManager = mock(StreamControlManager.class);
6572
when(streamControlManager.streamsMetadata()).thenReturn(streams);
66-
topicDeletionManager = new TopicDeletionManager(registry, quorumController, streamControlManager);
73+
74+
kvs = new TimelineHashMap<>(registry, 0);
75+
kvControlManager = mock(KVControlManager.class);
76+
when(kvControlManager.kv()).thenReturn(kvs);
77+
78+
topicDeletionManager = new TopicDeletionManager(registry, quorumController, streamControlManager, kvControlManager);
6779
}
6880

6981
@Test
@@ -75,6 +87,13 @@ public void testCleanup() {
7587
when(quorumController.isActive()).thenReturn(true);
7688
when(quorumController.appendWriteEvent(any(), any(), any())).thenReturn(CompletableFuture.completedFuture(null));
7789

90+
List<String> metadataKvList = List.of(
91+
ObjectUtils.genMetaStreamKvPrefix(topicId.toString()) + 1,
92+
ObjectUtils.genMetaStreamKvPrefix(topicId.toString()) + 2
93+
);
94+
metadataKvList.forEach(str -> kvs.put(str, ByteBuffer.wrap(new byte[0])));
95+
kvs.put(ObjectUtils.genMetaStreamKvPrefix(topicId + "others") + 1, ByteBuffer.wrap(new byte[0]));
96+
7897
streams.put(1L, new StreamRuntimeMetadata(1L, 0, 0, 0,
7998
StreamState.CLOSED, Collections.emptyMap(), registry));
8099
Map<String, String> tags = new HashMap<>();
@@ -122,7 +141,11 @@ public void testCleanup() {
122141
registry.getOrCreateSnapshot(4L);
123142
when(quorumController.lastStableOffset()).thenReturn(4L);
124143

125-
replay(topicDeletionManager.cleanupDeletedTopics0(), topicDeletionManager);
144+
ControllerResult<Void> rst = topicDeletionManager.cleanupDeletedTopics0();
145+
assertEquals(2, rst.records().size());
146+
assertEquals(metadataKvList.stream().sorted().collect(Collectors.toList()), ((RemoveKVRecord) rst.records().get(0).message()).keys().stream().sorted().collect(Collectors.toList()));
147+
assertEquals(List.of(TopicDeletion.TOPIC_DELETION_PREFIX + topicId), ((RemoveKVRecord) rst.records().get(1).message()).keys());
148+
replay(rst, topicDeletionManager);
126149
verify(quorumController, times(2)).deleteStreams(any(), any());
127150
assertEquals(0, topicDeletionManager.waitingCleanupTopics.size());
128151
}

s3stream/src/main/java/com/automq/stream/s3/metadata/ObjectUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ public static void setNamespace(String namespace) {
2828
ObjectUtils.namespace = namespace;
2929
}
3030

31-
public static void main(String[] args) {
32-
System.out.printf("%s%n", genKey(0, 11154));
31+
public static String genMetaStreamKvPrefix(String topic) {
32+
return namespace + "/" + topic + "/";
3333
}
3434

3535
public static String genIndexKey(int version, long nodeId) {

0 commit comments

Comments
 (0)