Skip to content

Commit 4907b9d

Browse files
Update main to use setup.File interfaces (#21)
* Update main to use setup.File interfaces * Include filename in log message
1 parent b0dcf25 commit 4907b9d

File tree

2 files changed

+132
-91
lines changed

2 files changed

+132
-91
lines changed

cmd/bigquery_exporter/main.go

Lines changed: 56 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -6,37 +6,42 @@
66
package main
77

88
import (
9+
"flag"
910
"fmt"
1011
"io/ioutil"
1112
"log"
1213
"path/filepath"
1314
"strings"
15+
"sync"
1416
"time"
1517

18+
"github.com/m-lab/go/flagx"
1619
"github.com/m-lab/go/prometheusx"
20+
"github.com/m-lab/go/rtx"
21+
"github.com/m-lab/prometheus-bigquery-exporter/internal/setup"
1722
"github.com/m-lab/prometheus-bigquery-exporter/query"
1823
"github.com/m-lab/prometheus-bigquery-exporter/sql"
1924

20-
flag "github.com/spf13/pflag"
21-
2225
"cloud.google.com/go/bigquery"
2326
"golang.org/x/net/context"
2427

2528
"github.com/prometheus/client_golang/prometheus"
2629
)
2730

2831
var (
29-
valueTypes = []string{}
30-
querySources = []string{}
31-
project = flag.String("project", "", "GCP project name.")
32-
port = flag.String("port", ":9050", "Exporter port.")
33-
refresh = flag.Duration("refresh", 5*time.Minute, "Interval between updating metrics.")
32+
counterSources = flagx.StringArray{}
33+
gaugeSources = flagx.StringArray{}
34+
project = flag.String("project", "", "GCP project name.")
35+
refresh = flag.Duration("refresh", 5*time.Minute, "Interval between updating metrics.")
3436
)
3537

3638
func init() {
37-
flag.StringArrayVar(&valueTypes, "type", nil, "Name of the prometheus value type, e.g. 'counter' or 'gauge'.")
38-
flag.StringArrayVar(&querySources, "query", nil, "Name of file with query string.")
39+
// TODO: support counter queries.
40+
// flag.Var(&counterSources, "counter-query", "Name of file containing a counter query.")
41+
flag.Var(&gaugeSources, "gauge-query", "Name of file containing a gauge query.")
3942

43+
// Port registered at https://github.com/prometheus/prometheus/wiki/Default-port-allocations
44+
*prometheusx.ListenAddress = ":9348"
4045
log.SetFlags(log.LstdFlags | log.Lshortfile)
4146
}
4247

@@ -53,109 +58,69 @@ func fileToMetric(filename string) string {
5358
return strings.TrimSuffix(fname, filepath.Ext(fname))
5459
}
5560

56-
// createCollector creates a sql.Collector initialized with the BQ query
57-
// contained in filename. The returned collector should be registered with
58-
// prometheus.Register.
59-
func createCollector(client *bigquery.Client, filename, typeName string, vars map[string]string) (*sql.Collector, error) {
61+
// fileToQuery reads the content of the given file and returns the query with template values repalced with those in vars.
62+
func fileToQuery(filename string, vars map[string]string) string {
6063
queryBytes, err := ioutil.ReadFile(filename)
61-
if err != nil {
62-
return nil, err
63-
}
64-
65-
var v prometheus.ValueType
66-
if typeName == "counter" {
67-
v = prometheus.CounterValue
68-
} else if typeName == "gauge" {
69-
v = prometheus.GaugeValue
70-
} else {
71-
v = prometheus.UntypedValue
72-
}
64+
rtx.Must(err, "Failed to open %q", filename)
7365

74-
// TODO: use to text/template
7566
q := string(queryBytes)
7667
q = strings.Replace(q, "UNIX_START_TIME", vars["UNIX_START_TIME"], -1)
7768
q = strings.Replace(q, "REFRESH_RATE_SEC", vars["REFRESH_RATE_SEC"], -1)
78-
79-
c := sql.NewCollector(query.NewBQRunner(client), v, fileToMetric(filename), string(q))
80-
81-
return c, nil
69+
return q
8270
}
8371

84-
// updatePeriodically runs in an infinite loop, and updates registered
85-
// collectors every refresh period.
86-
func updatePeriodically(unregistered chan *sql.Collector, refresh time.Duration) {
87-
var collectors = []*sql.Collector{}
88-
89-
// Attempt to register all unregistered collectors.
90-
if len(unregistered) > 0 {
91-
collectors = append(collectors, tryRegister(unregistered)...)
92-
}
93-
for sleepUntilNext(refresh); ; sleepUntilNext(refresh) {
94-
log.Printf("Starting a new round at: %s", time.Now())
95-
for i := range collectors {
96-
log.Printf("Running query for %s", collectors[i])
97-
collectors[i].Update()
98-
log.Printf("Done")
99-
}
100-
if len(unregistered) > 0 {
101-
collectors = append(collectors, tryRegister(unregistered)...)
102-
}
72+
func reloadRegisterUpdate(client *bigquery.Client, files []setup.File, vars map[string]string) {
73+
var wg sync.WaitGroup
74+
for i := range files {
75+
wg.Add(1)
76+
go func(f *setup.File) {
77+
modified, err := f.IsModified()
78+
if modified && err == nil {
79+
c := sql.NewCollector(
80+
newRunner(client), prometheus.GaugeValue,
81+
fileToMetric(f.Name), fileToQuery(f.Name, vars))
82+
83+
log.Println("Registering:", fileToMetric(f.Name))
84+
err = f.Register(c)
85+
} else {
86+
log.Println("Updating:", fileToMetric(f.Name))
87+
err = f.Update()
88+
}
89+
if err != nil {
90+
log.Println("Error:", f.Name, err)
91+
}
92+
wg.Done()
93+
}(&files[i])
10394
}
95+
wg.Wait()
10496
}
10597

106-
// tryRegister attempts to prometheus.Register every sql.Collectors queued in
107-
// unregistered. Any collectors that fail are placed back on the channel. All
108-
// successfully registered collectors are returned.
109-
func tryRegister(unregistered chan *sql.Collector) []*sql.Collector {
110-
var registered = []*sql.Collector{}
111-
count := len(unregistered)
112-
for i := 0; i < count; i++ {
113-
// Take collector off of channel.
114-
c := <-unregistered
115-
116-
// Try to register this collector.
117-
err := prometheus.Register(c)
118-
if err != nil {
119-
// Registration failed, so place collector back on channel.
120-
unregistered <- c
121-
continue
122-
}
123-
log.Printf("Registered %s", c)
124-
registered = append(registered, c)
125-
}
126-
return registered
98+
var mainCtx, mainCancel = context.WithCancel(context.Background())
99+
var newRunner = func(client *bigquery.Client) sql.QueryRunner {
100+
return query.NewBQRunner(client)
127101
}
128102

129103
func main() {
130104
flag.Parse()
105+
rtx.Must(flagx.ArgsFromEnv(flag.CommandLine), "Could not get args from env")
131106

132-
if len(querySources) != len(valueTypes) {
133-
log.Fatal("You must provide a --type flag for every --query source.")
134-
}
135-
136-
// Create a channel with capacity for all collectors.
137-
unregistered := make(chan *sql.Collector, len(querySources))
107+
srv := prometheusx.MustServeMetrics()
108+
defer srv.Shutdown(mainCtx)
138109

139-
ctx := context.Background()
140-
client, err := bigquery.NewClient(ctx, *project)
141-
if err != nil {
142-
log.Fatal(err)
110+
files := make([]setup.File, len(gaugeSources))
111+
for i := range files {
112+
files[i].Name = gaugeSources[i]
143113
}
144114

115+
client, err := bigquery.NewClient(mainCtx, *project)
116+
rtx.Must(err, "Failed to allocate a new bigquery.Client")
145117
vars := map[string]string{
146118
"UNIX_START_TIME": fmt.Sprintf("%d", time.Now().UTC().Unix()),
147119
"REFRESH_RATE_SEC": fmt.Sprintf("%d", int(refresh.Seconds())),
148120
}
149-
for i := range querySources {
150-
c, err := createCollector(client, querySources[i], valueTypes[i], vars)
151-
if err != nil {
152-
log.Printf("Failed to create collector %s: %s", querySources[i], err)
153-
continue
154-
}
155-
// Store collector in channel.
156-
unregistered <- c
157-
}
158121

159-
prometheusx.MustStartPrometheus(*port)
160-
updatePeriodically(unregistered, *refresh)
122+
for mainCtx.Err() == nil {
123+
reloadRegisterUpdate(client, files, vars)
124+
sleepUntilNext(*refresh)
125+
}
161126
}

cmd/bigquery_exporter/main_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// bigquery_exporter runs structured bigquery SQL and converts the results into
2+
// prometheus metrics. bigquery_exporter can process multiple queries.
3+
// Because BigQuery queries can have long run times and high cost, Query results
4+
// are cached and updated every refresh interval, not on every scrape of
5+
// prometheus metrics.
6+
package main
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"io/ioutil"
12+
"log"
13+
"os"
14+
"testing"
15+
"time"
16+
17+
"cloud.google.com/go/bigquery"
18+
"github.com/m-lab/go/rtx"
19+
"github.com/m-lab/prometheus-bigquery-exporter/sql"
20+
)
21+
22+
func init() {
23+
log.SetOutput(ioutil.Discard)
24+
}
25+
26+
type fakeRunner struct {
27+
updated int
28+
}
29+
30+
func (f *fakeRunner) Query(query string) ([]sql.Metric, error) {
31+
r := []sql.Metric{
32+
{
33+
LabelKeys: []string{"key"},
34+
LabelValues: []string{"value"},
35+
Values: map[string]float64{
36+
"okay": 1.23,
37+
},
38+
},
39+
}
40+
f.updated++
41+
if f.updated > 1 {
42+
// Simulate an error after one successful query.
43+
return nil, fmt.Errorf("Fake failure for testing")
44+
}
45+
return r, nil
46+
}
47+
48+
func Test_main(t *testing.T) {
49+
tmp, err := ioutil.TempFile("", "empty_query_*")
50+
rtx.Must(err, "Failed to create temp file for main test.")
51+
defer os.Remove(tmp.Name())
52+
53+
// Provide coverage of the original newRunner definition.
54+
newRunner(nil)
55+
56+
// Create a fake runner for the test.
57+
f := &fakeRunner{}
58+
newRunner = func(*bigquery.Client) sql.QueryRunner {
59+
return f
60+
}
61+
62+
// Set the refresh period to a very small delay.
63+
*refresh = time.Second
64+
gaugeSources.Set(tmp.Name())
65+
66+
// Reset mainCtx to timeout after a second.
67+
mainCtx, mainCancel = context.WithTimeout(mainCtx, time.Second)
68+
defer mainCancel()
69+
70+
main()
71+
72+
// Verify that the fakeRunner was called twice.
73+
if f.updated != 2 {
74+
t.Errorf("main() failed to update; got %d, want 2", f.updated)
75+
}
76+
}

0 commit comments

Comments
 (0)