|
8 | 8 | "net" |
9 | 9 |
|
10 | 10 | "github.com/scholzj/go-kafka-protocol/api/apiversions" |
| 11 | + "github.com/scholzj/go-kafka-protocol/api/describecluster" |
11 | 12 | "github.com/scholzj/go-kafka-protocol/api/findcoordinator" |
12 | 13 | "github.com/scholzj/go-kafka-protocol/api/metadata" |
13 | 14 | "github.com/scholzj/go-kafka-protocol/protocol" |
@@ -66,7 +67,7 @@ func (p *Proksy) BrokerToClient(client net.Conn, broker httpstream.Stream, shutd |
66 | 67 | } |
67 | 68 |
|
68 | 69 | for i := range *metadataResponse.Brokers { |
69 | | - (*metadataResponse.Brokers)[i].Host = "localhost" |
| 70 | + (*metadataResponse.Brokers)[i].Host = ptr.To("localhost") |
70 | 71 | (*metadataResponse.Brokers)[i].Port = int32(p.PortMapping[(*metadataResponse.Brokers)[i].NodeId]) |
71 | 72 | } |
72 | 73 |
|
@@ -119,6 +120,28 @@ func (p *Proksy) BrokerToClient(client net.Conn, broker httpstream.Stream, shutd |
119 | 120 | slog.Error("<- Failed to re-encode ApiVersions response", "node", p.NodeId, "error", err) |
120 | 121 | } |
121 | 122 | response.Body = buf |
| 123 | + } else if response.ApiKey == 60 { |
| 124 | + describeCluster := describecluster.DescribeClusterResponse{} |
| 125 | + err := describeCluster.Read(response) |
| 126 | + if err != nil { |
| 127 | + slog.Error("<- Failed to decode DescribeCluster response", "node", p.NodeId, "error", err) |
| 128 | + } |
| 129 | + |
| 130 | + if describeCluster.EndpointType == 1 && describeCluster.Brokers != nil { |
| 131 | + for i := range *describeCluster.Brokers { |
| 132 | + (*describeCluster.Brokers)[i].Host = ptr.To("localhost") |
| 133 | + (*describeCluster.Brokers)[i].Port = int32(p.PortMapping[(*describeCluster.Brokers)[i].BrokerId]) |
| 134 | + } |
| 135 | + } |
| 136 | + |
| 137 | + slog.Log(context.Background(), TraceLevel, describeCluster.PrettyPrint()) |
| 138 | + |
| 139 | + buf := bytes.NewBuffer(make([]byte, 0)) |
| 140 | + err = describeCluster.Write(buf) |
| 141 | + if err != nil { |
| 142 | + slog.Error("<- Failed to re-encode DescribeCluster response", "node", p.NodeId, "error", err) |
| 143 | + } |
| 144 | + response.Body = buf |
122 | 145 | } |
123 | 146 |
|
124 | 147 | slog.Debug("<- Proxying response from remote", "node", p.NodeId) |
|
0 commit comments