From 421fa9581e369d8078cf0678fce77ccb5f62cb97 Mon Sep 17 00:00:00 2001 From: Facundo Guerrero Date: Fri, 22 Sep 2023 10:37:56 +0200 Subject: [PATCH 01/11] - Add metrics for sucessful and failed queries - Add keepAlive flag to not kill the app when the query fail --- main.go | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index 116a0b7..ce2ef3e 100644 --- a/main.go +++ b/main.go @@ -26,6 +26,7 @@ import ( "golang.org/x/net/context" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) var ( @@ -33,6 +34,22 @@ var ( gaugeSources = flagx.StringArray{} project = flag.String("project", "", "GCP project name.") refresh = flag.Duration("refresh", 5*time.Minute, "Interval between updating metrics.") + keepAlive = flag.Bool("keepAlive", false, "Keep the process alive even if query fails to execute.") + + successFilesCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "success_files_executed_total", + Help: "The total number of successfully executed files", + }, []string{"filename"}) + + failedFilesCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "failed_files_executed_total", + Help: "The total number of failed executed files", + }, []string{"filename"}) + updateDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "query_runtime_duration_seconds", + Help: "Duration taken for updating files", + Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 30, 60, 120, 300, 600}, + }, []string{"filename"}) ) func init() { @@ -69,7 +86,7 @@ func fileToQuery(filename string, vars map[string]string) string { return q } -func reloadRegisterUpdate(client *bigquery.Client, files []setup.File, vars map[string]string) { +func reloadRegisterUpdate(client *bigquery.Client, files []setup.File, vars map[string]string, keepAlive bool) { var wg sync.WaitGroup for i := range files { wg.Add(1) @@ -85,14 +102,25 @@ func reloadRegisterUpdate(client *bigquery.Client, files []setup.File, vars map[ // uses the same name but changes the metrics reported. Because // this cannot be recovered, we use rtx.Must to exit and allow // the runtime environment to restart. - rtx.Must(f.Register(c), "Failed to register collector: aborting") + if !keepAlive { + rtx.Must(f.Register(c), "Failed to register collector: aborting") + } else { + err = f.Register(c) + } + if err == nil { + successFilesCounter.WithLabelValues(fileToMetric(f.Name)).Inc() + } } else { start := time.Now() err = f.Update() log.Println("Updating:", fileToMetric(f.Name), time.Since(start)) + updateDuration.WithLabelValues(fileToMetric(f.Name)).Observe(time.Since(start).Seconds()) } if err != nil { log.Println("Error:", f.Name, err) + failedFilesCounter.WithLabelValues(fileToMetric(f.Name)).Inc() + } else { + successFilesCounter.WithLabelValues(fileToMetric(f.Name)).Inc() } wg.Done() }(&files[i]) @@ -125,7 +153,7 @@ func main() { } for mainCtx.Err() == nil { - reloadRegisterUpdate(client, files, vars) + reloadRegisterUpdate(client, files, vars, *keepAlive) sleepUntilNext(*refresh) } } From 952effa504af93c4f13c40e60c7cc65c1ddafd2d Mon Sep 17 00:00:00 2001 From: Facundo Guerrero Date: Fri, 27 Oct 2023 15:00:00 +0200 Subject: [PATCH 02/11] add GCP Billed Bytes --- query/bigquery_runner.go | 44 ++++++++++++++++++++++++++++++---------- sql/collector.go | 11 +++++++--- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/query/bigquery_runner.go b/query/bigquery_runner.go index e53f53f..89ca82e 100644 --- a/query/bigquery_runner.go +++ b/query/bigquery_runner.go @@ -19,28 +19,50 @@ type bigQueryImpl struct { bqiface.Client } -func (b *bigQueryImpl) Query(query string, visit func(row map[string]bigquery.Value) error) error { +func (b *bigQueryImpl) Query(query string, visit func(row map[string]bigquery.Value) error) (int64, error) { + // Create a query job. q := b.Client.Query(query) - it, err := q.Read(context.Background()) + job, err := q.Run(context.Background()) if err != nil { - return err + return 0, err } + + // Wait for the job to complete. + status, err := job.Wait(context.Background()) + if err != nil { + return 0, status.Err() + } + + // Get the query statistics to extract the cost. + queryStats := job.LastStatus().Statistics.Details.(*bigquery.QueryStatistics) + cost := int64(0) + if queryStats != nil { + cost = queryStats.TotalBytesBilled + } + + // Now, you can proceed with reading and processing the query results. var row map[string]bigquery.Value + + it, err := job.Read(context.Background()) + if err != nil { + return cost, err + } for err = it.Next(&row); err == nil; err = it.Next(&row) { err2 := visit(row) if err2 != nil { - return err2 + return cost, err2 } } + if err != iterator.Done { - return err + return cost, err } - return nil + return cost, nil } // BQRunner is a concerete implementation of QueryRunner for BigQuery. type BQRunner struct { - runner runner + runner *bigQueryImpl } // runner interface allows unit testing of the Query function. @@ -60,16 +82,16 @@ func NewBQRunner(client *bigquery.Client) *BQRunner { // Query executes the given query. Query only supports standard SQL. The // query must define a column named "value" for the value, and may define // additional columns, all of which are used as metric labels. -func (qr *BQRunner) Query(query string) ([]sql.Metric, error) { +func (qr *BQRunner) Query(query string) ([]sql.Metric, int64, error) { metrics := []sql.Metric{} - err := qr.runner.Query(query, func(row map[string]bigquery.Value) error { + cost, err := qr.runner.Query(query, func(row map[string]bigquery.Value) error { metrics = append(metrics, rowToMetric(row)) return nil }) if err != nil { - return nil, err + return nil, cost, err } - return metrics, nil + return metrics, cost, nil } // valToFloat extracts a float from the bigquery.Value irrespective of the diff --git a/sql/collector.go b/sql/collector.go index 3f01188..a9cc24d 100644 --- a/sql/collector.go +++ b/sql/collector.go @@ -29,7 +29,7 @@ func NewMetric(labelKeys []string, labelValues []string, values map[string]float // QueryRunner defines the interface used to run a query and return an array of metrics. type QueryRunner interface { - Query(q string) ([]Metric, error) + Query(q string) ([]Metric, int64, error) } // Collector manages a prometheus.Collector for queries performed by a QueryRunner. @@ -51,7 +51,8 @@ type Collector struct { metrics []Metric // mux locks access to types above. mux sync.Mutex - + // Total billed bytes by BigQuery + cost int64 // RegisterErr contains any error during registration. This should be considered fatal. RegisterErr error } @@ -65,6 +66,7 @@ func NewCollector(runner QueryRunner, valType prometheus.ValueType, metricName, valType: valType, descs: nil, metrics: nil, + cost: 0, mux: sync.Mutex{}, } } @@ -106,6 +108,8 @@ func (col *Collector) Collect(ch chan<- prometheus.Metric) { desc, col.valType, metrics[i].Values[k], metrics[i].LabelValues...) } } + desc := prometheus.NewDesc("total_bytes_billed", "Total billed bytes", []string{"file_name"}, nil) + ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(col.cost), col.metricName) } // String satisfies the Stringer interface. String returns the metric name. @@ -117,7 +121,7 @@ func (col *Collector) String() string { // Update is called automaticlly after the collector is registered. func (col *Collector) Update() error { logx.Debug.Println("Update:", col.metricName) - metrics, err := col.runner.Query(col.query) + metrics, cost, err := col.runner.Query(col.query) if err != nil { logx.Debug.Println("Failed to run query:", err) return err @@ -128,6 +132,7 @@ func (col *Collector) Update() error { // Replace slice reference with new value returned from Query. References // to the previous value of col.metrics are not affected. col.metrics = metrics + col.cost = cost return nil } From f0491b4698aac61eaf303a22d29a75add82b8acc Mon Sep 17 00:00:00 2001 From: Facundo Guerrero Date: Fri, 27 Oct 2023 15:01:41 +0200 Subject: [PATCH 03/11] Add billed bytes by BigQuery --- main_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/main_test.go b/main_test.go index 05606d0..5a3bc35 100644 --- a/main_test.go +++ b/main_test.go @@ -27,7 +27,7 @@ type fakeRunner struct { updated int } -func (f *fakeRunner) Query(query string) ([]sql.Metric, error) { +func (f *fakeRunner) Query(query string) ([]sql.Metric, int64, error) { r := []sql.Metric{ { LabelKeys: []string{"key"}, @@ -40,9 +40,9 @@ func (f *fakeRunner) Query(query string) ([]sql.Metric, error) { f.updated++ if f.updated > 1 { // Simulate an error after one successful query. - return nil, fmt.Errorf("Fake failure for testing") + return nil, 0, fmt.Errorf("Fake failure for testing") } - return r, nil + return r, 0, nil } func Test_main(t *testing.T) { From ed67ccc0029868a573863282f54ddf679936d158 Mon Sep 17 00:00:00 2001 From: Facundo Guerrero Date: Fri, 27 Oct 2023 15:46:37 +0200 Subject: [PATCH 04/11] fix tests --- query/bigquery_runner.go | 4 ++-- query/bigquery_runner_test.go | 13 +++++++------ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/query/bigquery_runner.go b/query/bigquery_runner.go index 89ca82e..2110b07 100644 --- a/query/bigquery_runner.go +++ b/query/bigquery_runner.go @@ -62,12 +62,12 @@ func (b *bigQueryImpl) Query(query string, visit func(row map[string]bigquery.Va // BQRunner is a concerete implementation of QueryRunner for BigQuery. type BQRunner struct { - runner *bigQueryImpl + runner runner } // runner interface allows unit testing of the Query function. type runner interface { - Query(q string, visit func(row map[string]bigquery.Value) error) error + Query(q string, visit func(row map[string]bigquery.Value) error) (int64, error) } // NewBQRunner creates a new QueryRunner instance. diff --git a/query/bigquery_runner_test.go b/query/bigquery_runner_test.go index 9200cb5..9cbbc5c 100644 --- a/query/bigquery_runner_test.go +++ b/query/bigquery_runner_test.go @@ -103,19 +103,20 @@ func TestRowToMetric(t *testing.T) { type fakeQuery struct { err error rows []map[string]bigquery.Value + cost int64 } -func (f *fakeQuery) Query(q string, visit func(row map[string]bigquery.Value) error) error { +func (f *fakeQuery) Query(q string, visit func(row map[string]bigquery.Value) error) (int64, error) { if f.err != nil { - return f.err + return 0, f.err } for i := range f.rows { err := visit(f.rows[i]) if err != nil { - return err + return 0, err } } - return nil + return 0, nil } func TestBQRunner_Query(t *testing.T) { @@ -151,7 +152,7 @@ func TestBQRunner_Query(t *testing.T) { qr := &BQRunner{ runner: tt.runner, } - got, err := qr.Query("select * from `fake-table`") + got, _, err := qr.Query("select * from `fake-table`") if (err != nil) != tt.wantErr { t.Errorf("BQRunner.Query() error = %v, wantErr %v", err, tt.wantErr) return @@ -221,7 +222,7 @@ func TestBigQueryImpl_Query(t *testing.T) { b := &bigQueryImpl{ Client: client, } - if err := b.Query(tt.query, tt.visit); (err != nil) != tt.wantErr { + if _, err := b.Query(tt.query, tt.visit); (err != nil) != tt.wantErr { t.Errorf("bigQueryImpl.Query() error = %v, wantErr %v", err, tt.wantErr) } }) From 8d86c9d5ee546904be5d2c9e7da95a823e81aff7 Mon Sep 17 00:00:00 2001 From: Facundo Guerrero Date: Mon, 30 Oct 2023 15:31:54 +0100 Subject: [PATCH 05/11] Add Metrics for slot utilization and total bytes: --- main_test.go | 6 ++--- query/bigquery_runner.go | 41 +++++++++++++++++++++-------------- query/bigquery_runner_test.go | 11 +++++----- sql/collector.go | 17 ++++++++++----- sql/collector_test.go | 9 ++++---- sql/live_test.go | 5 ++++- 6 files changed, 55 insertions(+), 34 deletions(-) diff --git a/main_test.go b/main_test.go index 5a3bc35..fc21bd2 100644 --- a/main_test.go +++ b/main_test.go @@ -27,7 +27,7 @@ type fakeRunner struct { updated int } -func (f *fakeRunner) Query(query string) ([]sql.Metric, int64, error) { +func (f *fakeRunner) Query(query string) ([]sql.Metric, *bigquery.QueryStatistics, error) { r := []sql.Metric{ { LabelKeys: []string{"key"}, @@ -40,9 +40,9 @@ func (f *fakeRunner) Query(query string) ([]sql.Metric, int64, error) { f.updated++ if f.updated > 1 { // Simulate an error after one successful query. - return nil, 0, fmt.Errorf("Fake failure for testing") + return nil, nil, fmt.Errorf("Fake failure for testing") } - return r, 0, nil + return r, nil, nil } func Test_main(t *testing.T) { diff --git a/query/bigquery_runner.go b/query/bigquery_runner.go index 2110b07..02ec64b 100644 --- a/query/bigquery_runner.go +++ b/query/bigquery_runner.go @@ -19,25 +19,34 @@ type bigQueryImpl struct { bqiface.Client } -func (b *bigQueryImpl) Query(query string, visit func(row map[string]bigquery.Value) error) (int64, error) { +func (b *bigQueryImpl) Query(query string, visit func(row map[string]bigquery.Value) error) (*bigquery.QueryStatistics, error) { // Create a query job. q := b.Client.Query(query) job, err := q.Run(context.Background()) + if err != nil { - return 0, err + return nil, err + } + if job == nil { + return nil, nil } + if job.LastStatus().Statistics == nil { + return nil, nil + } // Wait for the job to complete. status, err := job.Wait(context.Background()) if err != nil { - return 0, status.Err() + return nil, status.Err() } // Get the query statistics to extract the cost. - queryStats := job.LastStatus().Statistics.Details.(*bigquery.QueryStatistics) - cost := int64(0) - if queryStats != nil { - cost = queryStats.TotalBytesBilled + queryStatistics := new(bigquery.QueryStatistics) + if job.LastStatus().Statistics.Details != nil { + queryStats := job.LastStatus().Statistics.Details.(*bigquery.QueryStatistics) + if queryStats != nil { + queryStatistics = queryStats + } } // Now, you can proceed with reading and processing the query results. @@ -45,19 +54,19 @@ func (b *bigQueryImpl) Query(query string, visit func(row map[string]bigquery.Va it, err := job.Read(context.Background()) if err != nil { - return cost, err + return queryStatistics, err } for err = it.Next(&row); err == nil; err = it.Next(&row) { err2 := visit(row) if err2 != nil { - return cost, err2 + return queryStatistics, err2 } } if err != iterator.Done { - return cost, err + return queryStatistics, err } - return cost, nil + return queryStatistics, nil } // BQRunner is a concerete implementation of QueryRunner for BigQuery. @@ -67,7 +76,7 @@ type BQRunner struct { // runner interface allows unit testing of the Query function. type runner interface { - Query(q string, visit func(row map[string]bigquery.Value) error) (int64, error) + Query(q string, visit func(row map[string]bigquery.Value) error) (*bigquery.QueryStatistics, error) } // NewBQRunner creates a new QueryRunner instance. @@ -82,16 +91,16 @@ func NewBQRunner(client *bigquery.Client) *BQRunner { // Query executes the given query. Query only supports standard SQL. The // query must define a column named "value" for the value, and may define // additional columns, all of which are used as metric labels. -func (qr *BQRunner) Query(query string) ([]sql.Metric, int64, error) { +func (qr *BQRunner) Query(query string) ([]sql.Metric, *bigquery.QueryStatistics, error) { metrics := []sql.Metric{} - cost, err := qr.runner.Query(query, func(row map[string]bigquery.Value) error { + queryStatistics, err := qr.runner.Query(query, func(row map[string]bigquery.Value) error { metrics = append(metrics, rowToMetric(row)) return nil }) if err != nil { - return nil, cost, err + return nil, queryStatistics, err } - return metrics, cost, nil + return metrics, queryStatistics, nil } // valToFloat extracts a float from the bigquery.Value irrespective of the diff --git a/query/bigquery_runner_test.go b/query/bigquery_runner_test.go index 9cbbc5c..966365b 100644 --- a/query/bigquery_runner_test.go +++ b/query/bigquery_runner_test.go @@ -101,12 +101,12 @@ func TestRowToMetric(t *testing.T) { } type fakeQuery struct { - err error - rows []map[string]bigquery.Value - cost int64 + err error + rows []map[string]bigquery.Value + queryStats *bigquery.QueryStatistics } -func (f *fakeQuery) Query(q string, visit func(row map[string]bigquery.Value) error) (int64, error) { +func (f *fakeQuery) Query(q string, visit func(row map[string]bigquery.Value) error) (*bigquery.QueryStatistics, error) { if f.err != nil { return 0, f.err } @@ -222,8 +222,9 @@ func TestBigQueryImpl_Query(t *testing.T) { b := &bigQueryImpl{ Client: client, } - if _, err := b.Query(tt.query, tt.visit); (err != nil) != tt.wantErr { + if cost, err := b.Query(tt.query, tt.visit); (err != nil) != tt.wantErr { t.Errorf("bigQueryImpl.Query() error = %v, wantErr %v", err, tt.wantErr) + fmt.Println(cost) } }) } diff --git a/sql/collector.go b/sql/collector.go index a9cc24d..e440ab2 100644 --- a/sql/collector.go +++ b/sql/collector.go @@ -6,8 +6,8 @@ import ( "sync" "time" + "cloud.google.com/go/bigquery" "github.com/m-lab/go/logx" - "github.com/prometheus/client_golang/prometheus" ) @@ -29,7 +29,7 @@ func NewMetric(labelKeys []string, labelValues []string, values map[string]float // QueryRunner defines the interface used to run a query and return an array of metrics. type QueryRunner interface { - Query(q string) ([]Metric, int64, error) + Query(q string) ([]Metric, *bigquery.QueryStatistics, error) } // Collector manages a prometheus.Collector for queries performed by a QueryRunner. @@ -52,7 +52,8 @@ type Collector struct { // mux locks access to types above. mux sync.Mutex // Total billed bytes by BigQuery - cost int64 + cost int64 + slotMillis int64 // RegisterErr contains any error during registration. This should be considered fatal. RegisterErr error } @@ -67,6 +68,7 @@ func NewCollector(runner QueryRunner, valType prometheus.ValueType, metricName, descs: nil, metrics: nil, cost: 0, + slotMillis: 0, mux: sync.Mutex{}, } } @@ -108,6 +110,8 @@ func (col *Collector) Collect(ch chan<- prometheus.Metric) { desc, col.valType, metrics[i].Values[k], metrics[i].LabelValues...) } } + desc2 := prometheus.NewDesc("slot_ms_utilized", "slot milliseconds utilized", []string{"file_name"}, nil) + ch <- prometheus.MustNewConstMetric(desc2, prometheus.GaugeValue, float64(col.slotMillis), col.metricName) desc := prometheus.NewDesc("total_bytes_billed", "Total billed bytes", []string{"file_name"}, nil) ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(col.cost), col.metricName) } @@ -121,7 +125,7 @@ func (col *Collector) String() string { // Update is called automaticlly after the collector is registered. func (col *Collector) Update() error { logx.Debug.Println("Update:", col.metricName) - metrics, cost, err := col.runner.Query(col.query) + metrics, queryStatistics, err := col.runner.Query(col.query) if err != nil { logx.Debug.Println("Failed to run query:", err) return err @@ -132,7 +136,10 @@ func (col *Collector) Update() error { // Replace slice reference with new value returned from Query. References // to the previous value of col.metrics are not affected. col.metrics = metrics - col.cost = cost + if queryStatistics != nil { + col.cost = queryStatistics.TotalBytesBilled + col.slotMillis = queryStatistics.SlotMillis + } return nil } diff --git a/sql/collector_test.go b/sql/collector_test.go index 34c61d7..85f060a 100644 --- a/sql/collector_test.go +++ b/sql/collector_test.go @@ -9,6 +9,7 @@ import ( "strings" "testing" + "cloud.google.com/go/bigquery" "github.com/m-lab/go/prometheusx" "github.com/m-lab/go/prometheusx/promtest" "github.com/prometheus/client_golang/prometheus" @@ -18,17 +19,17 @@ type fakeQueryRunner struct { metrics []Metric } -func (qr *fakeQueryRunner) Query(query string) ([]Metric, error) { - return qr.metrics, nil +func (qr *fakeQueryRunner) Query(query string) ([]Metric, *bigquery.QueryStatistics, error) { + return qr.metrics, nil, nil } type errorQueryRunner struct { count int } -func (qr *errorQueryRunner) Query(query string) ([]Metric, error) { +func (qr *errorQueryRunner) Query(query string) ([]Metric, *bigquery.QueryStatistics, error) { qr.count++ - return nil, fmt.Errorf("Fake query error") + return nil, nil, fmt.Errorf("Fake query error") } func TestCollector(t *testing.T) { diff --git a/sql/live_test.go b/sql/live_test.go index 19061be..57479ce 100644 --- a/sql/live_test.go +++ b/sql/live_test.go @@ -67,10 +67,13 @@ func TestLiveQuery(t *testing.T) { } for _, test := range tests { t.Logf("Live query test: %s", test.name) - metrics, err := qr.Query(test.query) + metrics, queryStatistics, err := qr.Query(test.query) if err != nil { t.Fatal(err) } + if queryStatistics == nil { + t.Error("QueryStatistics is nil") + } if !reflect.DeepEqual(metrics, test.metrics) { t.Errorf("Metrics do not match:\nwant %#v;\n got %#v", test.metrics, metrics) } From cd06fd8e782a5f8a1ae58f4a5c21759d094f674f Mon Sep 17 00:00:00 2001 From: Facundo Guerrero Date: Tue, 31 Oct 2023 15:29:01 +0100 Subject: [PATCH 06/11] Remove travis CI --- .travis.yml | 14 -------------- 1 file changed, 14 deletions(-) delete mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 829efc9..0000000 --- a/.travis.yml +++ /dev/null @@ -1,14 +0,0 @@ -language: go - -go: -- 1.20 - -install: -- go get -v -t ./... -- go install github.com/mattn/goveralls@latest - -script: -# Run query "unit tests". -- make -- go test -short -v ./... -cover=1 -coverprofile=_c.cov -- $GOPATH/bin/goveralls -service=travis-ci -coverprofile=_c.cov From 9da078738f3bed7f10450a84537a25a3de15b07a Mon Sep 17 00:00:00 2001 From: Facundo Guerrero Date: Wed, 8 Nov 2023 10:48:07 +0100 Subject: [PATCH 07/11] fix test, change metric name and unit to pass the current test --- go.mod | 2 +- go.sum | 2 ++ internal/setup/setup_test.go | 9 ++++---- query/bigquery_runner.go | 23 ++++++++++---------- query/bigquery_runner_test.go | 33 ++++++++++++++--------------- sql/collector.go | 40 +++++++++++++++++++++-------------- sql/collector_test.go | 14 ++++++------ 7 files changed, 68 insertions(+), 55 deletions(-) diff --git a/go.mod b/go.mod index d7690b5..e84e614 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.20 require ( cloud.google.com/go/bigquery v1.45.0 github.com/googleapis/google-cloud-go-testing v0.0.0-20191008195207-8e1d251e947d - github.com/m-lab/go v0.1.66 + github.com/m-lab/go v0.1.67 github.com/prometheus/client_golang v1.7.1 github.com/spf13/afero v1.2.2 golang.org/x/net v0.0.0-20221014081412-f15817d10f9b diff --git a/go.sum b/go.sum index 7cb7eae..8207e09 100644 --- a/go.sum +++ b/go.sum @@ -112,6 +112,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/m-lab/go v0.1.66 h1:adDJILqKBCkd5YeVhCrrjWkjoNRtDzlDr6uizWu5/pE= github.com/m-lab/go v0.1.66/go.mod h1:O1D/EoVarJ8lZt9foANcqcKtwxHatBzUxXFFyC87aQQ= +github.com/m-lab/go v0.1.67 h1:jT9tLviED+w2GP6tp0qlB3r8c/yXGtjQW885OaSXn6I= +github.com/m-lab/go v0.1.67/go.mod h1:BirARfHWjjXHaCGNyWCm/CKW1OarjuEj8Yn6Z2rc0M4= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= diff --git a/internal/setup/setup_test.go b/internal/setup/setup_test.go index a6ba6ec..fe94617 100644 --- a/internal/setup/setup_test.go +++ b/internal/setup/setup_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "cloud.google.com/go/bigquery" "github.com/m-lab/go/rtx" "github.com/m-lab/prometheus-bigquery-exporter/sql" "github.com/prometheus/client_golang/prometheus" @@ -69,8 +70,8 @@ func TestFile_IsModified(t *testing.T) { type fakeRunner struct{} -func (f *fakeRunner) Query(query string) ([]sql.Metric, error) { - return nil, fmt.Errorf("Fake failure") +func (f *fakeRunner) Query(query string) ([]sql.Metric, *bigquery.QueryStatistics, error) { + return nil, nil, fmt.Errorf("Fake failure") } func TestFile_Update(t *testing.T) { @@ -106,8 +107,8 @@ type fakeRegister struct { metric sql.Metric } -func (f *fakeRegister) Query(query string) ([]sql.Metric, error) { - return []sql.Metric{f.metric}, nil +func (f *fakeRegister) Query(query string) ([]sql.Metric, *bigquery.QueryStatistics, error) { + return []sql.Metric{f.metric}, nil, nil } func TestFile_Register(t *testing.T) { diff --git a/query/bigquery_runner.go b/query/bigquery_runner.go index 02ec64b..7f06bb6 100644 --- a/query/bigquery_runner.go +++ b/query/bigquery_runner.go @@ -5,6 +5,7 @@ package query import ( "context" + "fmt" "math" "sort" "strings" @@ -27,35 +28,35 @@ func (b *bigQueryImpl) Query(query string, visit func(row map[string]bigquery.Va if err != nil { return nil, err } + if job == nil { return nil, nil } - if job.LastStatus().Statistics == nil { - return nil, nil - } // Wait for the job to complete. status, err := job.Wait(context.Background()) if err != nil { return nil, status.Err() } + // Now, you can proceed with reading and processing the query results. + var row map[string]bigquery.Value + it, err := job.Read(context.Background()) + + if err != nil { + return nil, err + } // Get the query statistics to extract the cost. queryStatistics := new(bigquery.QueryStatistics) - if job.LastStatus().Statistics.Details != nil { + // Assuming that the LastStatus method returns a string + if job.LastStatus() != nil { + fmt.Println(job.LastStatus()) queryStats := job.LastStatus().Statistics.Details.(*bigquery.QueryStatistics) if queryStats != nil { queryStatistics = queryStats } } - // Now, you can proceed with reading and processing the query results. - var row map[string]bigquery.Value - - it, err := job.Read(context.Background()) - if err != nil { - return queryStatistics, err - } for err = it.Next(&row); err == nil; err = it.Next(&row) { err2 := visit(row) if err2 != nil { diff --git a/query/bigquery_runner_test.go b/query/bigquery_runner_test.go index 966365b..70bbb1e 100644 --- a/query/bigquery_runner_test.go +++ b/query/bigquery_runner_test.go @@ -9,7 +9,7 @@ import ( "cloud.google.com/go/bigquery" "github.com/m-lab/prometheus-bigquery-exporter/sql" - "github.com/m-lab/go/cloud/bqfake" + "github.com/m-lab/go/cloudtest/bqfake" ) func TestRowToMetric(t *testing.T) { @@ -101,22 +101,21 @@ func TestRowToMetric(t *testing.T) { } type fakeQuery struct { - err error - rows []map[string]bigquery.Value - queryStats *bigquery.QueryStatistics + rows []map[string]bigquery.Value + err error } func (f *fakeQuery) Query(q string, visit func(row map[string]bigquery.Value) error) (*bigquery.QueryStatistics, error) { if f.err != nil { - return 0, f.err + return nil, f.err } for i := range f.rows { err := visit(f.rows[i]) if err != nil { - return 0, err + return nil, err } } - return 0, nil + return nil, nil } func TestBQRunner_Query(t *testing.T) { @@ -171,15 +170,15 @@ func TestNewBQRunner(t *testing.T) { func TestBigQueryImpl_Query(t *testing.T) { tests := []struct { name string - config bqfake.QueryConfig[map[string]bigquery.Value] + config bqfake.QueryConfig query string visit func(row map[string]bigquery.Value) error wantErr bool }{ { name: "success-iteration", - config: bqfake.QueryConfig[map[string]bigquery.Value]{ - RowIteratorConfig: bqfake.RowIteratorConfig[map[string]bigquery.Value]{ + config: bqfake.QueryConfig{ + RowIteratorConfig: bqfake.RowIteratorConfig{ Rows: []map[string]bigquery.Value{{"value": 1.234}}, }, }, @@ -189,8 +188,8 @@ func TestBigQueryImpl_Query(t *testing.T) { }, { name: "visit-error", - config: bqfake.QueryConfig[map[string]bigquery.Value]{ - RowIteratorConfig: bqfake.RowIteratorConfig[map[string]bigquery.Value]{ + config: bqfake.QueryConfig{ + RowIteratorConfig: bqfake.RowIteratorConfig{ Rows: []map[string]bigquery.Value{{"value": 1.234}}, }, }, @@ -201,15 +200,15 @@ func TestBigQueryImpl_Query(t *testing.T) { }, { name: "read-error", - config: bqfake.QueryConfig[map[string]bigquery.Value]{ + config: bqfake.QueryConfig{ ReadErr: fmt.Errorf("This is a fake read error"), }, wantErr: true, }, { name: "iterator-error", - config: bqfake.QueryConfig[map[string]bigquery.Value]{ - RowIteratorConfig: bqfake.RowIteratorConfig[map[string]bigquery.Value]{ + config: bqfake.QueryConfig{ + RowIteratorConfig: bqfake.RowIteratorConfig{ IterErr: fmt.Errorf("This is a fake iterator error"), }, }, @@ -222,10 +221,10 @@ func TestBigQueryImpl_Query(t *testing.T) { b := &bigQueryImpl{ Client: client, } - if cost, err := b.Query(tt.query, tt.visit); (err != nil) != tt.wantErr { + if _, err := b.Query(tt.query, tt.visit); (err != nil) != tt.wantErr { t.Errorf("bigQueryImpl.Query() error = %v, wantErr %v", err, tt.wantErr) - fmt.Println(cost) } + }) } } diff --git a/sql/collector.go b/sql/collector.go index e440ab2..b3617aa 100644 --- a/sql/collector.go +++ b/sql/collector.go @@ -52,8 +52,8 @@ type Collector struct { // mux locks access to types above. mux sync.Mutex // Total billed bytes by BigQuery - cost int64 - slotMillis int64 + totalBytesBilled int64 + slotMillis int64 // RegisterErr contains any error during registration. This should be considered fatal. RegisterErr error } @@ -61,15 +61,15 @@ type Collector struct { // NewCollector creates a new BigQuery Collector instance. func NewCollector(runner QueryRunner, valType prometheus.ValueType, metricName, query string) *Collector { return &Collector{ - runner: runner, - metricName: metricName, - query: query, - valType: valType, - descs: nil, - metrics: nil, - cost: 0, - slotMillis: 0, - mux: sync.Mutex{}, + runner: runner, + metricName: metricName, + query: query, + valType: valType, + descs: nil, + metrics: nil, + totalBytesBilled: 0, + slotMillis: 0, + mux: sync.Mutex{}, } } @@ -110,10 +110,8 @@ func (col *Collector) Collect(ch chan<- prometheus.Metric) { desc, col.valType, metrics[i].Values[k], metrics[i].LabelValues...) } } - desc2 := prometheus.NewDesc("slot_ms_utilized", "slot milliseconds utilized", []string{"file_name"}, nil) - ch <- prometheus.MustNewConstMetric(desc2, prometheus.GaugeValue, float64(col.slotMillis), col.metricName) - desc := prometheus.NewDesc("total_bytes_billed", "Total billed bytes", []string{"file_name"}, nil) - ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(col.cost), col.metricName) + setSlotMillis(ch, col.slotMillis, col.metricName) + setTotalBytesBilled(ch, col.totalBytesBilled, col.metricName) } // String satisfies the Stringer interface. String returns the metric name. @@ -137,7 +135,7 @@ func (col *Collector) Update() error { // to the previous value of col.metrics are not affected. col.metrics = metrics if queryStatistics != nil { - col.cost = queryStatistics.TotalBytesBilled + col.totalBytesBilled = queryStatistics.TotalBytesBilled col.slotMillis = queryStatistics.SlotMillis } return nil @@ -152,3 +150,13 @@ func (col *Collector) setDesc() { } } } + +func setSlotMillis(ch chan<- prometheus.Metric, slotMillis int64, metricName string) { + desc := prometheus.NewDesc("bqx_slot_seconds_utilized", "slot milliseconds utilized", []string{"file_name"}, nil) + ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(slotMillis)/1000, metricName) +} + +func setTotalBytesBilled(ch chan<- prometheus.Metric, totalBytesBilled int64, metricName string) { + desc := prometheus.NewDesc("bqx_total_bytes_billed", "Total billed bytes", []string{"file_name"}, nil) + ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(totalBytesBilled), metricName) +} diff --git a/sql/collector_test.go b/sql/collector_test.go index 85f060a..42dfaee 100644 --- a/sql/collector_test.go +++ b/sql/collector_test.go @@ -2,7 +2,7 @@ package sql import ( "fmt" - "io/ioutil" + "io" "log" "net/http" "reflect" @@ -48,6 +48,8 @@ func TestCollector(t *testing.T) { expectedMetrics := []string{ `fake_metric{key="thing"} 1.1`, `fake_metric{key="thing2"} 2.1`, + `bqx_slot_seconds_utilized{file_name="fake_metric"} 0`, + `bqx_total_bytes_billed{file_name="fake_metric"} 0`, } c := NewCollector( &fakeQueryRunner{metrics}, prometheus.GaugeValue, "fake_metric", "-- not used") @@ -55,8 +57,8 @@ func TestCollector(t *testing.T) { // NOTE: prometheus.Desc and prometheus.Metric are opaque interfaces that do // not allow introspection. But, we know how many to expect, so check the // counts added to the channels. - chDesc := make(chan *prometheus.Desc, 2) - chCol := make(chan prometheus.Metric, 2) + chDesc := make(chan *prometheus.Desc, 4) + chCol := make(chan prometheus.Metric, 4) c.Describe(chDesc) c.Collect(chCol) @@ -67,8 +69,8 @@ func TestCollector(t *testing.T) { if len(chDesc) != 1 { t.Fatalf("want 1 prometheus.Desc, got %d\n", len(chDesc)) } - if len(chCol) != 2 { - t.Fatalf("want 2 prometheus.Metric, got %d\n", len(chCol)) + if len(chCol) != 4 { + t.Fatalf("want 4 prometheus.Metric, got %d\n", len(chCol)) } // Normally, we use the default registry via prometheus.Register. Using a @@ -88,7 +90,7 @@ func TestCollector(t *testing.T) { if err != nil { log.Fatal(err) } - rawMetrics, err := ioutil.ReadAll(res.Body) + rawMetrics, err := io.ReadAll(res.Body) res.Body.Close() if err != nil { log.Fatal(err) From 6f68c89ca8de3bbb22683f8da997ca502fdcac5c Mon Sep 17 00:00:00 2001 From: Facundo Guerrero Date: Mon, 6 Nov 2023 16:59:04 +0100 Subject: [PATCH 08/11] Add metrics and keep app alive (#50) * - Add metrics for sucessful and failed queries - Add keepAlive flag to not kill the app when the query fail * - Add metric names prefix - Remove duplicated matric calls - Add extra label to update duration metic --- main.go | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/main.go b/main.go index ce2ef3e..1897305 100644 --- a/main.go +++ b/main.go @@ -30,26 +30,25 @@ import ( ) var ( - counterSources = flagx.StringArray{} - gaugeSources = flagx.StringArray{} - project = flag.String("project", "", "GCP project name.") - refresh = flag.Duration("refresh", 5*time.Minute, "Interval between updating metrics.") - keepAlive = flag.Bool("keepAlive", false, "Keep the process alive even if query fails to execute.") + gaugeSources = flagx.StringArray{} + project = flag.String("project", "", "GCP project name.") + refresh = flag.Duration("refresh", 5*time.Minute, "Interval between updating metrics.") + keepAlive = flag.Bool("keepAlive", false, "Keep the process alive even if query fails to execute.") successFilesCounter = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "success_files_executed_total", + Name: "bqx_success_files_executed_total", Help: "The total number of successfully executed files", }, []string{"filename"}) failedFilesCounter = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "failed_files_executed_total", + Name: "bqx_failed_files_executed_total", Help: "The total number of failed executed files", }, []string{"filename"}) updateDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Name: "query_runtime_duration_seconds", + Name: "bqx_query_runtime_duration_seconds", Help: "Duration taken for updating files", Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 30, 60, 120, 300, 600}, - }, []string{"filename"}) + }, []string{"filename", "status"}) ) func init() { @@ -92,6 +91,7 @@ func reloadRegisterUpdate(client *bigquery.Client, files []setup.File, vars map[ wg.Add(1) go func(f *setup.File) { modified, err := f.IsModified() + start := time.Now() if modified && err == nil { c := sql.NewCollector( newRunner(client), prometheus.GaugeValue, @@ -102,25 +102,21 @@ func reloadRegisterUpdate(client *bigquery.Client, files []setup.File, vars map[ // uses the same name but changes the metrics reported. Because // this cannot be recovered, we use rtx.Must to exit and allow // the runtime environment to restart. + err = f.Register(c) if !keepAlive { rtx.Must(f.Register(c), "Failed to register collector: aborting") - } else { - err = f.Register(c) - } - if err == nil { - successFilesCounter.WithLabelValues(fileToMetric(f.Name)).Inc() } } else { - start := time.Now() err = f.Update() log.Println("Updating:", fileToMetric(f.Name), time.Since(start)) - updateDuration.WithLabelValues(fileToMetric(f.Name)).Observe(time.Since(start).Seconds()) } if err != nil { - log.Println("Error:", f.Name, err) failedFilesCounter.WithLabelValues(fileToMetric(f.Name)).Inc() + updateDuration.WithLabelValues(fileToMetric(f.Name), "failed").Observe(time.Since(start).Seconds()) + log.Println("Error:", f.Name, err) } else { successFilesCounter.WithLabelValues(fileToMetric(f.Name)).Inc() + updateDuration.WithLabelValues(fileToMetric(f.Name), "success").Observe(time.Since(start).Seconds()) } wg.Done() }(&files[i]) From b6547f06f7cb5fc0c48064f822b2e267f01150a7 Mon Sep 17 00:00:00 2001 From: Facundo Guerrero Date: Wed, 8 Nov 2023 10:51:32 +0100 Subject: [PATCH 09/11] rebase main --- main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 1897305..0c896c7 100644 --- a/main.go +++ b/main.go @@ -8,8 +8,8 @@ package main import ( "flag" "fmt" - "io/ioutil" "log" + "os" "path/filepath" "strings" "sync" @@ -76,7 +76,7 @@ func fileToMetric(filename string) string { // fileToQuery reads the content of the given file and returns the query with template values repalced with those in vars. func fileToQuery(filename string, vars map[string]string) string { - queryBytes, err := ioutil.ReadFile(filename) + queryBytes, err := os.ReadFile(filename) rtx.Must(err, "Failed to open %q", filename) q := string(queryBytes) From 69fadbddbf8f5422b9cd2f780dff0e0a8eea5379 Mon Sep 17 00:00:00 2001 From: Facundo Guerrero Date: Wed, 8 Nov 2023 11:11:11 +0100 Subject: [PATCH 10/11] Set same labels name for metrics --- query/bigquery_runner.go | 2 -- sql/collector.go | 4 ++-- sql/collector_test.go | 4 ++-- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/query/bigquery_runner.go b/query/bigquery_runner.go index 7f06bb6..3b052c6 100644 --- a/query/bigquery_runner.go +++ b/query/bigquery_runner.go @@ -5,7 +5,6 @@ package query import ( "context" - "fmt" "math" "sort" "strings" @@ -50,7 +49,6 @@ func (b *bigQueryImpl) Query(query string, visit func(row map[string]bigquery.Va queryStatistics := new(bigquery.QueryStatistics) // Assuming that the LastStatus method returns a string if job.LastStatus() != nil { - fmt.Println(job.LastStatus()) queryStats := job.LastStatus().Statistics.Details.(*bigquery.QueryStatistics) if queryStats != nil { queryStatistics = queryStats diff --git a/sql/collector.go b/sql/collector.go index b3617aa..a164e34 100644 --- a/sql/collector.go +++ b/sql/collector.go @@ -152,11 +152,11 @@ func (col *Collector) setDesc() { } func setSlotMillis(ch chan<- prometheus.Metric, slotMillis int64, metricName string) { - desc := prometheus.NewDesc("bqx_slot_seconds_utilized", "slot milliseconds utilized", []string{"file_name"}, nil) + desc := prometheus.NewDesc("bqx_slot_seconds_utilized", "slot milliseconds utilized", []string{"filename"}, nil) ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(slotMillis)/1000, metricName) } func setTotalBytesBilled(ch chan<- prometheus.Metric, totalBytesBilled int64, metricName string) { - desc := prometheus.NewDesc("bqx_total_bytes_billed", "Total billed bytes", []string{"file_name"}, nil) + desc := prometheus.NewDesc("bqx_total_bytes_billed", "Total billed bytes", []string{"filename"}, nil) ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(totalBytesBilled), metricName) } diff --git a/sql/collector_test.go b/sql/collector_test.go index 42dfaee..0742344 100644 --- a/sql/collector_test.go +++ b/sql/collector_test.go @@ -48,8 +48,8 @@ func TestCollector(t *testing.T) { expectedMetrics := []string{ `fake_metric{key="thing"} 1.1`, `fake_metric{key="thing2"} 2.1`, - `bqx_slot_seconds_utilized{file_name="fake_metric"} 0`, - `bqx_total_bytes_billed{file_name="fake_metric"} 0`, + `bqx_slot_seconds_utilized{filename="fake_metric"} 0`, + `bqx_total_bytes_billed{filename="fake_metric"} 0`, } c := NewCollector( &fakeQueryRunner{metrics}, prometheus.GaugeValue, "fake_metric", "-- not used") From 33dab1e9fb7b410c727c1c63fc4cf9b8f24bbd68 Mon Sep 17 00:00:00 2001 From: Facundo Guerrero Date: Wed, 8 Nov 2023 11:38:29 +0100 Subject: [PATCH 11/11] restore deleted file --- .travis.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..829efc9 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,14 @@ +language: go + +go: +- 1.20 + +install: +- go get -v -t ./... +- go install github.com/mattn/goveralls@latest + +script: +# Run query "unit tests". +- make +- go test -short -v ./... -cover=1 -coverprofile=_c.cov +- $GOPATH/bin/goveralls -service=travis-ci -coverprofile=_c.cov