Skip to content

Commit 7c3aef6

Browse files
committed
Add support for MSK clusters with kraft metadata nodes
Since kafka version 3.7.0 MSK has supported using kraft metadata nodes instead of zookeeper. https://aws.amazon.com/about-aws/whats-new/2024/05/amazon-msk-kraft-mode-apache-kafka-clusters/ When running prometheus-msk-discovery against a cluster in kraft node there is a panic like this: ``` 2025/01/30 12:03:36 http: panic serving 10.3.57.169:45616: runtime error: invalid memory address or nil pointer dereference goroutine 18 [running]: net/http.(*conn).serve.func1() /usr/local/go/src/net/http/server.go:1868 +0xb9 panic({0x7675e0?, 0xa8f9c0?}) /usr/local/go/src/runtime/panic.go:920 +0x270 main.getBrokers({0x86e250?, 0xc00013a140}, {0xc00002bf20, 0x5b}) /src/main.go:131 +0x24d main.buildClusterDetails({0x86e250?, 0xc00013a140?}, {0x0, 0xc000268730, 0xc000232b40, 0xc000232a80, 0xc000232a70, 0xc000226930, 0xc000226948, 0xc000232b20, ...}) /src/main.go:140 +0x56 main.GetStaticConfigs({0x86e250, 0xc00013a140}, {0xc00025ba18, 0x1, 0xc0000ef988?}) /src/main.go:199 +0x26b main.httpSD.func1({0x86feb0, 0xc00015a000}, 0xc0000efb18?) /src/main.go:248 +0xbb net/http.HandlerFunc.ServeHTTP(0x440480?, {0x86feb0?, 0xc00015a000?}, 0x6457fa?) /usr/local/go/src/net/http/server.go:2136 +0x29 net/http.(*ServeMux).ServeHTTP(0xace040?, {0x86feb0, 0xc00015a000}, 0xc000154000) /usr/local/go/src/net/http/server.go:2514 +0x142 net/http.serverHandler.ServeHTTP({0xc000150090?}, {0x86feb0?, 0xc00015a000?}, 0x6?) /usr/local/go/src/net/http/server.go:2938 +0x8e net/http.(*conn).serve(0xc00011a1b0, {0x8704c0, 0xc0000a3f80}) /usr/local/go/src/net/http/server.go:2009 +0x5f4 created by net/http.(*Server).Serve in goroutine 1 /usr/local/go/src/net/http/server.go:3086 +0x5cb ``` This is caused because the list nodes api returns records like this: ``` { "AddedToClusterTime": null, "BrokerNodeInfo": null, "ControllerNodeInfo": { "Endpoints": [ "c-10002.foo.xxxxxx.c7.kafka.us-east-1.amazonaws.com" ] }, "InstanceType": null, "NodeARN": null, "NodeType": "CONTROLLER", "ZookeeperNodeInfo": null } ``` which have a nil BrokerNodeInfo This PR fixes this bug, and also adds these controller nodes to the target endpoints. When JMX Exporter and Node Exporter are enabled on the cluster, these nodes only seem to be running JMX Exporter, so we are only adding this endpoint for these nodes.
1 parent c686e6d commit 7c3aef6

File tree

4 files changed

+152
-67
lines changed

4 files changed

+152
-67
lines changed

go.mod

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,22 @@ module github.com/statsbomb/prometheus-msk-discovery
33
go 1.21
44

55
require (
6-
github.com/aws/aws-sdk-go-v2/config v1.1.4
7-
github.com/aws/aws-sdk-go-v2/service/kafka v1.2.1
6+
github.com/aws/aws-sdk-go-v2/config v1.29.2
7+
github.com/aws/aws-sdk-go-v2/service/kafka v1.38.13
88
gopkg.in/yaml.v2 v2.4.0
99
)
1010

