diff --git a/kafka_exporter.go b/kafka_exporter.go index 4a1f7c8c..b2c0a3c9 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -59,6 +59,7 @@ var ( consumergroupLagSum *prometheus.Desc consumergroupLagZookeeper *prometheus.Desc consumergroupMembers *prometheus.Desc + versions *prometheus.Desc ) // Exporter collects Kafka stats from the given server and exports them using @@ -81,6 +82,7 @@ type Exporter struct { sgWaitCh chan struct{} sgChans []chan<- prometheus.Metric consumerGroupFetchAll bool + versions string } type kafkaOpts struct { @@ -163,6 +165,50 @@ func canReadFile(path string) bool { return true } +func GetInterBrokerProtocolVersion(brokerAddrs []string, config *sarama.Config) (string, error) { + admin, err := sarama.NewClusterAdmin(brokerAddrs, config) + if err != nil { + return "", fmt.Errorf("failed to create cluster admin: %w", err) + } + defer admin.Close() + + client, err := sarama.NewClient(brokerAddrs, config) + if err != nil { + return "", fmt.Errorf("failed to create kafka client: %w", err) + } + defer client.Close() + + brokers := client.Brokers() + if len(brokers) == 0 { + return "", fmt.Errorf("no brokers found") + } + + broker := brokers[0] + err = broker.Open(config) + if err != nil && err != sarama.ErrAlreadyConnected { + return "", fmt.Errorf("failed to open broker connection: %w", err) + } + + brokerID := broker.ID() + resource := sarama.ConfigResource{ + Type: sarama.BrokerResource, + Name: fmt.Sprintf("%d", brokerID), + } + + configEntries, err := admin.DescribeConfig(resource) + if err != nil { + return "", fmt.Errorf("failed to describe config: %w", err) + } + + for _, entry := range configEntries { + if entry.Name == "inter.broker.protocol.version" { + return entry.Value, nil + } + } + + return "UNKNOWN", fmt.Errorf("inter.broker.protocol.version not found") +} + // NewExporter returns an initialized Exporter. func NewExporter(opts kafkaOpts, topicFilter string, topicExclude string, groupFilter string, groupExclude string) (*Exporter, error) { var zookeeperClient *kazoo.Kazoo @@ -278,7 +324,7 @@ func NewExporter(opts kafkaOpts, topicFilter string, topicExclude string, groupF config.Metadata.AllowAutoTopicCreation = opts.allowAutoTopicCreation client, err := sarama.NewClient(opts.uri, config) - + versions, err := GetInterBrokerProtocolVersion(opts.uri, config) if err != nil { return nil, errors.Wrap(err, "Error Init Kafka Client") } @@ -302,6 +348,7 @@ func NewExporter(opts kafkaOpts, topicFilter string, topicExclude string, groupF sgWaitCh: nil, sgChans: []chan<- prometheus.Metric{}, consumerGroupFetchAll: config.Version.IsAtLeast(sarama.V2_0_0_0), + versions: versions, }, nil } @@ -391,6 +438,9 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric( clusterBrokers, prometheus.GaugeValue, float64(len(e.client.Brokers())), ) + ch <- prometheus.MustNewConstMetric( + versions, prometheus.GaugeValue, 1, e.versions, + ) for _, b := range e.client.Brokers() { ch <- prometheus.MustNewConstMetric( clusterBrokerInfo, prometheus.GaugeValue, 1, strconv.Itoa(int(b.ID())), b.Addr(), @@ -657,7 +707,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10), ) e.mu.Lock() - currentPartitionOffset, currentPartitionOffsetError := e.client.GetOffset(topic, partition, sarama.OffsetNewest) + currentPartitionOffset, currentPartitionOffsetError := e.client.GetOffset(topic, partition, sarama.OffsetNewest) if currentPartitionOffsetError != nil { klog.Errorf("Cannot get current offset of topic %s partition %d: %v", topic, partition, currentPartitionOffsetError) } else { @@ -673,11 +723,11 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { lag = currentPartitionOffset - offsetFetchResponseBlock.Offset lagSum += lag } - + ch <- prometheus.MustNewConstMetric( consumergroupLag, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10), ) - } + } e.mu.Unlock() } ch <- prometheus.MustNewConstMetric( @@ -855,6 +905,11 @@ func setup( "Number of Brokers in the Kafka Cluster.", nil, labels, ) + versions = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "info"), + "Versions of Kafka Cluster.", + []string{"versions"}, labels, + ) clusterBrokerInfo = prometheus.NewDesc( prometheus.BuildFQName(namespace, "", "broker_info"), "Information about the Kafka Broker.",