File tree Expand file tree Collapse file tree 2 files changed +18
-1
lines changed
clients/src/main/java/org/apache/kafka/common
core/src/test/scala/unit/kafka/server Expand file tree Collapse file tree 2 files changed +18
-1
lines changed Original file line number Diff line number Diff line change @@ -70,7 +70,8 @@ public Node leader() {
7070 }
7171
7272 /**
73- * The complete set of replicas for this partition regardless of whether they are alive or up-to-date
73+ * The complete set of replicas for this partition regardless of whether they are alive or up-to-date. The preferred replica
74+ * is the head of the list.
7475 */
7576 public Node [] replicas () {
7677 return replicas ;
Original file line number Diff line number Diff line change @@ -204,6 +204,22 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
204204 }
205205 }
206206
207+ @ Test
208+ def testPartitionInfoPreferredReplica (): Unit = {
209+ val replicaAssignment = Map (0 -> Seq (1 , 2 , 0 ))
210+ val topic = " testPartitionInfoPreferredReplicaTopic"
211+ createTopicWithAssignment(topic, replicaAssignment)
212+
213+ val response = sendMetadataRequest(new MetadataRequest .Builder (Seq (topic).asJava, true ).build())
214+ val cluster = response.buildCluster()
215+ val partitionInfos = cluster.partitionsForTopic(topic).asScala
216+ assertEquals(1 , partitionInfos.size)
217+
218+ val partitionInfo = partitionInfos.head
219+ val preferredReplicaId = replicaAssignment(partitionInfo.partition()).head
220+ assertEquals(preferredReplicaId, partitionInfo.replicas().head.id())
221+ }
222+
207223 @ Test
208224 def testReplicaDownResponse (): Unit = {
209225 val replicaDownTopic = " replicaDown"
You can’t perform that action at this time.
0 commit comments