1111
require (
12-
github.com/aws/aws-sdk-go-v2 v1.3.1 // indirect
13-
github.com/aws/aws-sdk-go-v2/credentials v1.1.4 // indirect
14-
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.5 // indirect
15-
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.5 // indirect
16-
github.com/aws/aws-sdk-go-v2/service/sso v1.1.4 // indirect
17-
github.com/aws/aws-sdk-go-v2/service/sts v1.2.1 // indirect
18-
github.com/aws/smithy-go v1.3.0 // indirect
12+
github.com/aws/aws-sdk-go-v2 v1.34.0 // indirect
13+
github.com/aws/aws-sdk-go-v2/credentials v1.17.55 // indirect
14+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.25 // indirect
15+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.29 // indirect
16+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.29 // indirect
17+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2 // indirect
18+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2 // indirect
19+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.10 // indirect
20+
github.com/aws/aws-sdk-go-v2/service/sso v1.24.12 // indirect
21+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.11 // indirect
22+
github.com/aws/aws-sdk-go-v2/service/sts v1.33.10 // indirect
23+
github.com/aws/smithy-go v1.22.2 // indirect
1924
)

go.sum

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,32 @@
1-
github.com/aws/aws-sdk-go-v2 v1.3.1 h1:KKstwh6zsuUhQH3GvSor7M3am/+imPqydFOZHzlkTKc=
2-
github.com/aws/aws-sdk-go-v2 v1.3.1/go.mod h1:5SmWRTjN6uTRFNCc7rR69xHsdcUJnthmaRHGDsYhpTE=
3-
github.com/aws/aws-sdk-go-v2/config v1.1.4 h1:2hjdDldmJJjb+rFieQySfOFt4WwxKZJVTEB6RBI74T4=
4-
github.com/aws/aws-sdk-go-v2/config v1.1.4/go.mod h1:op05ummoVoAqctpA80jVt/+hvEtLfuKmDyx0bIuvfbE=
5-
github.com/aws/aws-sdk-go-v2/credentials v1.1.4 h1:whYYw2srG+zUQzUw4LhML83f+xd22Vm7gv0I7aJglc8=
6-
github.com/aws/aws-sdk-go-v2/credentials v1.1.4/go.mod h1:UQwsT2w2XelrWoVV2v/zL2uce1RxmVCiHaZsoKLamZg=
7-
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.5 h1:5gCrezE41xYQHWDsDkJD9nT22tUH3s+Zrvs4c3v2FGc=
8-
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.5/go.mod h1:z/NKNlYxMzphl7TzjV+ctUebHF4CFNGGlSvmV/NKcJU=
9-
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.5 h1:GbW4bbc1iED64aIL203xcGSfLzWOWuIdnKV0guMcJvg=
10-
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.5/go.mod h1:MW0O/RpmVpS6MWKn6W03XEJmqXlG7+d3iaYLzkd2fAc=
11-
github.com/aws/aws-sdk-go-v2/service/kafka v1.2.1 h1:70IRtg4hU1mQ5aAtjEHPQR+KCeVIDwLOpvbofiSEODE=
12-
github.com/aws/aws-sdk-go-v2/service/kafka v1.2.1/go.mod h1:2gy+VDVpxUvxQRdjpFlZNhXSjybrxnYrb9Byuzz855I=
13-
github.com/aws/aws-sdk-go-v2/service/sso v1.1.4 h1:Tr/SsFDXWN8rntdzTNrDs/MvuBXRCjY6xvJrPFUPKRM=
14-
github.com/aws/aws-sdk-go-v2/service/sso v1.1.4/go.mod h1:yQayEbOWH75NaKFylsFocBc3yanYEGndlOaH4i/Lvno=
15-
github.com/aws/aws-sdk-go-v2/service/sts v1.2.1 h1:1koRvKlZMN+FhTGV5f4q6vRHXNJzeZlPKzbs1/Y32Kg=
16-
github.com/aws/aws-sdk-go-v2/service/sts v1.2.1/go.mod h1:L1LH5nHMXxdkKj057ZUx7Wi50CCrkZ+9jkTnBnY2j/w=
17-
github.com/aws/smithy-go v1.3.0 h1:awbB2OJBZ/Txj+c4q+qhDQs3Ob0sRhBuIIkOD4Aq8yc=
18-
github.com/aws/smithy-go v1.3.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
19-
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
20-
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
21-
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
22-
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
23-
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
24-
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
25-
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
26-
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
27-
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
1+
github.com/aws/aws-sdk-go-v2 v1.34.0 h1:9iyL+cjifckRGEVpRKZP3eIxVlL06Qk1Tk13vreaVQU=
2+
github.com/aws/aws-sdk-go-v2 v1.34.0/go.mod h1:JgstGg0JjWU1KpVJjD5H0y0yyAIpSdKEq556EI6yOOM=
3+
github.com/aws/aws-sdk-go-v2/config v1.29.2 h1:JuIxOEPcSKpMB0J+khMjznG9LIhIBdmqNiEcPclnwqc=
4+
github.com/aws/aws-sdk-go-v2/config v1.29.2/go.mod h1:HktTHregOZwNSM/e7WTfVSu9RCX+3eOv+6ij27PtaYs=
5+
github.com/aws/aws-sdk-go-v2/credentials v1.17.55 h1:CDhKnDEaGkLA5ZszV/qw5uwN5M8rbv9Cl0JRN+PRsaM=
6+
github.com/aws/aws-sdk-go-v2/credentials v1.17.55/go.mod h1:kPD/vj+RB5MREDUky376+zdnjZpR+WgdBBvwrmnlmKE=
7+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.25 h1:kU7tmXNaJ07LsyN3BUgGqAmVmQtq0w6duVIHAKfp0/w=
8+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.25/go.mod h1:OiC8+OiqrURb1wrwmr/UbOVLFSWEGxjinj5C299VQdo=
9+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.29 h1:Ej0Rf3GMv50Qh4G4852j2djtoDb7AzQ7MuQeFHa3D70=
10+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.29/go.mod h1:oeNTC7PwJNoM5AznVr23wxhLnuJv0ZDe5v7w0wqIs9M=
11+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.29 h1:6e8a71X+9GfghragVevC5bZqvATtc3mAMgxpSNbgzF0=
12+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.29/go.mod h1:c4jkZiQ+BWpNqq7VtrxjwISrLrt/VvPq3XiopkUIolI=
13+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2 h1:Pg9URiobXy85kgFev3og2CuOZ8JZUBENF+dcgWBaYNk=
14+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
15+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2 h1:D4oz8/CzT9bAEYtVhSBmFj2dNOtaHOtMKc2vHBwYizA=
16+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2/go.mod h1:Za3IHqTQ+yNcRHxu1OFucBh0ACZT4j4VQFF0BqpZcLY=
17+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.10 h1:hN4yJBGswmFTOVYqmbz1GBs9ZMtQe8SrYxPwrkrlRv8=
18+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.10/go.mod h1:TsxON4fEZXyrKY+D+3d2gSTyJkGORexIYab9PTf56DA=
19+
github.com/aws/aws-sdk-go-v2/service/kafka v1.38.13 h1:PwPoRsGAb4A9XmOUg8MhtjY5UZgt018TDuRP5OtDfxQ=
20+
github.com/aws/aws-sdk-go-v2/service/kafka v1.38.13/go.mod h1:z7Et3O89RoTdBmGi/UN82OSkuYQLKmlvJcRmyorMvhw=
21+
github.com/aws/aws-sdk-go-v2/service/sso v1.24.12 h1:kznaW4f81mNMlREkU9w3jUuJvU5g/KsqDV43ab7Rp6s=
22+
github.com/aws/aws-sdk-go-v2/service/sso v1.24.12/go.mod h1:bZy9r8e0/s0P7BSDHgMLXK2KvdyRRBIQ2blKlvLt0IU=
23+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.11 h1:mUwIpAvILeKFnRx4h1dEgGEFGuV8KJ3pEScZWVFYuZA=
24+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.11/go.mod h1:JDJtD+b8HNVv71axz8+S5492KM8wTzHRFpMKQbPlYxw=
25+
github.com/aws/aws-sdk-go-v2/service/sts v1.33.10 h1:g9d+TOsu3ac7SgmY2dUf1qMgu/uJVTlQ4VCbH6hRxSw=
26+
github.com/aws/aws-sdk-go-v2/service/sts v1.33.10/go.mod h1:WZfNmntu92HO44MVZAubQaz3qCuIdeOdog2sADfU6hU=
27+
github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ=
28+
github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
2829
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
2930
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
30-
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
3131
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
3232
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=

