Skip to content

Commit 574c4c6

Browse files
authored
Merge pull request #21 from grepplabs/kafka-2.1.0
Add Metadata Response (Version: 7) for Kafka 2.1.0
2 parents ac9f4c8 + 3b941b6 commit 574c4c6

File tree

4 files changed

+229
-4
lines changed

4 files changed

+229
-4
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ VERSION ?= $(shell git describe --tags --always --dirty)
88
GOPKGS = $(shell go list ./... | grep -v /vendor/)
99
BUILD_FLAGS ?=
1010
LDFLAGS ?= -X github.com/grepplabs/kafka-proxy/config.Version=$(VERSION) -w -s
11-
TAG ?= "v0.1.0"
11+
TAG ?= "v0.1.1"
1212
GOARCH ?= amd64
1313
GOOS ?= linux
1414

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ See:
3434

3535
Linux
3636

37-
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.1.0/kafka-proxy_0.1.0_linux_amd64.tar.gz | tar xz
37+
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.1.1/kafka-proxy_0.1.1_linux_amd64.tar.gz | tar xz
3838

3939
macOS
4040

41-
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.1.0/kafka-proxy_0.1.0_darwin_amd64.tar.gz | tar xz
41+
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.1.1/kafka-proxy_0.1.1_darwin_amd64.tar.gz | tar xz
4242

4343
2. Move the binary in to your PATH.
4444

proxy/protocol/responses.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,16 @@ func createMetadataResponseSchemaVersions() []Schema {
6666
&array{name: "offline_replicas", ty: typeInt32},
6767
)
6868

69+
partitionMetadataV7 := NewSchema("partition_metadata_v7",
70+
&field{name: "error_code", ty: typeInt16},
71+
&field{name: "partition", ty: typeInt32},
72+
&field{name: "leader", ty: typeInt32},
73+
&field{name: "leader_epoch", ty: typeInt32},
74+
&array{name: "replicas", ty: typeInt32},
75+
&array{name: "isr", ty: typeInt32},
76+
&array{name: "offline_replicas", ty: typeInt32},
77+
)
78+
6979
topicMetadataV1 := NewSchema("topic_metadata_v1",
7080
&field{name: "error_code", ty: typeInt16},
7181
&field{name: "topic", ty: typeStr},
@@ -80,6 +90,13 @@ func createMetadataResponseSchemaVersions() []Schema {
8090
&array{name: "partition_metadata", ty: partitionMetadataV2},
8191
)
8292

93+
topicMetadataV7 := NewSchema("topic_metadata_v7",
94+
&field{name: "error_code", ty: typeInt16},
95+
&field{name: "topic", ty: typeStr},
96+
&field{name: "is_internal", ty: typeBool},
97+
&array{name: "partition_metadata", ty: partitionMetadataV7},
98+
)
99+
83100
metadataResponseV1 := NewSchema("metadata_response_v1",
84101
&array{name: brokersKeyName, ty: metadataBrokerV1},
85102
&field{name: "controller_id", ty: typeInt32},
@@ -113,7 +130,15 @@ func createMetadataResponseSchemaVersions() []Schema {
113130

114131
metadataResponseV6 := metadataResponseV5
115132

116-
return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5, metadataResponseV6}
133+
metadataResponseV7 := NewSchema("metadata_response_v7",
134+
&field{name: "throttle_time_ms", ty: typeInt32},
135+
&array{name: brokersKeyName, ty: metadataBrokerV1},
136+
&field{name: "cluster_id", ty: typeNullableStr},
137+
&field{name: "controller_id", ty: typeInt32},
138+
&array{name: "topic_metadata", ty: topicMetadataV7},
139+
)
140+
141+
return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5, metadataResponseV6, metadataResponseV7}
117142
}
118143

