Skip to content

Commit fea222c

Browse files
committed
Support Kafka 2.0 - Metadata Response (Version: 6) , FindCoordinator Response (Version: 2)
1 parent 5429d0a commit fea222c

File tree

3 files changed

+270
-3
lines changed

3 files changed

+270
-3
lines changed

cmd/kafka-proxy/server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,4 @@ func TestExternalServersMappingFromEnv(t *testing.T) {
124124
a.Equal(c.Proxy.ExternalServers[1].ListenerAddress, "0.0.0.0:32405")
125125
a.Equal(c.Proxy.ExternalServers[1].AdvertisedAddress, "kafka-5.grepplabs.com:9092")
126126

127-
}
127+
}

proxy/protocol/responses.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,10 @@ func createMetadataResponseSchemaVersions() []Schema {
110110
&field{name: "controller_id", ty: typeInt32},
111111
&array{name: "topic_metadata", ty: topicMetadataV2},
112112
)
113-
return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5}
113+
114+
metadataResponseV6 := metadataResponseV5
115+
116+
return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5, metadataResponseV6}
114117
}
115118

116119
func createFindCoordinatorResponseSchemaVersions() []Schema {
@@ -131,7 +134,10 @@ func createFindCoordinatorResponseSchemaVersions() []Schema {
131134
&field{name: "error_message", ty: typeNullableStr},
132135
&field{name: coordinatorKeyName, ty: findCoordinatorBrokerV0},
133136
)
134-
return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1}
137+
138+
findCoordinatorResponseV2 := findCoordinatorResponseV1
139+
140+
return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1, findCoordinatorResponseV2}
135141
}
136142

