Skip to content

Commit c686e6d

Browse files
errmjoshm91
andauthored
HTTP service discovery (#16)
* Extract fileSD function * Add option for http service discovery * Update docs * Fix code block in readme * Clarify scrape interval parameter --------- Co-authored-by: Josh Mills <josh.mills@statsbomb.com>
1 parent aaea76a commit c686e6d

File tree

2 files changed

+96
-36
lines changed

2 files changed

+96
-36
lines changed

README.md

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Service discovery for [AWS MSK](https://aws.amazon.com/msk/), compatible with [P
88

99
## How it works
1010

11-
This service gets a list of MSK clusters in an AWS account and exports each broker to a Prometheus-compatible static config to be used with the [`file_sd_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config) mechanism.
11+
This service gets a list of MSK clusters in an AWS account and exports each broker to a Prometheus-compatible static config to be used with the [`file_sd_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config) or [`http_sd_config`](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config) mechanism.
1212

1313
## Pre-requisites
1414

@@ -44,12 +44,20 @@ When using AWS credentials or IAM Roles, the following policy needs to be attach
4444
Usage of ./prometheus-msk-discovery:
4545
-filter string
4646
a regex pattern to filter cluster names from the results
47+
-http-sd
48+
expose http_sd interface rather than writing a file
4749
-job-prefix string
4850
string with which to prefix each job label (default "msk")
51+
-listen-address string
52+
Address to listen on for http service discovery (default ":8080")
4953
-output string
5054
path of the file to write MSK discovery information to (default "msk_file_sd.yml")
55+
-region string
56+
the aws region in which to scan for MSK clusters
5157
-scrape-interval duration
52-
interval at which to scrape the AWS API for MSK cluster information (default 5m0s)
58+
interval at which to scrape the AWS API for MSK cluster information when in file_sd mode (default 5m0s)
59+
-tag value
60+
A key=value for filtering by tags. Flag can be specified multiple times, resulting OR expression.
5361
```
5462

5563
### Example output:
@@ -61,6 +69,23 @@ $ ./prometheus-msk-discovery -scrape-interval 10s -filter 'primary'
6169

6270
An example output file can be found [here](examples/msk_file_sd.yml)
6371

72+
### http_sd
73+
74+
```
75+
$ ./prometheus-msk-discovery -http-sd -listen-address :8989 -filter 'primary'
76+
```
77+
78+
```
79+
$ curl localhost:8989
80+
[{"targets":["b-1.primary-kafka.tffs8g.c2.kafka.eu-west-2.amazonaws.com:11001","b-1.primary-kafka.tffs8g.c2.kafka.eu-west-2.amazonaws.com:11002","b-2.primary-kafka.tffs8g.c2.kafka.eu-west-2.amazonaws.com:11001","b-2.primary-kafka.tffs8g.c2.kafka.eu-west-2.amazonaws.com:11002"],"labels":{"job":"msk-primary-kafka","cluster_name":"primary-kafka","cluster_arn":"arn:aws:kafka:eu-west-2:111111111111:cluster/primary-kafka/522d90ab-d400-4ea0-b8fd-bbf3576425d4-2"}}]
81+
```
82+
83+
```yaml
84+
http_sd_configs:
85+
- url: http://localhost:8989
86+
refresh_interval: 30s
87+
```
88+
6489
## Region Precedence
6590
When no region is specified with the `-region` flag the process first attempts to load the default SDK configuration checking for an `AWS_REGION` environment variable or reading any region specified in the standard [configuration file](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html). If no region is found it will attempt to retrieve it from the EC2 Instance Metadata Service.
6691

main.go

Lines changed: 69 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package main
22

33
import (
44
"context"
5+
"encoding/json"
56
"flag"
67
"fmt"
78
"io/ioutil"
89
"log"
10+
"net/http"
911
"regexp"
1012
"strings"
1113
"time"
@@ -24,11 +26,13 @@ const (
2426
type tags map[string]string
2527

2628
var (
27-
outFile = flag.String("output", "msk_file_sd.yml", "path of the file to write MSK discovery information to")
28-
interval = flag.Duration("scrape-interval", 5*time.Minute, "interval at which to scrape the AWS API for MSK cluster information")
29-
jobPrefix = flag.String("job-prefix", "msk", "string with which to prefix each job label")
30-
clusterFilter = flag.String("filter", "", "a regex pattern to filter cluster names from the results")
31-
awsRegion = flag.String("region", "", "the aws region in which to scan for MSK clusters")
29+
outFile = flag.String("output", "msk_file_sd.yml", "path of the file to write MSK discovery information to")
30+
interval = flag.Duration("scrape-interval", 5*time.Minute, "interval at which to scrape the AWS API for MSK cluster information when in file_sd mode")
31+
jobPrefix = flag.String("job-prefix", "msk", "string with which to prefix each job label")
32+
clusterFilter = flag.String("filter", "", "a regex pattern to filter cluster names from the results")
33+
awsRegion = flag.String("region", "", "the aws region in which to scan for MSK clusters")
34+
httpSDEnabled = flag.Bool("http-sd", false, "expose http_sd interface rather than writing a file")
35+
listenAddress = flag.String("listen-address", ":8080", "Address to listen on for http service discovery")
3236
)
3337

3438
type kafkaClient interface {
@@ -37,16 +41,16 @@ type kafkaClient interface {
3741
}
3842

3943
type labels struct {
40-
Job string `yaml:"job"`
41-
ClusterName string `yaml:"cluster_name"`
42-
ClusterArn string `yaml:"cluster_arn"`
44+
Job string `yaml:"job" json:"job"`
45+
ClusterName string `yaml:"cluster_name" json:"cluster_name"`
46+
ClusterArn string `yaml:"cluster_arn" json:"cluster_arn"`
4347
}
4448

4549
// PrometheusStaticConfig is the final structure of a single static config that
46-
// will be outputted to the Prometheus file service discovery config
50+
// will be outputted to the Prometheus file/http service discovery config
4751
type PrometheusStaticConfig struct {
48-
Targets []string `yaml:"targets"`
49-
Labels labels `yaml:"labels"`
52+
Targets []string `yaml:"targets" json:"targets"`
53+
Labels labels `yaml:"labels" json:"labels"`
5054
}
5155

5256
// clusterDetails holds details of cluster, each broker, and which OpenMetrics endpoints are enabled
@@ -205,30 +209,8 @@ func GetStaticConfigs(svc kafkaClient, opt_filter ...Filter) ([]PrometheusStatic
205209
return staticConfigs, nil
206210
}
207211

208-
func main() {
209-
var tagFilters tags = make(tags)
210-
flag.Var(&tagFilters, "tag", "A key=value for filtering by tags. Flag can be specified multiple times, resulting OR expression.")
211-
flag.Parse()
212-
213-
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(*awsRegion), config.WithEC2IMDSRegion())
214-
if err != nil {
215-
fmt.Println(err)
216-
return
217-
}
218-
219-
client := kafka.NewFromConfig(cfg)
220-
212+
func fileSD(client *kafka.Client, filter Filter) {
221213
work := func() {
222-
regexpFilter, err := regexp.Compile(*clusterFilter)
223-
if err != nil {
224-
fmt.Println(err)
225-
return
226-
}
227-
228-
filter := Filter{
229-
NameFilter: *regexpFilter,
230-
TagFilter: tagFilters,
231-
}
232214

233215
staticConfigs, err := GetStaticConfigs(client, filter)
234216
if err != nil {
@@ -260,3 +242,56 @@ func main() {
260242
work()
261243
}
262244
}
245+
246+
func httpSD(client *kafka.Client, filter Filter) {
247+
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
248+
staticConfigs, err := GetStaticConfigs(client, filter)
249+
if err != nil {
250+
log.Println(err)
251+
http.Error(w, "Internal Server Error", 500)
252+
return
253+
}
254+
m, err := json.Marshal(staticConfigs)
255+
if err != nil {
256+
log.Println(err)
257+
http.Error(w, "Internal Server Error", 500)
258+
return
259+
}
260+
w.Header().Set("Content-Type", "application/json")
261+
w.Write(m)
262+
return
263+
})
264+
265+
log.Fatal(http.ListenAndServe(*listenAddress, nil))
266+
}
267+
268+
func main() {
269+
var tagFilters tags = make(tags)
270+
flag.Var(&tagFilters, "tag", "A key=value for filtering by tags. Flag can be specified multiple times, resulting OR expression.")
271+
flag.Parse()
272+
273+
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(*awsRegion), config.WithEC2IMDSRegion())
274+
if err != nil {
275+
fmt.Println(err)
276+
return
277+
}
278+
279+
client := kafka.NewFromConfig(cfg)
280+
281+
regexpFilter, err := regexp.Compile(*clusterFilter)
282+
if err != nil {
283+
fmt.Println(err)
284+
return
285+
}
286+
287+
filter := Filter{
288+
NameFilter: *regexpFilter,
289+
TagFilter: tagFilters,
290+
}
291+
292+
if *httpSDEnabled {
293+
httpSD(client, filter)
294+
} else {
295+
fileSD(client, filter)
296+
}
297+
}

0 commit comments

Comments
 (0)