@@ -1518,6 +1518,206 @@ func TestMetadataResponseV6(t *testing.T) {
15181518 }
15191519 a .Equal (expected , dc .AttrValues ())
15201520}
1521+
1522+ func TestMetadataResponseV7 (t * testing.T ) {
1523+ /*
1524+ Metadata Response (Version: 7) => throttle_time_ms [brokers] cluster_id controller_id [topic_metadata]
1525+ throttle_time_ms => INT32
1526+ brokers => node_id host port rack
1527+ node_id => INT32
1528+ host => STRING
1529+ port => INT32
1530+ rack => NULLABLE_STRING
1531+ cluster_id => NULLABLE_STRING
1532+ controller_id => INT32
1533+ topic_metadata => error_code topic is_internal [partition_metadata]
1534+ error_code => INT16
1535+ topic => STRING
1536+ is_internal => BOOLEAN
1537+ partition_metadata => error_code partition leader leader_epoch [replicas] [isr] [offline_replicas]
1538+ error_code => INT16
1539+ partition => INT32
1540+ leader => INT32
1541+ leader_epoch => INT32
1542+ replicas => INT32
1543+ isr => INT32
1544+ offline_replicas => INT32
1545+
1546+ */
1547+
1548+ apiVersion := int16 (7 )
1549+
1550+ bytes := []byte {
1551+ // throttle_time_ms
1552+ 0x00 , 0x00 , 0x00 , 0x01 , // 1
1553+ // brokers
1554+ 0x00 , 0x00 , 0x00 , 0x03 ,
1555+ // brokers[0]
1556+ 0x00 , 0x00 , 0xab , 0xff , // 44031
1557+ 0x00 , 0x09 , 'l' , 'o' , 'c' , 'a' , 'l' , 'h' , 'o' , 's' , 't' ,
1558+ 0x00 , 0x00 , 0x00 , 0x33 , // 51
1559+ 0x00 , 0x00 , // ''
1560+ // brokers[1]
1561+ 0x00 , 0x01 , 0x02 , 0x03 , // 66051
1562+ 0x00 , 0x0a , 'g' , 'o' , 'o' , 'g' , 'l' , 'e' , '.' , 'c' , 'o' , 'm' ,
1563+ 0x00 , 0x00 , 0x01 , 0x11 , // 273
1564+ 0x00 , 0x07 , 'e' , 'u' , 'w' , 'e' , 's' , 't' , '1' ,
1565+ // brokers[2]
1566+ 0x00 , 0x00 , 0x00 , 0x02 , // 2
1567+ 0x00 , 0x09 , 'k' , 'a' , 'f' , 'k' , 'a' , '.' , 'o' , 'r' , 'g' ,
1568+ 0x00 , 0x00 , 0xd0 , 0xff , // 53503
1569+ 0xff , 0xff , // -1 is nil'
1570+
1571+ // cluster_id
1572+ 0x00 , 0x07 , 'm' , 'y' , 'k' , 'a' , 'f' , 'k' , 'a' ,
1573+
1574+ // controller_id
1575+ 0x00 , 0x00 , 0xe1 , 0xb2 , // 57778
1576+
1577+ // topic_metadata
1578+ 0x00 , 0x00 , 0x00 , 0x02 ,
1579+
1580+ // topic_metadata[0]
1581+ 0x00 , 0x00 ,
1582+ 0x00 , 0x03 , 'f' , 'o' , 'o' ,
1583+ 0x01 , // true
1584+ // partition_metadata
1585+ 0x00 , 0x00 , 0x00 , 0x01 ,
1586+ 0x00 , 0x04 ,
1587+ 0x00 , 0x00 , 0x00 , 0x01 ,
1588+ 0x00 , 0x00 , 0x00 , 0x07 ,
1589+ 0x00 , 0x00 , 0x00 , 0x08 ,
1590+ 0x00 , 0x00 , 0x00 , 0x03 , 0x00 , 0x00 , 0x00 , 0x01 , 0x00 , 0x00 , 0x00 , 0x02 , 0x00 , 0x00 , 0x00 , 0x03 ,
1591+ 0x00 , 0x00 , 0x00 , 0x02 , 0x00 , 0x00 , 0x00 , 0x03 , 0x00 , 0x00 , 0x00 , 0x02 ,
1592+ 0x00 , 0x00 , 0x00 , 0x03 , 0x00 , 0x00 , 0x00 , 0x05 , 0x00 , 0x00 , 0x00 , 0x06 , 0x00 , 0x00 , 0x00 , 0x07 ,
1593+ // topic_metadata[1]
1594+ 0x00 , 0x00 ,
1595+ 0x00 , 0x03 , 'b' , 'a' , 'r' ,
1596+ 0x00 , // false
1597+ // partition_metadata
1598+ 0x00 , 0x00 , 0x00 , 0x00 }
1599+
1600+ a := assert .New (t )
1601+
1602+ schema := metadataResponseSchemaVersions [apiVersion ]
1603+
1604+ s , err := DecodeSchema (bytes , schema )
1605+ a .Nil (err )
1606+
1607+ dc := NewDecodeCheck ()
1608+ dc .Traverse (s )
1609+
1610+ expected := []string {
1611+ "throttle_time_ms int32 1" ,
1612+ "[brokers]" ,
1613+ "brokers struct" ,
1614+ "node_id int32 44031" ,
1615+ "host string localhost" ,
1616+ "port int32 51" ,
1617+ "rack *string " ,
1618+ "brokers struct" ,
1619+ "node_id int32 66051" ,
1620+ "host string google.com" ,
1621+ "port int32 273" ,
1622+ "rack *string euwest1" ,
1623+ "brokers struct" ,
1624+ "node_id int32 2" ,
1625+ "host string kafka.org" ,
1626+ "port int32 53503" ,
1627+ "rack *string <nil>" ,
1628+ "cluster_id *string mykafka" ,
1629+ "controller_id int32 57778" ,
1630+ "[topic_metadata]" ,
1631+ "topic_metadata struct" ,
1632+ "error_code int16 0" ,
1633+ "topic string foo" ,
1634+ "is_internal bool true" ,
1635+ "[partition_metadata]" ,
1636+ "partition_metadata struct" ,
1637+ "error_code int16 4" ,
1638+ "partition int32 1" ,
1639+ "leader int32 7" ,
1640+ "leader_epoch int32 8" ,
1641+ "[replicas]" ,
1642+ "replicas int32 1" ,
1643+ "replicas int32 2" ,
1644+ "replicas int32 3" ,
1645+ "[isr]" ,
1646+ "isr int32 3" ,
1647+ "isr int32 2" ,
1648+ "[offline_replicas]" ,
1649+ "offline_replicas int32 5" ,
1650+ "offline_replicas int32 6" ,
1651+ "offline_replicas int32 7" ,
1652+ "topic_metadata struct" ,
1653+ "error_code int16 0" ,
1654+ "topic string bar" ,
1655+ "is_internal bool false" ,
1656+ "[partition_metadata]" ,
1657+ }
1658+ a .Equal (expected , dc .AttrValues ())
1659+ resp , err := EncodeSchema (s , schema )
1660+ a .Nil (err )
1661+ a .Equal (bytes , resp )
1662+
1663+ modifier , err := GetResponseModifier (apiKeyMetadata , apiVersion , testResponseModifier )
1664+ a .Nil (err )
1665+ resp , err = modifier .Apply (resp )
1666+ a .Nil (err )
1667+ s , err = DecodeSchema (resp , schema )
1668+ a .Nil (err )
1669+ dc = NewDecodeCheck ()
1670+ dc .Traverse (s )
1671+ expected = []string {
1672+ "throttle_time_ms int32 1" ,
1673+ "[brokers]" ,
1674+ "brokers struct" ,
1675+ "node_id int32 44031" ,
1676+ "host string myhost1" , // replaced
1677+ "port int32 34001" , // replaced
1678+ "rack *string " ,
1679+ "brokers struct" ,
1680+ "node_id int32 66051" ,
1681+ "host string myhost2" , // replaced
1682+ "port int32 34002" , // replaced
1683+ "rack *string euwest1" ,
1684+ "brokers struct" ,
1685+ "node_id int32 2" ,
1686+ "host string myhost3" , // replaced
1687+ "port int32 34003" , // replaced
1688+ "rack *string <nil>" ,
1689+ "cluster_id *string mykafka" ,
1690+ "controller_id int32 57778" ,
1691+ "[topic_metadata]" ,
1692+ "topic_metadata struct" ,
1693+ "error_code int16 0" ,
1694+ "topic string foo" ,
1695+ "is_internal bool true" ,
1696+ "[partition_metadata]" ,
1697+ "partition_metadata struct" ,
1698+ "error_code int16 4" ,
1699+ "partition int32 1" ,
1700+ "leader int32 7" ,
1701+ "leader_epoch int32 8" ,
1702+ "[replicas]" ,
1703+ "replicas int32 1" ,
1704+ "replicas int32 2" ,
1705+ "replicas int32 3" ,
1706+ "[isr]" ,
1707+ "isr int32 3" ,
1708+ "isr int32 2" ,
1709+ "[offline_replicas]" ,
1710+ "offline_replicas int32 5" ,
1711+ "offline_replicas int32 6" ,
1712+ "offline_replicas int32 7" ,
1713+ "topic_metadata struct" ,
1714+ "error_code int16 0" ,
1715+ "topic string bar" ,
1716+ "is_internal bool false" ,
1717+ "[partition_metadata]" ,
1718+ }
1719+ a .Equal (expected , dc .AttrValues ())
1720+ }
15211721func TestFindCoordinatorResponseV0 (t * testing.T ) {
15221722 /*
15231723 FindCoordinator Response (Version: 0) => error_code coordinator
0 commit comments