main.go

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,13 @@ type PrometheusStaticConfig struct {
5555

5656
// clusterDetails holds details of cluster, each broker, and which OpenMetrics endpoints are enabled
5757
type clusterDetails struct {
58-
ClusterName string
59-
ClusterArn string
60-
Brokers []string
58+
ClusterName string
59+
ClusterArn string
60+
Brokers []brokerDetails
61+
}
62+
63+
type brokerDetails struct {
64+
Endpoint string
6165
JmxExporter bool
6266
NodeExporter bool
6367
}
@@ -88,11 +92,11 @@ func (c clusterDetails) StaticConfig() PrometheusStaticConfig {
8892

8993
var targets []string
9094
for _, b := range c.Brokers {
91-
if c.JmxExporter {
92-
targets = append(targets, fmt.Sprintf("%s:%d", b, jmxExporterPort))
95+
if b.JmxExporter {
96+
targets = append(targets, fmt.Sprintf("%s:%d", b.Endpoint, jmxExporterPort))
9397
}
94-
if c.NodeExporter {
95-
targets = append(targets, fmt.Sprintf("%s:%d", b, nodeExporterPort))
98+
if b.NodeExporter {
99+
targets = append(targets, fmt.Sprintf("%s:%d", b.Endpoint, nodeExporterPort))
96100
}
97101
}
98102
ret.Targets = targets
@@ -116,9 +120,9 @@ func getClusters(svc kafkaClient) (*kafka.ListClustersOutput, error) {
116120
}
117121

118122
// getBrokers returns a slice of broker hosts without ports
119-
func getBrokers(svc kafkaClient, arn string) ([]string, error) {
120-
input := kafka.ListNodesInput{ClusterArn: &arn}
121-
var brokers []string
123+
func getBrokers(svc kafkaClient, c types.ClusterInfo) ([]brokerDetails, error) {
124+
input := kafka.ListNodesInput{ClusterArn: c.ClusterArn}
125+
var brokers []brokerDetails
122126

123127
p := kafka.NewListNodesPaginator(svc, &input)
124128
for p.HasMorePages() {
@@ -128,7 +132,20 @@ func getBrokers(svc kafkaClient, arn string) ([]string, error) {
128132
}
129133

130134
for _, b := range page.NodeInfoList {
131-
brokers = append(brokers, b.BrokerNodeInfo.Endpoints...)
135+
if b.BrokerNodeInfo != nil {
136+
details := brokerDetails{
137+
Endpoint: b.BrokerNodeInfo.Endpoints[0],
138+
JmxExporter: *c.OpenMonitoring.Prometheus.JmxExporter.EnabledInBroker,
139+
NodeExporter: *c.OpenMonitoring.Prometheus.NodeExporter.EnabledInBroker,
140+
}
141+
brokers = append(brokers, details)
142+
} else if b.ControllerNodeInfo != nil {
143+
details := brokerDetails{
144+
Endpoint: b.ControllerNodeInfo.Endpoints[0],
145+
JmxExporter: *c.OpenMonitoring.Prometheus.JmxExporter.EnabledInBroker,
146+
}
147+
brokers = append(brokers, details)
148+
}
132149
}
133150
}
134151

@@ -137,18 +154,16 @@ func getBrokers(svc kafkaClient, arn string) ([]string, error) {
137154

138155
// buildClusterDetails extracts the relevant details from a ClusterInfo and returns a ClusterDetails
139156
func buildClusterDetails(svc kafkaClient, c types.ClusterInfo) (clusterDetails, error) {
140-
brokers, err := getBrokers(svc, *c.ClusterArn)
157+
brokers, err := getBrokers(svc, c)
141158
if err != nil {
142159
fmt.Println(err)
143160
return clusterDetails{}, err
144161
}
145162

146163
cluster := clusterDetails{
147-
ClusterName: *c.ClusterName,
148-
ClusterArn: *c.ClusterArn,
149-
Brokers: brokers,
150-
JmxExporter: c.OpenMonitoring.Prometheus.JmxExporter.EnabledInBroker,
151-
NodeExporter: c.OpenMonitoring.Prometheus.NodeExporter.EnabledInBroker,
164+
ClusterName: *c.ClusterName,
165+
ClusterArn: *c.ClusterArn,
166+
Brokers: brokers,
152167
}
153168
return cluster, nil
154169
}
@@ -201,9 +216,10 @@ func GetStaticConfigs(svc kafkaClient, opt_filter ...Filter) ([]PrometheusStatic
201216
return []PrometheusStaticConfig{}, err
202217
}
203218

204-
if !clusterDetails.JmxExporter && !clusterDetails.NodeExporter {
219+
if !*cluster.OpenMonitoring.Prometheus.JmxExporter.EnabledInBroker && !*cluster.OpenMonitoring.Prometheus.NodeExporter.EnabledInBroker {
205220
continue
206221
}
222+
207223
staticConfigs = append(staticConfigs, clusterDetails.StaticConfig())
208224
}
209225
return staticConfigs, nil

main_test.go

Lines changed: 76 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type mockCluster struct {
1717
clusterName string
1818
jmxExporter bool
1919
nodeExporter bool
20+
kraft bool
2021
}
2122

2223
type mockKafkaClient struct{ clusters map[string]mockCluster }
@@ -38,10 +39,10 @@ func (m mockKafkaClient) ListClusters(ctx context.Context, params *kafka.ListClu
3839
OpenMonitoring: &types.OpenMonitoring{
3940
Prometheus: &types.Prometheus{
4041
JmxExporter: &types.JmxExporter{
41-
EnabledInBroker: cCluster.jmxExporter,
42+
EnabledInBroker: &cCluster.jmxExporter,
4243
},
4344
NodeExporter: &types.NodeExporter{
44-
EnabledInBroker: cCluster.nodeExporter,
45+
EnabledInBroker: &cCluster.nodeExporter,
4546
},
4647
},
4748
},
@@ -57,6 +58,17 @@ func (m mockKafkaClient) ListNodes(ctx context.Context, params *kafka.ListNodesI
5758
cluster := m.clusters[*params.ClusterArn]
5859
var nodeInfos []types.NodeInfo
5960

61+
if cluster.kraft {
62+
for i := 1; i <= cluster.brokerCount; {
63+
n := types.NodeInfo{
64+
NodeType: "CONTROLLER",
65+
ControllerNodeInfo: &types.ControllerNodeInfo{Endpoints: []string{fmt.Sprintf("c-1000%v.broker.com", i)}},
66+
}
67+
nodeInfos = append(nodeInfos, n)
68+
i++
69+
}
70+
}
71+
6072
for i := 1; i <= cluster.brokerCount; {
6173
n := types.NodeInfo{
6274
NodeType: "BROKER",
@@ -75,12 +87,51 @@ func TestGetStaticConfigs(t *testing.T) {
7587
t.Run("OneClusterTwoBrokersFullMonitoring", func(t *testing.T) {
7688
var client mockKafkaClient
7789
client.clusters = make(map[string]mockCluster)
78-
client.clusters["arn:::my-cluster"] = mockCluster{2, "my-cluster", true, true}
90+
client.clusters["arn:::my-cluster"] = mockCluster{
91+
brokerCount: 2,
92+
clusterName: "my-cluster",
93+
jmxExporter: true,
94+
nodeExporter: true,
95+
}
96+
97+
got, _ := GetStaticConfigs(client)
98+
want := []PrometheusStaticConfig{
99+
{
100+
Targets: []string{
101+
"b-1.broker.com:11001",
102+
"b-1.broker.com:11002",
103+
"b-2.broker.com:11001",
104+
"b-2.broker.com:11002",
105+
},
106+
Labels: labels{
107+
Job: "msk-my-cluster",
108+
ClusterName: "my-cluster",
109+
ClusterArn: "arn:::my-cluster",
110+
},
111+
},
112+
}
113+
if !reflect.DeepEqual(got, want) {
114+
t.Errorf("got %s want %s", got, want)
115+
}
116+
})
117+
118+
t.Run("OneClusterTwoBrokersFullMonitoringKraft", func(t *testing.T) {
119+
var client mockKafkaClient
120+
client.clusters = make(map[string]mockCluster)
121+
client.clusters["arn:::my-cluster"] = mockCluster{
122+
brokerCount: 2,
123+
clusterName: "my-cluster",
124+
jmxExporter: true,
125+
nodeExporter: true,
126+
kraft: true,
127+
}
79128

80129
got, _ := GetStaticConfigs(client)
81130
want := []PrometheusStaticConfig{
82131
{
83132
Targets: []string{
133+
"c-10001.broker.com:11001",
134+
"c-10002.broker.com:11001",
84135
"b-1.broker.com:11001",
85136
"b-1.broker.com:11002",
86137
"b-2.broker.com:11001",
@@ -101,8 +152,18 @@ func TestGetStaticConfigs(t *testing.T) {
101152
t.Run("TwoClusterTwoBrokersFullAndLimitedMonitoring", func(t *testing.T) {
102153
var client mockKafkaClient
103154
client.clusters = make(map[string]mockCluster)
104-
client.clusters["arn:::my-cluster"] = mockCluster{2, "my-cluster", true, true}
105-
client.clusters["arn:::my-other-cluster"] = mockCluster{2, "my-other-cluster", true, false}
155+
client.clusters["arn:::my-cluster"] = mockCluster{
156+
brokerCount: 2,
157+
clusterName: "my-cluster",
158+
jmxExporter: true,
159+
nodeExporter: true,
160+
}
161+
client.clusters["arn:::my-other-cluster"] = mockCluster{
162+
brokerCount: 2,
163+
clusterName: "my-other-cluster",
164+
jmxExporter: true,
165+
nodeExporter: false,
166+
}
106167

107168
got, _ := GetStaticConfigs(client)
108169
want := []PrometheusStaticConfig{
@@ -139,7 +200,10 @@ func TestGetStaticConfigs(t *testing.T) {
139200
t.Run("NoMonitoringEnabled", func(t *testing.T) {
140201
var client mockKafkaClient
141202
client.clusters = make(map[string]mockCluster)
142-
client.clusters["arn:::my-cluster"] = mockCluster{2, "my-cluster", false, false}
203+
client.clusters["arn:::my-cluster"] = mockCluster{
204+
brokerCount: 2,
205+
clusterName: "my-cluster",
206+
}
143207

144208
got, _ := GetStaticConfigs(client)
145209
want := []PrometheusStaticConfig{}
@@ -183,7 +247,7 @@ func Test_filterClusters(t *testing.T) {
183247
NameFilter: *(regexp.MustCompile(``)),
184248
TagFilter: map[string]string{
185249
"Enviroment": "test",
186-
"SomeOther": "tag",
250+
"SomeOther": "tag",
187251
},
188252
}
189253

@@ -244,14 +308,14 @@ func Test_filterClusters(t *testing.T) {
244308
ClusterName: strPtr("test-cluster"),
245309
Tags: map[string]string{
246310
"Enviroment": "test",
247-
"SomeOther": "DifferentTag",
311+
"SomeOther": "DifferentTag",
248312
},
249313
},
250314
{
251315
ClusterName: strPtr("second-test-cluster"),
252316
Tags: map[string]string{
253317
"Enviroment": "staging",
254-
"SomeOther": "tag",
318+
"SomeOther": "tag",
255319
},
256320
},
257321
{
@@ -265,16 +329,16 @@ func Test_filterClusters(t *testing.T) {
265329
ClusterInfoList: []types.ClusterInfo{
266330
{
267331
ClusterName: strPtr("test-cluster"),
268-
Tags: map[string]string{
332+
Tags: map[string]string{
269333
"Enviroment": "test",
270-
"SomeOther": "DifferentTag",
334+
"SomeOther": "DifferentTag",
271335
},
272336
},
273337
{
274338
ClusterName: strPtr("second-test-cluster"),
275339
Tags: map[string]string{
276340
"Enviroment": "staging",
277-
"SomeOther": "tag",
341+
"SomeOther": "tag",
278342
},
279343
},
280344
},

0 commit comments

Comments
 (0)