Skip to content

Commit 99eccdf

Browse files
committed
Add support for MSK clusters with kraft metadata nodes
Since kafka version 3.7.0 MSK has supported using kraft metadata nodes instead of zookeeper. https://aws.amazon.com/about-aws/whats-new/2024/05/amazon-msk-kraft-mode-apache-kafka-clusters/ When running prometheus-msk-discovery against a cluster in kraft node there is a panic like this: ``` 2025/01/30 12:03:36 http: panic serving 10.3.57.169:45616: runtime error: invalid memory address or nil pointer dereference goroutine 18 [running]: net/http.(*conn).serve.func1() /usr/local/go/src/net/http/server.go:1868 +0xb9 panic({0x7675e0?, 0xa8f9c0?}) /usr/local/go/src/runtime/panic.go:920 +0x270 main.getBrokers({0x86e250?, 0xc00013a140}, {0xc00002bf20, 0x5b}) /src/main.go:131 +0x24d main.buildClusterDetails({0x86e250?, 0xc00013a140?}, {0x0, 0xc000268730, 0xc000232b40, 0xc000232a80, 0xc000232a70, 0xc000226930, 0xc000226948, 0xc000232b20, ...}) /src/main.go:140 +0x56 main.GetStaticConfigs({0x86e250, 0xc00013a140}, {0xc00025ba18, 0x1, 0xc0000ef988?}) /src/main.go:199 +0x26b main.httpSD.func1({0x86feb0, 0xc00015a000}, 0xc0000efb18?) /src/main.go:248 +0xbb net/http.HandlerFunc.ServeHTTP(0x440480?, {0x86feb0?, 0xc00015a000?}, 0x6457fa?) /usr/local/go/src/net/http/server.go:2136 +0x29 net/http.(*ServeMux).ServeHTTP(0xace040?, {0x86feb0, 0xc00015a000}, 0xc000154000) /usr/local/go/src/net/http/server.go:2514 +0x142 net/http.serverHandler.ServeHTTP({0xc000150090?}, {0x86feb0?, 0xc00015a000?}, 0x6?) /usr/local/go/src/net/http/server.go:2938 +0x8e net/http.(*conn).serve(0xc00011a1b0, {0x8704c0, 0xc0000a3f80}) /usr/local/go/src/net/http/server.go:2009 +0x5f4 created by net/http.(*Server).Serve in goroutine 1 /usr/local/go/src/net/http/server.go:3086 +0x5cb ``` This is caused because the list nodes api returns records like this: ``` { "AddedToClusterTime": null, "BrokerNodeInfo": null, "ControllerNodeInfo": { "Endpoints": [ "c-10002.foo.xxxxxx.c7.kafka.us-east-1.amazonaws.com" ] }, "InstanceType": null, "NodeARN": null, "NodeType": "CONTROLLER", "ZookeeperNodeInfo": null } ``` which have a nil BrokerNodeInfo This PR fixes this bug, and also adds these controller nodes to the target endpoints. When JMX Exporter and Node Exporter are enabled on the cluster, these nodes only seem to be running JMX Exporter, so we are only adding this endpoint for these nodes.
1 parent c686e6d commit 99eccdf

File tree

3 files changed

+74
-57
lines changed

3 files changed

+74
-57
lines changed

go.mod

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,22 @@ module github.com/statsbomb/prometheus-msk-discovery
33
go 1.21
44

55
require (
6-
github.com/aws/aws-sdk-go-v2/config v1.1.4
7-
github.com/aws/aws-sdk-go-v2/service/kafka v1.2.1
6+
github.com/aws/aws-sdk-go-v2/config v1.29.2
7+
github.com/aws/aws-sdk-go-v2/service/kafka v1.38.13
88
gopkg.in/yaml.v2 v2.4.0
99
)
1010

