17
17
18
18
package kafka .server .metadata
19
19
20
+ import com .automq .stream .api .KeyValue
21
+ import com .automq .stream .s3 .metadata .StreamState
22
+ import io .netty .buffer .Unpooled
23
+ import kafka .log .streamaspect .{ElasticLog , ElasticLogManager }
20
24
import kafka .server .{CachedControllerId , KRaftCachedControllerId , MetadataCache }
21
25
import kafka .utils .Logging
22
26
import org .apache .kafka .admin .BrokerMetadata
@@ -31,7 +35,7 @@ import org.apache.kafka.common.message._
31
35
import org .apache .kafka .common .network .ListenerName
32
36
import org .apache .kafka .common .protocol .Errors
33
37
import org .apache .kafka .common .requests .MetadataResponse
34
- import org .apache .kafka .image .MetadataImage
38
+ import org .apache .kafka .image .{ MetadataImage , S3StreamMetadataImage }
35
39
import org .apache .kafka .metadata .{BrokerRegistration , PartitionRegistration , Replicas }
36
40
import org .apache .kafka .server .common .automq .AutoMQVersion
37
41
import org .apache .kafka .server .common .{FinalizedFeatures , KRaftVersion , MetadataVersion }
@@ -87,6 +91,42 @@ class KRaftMetadataCache(
87
91
88
92
def currentImage (): MetadataImage = _currentImage
89
93
94
+
95
+ // AutoMQ inject start
96
+ private def checkFailoverSuccess (topicPartition : TopicPartition , topicId : Uuid , tpRegistration : PartitionRegistration ): Boolean = {
97
+ safeRun((image : MetadataImage ) => {
98
+ val key = ElasticLog .formatStreamKey(ElasticLogManager .NAMESPACE , topicPartition, Some (topicId))
99
+ val buffer = image.kv().getValue(key)
100
+ if (buffer == null ) {
101
+ error(s " topic ${topicPartition} topicId ${topicId.toString} key $key not found " )
102
+ return false
103
+ }
104
+
105
+ val metaStreamId = Unpooled .wrappedBuffer(KeyValue .Value .of(buffer).get).readLong
106
+ val streamMetadata = image.streamsMetadata.getStreamMetadata(metaStreamId)
107
+ if (streamMetadata == null ) {
108
+ false
109
+ } else {
110
+ return topicFailoverSuccess(topicPartition, tpRegistration, streamMetadata)
111
+ }
112
+ })
113
+ }
114
+
115
+ private def topicFailoverSuccess (topicPartition : TopicPartition , tpRegistration : PartitionRegistration , streamMetadata : S3StreamMetadataImage ): Boolean = {
116
+ // the partition leaderEpoch may be different from the stream epoch
117
+ val result = tpRegistration.leaderEpoch >= streamMetadata.getEpoch &&
118
+ streamMetadata.state == StreamState .OPENED &&
119
+ streamMetadata.lastRange().nodeId() == tpRegistration.leader
120
+
121
+ if (! result) {
122
+ debug(s " Failover failed for topicPartition $topicPartition, tpEpoch $tpRegistration, streamMetadata ${streamMetadata}" )
123
+ }
124
+
125
+ result
126
+ }
127
+
128
+ // AutoMQ inject end
129
+
90
130
// errorUnavailableEndpoints exists to support v0 MetadataResponses
91
131
// If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker.
92
132
// Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below).
@@ -102,10 +142,17 @@ class KRaftMetadataCache(
102
142
val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName,
103
143
errorUnavailableEndpoints)
104
144
val offlineReplicas = getOfflineReplicas(image, partition, listenerName)
105
- val maybeLeader = getAliveEndpoint(image, partition.leader, listenerName)
145
+ // AutoMQ inject start
146
+ val failoverSuccess = checkFailoverSuccess(new TopicPartition (topicName, partitionId), topic.id(), partition)
147
+ val maybeLeader = if (! failoverSuccess) {
148
+ None
149
+ } else {
150
+ getAliveEndpoint(image, partition.leader, listenerName)
151
+ }
152
+ // AutoMQ inject end
106
153
maybeLeader match {
107
154
case None =>
108
- val error = if (! image.cluster().brokers.containsKey(partition.leader)) {
155
+ val error = if (! failoverSuccess || ! image.cluster().brokers.containsKey(partition.leader)) {
109
156
debug(s " Error while fetching metadata for $topicName- $partitionId: leader not available " )
110
157
Errors .LEADER_NOT_AVAILABLE
111
158
} else {
0 commit comments