119144
func createFindCoordinatorResponseSchemaVersions() []Schema {

proxy/protocol/responses_test.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1518,6 +1518,206 @@ func TestMetadataResponseV6(t *testing.T) {
15181518
}
15191519
a.Equal(expected, dc.AttrValues())
15201520
}
1521+
1522+
func TestMetadataResponseV7(t *testing.T) {
1523+
/*
1524+
Metadata Response (Version: 7) => throttle_time_ms [brokers] cluster_id controller_id [topic_metadata]
1525+
throttle_time_ms => INT32
1526+
brokers => node_id host port rack
1527+
node_id => INT32
1528+
host => STRING
1529+
port => INT32
1530+
rack => NULLABLE_STRING
1531+
cluster_id => NULLABLE_STRING
1532+
controller_id => INT32
1533+
topic_metadata => error_code topic is_internal [partition_metadata]
1534+
error_code => INT16
1535+
topic => STRING
1536+
is_internal => BOOLEAN
1537+
partition_metadata => error_code partition leader leader_epoch [replicas] [isr] [offline_replicas]
1538+
error_code => INT16
1539+
partition => INT32
1540+
leader => INT32
1541+
leader_epoch => INT32
1542+
replicas => INT32
1543+
isr => INT32
1544+
offline_replicas => INT32
1545+
1546+
*/
1547+
1548+
apiVersion := int16(7)
1549+
1550+
bytes := []byte{
1551+
// throttle_time_ms
1552+
0x00, 0x00, 0x00, 0x01, // 1
1553+
// brokers
1554+
0x00, 0x00, 0x00, 0x03,
1555+
// brokers[0]
1556+
0x00, 0x00, 0xab, 0xff, // 44031
1557+
0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
1558+
0x00, 0x00, 0x00, 0x33, // 51
1559+
0x00, 0x00, // ''
1560+
// brokers[1]
1561+
0x00, 0x01, 0x02, 0x03, // 66051
1562+
0x00, 0x0a, 'g', 'o', 'o', 'g', 'l', 'e', '.', 'c', 'o', 'm',
1563+
0x00, 0x00, 0x01, 0x11, // 273
1564+
0x00, 0x07, 'e', 'u', 'w', 'e', 's', 't', '1',
1565+
// brokers[2]
1566+
0x00, 0x00, 0x00, 0x02, // 2
1567+
0x00, 0x09, 'k', 'a', 'f', 'k', 'a', '.', 'o', 'r', 'g',
1568+
0x00, 0x00, 0xd0, 0xff, // 53503
1569+
0xff, 0xff, // -1 is nil'
1570+
1571+
// cluster_id
1572+
0x00, 0x07, 'm', 'y', 'k', 'a', 'f', 'k', 'a',
1573+
1574+
// controller_id
1575+
0x00, 0x00, 0xe1, 0xb2, // 57778
1576+
1577+
// topic_metadata
1578+
0x00, 0x00, 0x00, 0x02,
1579+
1580+
// topic_metadata[0]
1581+
0x00, 0x00,
1582+
0x00, 0x03, 'f', 'o', 'o',
1583+
0x01, // true
1584+
// partition_metadata
1585+
0x00, 0x00, 0x00, 0x01,
1586+
0x00, 0x04,
1587+
0x00, 0x00, 0x00, 0x01,
1588+
0x00, 0x00, 0x00, 0x07,
1589+
0x00, 0x00, 0x00, 0x08,
1590+
0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03,
1591+
0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x02,
1592+
0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x07,
1593+
// topic_metadata[1]
1594+
0x00, 0x00,
1595+
0x00, 0x03, 'b', 'a', 'r',
1596+
0x00, // false
1597+
// partition_metadata
1598+
0x00, 0x00, 0x00, 0x00}
1599+
1600+
a := assert.New(t)
1601+
1602+
schema := metadataResponseSchemaVersions[apiVersion]
1603+
1604+
s, err := DecodeSchema(bytes, schema)
1605+
a.Nil(err)
1606+
1607+
dc := NewDecodeCheck()
1608+
dc.Traverse(s)
1609+
1610+
expected := []string{
1611+
"throttle_time_ms int32 1",
1612+
"[brokers]",
1613+
"brokers struct",
1614+
"node_id int32 44031",
1615+
"host string localhost",
1616+
"port int32 51",
1617+
"rack *string ",
1618+
"brokers struct",
1619+
"node_id int32 66051",
1620+
"host string google.com",
1621+
"port int32 273",
1622+
"rack *string euwest1",
1623+
"brokers struct",
1624+
"node_id int32 2",
1625+
"host string kafka.org",
1626+
"port int32 53503",
1627+
"rack *string <nil>",
1628+
"cluster_id *string mykafka",
1629+
"controller_id int32 57778",
1630+
"[topic_metadata]",
1631+
"topic_metadata struct",
1632+
"error_code int16 0",
1633+
"topic string foo",
1634+
"is_internal bool true",
1635+
"[partition_metadata]",
1636+
"partition_metadata struct",
1637+
"error_code int16 4",
1638+
"partition int32 1",
1639+
"leader int32 7",
1640+
"leader_epoch int32 8",
1641+
"[replicas]",
1642+
"replicas int32 1",
1643+
"replicas int32 2",
1644+
"replicas int32 3",
1645+
"[isr]",
1646+
"isr int32 3",
1647+
"isr int32 2",
1648+
"[offline_replicas]",
1649+
"offline_replicas int32 5",
1650+
"offline_replicas int32 6",
1651+
"offline_replicas int32 7",
1652+
"topic_metadata struct",
1653+
"error_code int16 0",
1654+
"topic string bar",
1655+
"is_internal bool false",
1656+
"[partition_metadata]",
1657+
}
1658+
a.Equal(expected, dc.AttrValues())
1659+
resp, err := EncodeSchema(s, schema)
1660+
a.Nil(err)
1661+
a.Equal(bytes, resp)
1662+
1663+
modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, testResponseModifier)
1664+
a.Nil(err)
1665+
resp, err = modifier.Apply(resp)
1666+
a.Nil(err)
1667+
s, err = DecodeSchema(resp, schema)
1668+
a.Nil(err)
1669+
dc = NewDecodeCheck()
1670+
dc.Traverse(s)
1671+
expected = []string{
1672+
"throttle_time_ms int32 1",
1673+
"[brokers]",
1674+
"brokers struct",
1675+
"node_id int32 44031",
1676+
"host string myhost1", // replaced
1677+
"port int32 34001", // replaced
1678+
"rack *string ",
1679+
"brokers struct",
1680+
"node_id int32 66051",
1681+
"host string myhost2", // replaced
1682+
"port int32 34002", // replaced
1683+
"rack *string euwest1",
1684+
"brokers struct",
1685+
"node_id int32 2",
1686+
"host string myhost3", // replaced
1687+
"port int32 34003", // replaced
1688+
"rack *string <nil>",
1689+
"cluster_id *string mykafka",
1690+
"controller_id int32 57778",
1691+
"[topic_metadata]",
1692+
"topic_metadata struct",
1693+
"error_code int16 0",
1694+
"topic string foo",
1695+
"is_internal bool true",
1696+
"[partition_metadata]",
1697+
"partition_metadata struct",
1698+
"error_code int16 4",
1699+
"partition int32 1",
1700+
"leader int32 7",
1701+
"leader_epoch int32 8",
1702+
"[replicas]",
1703+
"replicas int32 1",
1704+
"replicas int32 2",
1705+
"replicas int32 3",
1706+
"[isr]",
1707+
"isr int32 3",
1708+
"isr int32 2",
1709+
"[offline_replicas]",
1710+
"offline_replicas int32 5",
1711+
"offline_replicas int32 6",
1712+
"offline_replicas int32 7",
1713+
"topic_metadata struct",
1714+
"error_code int16 0",
1715+
"topic string bar",
1716+
"is_internal bool false",
1717+
"[partition_metadata]",
1718+
}
1719+
a.Equal(expected, dc.AttrValues())
1720+
}
15211721
func TestFindCoordinatorResponseV0(t *testing.T) {
15221722
/*
15231723
FindCoordinator Response (Version: 0) => error_code coordinator

0 commit comments

Comments
 (0)