1111
require (
12-
github.com/aws/aws-sdk-go-v2 v1.3.1 // indirect
13-
github.com/aws/aws-sdk-go-v2/credentials v1.1.4 // indirect
14-
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.5 // indirect
15-
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.5 // indirect
16-
github.com/aws/aws-sdk-go-v2/service/sso v1.1.4 // indirect
17-
github.com/aws/aws-sdk-go-v2/service/sts v1.2.1 // indirect
18-
github.com/aws/smithy-go v1.3.0 // indirect
12+
github.com/aws/aws-sdk-go-v2 v1.34.0 // indirect
13+
github.com/aws/aws-sdk-go-v2/credentials v1.17.55 // indirect
14+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.25 // indirect
15+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.29 // indirect
16+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.29 // indirect
17+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2 // indirect
18+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2 // indirect
19+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.10 // indirect
20+
github.com/aws/aws-sdk-go-v2/service/sso v1.24.12 // indirect
21+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.11 // indirect
22+
github.com/aws/aws-sdk-go-v2/service/sts v1.33.10 // indirect
23+
github.com/aws/smithy-go v1.22.2 // indirect
1924
)

go.sum

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,32 @@
1-
github.com/aws/aws-sdk-go-v2 v1.3.1 h1:KKstwh6zsuUhQH3GvSor7M3am/+imPqydFOZHzlkTKc=
2-
github.com/aws/aws-sdk-go-v2 v1.3.1/go.mod h1:5SmWRTjN6uTRFNCc7rR69xHsdcUJnthmaRHGDsYhpTE=
3-
github.com/aws/aws-sdk-go-v2/config v1.1.4 h1:2hjdDldmJJjb+rFieQySfOFt4WwxKZJVTEB6RBI74T4=
4-
github.com/aws/aws-sdk-go-v2/config v1.1.4/go.mod h1:op05ummoVoAqctpA80jVt/+hvEtLfuKmDyx0bIuvfbE=
5-
github.com/aws/aws-sdk-go-v2/credentials v1.1.4 h1:whYYw2srG+zUQzUw4LhML83f+xd22Vm7gv0I7aJglc8=
6-
github.com/aws/aws-sdk-go-v2/credentials v1.1.4/go.mod h1:UQwsT2w2XelrWoVV2v/zL2uce1RxmVCiHaZsoKLamZg=
7-
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.5 h1:5gCrezE41xYQHWDsDkJD9nT22tUH3s+Zrvs4c3v2FGc=
8-
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.5/go.mod h1:z/NKNlYxMzphl7TzjV+ctUebHF4CFNGGlSvmV/NKcJU=
9-
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.5 h1:GbW4bbc1iED64aIL203xcGSfLzWOWuIdnKV0guMcJvg=
10-
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.5/go.mod h1:MW0O/RpmVpS6MWKn6W03XEJmqXlG7+d3iaYLzkd2fAc=
11-
github.com/aws/aws-sdk-go-v2/service/kafka v1.2.1 h1:70IRtg4hU1mQ5aAtjEHPQR+KCeVIDwLOpvbofiSEODE=
12-
github.com/aws/aws-sdk-go-v2/service/kafka v1.2.1/go.mod h1:2gy+VDVpxUvxQRdjpFlZNhXSjybrxnYrb9Byuzz855I=
13-
github.com/aws/aws-sdk-go-v2/service/sso v1.1.4 h1:Tr/SsFDXWN8rntdzTNrDs/MvuBXRCjY6xvJrPFUPKRM=
14-
github.com/aws/aws-sdk-go-v2/service/sso v1.1.4/go.mod h1:yQayEbOWH75NaKFylsFocBc3yanYEGndlOaH4i/Lvno=
15-
github.com/aws/aws-sdk-go-v2/service/sts v1.2.1 h1:1koRvKlZMN+FhTGV5f4q6vRHXNJzeZlPKzbs1/Y32Kg=
16-
github.com/aws/aws-sdk-go-v2/service/sts v1.2.1/go.mod h1:L1LH5nHMXxdkKj057ZUx7Wi50CCrkZ+9jkTnBnY2j/w=
17-
github.com/aws/smithy-go v1.3.0 h1:awbB2OJBZ/Txj+c4q+qhDQs3Ob0sRhBuIIkOD4Aq8yc=
18-
github.com/aws/smithy-go v1.3.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
19-
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
20-
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
21-
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
22-
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
23-
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
24-
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
25-
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
26-
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
27-
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
1+
github.com/aws/aws-sdk-go-v2 v1.34.0 h1:9iyL+cjifckRGEVpRKZP3eIxVlL06Qk1Tk13vreaVQU=
2+
github.com/aws/aws-sdk-go-v2 v1.34.0/go.mod h1:JgstGg0JjWU1KpVJjD5H0y0yyAIpSdKEq556EI6yOOM=
3+
github.com/aws/aws-sdk-go-v2/config v1.29.2 h1:JuIxOEPcSKpMB0J+khMjznG9LIhIBdmqNiEcPclnwqc=
4+
github.com/aws/aws-sdk-go-v2/config v1.29.2/go.mod h1:HktTHregOZwNSM/e7WTfVSu9RCX+3eOv+6ij27PtaYs=
5+
github.com/aws/aws-sdk-go-v2/credentials v1.17.55 h1:CDhKnDEaGkLA5ZszV/qw5uwN5M8rbv9Cl0JRN+PRsaM=
6+
github.com/aws/aws-sdk-go-v2/credentials v1.17.55/go.mod h1:kPD/vj+RB5MREDUky376+zdnjZpR+WgdBBvwrmnlmKE=
7+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.25 h1:kU7tmXNaJ07LsyN3BUgGqAmVmQtq0w6duVIHAKfp0/w=
8+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.25/go.mod h1:OiC8+OiqrURb1wrwmr/UbOVLFSWEGxjinj5C299VQdo=
9+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.29 h1:Ej0Rf3GMv50Qh4G4852j2djtoDb7AzQ7MuQeFHa3D70=
10+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.29/go.mod h1:oeNTC7PwJNoM5AznVr23wxhLnuJv0ZDe5v7w0wqIs9M=
11+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.29 h1:6e8a71X+9GfghragVevC5bZqvATtc3mAMgxpSNbgzF0=
12+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.29/go.mod h1:c4jkZiQ+BWpNqq7VtrxjwISrLrt/VvPq3XiopkUIolI=
13+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2 h1:Pg9URiobXy85kgFev3og2CuOZ8JZUBENF+dcgWBaYNk=
14+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
15+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2 h1:D4oz8/CzT9bAEYtVhSBmFj2dNOtaHOtMKc2vHBwYizA=
16+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2/go.mod h1:Za3IHqTQ+yNcRHxu1OFucBh0ACZT4j4VQFF0BqpZcLY=
17+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.10 h1:hN4yJBGswmFTOVYqmbz1GBs9ZMtQe8SrYxPwrkrlRv8=
18+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.10/go.mod h1:TsxON4fEZXyrKY+D+3d2gSTyJkGORexIYab9PTf56DA=
19+
github.com/aws/aws-sdk-go-v2/service/kafka v1.38.13 h1:PwPoRsGAb4A9XmOUg8MhtjY5UZgt018TDuRP5OtDfxQ=
20+
github.com/aws/aws-sdk-go-v2/service/kafka v1.38.13/go.mod h1:z7Et3O89RoTdBmGi/UN82OSkuYQLKmlvJcRmyorMvhw=
21+
github.com/aws/aws-sdk-go-v2/service/sso v1.24.12 h1:kznaW4f81mNMlREkU9w3jUuJvU5g/KsqDV43ab7Rp6s=
22+
github.com/aws/aws-sdk-go-v2/service/sso v1.24.12/go.mod h1:bZy9r8e0/s0P7BSDHgMLXK2KvdyRRBIQ2blKlvLt0IU=
23+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.11 h1:mUwIpAvILeKFnRx4h1dEgGEFGuV8KJ3pEScZWVFYuZA=
24+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.11/go.mod h1:JDJtD+b8HNVv71axz8+S5492KM8wTzHRFpMKQbPlYxw=
25+
github.com/aws/aws-sdk-go-v2/service/sts v1.33.10 h1:g9d+TOsu3ac7SgmY2dUf1qMgu/uJVTlQ4VCbH6hRxSw=
26+
github.com/aws/aws-sdk-go-v2/service/sts v1.33.10/go.mod h1:WZfNmntu92HO44MVZAubQaz3qCuIdeOdog2sADfU6hU=
27+
github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ=
28+
github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
2829
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
2930
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
30-
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
3131
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
3232
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

