From 3fbd067432d43acdeec83151a8079d744bb7a34f Mon Sep 17 00:00:00 2001 From: Julien Limoges Date: Tue, 13 May 2025 12:19:20 +0200 Subject: [PATCH 1/9] fix: Add host rewrite for DescribeClusterResponse --- README.md | 3 +- proxy/protocol/responses.go | 108 ++++++++++++++++++++++++++++++++++++ proxy/protocol/schema.go | 32 ++++++++++- 3 files changed, 140 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index a82e3c54..550b28a1 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,7 @@ The Kafka Proxy is based on idea of [Cloud SQL Proxy](https://github.com/GoogleC It allows a service to connect to Kafka brokers without having to deal with SASL/PLAIN authentication and SSL certificates. It works by opening tcp sockets on the local machine and proxying connections to the associated Kafka brokers -when the sockets are used. The host and port in [Metadata](http://kafka.apache.org/protocol.html#The_Messages_Metadata) -and [FindCoordinator](http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator) +when the sockets are used. The host and port in [Metadata](http://kafka.apache.org/protocol.html#The_Messages_Metadata), [FindCoordinator](http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator) & [DescribeCluster](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/DescribeClusterResponse.json) responses received from the brokers are replaced by local counterparts. For discovered brokers (not configured as the boostrap servers), local listeners are started on random ports. The dynamic local listeners feature can be disabled and an additional list of external server mappings can be provided. diff --git a/proxy/protocol/responses.go b/proxy/protocol/responses.go index 774ff90d..6ea30dd5 100644 --- a/proxy/protocol/responses.go +++ b/proxy/protocol/responses.go @@ -10,8 +10,10 @@ import ( const ( apiKeyMetadata = 3 apiKeyFindCoordinator = 10 + apiKeyDescribeCluster = 60 brokersKeyName = "brokers" + brokerKeyName = "broker_id" hostKeyName = "host" portKeyName = "port" nodeKeyName = "node_id" @@ -23,6 +25,7 @@ const ( var ( metadataResponseSchemaVersions = createMetadataResponseSchemaVersions() findCoordinatorResponseSchemaVersions = createFindCoordinatorResponseSchemaVersions() + describeClusterResponseSchemaVersions = createDescribeClusterResponseSchemaVersions() ) func createMetadataResponseSchemaVersions() []Schema { @@ -325,6 +328,59 @@ func createFindCoordinatorResponseSchemaVersions() []Schema { return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1, findCoordinatorResponseV2, findCoordinatorResponseV3, findCoordinatorResponseV4, findCoordinatorResponseV5, findCoordinatorResponseV6} } +func createDescribeClusterResponseSchemaVersions() []Schema { + describeClusterBrokerV0 := NewSchema("describe_cluster_broker_v0", + &Mfield{Name: brokerKeyName, Ty: TypeInt32}, + &Mfield{Name: hostKeyName, Ty: TypeStr}, + &Mfield{Name: portKeyName, Ty: TypeInt32}, + &Mfield{Name: "rack", Ty: TypeNullableStr}, + ) + + describeClusterBrokerV2 := NewSchema("describe_cluster_broker_v2", + &Mfield{Name: brokerKeyName, Ty: TypeInt32}, + &Mfield{Name: hostKeyName, Ty: TypeStr}, + &Mfield{Name: portKeyName, Ty: TypeInt32}, + &Mfield{Name: "rack", Ty: TypeNullableStr}, + &Mfield{Name: "is_fenced", Ty: TypeBool}, + ) + + describeClusterV0 := NewSchema("describe_cluster_response_v0", + &Mfield{Name: "throttle_time-ms", Ty: TypeInt32}, + &Mfield{Name: "error_code", Ty: TypeInt16}, + &Mfield{Name: "error_message", Ty: TypeNullableStr}, + // &MField{Name: "endpoint_type", Ty: TypeInt8}, + &Mfield{Name: "cluster_id", Ty: TypeStr}, + &Mfield{Name: "controller_id", Ty: TypeInt32}, + &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV0}, + &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, + ) + + describeClusterV1 := NewSchema("describe_cluster_response_v1", + &Mfield{Name: "throttle_time-ms", Ty: TypeInt32}, + &Mfield{Name: "error_code", Ty: TypeInt16}, + &Mfield{Name: "error_message", Ty: TypeNullableStr}, + &Mfield{Name: "endpoint_type", Ty: TypeInt8}, + // &MField{Name: "endpoint_type", Ty: TypeInt8}, + &Mfield{Name: "cluster_id", Ty: TypeStr}, + &Mfield{Name: "controller_id", Ty: TypeInt32}, + &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV0}, + &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, + ) + + describeClusterV2 := NewSchema("describe_cluster_response_v2", + &Mfield{Name: "throttle_time-ms", Ty: TypeInt32}, + &Mfield{Name: "error_code", Ty: TypeInt16}, + &Mfield{Name: "error_message", Ty: TypeNullableStr}, + &Mfield{Name: "endpoint_type", Ty: TypeInt8}, + // &MField{Name: "endpoint_type", Ty: TypeInt8}, + &Mfield{Name: "cluster_id", Ty: TypeStr}, + &Mfield{Name: "controller_id", Ty: TypeInt32}, + &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV2}, + &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, + ) + return []Schema{describeClusterV0, describeClusterV1, describeClusterV2} +} + func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error { if decodedStruct == nil { return errors.New("decoded struct must not be nil") @@ -441,6 +497,56 @@ func modifyCoordinator(decodedStruct *Struct, fn config.NetAddressMappingFunc) e return nil } +func modifyDescribeClusterResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error { + if decodedStruct == nil { + return errors.New("decoded struct must not be nil") + } + if fn == nil { + return errors.New("net address mapper must not be nil") + } + brokersArray, ok := decodedStruct.Get(brokersKeyName).([]interface{}) + if !ok { + return errors.New("brokers list not found") + } + for _, brokerElement := range brokersArray { + broker := brokerElement.(*Struct) + host, ok := broker.Get(hostKeyName).(string) + if !ok { + return errors.New("broker.host not found") + } + port, ok := broker.Get(portKeyName).(int32) + if !ok { + return errors.New("broker.port not found") + } + brokerId, ok := broker.Get(brokerKeyName).(int32) + if !ok { + return errors.New("broker.broker_id not found") + } + + if host == "" && port <= 0 { + continue + } + + newHost, newPort, err := fn(host, port, brokerId) + if err != nil { + return err + } + if host != newHost { + err := broker.Replace(hostKeyName, newHost) + if err != nil { + return err + } + } + if port != newPort { + err = broker.Replace(portKeyName, newPort) + if err != nil { + return err + } + } + } + return nil +} + type ResponseModifier interface { Apply(resp []byte) ([]byte, error) } @@ -471,6 +577,8 @@ func GetResponseModifier(apiKey int16, apiVersion int16, addressMappingFunc conf return newResponseModifier(apiKey, apiVersion, addressMappingFunc, metadataResponseSchemaVersions, modifyMetadataResponse) case apiKeyFindCoordinator: return newResponseModifier(apiKey, apiVersion, addressMappingFunc, findCoordinatorResponseSchemaVersions, modifyFindCoordinatorResponse) + case apiKeyDescribeCluster: + return newResponseModifier(apiKey, apiVersion, addressMappingFunc, describeClusterResponseSchemaVersions, modifyDescribeClusterResponse) default: return nil, nil } diff --git a/proxy/protocol/schema.go b/proxy/protocol/schema.go index 6810916c..aa824669 100644 --- a/proxy/protocol/schema.go +++ b/proxy/protocol/schema.go @@ -3,14 +3,16 @@ package protocol import ( "bytes" "fmt" - "github.com/google/uuid" "reflect" + "github.com/google/uuid" + "github.com/pkg/errors" ) var ( TypeBool = &Bool{} + TypeInt8 = &Int8{} TypeInt16 = &Int16{} TypeInt32 = &Int32{} TypeStr = &Str{} @@ -103,6 +105,34 @@ func (f *Bool) GetName() string { return "bool" } +// Field int8 + +type Int8 struct{} + +func (f *Int8) decode(pd packetDecoder) (interface{}, error) { + return pd.getInt8() +} +func (f *Int8) encode(pe packetEncoder, value interface{}) error { + in, ok := value.(int8) + if !ok { + return SchemaEncodingError{fmt.Sprintf("value %T not a int8", value)} + } + pe.putInt8(in) + return nil +} + +func (f *Int8) GetFields() []boundField { + return nil +} + +func (f *Int8) GetFieldsByName() map[string]*boundField { + return nil +} + +func (f *Int8) GetName() string { + return "int8" +} + // Field int16 type Int16 struct{} From c0d7d5eb60b94b5b07d8ae3ddce92e45f4f34ef7 Mon Sep 17 00:00:00 2001 From: Julien Limoges Date: Tue, 13 May 2025 12:33:22 +0200 Subject: [PATCH 2/9] fix: missing endpoint type --- proxy/protocol/responses.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/proxy/protocol/responses.go b/proxy/protocol/responses.go index 6ea30dd5..2a534a1e 100644 --- a/proxy/protocol/responses.go +++ b/proxy/protocol/responses.go @@ -348,7 +348,6 @@ func createDescribeClusterResponseSchemaVersions() []Schema { &Mfield{Name: "throttle_time-ms", Ty: TypeInt32}, &Mfield{Name: "error_code", Ty: TypeInt16}, &Mfield{Name: "error_message", Ty: TypeNullableStr}, - // &MField{Name: "endpoint_type", Ty: TypeInt8}, &Mfield{Name: "cluster_id", Ty: TypeStr}, &Mfield{Name: "controller_id", Ty: TypeInt32}, &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV0}, @@ -360,7 +359,7 @@ func createDescribeClusterResponseSchemaVersions() []Schema { &Mfield{Name: "error_code", Ty: TypeInt16}, &Mfield{Name: "error_message", Ty: TypeNullableStr}, &Mfield{Name: "endpoint_type", Ty: TypeInt8}, - // &MField{Name: "endpoint_type", Ty: TypeInt8}, + &Mfield{Name: "endpoint_type", Ty: TypeInt8}, &Mfield{Name: "cluster_id", Ty: TypeStr}, &Mfield{Name: "controller_id", Ty: TypeInt32}, &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV0}, @@ -372,7 +371,7 @@ func createDescribeClusterResponseSchemaVersions() []Schema { &Mfield{Name: "error_code", Ty: TypeInt16}, &Mfield{Name: "error_message", Ty: TypeNullableStr}, &Mfield{Name: "endpoint_type", Ty: TypeInt8}, - // &MField{Name: "endpoint_type", Ty: TypeInt8}, + &Mfield{Name: "endpoint_type", Ty: TypeInt8}, &Mfield{Name: "cluster_id", Ty: TypeStr}, &Mfield{Name: "controller_id", Ty: TypeInt32}, &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV2}, From 18aedc7c569fdd17687a8a4cbf54f6463bc29cf8 Mon Sep 17 00:00:00 2001 From: Julien Limoges Date: Tue, 13 May 2025 12:58:25 +0200 Subject: [PATCH 3/9] fix: field type --- proxy/protocol/responses.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/proxy/protocol/responses.go b/proxy/protocol/responses.go index 2a534a1e..a96eae26 100644 --- a/proxy/protocol/responses.go +++ b/proxy/protocol/responses.go @@ -345,36 +345,34 @@ func createDescribeClusterResponseSchemaVersions() []Schema { ) describeClusterV0 := NewSchema("describe_cluster_response_v0", - &Mfield{Name: "throttle_time-ms", Ty: TypeInt32}, + &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, &Mfield{Name: "error_code", Ty: TypeInt16}, &Mfield{Name: "error_message", Ty: TypeNullableStr}, &Mfield{Name: "cluster_id", Ty: TypeStr}, &Mfield{Name: "controller_id", Ty: TypeInt32}, - &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV0}, + &Array{Name: brokersKeyName, Ty: describeClusterBrokerV0}, &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, ) describeClusterV1 := NewSchema("describe_cluster_response_v1", - &Mfield{Name: "throttle_time-ms", Ty: TypeInt32}, + &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, &Mfield{Name: "error_code", Ty: TypeInt16}, &Mfield{Name: "error_message", Ty: TypeNullableStr}, &Mfield{Name: "endpoint_type", Ty: TypeInt8}, - &Mfield{Name: "endpoint_type", Ty: TypeInt8}, &Mfield{Name: "cluster_id", Ty: TypeStr}, &Mfield{Name: "controller_id", Ty: TypeInt32}, - &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV0}, + &Array{Name: brokersKeyName, Ty: describeClusterBrokerV0}, &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, ) describeClusterV2 := NewSchema("describe_cluster_response_v2", - &Mfield{Name: "throttle_time-ms", Ty: TypeInt32}, + &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, &Mfield{Name: "error_code", Ty: TypeInt16}, &Mfield{Name: "error_message", Ty: TypeNullableStr}, &Mfield{Name: "endpoint_type", Ty: TypeInt8}, - &Mfield{Name: "endpoint_type", Ty: TypeInt8}, &Mfield{Name: "cluster_id", Ty: TypeStr}, &Mfield{Name: "controller_id", Ty: TypeInt32}, - &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV2}, + &Array{Name: brokersKeyName, Ty: describeClusterBrokerV2}, &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, ) return []Schema{describeClusterV0, describeClusterV1, describeClusterV2} From a503c1766403aea2e3906e40660e8831a5c40a92 Mon Sep 17 00:00:00 2001 From: Julien Limoges Date: Tue, 13 May 2025 13:03:58 +0200 Subject: [PATCH 4/9] fix: add tagged fields --- proxy/protocol/responses.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/proxy/protocol/responses.go b/proxy/protocol/responses.go index a96eae26..abb9f6e6 100644 --- a/proxy/protocol/responses.go +++ b/proxy/protocol/responses.go @@ -352,6 +352,7 @@ func createDescribeClusterResponseSchemaVersions() []Schema { &Mfield{Name: "controller_id", Ty: TypeInt32}, &Array{Name: brokersKeyName, Ty: describeClusterBrokerV0}, &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, ) describeClusterV1 := NewSchema("describe_cluster_response_v1", @@ -363,6 +364,7 @@ func createDescribeClusterResponseSchemaVersions() []Schema { &Mfield{Name: "controller_id", Ty: TypeInt32}, &Array{Name: brokersKeyName, Ty: describeClusterBrokerV0}, &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, ) describeClusterV2 := NewSchema("describe_cluster_response_v2", @@ -374,6 +376,7 @@ func createDescribeClusterResponseSchemaVersions() []Schema { &Mfield{Name: "controller_id", Ty: TypeInt32}, &Array{Name: brokersKeyName, Ty: describeClusterBrokerV2}, &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, ) return []Schema{describeClusterV0, describeClusterV1, describeClusterV2} } From 31b07664259fe60727eed46fafd1c0132f86bd29 Mon Sep 17 00:00:00 2001 From: Julien Limoges Date: Wed, 14 May 2025 14:09:21 +0200 Subject: [PATCH 5/9] update --- proxy/protocol/real_decoder.go | 3 +- proxy/protocol/responses.go | 14 +++--- proxy/protocol/responses_test.go | 76 +++++++++++++++++++++++++++++--- 3 files changed, 80 insertions(+), 13 deletions(-) diff --git a/proxy/protocol/real_decoder.go b/proxy/protocol/real_decoder.go index d87b8dd4..8e7479e3 100644 --- a/proxy/protocol/real_decoder.go +++ b/proxy/protocol/real_decoder.go @@ -2,8 +2,9 @@ package protocol import ( "encoding/binary" - "github.com/google/uuid" "math" + + "github.com/google/uuid" ) var errInvalidArrayLength = PacketDecodingError{"invalid array length"} diff --git a/proxy/protocol/responses.go b/proxy/protocol/responses.go index abb9f6e6..4eac8b6f 100644 --- a/proxy/protocol/responses.go +++ b/proxy/protocol/responses.go @@ -331,9 +331,9 @@ func createFindCoordinatorResponseSchemaVersions() []Schema { func createDescribeClusterResponseSchemaVersions() []Schema { describeClusterBrokerV0 := NewSchema("describe_cluster_broker_v0", &Mfield{Name: brokerKeyName, Ty: TypeInt32}, - &Mfield{Name: hostKeyName, Ty: TypeStr}, + &Mfield{Name: hostKeyName, Ty: TypeCompactStr}, &Mfield{Name: portKeyName, Ty: TypeInt32}, - &Mfield{Name: "rack", Ty: TypeNullableStr}, + &Mfield{Name: "rack", Ty: TypeCompactNullableStr}, ) describeClusterBrokerV2 := NewSchema("describe_cluster_broker_v2", @@ -347,10 +347,10 @@ func createDescribeClusterResponseSchemaVersions() []Schema { describeClusterV0 := NewSchema("describe_cluster_response_v0", &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, &Mfield{Name: "error_code", Ty: TypeInt16}, - &Mfield{Name: "error_message", Ty: TypeNullableStr}, - &Mfield{Name: "cluster_id", Ty: TypeStr}, + &Mfield{Name: "error_message", Ty: TypeCompactNullableStr}, + &Mfield{Name: "cluster_id", Ty: TypeCompactStr}, &Mfield{Name: "controller_id", Ty: TypeInt32}, - &Array{Name: brokersKeyName, Ty: describeClusterBrokerV0}, + &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV0}, &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, &SchemaTaggedFields{Name: "response_tagged_fields"}, ) @@ -362,7 +362,7 @@ func createDescribeClusterResponseSchemaVersions() []Schema { &Mfield{Name: "endpoint_type", Ty: TypeInt8}, &Mfield{Name: "cluster_id", Ty: TypeStr}, &Mfield{Name: "controller_id", Ty: TypeInt32}, - &Array{Name: brokersKeyName, Ty: describeClusterBrokerV0}, + &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV0}, &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, &SchemaTaggedFields{Name: "response_tagged_fields"}, ) @@ -374,7 +374,7 @@ func createDescribeClusterResponseSchemaVersions() []Schema { &Mfield{Name: "endpoint_type", Ty: TypeInt8}, &Mfield{Name: "cluster_id", Ty: TypeStr}, &Mfield{Name: "controller_id", Ty: TypeInt32}, - &Array{Name: brokersKeyName, Ty: describeClusterBrokerV2}, + &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV2}, &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, &SchemaTaggedFields{Name: "response_tagged_fields"}, ) diff --git a/proxy/protocol/responses_test.go b/proxy/protocol/responses_test.go index 48e16474..3290838c 100644 --- a/proxy/protocol/responses_test.go +++ b/proxy/protocol/responses_test.go @@ -2573,11 +2573,77 @@ func testMetadataResponse(t *testing.T, apiVersion int16, payload string, expect if err != nil { t.Fatal(err) } - /* - for _, av := range dc.AttrValues() { - fmt.Printf("\"%s\",\n", av) - } - */ + a.Equal(expectedInput, dc.AttrValues()) + resp, err := EncodeSchema(s, schema) + a.Nil(err) + a.Equal(bytes, resp) + + modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, testResponseModifier2) + if err != nil { + t.Fatal(err) + } + a.Nil(err) + resp, err = modifier.Apply(resp) + a.Nil(err) + s, err = DecodeSchema(resp, schema) + a.Nil(err) + dc = newDecodeCheck() + err = dc.Traverse(s) + if err != nil { + t.Fatal(err) + } + a.Equal(expectedModified, dc.AttrValues()) +} + +func TestDescribeClusterV0(t *testing.T) { + apiVersion := int16(0) + payload := "0000000004000000010a6c6f63616c686f737400004a940000000000020a6c6f63616c686f7374000071a40000000000030a6c6f63616c686f7374000098b4000000ffffffff040000135f5f636f6e73756d65725f6f6666736574730000000000000000000000000000000001020000000000010000000100000005040000000100000002000000030400000001000000020000000301008000000000000507746f706963323293e5edee894e2cb924edb932be3d6e00018000000000000007746f70696333000000000000000000000000000000000002000500000000ffffffffffffffff04000000010000000200000003040000000100000002000000030100800000000000" + expectedInput := []string{ + "throttle_time_ms int32 0", + "error_code int16 0", + "error_message *string ", + "cluster_id string my_cluster", + "controller_id int32 0", + "[brokers]", + "brokers struct", + "broker_id int32 0", + "host string localhost", + "port int32 9092", + "rack *string ", + } + expectedModified := []string{ + "throttle_time_ms int32 0", + "error_code int16 0", + "error_message *string ", + "cluster_id string my_cluster", + "controller_id int32 0", + "[brokers]", + "brokers struct", + "broker_id int32 0", + "host string localhost", + "port int32 9092", + "rack *string ", + } + testDescribeClusterResponse(t, apiVersion, payload, expectedInput, expectedModified) +} + +func testDescribeClusterResponse(t *testing.T, apiVersion int16, payload string, expectedInput, expectedModified []string) { + bytes, err := hex.DecodeString(payload) + if err != nil { + t.Fatal(err) + } + a := assert.New(t) + + schema := describeClusterResponseSchemaVersions[apiVersion] + + s, err := DecodeSchema(bytes, schema) + a.Nil(err) + + dc := newDecodeCheck() + err = dc.Traverse(s) + if err != nil { + t.Fatal(err) + } a.Equal(expectedInput, dc.AttrValues()) resp, err := EncodeSchema(s, schema) a.Nil(err) From 93b12afaac8bb7c7c5fc05acee489bb722163c61 Mon Sep 17 00:00:00 2001 From: Maksim Zhylinski Date: Wed, 14 May 2025 16:28:50 +0200 Subject: [PATCH 6/9] Fix describe cluster message schema --- proxy/protocol/responses.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/proxy/protocol/responses.go b/proxy/protocol/responses.go index 4eac8b6f..3c0948c1 100644 --- a/proxy/protocol/responses.go +++ b/proxy/protocol/responses.go @@ -334,6 +334,10 @@ func createDescribeClusterResponseSchemaVersions() []Schema { &Mfield{Name: hostKeyName, Ty: TypeCompactStr}, &Mfield{Name: portKeyName, Ty: TypeInt32}, &Mfield{Name: "rack", Ty: TypeCompactNullableStr}, + // by the spec it should not be here. However, in kafka itself they don't check API version + // and put this field starting from v0 + // See https://github.com/tinaselenge/kafka/blob/814212f103c980f080593544079c4507f2557b08/core/src/main/scala/kafka/server/KafkaApis.scala#L3607 + &Mfield{Name: "is_fenced", Ty: TypeBool}, ) describeClusterBrokerV2 := NewSchema("describe_cluster_broker_v2", @@ -358,9 +362,9 @@ func createDescribeClusterResponseSchemaVersions() []Schema { describeClusterV1 := NewSchema("describe_cluster_response_v1", &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, &Mfield{Name: "error_code", Ty: TypeInt16}, - &Mfield{Name: "error_message", Ty: TypeNullableStr}, + &Mfield{Name: "error_message", Ty: TypeCompactNullableStr}, &Mfield{Name: "endpoint_type", Ty: TypeInt8}, - &Mfield{Name: "cluster_id", Ty: TypeStr}, + &Mfield{Name: "cluster_id", Ty: TypeCompactStr}, &Mfield{Name: "controller_id", Ty: TypeInt32}, &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV0}, &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, From 98aba68d098c506c8e0959b4e8e71ef37b3f494d Mon Sep 17 00:00:00 2001 From: Maksim Zhylinski Date: Wed, 25 Jun 2025 11:57:59 +0200 Subject: [PATCH 7/9] Fix describe cluster, cover with tests --- proxy/protocol/responses.go | 17 ++-- proxy/protocol/responses_test.go | 152 +++++++++++++++++++++++++++++-- 2 files changed, 148 insertions(+), 21 deletions(-) diff --git a/proxy/protocol/responses.go b/proxy/protocol/responses.go index 3c0948c1..34929b74 100644 --- a/proxy/protocol/responses.go +++ b/proxy/protocol/responses.go @@ -334,18 +334,15 @@ func createDescribeClusterResponseSchemaVersions() []Schema { &Mfield{Name: hostKeyName, Ty: TypeCompactStr}, &Mfield{Name: portKeyName, Ty: TypeInt32}, &Mfield{Name: "rack", Ty: TypeCompactNullableStr}, - // by the spec it should not be here. However, in kafka itself they don't check API version - // and put this field starting from v0 - // See https://github.com/tinaselenge/kafka/blob/814212f103c980f080593544079c4507f2557b08/core/src/main/scala/kafka/server/KafkaApis.scala#L3607 - &Mfield{Name: "is_fenced", Ty: TypeBool}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, ) - describeClusterBrokerV2 := NewSchema("describe_cluster_broker_v2", &Mfield{Name: brokerKeyName, Ty: TypeInt32}, - &Mfield{Name: hostKeyName, Ty: TypeStr}, + &Mfield{Name: hostKeyName, Ty: TypeCompactStr}, &Mfield{Name: portKeyName, Ty: TypeInt32}, - &Mfield{Name: "rack", Ty: TypeNullableStr}, + &Mfield{Name: "rack", Ty: TypeCompactNullableStr}, &Mfield{Name: "is_fenced", Ty: TypeBool}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, ) describeClusterV0 := NewSchema("describe_cluster_response_v0", @@ -358,7 +355,6 @@ func createDescribeClusterResponseSchemaVersions() []Schema { &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, &SchemaTaggedFields{Name: "response_tagged_fields"}, ) - describeClusterV1 := NewSchema("describe_cluster_response_v1", &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, &Mfield{Name: "error_code", Ty: TypeInt16}, @@ -370,13 +366,12 @@ func createDescribeClusterResponseSchemaVersions() []Schema { &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, &SchemaTaggedFields{Name: "response_tagged_fields"}, ) - describeClusterV2 := NewSchema("describe_cluster_response_v2", &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, &Mfield{Name: "error_code", Ty: TypeInt16}, - &Mfield{Name: "error_message", Ty: TypeNullableStr}, + &Mfield{Name: "error_message", Ty: TypeCompactNullableStr}, &Mfield{Name: "endpoint_type", Ty: TypeInt8}, - &Mfield{Name: "cluster_id", Ty: TypeStr}, + &Mfield{Name: "cluster_id", Ty: TypeCompactStr}, &Mfield{Name: "controller_id", Ty: TypeInt32}, &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV2}, &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, diff --git a/proxy/protocol/responses_test.go b/proxy/protocol/responses_test.go index 3290838c..04ebe80c 100644 --- a/proxy/protocol/responses_test.go +++ b/proxy/protocol/responses_test.go @@ -9,9 +9,10 @@ import ( "github.com/google/uuid" - "github.com/grepplabs/kafka-proxy/config" "github.com/pkg/errors" "github.com/stretchr/testify/assert" + + "github.com/grepplabs/kafka-proxy/config" ) var ( @@ -2595,9 +2596,9 @@ func testMetadataResponse(t *testing.T, apiVersion int16, payload string, expect a.Equal(expectedModified, dc.AttrValues()) } -func TestDescribeClusterV0(t *testing.T) { - apiVersion := int16(0) - payload := "0000000004000000010a6c6f63616c686f737400004a940000000000020a6c6f63616c686f7374000071a40000000000030a6c6f63616c686f7374000098b4000000ffffffff040000135f5f636f6e73756d65725f6f6666736574730000000000000000000000000000000001020000000000010000000100000005040000000100000002000000030400000001000000020000000301008000000000000507746f706963323293e5edee894e2cb924edb932be3d6e00018000000000000007746f70696333000000000000000000000000000000000002000500000000ffffffffffffffff04000000010000000200000003040000000100000002000000030100800000000000" +func TestDescribeClusterResponseV0(t *testing.T) { + payload := "000000000000000b6d795f636c75737465720000000003000000010a6c6f63616c686f737400004a940000000000020a6c6f63616c686f7374000071a400000000000000" + expectedInput := []string{ "throttle_time_ms int32 0", "error_code int16 0", @@ -2606,10 +2607,19 @@ func TestDescribeClusterV0(t *testing.T) { "controller_id int32 0", "[brokers]", "brokers struct", - "broker_id int32 0", + "broker_id int32 1", "host string localhost", - "port int32 9092", + "port int32 19092", "rack *string ", + "[response_tagged_fields]", + "brokers struct", + "broker_id int32 2", + "host string localhost", + "port int32 29092", + "rack *string ", + "[response_tagged_fields]", + "cluster_authorized_operations int32 0", + "[response_tagged_fields]", } expectedModified := []string{ "throttle_time_ms int32 0", @@ -2619,12 +2629,132 @@ func TestDescribeClusterV0(t *testing.T) { "controller_id int32 0", "[brokers]", "brokers struct", - "broker_id int32 0", + "broker_id int32 1", + "host string myhost1", // replaced + "port int32 34001", // replaced + "rack *string ", + "[response_tagged_fields]", + "brokers struct", + "broker_id int32 2", + "host string myhost2", // replaced + "port int32 34002", // replaced + "rack *string ", + "[response_tagged_fields]", + "cluster_authorized_operations int32 0", + "[response_tagged_fields]", + } + + testDescribeClusterResponse(t, 0, payload, expectedInput, expectedModified) +} + +func TestDescribeClusterResponseV1(t *testing.T) { + payload := "00000000000000010b6d795f636c75737465720000000003000000010a6c6f63616c686f737400004a940000000000020a6c6f63616c686f7374000071a400000000000000" + + expectedInput := []string{ + "throttle_time_ms int32 0", + "error_code int16 0", + "error_message *string ", + "endpoint_type int8 1", + "cluster_id string my_cluster", + "controller_id int32 0", + "[brokers]", + "brokers struct", + "broker_id int32 1", "host string localhost", - "port int32 9092", + "port int32 19092", + "rack *string ", + "[response_tagged_fields]", + "brokers struct", + "broker_id int32 2", + "host string localhost", + "port int32 29092", + "rack *string ", + "[response_tagged_fields]", + "cluster_authorized_operations int32 0", + "[response_tagged_fields]", + } + expectedModified := []string{ + "throttle_time_ms int32 0", + "error_code int16 0", + "error_message *string ", + "endpoint_type int8 1", + "cluster_id string my_cluster", + "controller_id int32 0", + "[brokers]", + "brokers struct", + "broker_id int32 1", + "host string myhost1", // replaced + "port int32 34001", // replaced + "rack *string ", + "[response_tagged_fields]", + "brokers struct", + "broker_id int32 2", + "host string myhost2", // replaced + "port int32 34002", // replaced + "rack *string ", + "[response_tagged_fields]", + "cluster_authorized_operations int32 0", + "[response_tagged_fields]", + } + + testDescribeClusterResponse(t, 1, payload, expectedInput, expectedModified) +} + +func TestDescribeClusterResponseV2(t *testing.T) { + payload := "00000000000000010b6d795f636c75737465720000000003000000010a6c6f63616c686f737400004a94000000000000020a6c6f63616c686f7374000071a40000000000000000" + + expectedInput := []string{ + "throttle_time_ms int32 0", + "error_code int16 0", + "error_message *string ", + "endpoint_type int8 1", + "cluster_id string my_cluster", + "controller_id int32 0", + "[brokers]", + "brokers struct", + "broker_id int32 1", + "host string localhost", + "port int32 19092", + "rack *string ", + "is_fenced bool false", + "[response_tagged_fields]", + "brokers struct", + "broker_id int32 2", + "host string localhost", + "port int32 29092", "rack *string ", + "is_fenced bool false", + "[response_tagged_fields]", + "cluster_authorized_operations int32 0", + "[response_tagged_fields]", } - testDescribeClusterResponse(t, apiVersion, payload, expectedInput, expectedModified) + expectedModified := []string{ + "throttle_time_ms int32 0", + "error_code int16 0", + "error_message *string ", + "endpoint_type int8 1", + "cluster_id string my_cluster", + "controller_id int32 0", + "[brokers]", + "brokers struct", + "broker_id int32 1", + "host string myhost1", // replaced + "port int32 34001", // replaced + "rack *string ", + "is_fenced bool false", + "[response_tagged_fields]", + "brokers struct", + "broker_id int32 2", + "host string myhost2", // replaced + "port int32 34002", // replaced + "rack *string ", + "is_fenced bool false", + "[response_tagged_fields]", + "cluster_authorized_operations int32 0", + "[response_tagged_fields]", + } + + testDescribeClusterResponse(t, 2, payload, expectedInput, expectedModified) } func testDescribeClusterResponse(t *testing.T, apiVersion int16, payload string, expectedInput, expectedModified []string) { @@ -2649,7 +2779,7 @@ func testDescribeClusterResponse(t *testing.T, apiVersion int16, payload string, a.Nil(err) a.Equal(bytes, resp) - modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, testResponseModifier2) + modifier, err := GetResponseModifier(apiKeyDescribeCluster, apiVersion, testResponseModifier2) if err != nil { t.Fatal(err) } @@ -3228,6 +3358,8 @@ func (t *decodeCheck) value(s *Struct, arg interface{}, sindex int) error { switch v := arg.(type) { case bool: t.append(name, "bool", v) + case int8: + t.append(name, "int8", v) case int16: t.append(name, "int16", v) case int32: From 2b922f4baa227d15f53330fbeb44a42a15b97ddb Mon Sep 17 00:00:00 2001 From: Maksim Zhylinski Date: Mon, 30 Jun 2025 10:30:23 +0200 Subject: [PATCH 8/9] Return commented out code --- proxy/protocol/responses_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/proxy/protocol/responses_test.go b/proxy/protocol/responses_test.go index 04ebe80c..146c6438 100644 --- a/proxy/protocol/responses_test.go +++ b/proxy/protocol/responses_test.go @@ -2574,6 +2574,11 @@ func testMetadataResponse(t *testing.T, apiVersion int16, payload string, expect if err != nil { t.Fatal(err) } + /* + for _, av := range dc.AttrValues() { + fmt.Printf("\"%s\",\n", av) + } + */ a.Equal(expectedInput, dc.AttrValues()) resp, err := EncodeSchema(s, schema) a.Nil(err) From 058a86de893d75ece5c84d1d6e3d3b7a1e040ebd Mon Sep 17 00:00:00 2001 From: Maksim Zhylinski Date: Mon, 30 Jun 2025 10:36:19 +0200 Subject: [PATCH 9/9] Add a newline --- proxy/protocol/schema.go | 1 + 1 file changed, 1 insertion(+) diff --git a/proxy/protocol/schema.go b/proxy/protocol/schema.go index aa824669..8b9e8c5d 100644 --- a/proxy/protocol/schema.go +++ b/proxy/protocol/schema.go @@ -112,6 +112,7 @@ type Int8 struct{} func (f *Int8) decode(pd packetDecoder) (interface{}, error) { return pd.getInt8() } + func (f *Int8) encode(pe packetEncoder, value interface{}) error { in, ok := value.(int8) if !ok {