137143
func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error {

proxy/protocol/responses_test.go

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1323,6 +1323,201 @@ func TestMetadataResponseV5(t *testing.T) {
13231323
a.Equal(expected, dc.AttrValues())
13241324
}
13251325

1326+
func TestMetadataResponseV6(t *testing.T) {
1327+
/*
1328+
Metadata Response (Version: 6) => throttle_time_ms [brokers] cluster_id controller_id [topic_metadata]
1329+
throttle_time_ms => INT32
1330+
brokers => node_id host port rack
1331+
node_id => INT32
1332+
host => STRING
1333+
port => INT32
1334+
rack => NULLABLE_STRING
1335+
cluster_id => NULLABLE_STRING
1336+
controller_id => INT32
1337+
topic_metadata => error_code topic is_internal [partition_metadata]
1338+
error_code => INT16
1339+
topic => STRING
1340+
is_internal => BOOLEAN
1341+
partition_metadata => error_code partition leader [replicas] [isr] [offline_replicas]
1342+
error_code => INT16
1343+
partition => INT32
1344+
leader => INT32
1345+
replicas => INT32
1346+
isr => INT32
1347+
offline_replicas => INT32
1348+
1349+
*/
1350+
1351+
apiVersion := int16(6)
1352+
1353+
bytes := []byte{
1354+
// throttle_time_ms
1355+
0x00, 0x00, 0x00, 0x01, // 1
1356+
// brokers
1357+
0x00, 0x00, 0x00, 0x03,
1358+
// brokers[0]
1359+
0x00, 0x00, 0xab, 0xff, // 44031
1360+
0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
1361+
0x00, 0x00, 0x00, 0x33, // 51
1362+
0x00, 0x00, // ''
1363+
// brokers[1]
1364+
0x00, 0x01, 0x02, 0x03, // 66051
1365+
0x00, 0x0a, 'g', 'o', 'o', 'g', 'l', 'e', '.', 'c', 'o', 'm',
1366+
0x00, 0x00, 0x01, 0x11, // 273
1367+
0x00, 0x07, 'e', 'u', 'w', 'e', 's', 't', '1',
1368+
// brokers[2]
1369+
0x00, 0x00, 0x00, 0x02, // 2
1370+
0x00, 0x09, 'k', 'a', 'f', 'k', 'a', '.', 'o', 'r', 'g',
1371+
0x00, 0x00, 0xd0, 0xff, // 53503
1372+
0xff, 0xff, // -1 is nil'
1373+
1374+
// cluster_id
1375+
0x00, 0x07, 'm', 'y', 'k', 'a', 'f', 'k', 'a',
1376+
1377+
// controller_id
1378+
0x00, 0x00, 0xe1, 0xb2, // 57778
1379+
1380+
// topic_metadata
1381+
0x00, 0x00, 0x00, 0x02,
1382+
1383+
// topic_metadata[0]
1384+
0x00, 0x00,
1385+
0x00, 0x03, 'f', 'o', 'o',
1386+
0x01, // true
1387+
// partition_metadata
1388+
0x00, 0x00, 0x00, 0x01,
1389+
0x00, 0x04,
1390+
0x00, 0x00, 0x00, 0x01,
1391+
0x00, 0x00, 0x00, 0x07,
1392+
0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03,
1393+
0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x02,
1394+
0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x07,
1395+
// topic_metadata[1]
1396+
0x00, 0x00,
1397+
0x00, 0x03, 'b', 'a', 'r',
1398+
0x00, // false
1399+
// partition_metadata
1400+
0x00, 0x00, 0x00, 0x00}
1401+
1402+
a := assert.New(t)
1403+
1404+
schema := metadataResponseSchemaVersions[apiVersion]
1405+
1406+
s, err := DecodeSchema(bytes, schema)
1407+
a.Nil(err)
1408+
1409+
dc := NewDecodeCheck()
1410+
dc.Traverse(s)
1411+
1412+
expected := []string{
1413+
"throttle_time_ms int32 1",
1414+
"[brokers]",
1415+
"brokers struct",
1416+
"node_id int32 44031",
1417+
"host string localhost",
1418+
"port int32 51",
1419+
"rack *string ",
1420+
"brokers struct",
1421+
"node_id int32 66051",
1422+
"host string google.com",
1423+
"port int32 273",
1424+
"rack *string euwest1",
1425+
"brokers struct",
1426+
"node_id int32 2",
1427+
"host string kafka.org",
1428+
"port int32 53503",
1429+
"rack *string <nil>",
1430+
"cluster_id *string mykafka",
1431+
"controller_id int32 57778",
1432+
"[topic_metadata]",
1433+
"topic_metadata struct",
1434+
"error_code int16 0",
1435+
"topic string foo",
1436+
"is_internal bool true",
1437+
"[partition_metadata]",
1438+
"partition_metadata struct",
1439+
"error_code int16 4",
1440+
"partition int32 1",
1441+
"leader int32 7",
1442+
"[replicas]",
1443+
"replicas int32 1",
1444+
"replicas int32 2",
1445+
"replicas int32 3",
1446+
"[isr]",
1447+
"isr int32 3",
1448+
"isr int32 2",
1449+
"[offline_replicas]",
1450+
"offline_replicas int32 5",
1451+
"offline_replicas int32 6",
1452+
"offline_replicas int32 7",
1453+
"topic_metadata struct",
1454+
"error_code int16 0",
1455+
"topic string bar",
1456+
"is_internal bool false",
1457+
"[partition_metadata]",
1458+
}
1459+
a.Equal(expected, dc.AttrValues())
1460+
resp, err := EncodeSchema(s, schema)
1461+
a.Nil(err)
1462+
a.Equal(bytes, resp)
1463+
1464+
modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, testResponseModifier)
1465+
a.Nil(err)
1466+
resp, err = modifier.Apply(resp)
1467+
a.Nil(err)
1468+
s, err = DecodeSchema(resp, schema)
1469+
a.Nil(err)
1470+
dc = NewDecodeCheck()
1471+
dc.Traverse(s)
1472+
expected = []string{
1473+
"throttle_time_ms int32 1",
1474+
"[brokers]",
1475+
"brokers struct",
1476+
"node_id int32 44031",
1477+
"host string myhost1", // replaced
1478+
"port int32 34001", // replaced
1479+
"rack *string ",
1480+
"brokers struct",
1481+
"node_id int32 66051",
1482+
"host string myhost2", // replaced
1483+
"port int32 34002", // replaced
1484+
"rack *string euwest1",
1485+
"brokers struct",
1486+
"node_id int32 2",
1487+
"host string myhost3", // replaced
1488+
"port int32 34003", // replaced
1489+
"rack *string <nil>",
1490+
"cluster_id *string mykafka",
1491+
"controller_id int32 57778",
1492+
"[topic_metadata]",
1493+
"topic_metadata struct",
1494+
"error_code int16 0",
1495+
"topic string foo",
1496+
"is_internal bool true",
1497+
"[partition_metadata]",
1498+
"partition_metadata struct",
1499+
"error_code int16 4",
1500+
"partition int32 1",
1501+
"leader int32 7",
1502+
"[replicas]",
1503+
"replicas int32 1",
1504+
"replicas int32 2",
1505+
"replicas int32 3",
1506+
"[isr]",
1507+
"isr int32 3",
1508+
"isr int32 2",
1509+
"[offline_replicas]",
1510+
"offline_replicas int32 5",
1511+
"offline_replicas int32 6",
1512+
"offline_replicas int32 7",
1513+
"topic_metadata struct",
1514+
"error_code int16 0",
1515+
"topic string bar",
1516+
"is_internal bool false",
1517+
"[partition_metadata]",
1518+
}
1519+
a.Equal(expected, dc.AttrValues())
1520+
}
13261521
func TestFindCoordinatorResponseV0(t *testing.T) {
13271522
/*
13281523
FindCoordinator Response (Version: 0) => error_code coordinator
@@ -1446,6 +1641,72 @@ func TestFindCoordinatorResponseV1(t *testing.T) {
14461641
a.Equal(expected, dc.AttrValues())
14471642
}
14481643

1644+
func TestFindCoordinatorResponseV2(t *testing.T) {
1645+
/*
1646+
FindCoordinator Response (Version: 2) => throttle_time_ms error_code error_message coordinator
1647+
throttle_time_ms => INT32
1648+
error_code => INT16
1649+
error_message => NULLABLE_STRING
1650+
coordinator => node_id host port
1651+
node_id => INT32
1652+
host => STRING
1653+
port => INT32
1654+
*/
1655+
apiVersion := int16(2)
1656+
1657+
bytes := []byte{
1658+
// throttle_time_ms
1659+
0x00, 0x00, 0x00, 0x01, // 1
1660+
0x00, 0x00,
1661+
0xff, 0xff,
1662+
// coordinator
1663+
0x00, 0x00, 0x00, 0xAB,
1664+
0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
1665+
0x00, 0x00, 0x00, 0x33, // 51
1666+
}
1667+
a := assert.New(t)
1668+
1669+
schema := findCoordinatorResponseSchemaVersions[apiVersion]
1670+
1671+
s, err := DecodeSchema(bytes, schema)
1672+
a.Nil(err)
1673+
dc := NewDecodeCheck()
1674+
dc.Traverse(s)
1675+
1676+
expected := []string{
1677+
"throttle_time_ms int32 1",
1678+
"error_code int16 0",
1679+
"error_message *string <nil>",
1680+
"coordinator struct",
1681+
"node_id int32 171",
1682+
"host string localhost",
1683+
"port int32 51",
1684+
}
1685+
a.Equal(expected, dc.AttrValues())
1686+
resp, err := EncodeSchema(s, schema)
1687+
a.Nil(err)
1688+
a.Equal(bytes, resp)
1689+
1690+
modifier, err := GetResponseModifier(apiKeyFindCoordinator, apiVersion, testResponseModifier)
1691+
a.Nil(err)
1692+
resp, err = modifier.Apply(resp)
1693+
a.Nil(err)
1694+
s, err = DecodeSchema(resp, schema)
1695+
a.Nil(err)
1696+
dc = NewDecodeCheck()
1697+
dc.Traverse(s)
1698+
expected = []string{
1699+
"throttle_time_ms int32 1",
1700+
"error_code int16 0",
1701+
"error_message *string <nil>",
1702+
"coordinator struct",
1703+
"node_id int32 171",
1704+
"host string myhost1", // replaced
1705+
"port int32 34001", // replaced
1706+
}
1707+
a.Equal(expected, dc.AttrValues())
1708+
}
1709+
14491710
type decodeCheck struct {
14501711
attrValues []string
14511712
}

0 commit comments

Comments
 (0)