main.go

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,13 @@ type PrometheusStaticConfig struct {
5555

5656
// clusterDetails holds details of cluster, each broker, and which OpenMetrics endpoints are enabled
5757
type clusterDetails struct {
58-
ClusterName string
59-
ClusterArn string
60-
Brokers []string
58+
ClusterName string
59+
ClusterArn string
60+
Brokers []brokerDetails
61+
}
62+
63+
type brokerDetails struct {
64+
Endpoint string
6165
JmxExporter bool
6266
NodeExporter bool
6367
}
@@ -88,11 +92,11 @@ func (c clusterDetails) StaticConfig() PrometheusStaticConfig {
8892

8993
var targets []string
9094
for _, b := range c.Brokers {
91-
if c.JmxExporter {
92-
targets = append(targets, fmt.Sprintf("%s:%d", b, jmxExporterPort))
95+
if b.JmxExporter {
96+
targets = append(targets, fmt.Sprintf("%s:%d", b.Endpoint, jmxExporterPort))
9397
}
94-
if c.NodeExporter {
95-
targets = append(targets, fmt.Sprintf("%s:%d", b, nodeExporterPort))
98+
if b.NodeExporter {
99+
targets = append(targets, fmt.Sprintf("%s:%d", b.Endpoint, nodeExporterPort))
96100
}
97101
}
98102
ret.Targets = targets
@@ -116,9 +120,9 @@ func getClusters(svc kafkaClient) (*kafka.ListClustersOutput, error) {
116120
}
117121

118122
// getBrokers returns a slice of broker hosts without ports
119-
func getBrokers(svc kafkaClient, arn string) ([]string, error) {
120-
input := kafka.ListNodesInput{ClusterArn: &arn}
121-
var brokers []string
123+
func getBrokers(svc kafkaClient, c types.ClusterInfo) ([]brokerDetails, error) {
124+
input := kafka.ListNodesInput{ClusterArn: c.ClusterArn}
125+
var brokers []brokerDetails
122126

123127
p := kafka.NewListNodesPaginator(svc, &input)
124128
for p.HasMorePages() {
@@ -128,7 +132,20 @@ func getBrokers(svc kafkaClient, arn string) ([]string, error) {
128132
}
129133

130134
for _, b := range page.NodeInfoList {
131-
brokers = append(brokers, b.BrokerNodeInfo.Endpoints...)
135+
if b.BrokerNodeInfo != nil {
136+
details := brokerDetails{
137+
Endpoint: b.BrokerNodeInfo.Endpoints[0],
138+
JmxExporter: *c.OpenMonitoring.Prometheus.JmxExporter.EnabledInBroker,
139+
NodeExporter: *c.OpenMonitoring.Prometheus.NodeExporter.EnabledInBroker,
140+
}
141+
brokers = append(brokers, details)
142+
} else if b.ControllerNodeInfo != nil {
143+
details := brokerDetails{
144+
Endpoint: b.ControllerNodeInfo.Endpoints[0],
145+
JmxExporter: *c.OpenMonitoring.Prometheus.JmxExporter.EnabledInBroker,
146+
}
147+
brokers = append(brokers, details)
148+
}
132149
}
133150
}
134151

@@ -137,18 +154,16 @@ func getBrokers(svc kafkaClient, arn string) ([]string, error) {
137154

138155
// buildClusterDetails extracts the relevant details from a ClusterInfo and returns a ClusterDetails
139156
func buildClusterDetails(svc kafkaClient, c types.ClusterInfo) (clusterDetails, error) {
140-
brokers, err := getBrokers(svc, *c.ClusterArn)
157+
brokers, err := getBrokers(svc, c)
141158
if err != nil {
142159
fmt.Println(err)
143160
return clusterDetails{}, err
144161
}
145162

146163
cluster := clusterDetails{
147-
ClusterName: *c.ClusterName,
148-
ClusterArn: *c.ClusterArn,
149-
Brokers: brokers,
150-
JmxExporter: c.OpenMonitoring.Prometheus.JmxExporter.EnabledInBroker,
151-
NodeExporter: c.OpenMonitoring.Prometheus.NodeExporter.EnabledInBroker,
164+
ClusterName: *c.ClusterName,
165+
ClusterArn: *c.ClusterArn,
166+
Brokers: brokers,
152167
}
153168
return cluster, nil
154169
}
@@ -201,9 +216,6 @@ func GetStaticConfigs(svc kafkaClient, opt_filter ...Filter) ([]PrometheusStatic
201216
return []PrometheusStaticConfig{}, err
202217
}
203218

204-
if !clusterDetails.JmxExporter && !clusterDetails.NodeExporter {
205-
continue
206-
}
207219
staticConfigs = append(staticConfigs, clusterDetails.StaticConfig())
208220
}
209221
return staticConfigs, nil

0 commit comments

Comments
 (0)