Skip to content
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
cloud.google.com/go/bigquery v1.45.0
github.com/googleapis/google-cloud-go-testing v0.0.0-20191008195207-8e1d251e947d
github.com/m-lab/go v0.1.66
github.com/m-lab/go v0.1.67
github.com/prometheus/client_golang v1.11.1
github.com/spf13/afero v1.2.2
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/m-lab/go v0.1.66 h1:adDJILqKBCkd5YeVhCrrjWkjoNRtDzlDr6uizWu5/pE=
github.com/m-lab/go v0.1.66/go.mod h1:O1D/EoVarJ8lZt9foANcqcKtwxHatBzUxXFFyC87aQQ=
github.com/m-lab/go v0.1.67 h1:jT9tLviED+w2GP6tp0qlB3r8c/yXGtjQW885OaSXn6I=
github.com/m-lab/go v0.1.67/go.mod h1:BirARfHWjjXHaCGNyWCm/CKW1OarjuEj8Yn6Z2rc0M4=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down
9 changes: 5 additions & 4 deletions internal/setup/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"cloud.google.com/go/bigquery"
"github.com/m-lab/go/rtx"
"github.com/m-lab/prometheus-bigquery-exporter/sql"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -69,8 +70,8 @@ func TestFile_IsModified(t *testing.T) {

type fakeRunner struct{}

func (f *fakeRunner) Query(query string) ([]sql.Metric, error) {
return nil, fmt.Errorf("Fake failure")
func (f *fakeRunner) Query(query string) ([]sql.Metric, *bigquery.QueryStatistics, error) {
return nil, nil, fmt.Errorf("Fake failure")
}

func TestFile_Update(t *testing.T) {
Expand Down Expand Up @@ -106,8 +107,8 @@ type fakeRegister struct {
metric sql.Metric
}

func (f *fakeRegister) Query(query string) ([]sql.Metric, error) {
return []sql.Metric{f.metric}, nil
func (f *fakeRegister) Query(query string) ([]sql.Metric, *bigquery.QueryStatistics, error) {
return []sql.Metric{f.metric}, nil, nil
}

func TestFile_Register(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ package main
import (
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"sync"
Expand Down Expand Up @@ -76,7 +76,7 @@ func fileToMetric(filename string) string {

// fileToQuery reads the content of the given file and returns the query with template values repalced with those in vars.
func fileToQuery(filename string, vars map[string]string) string {
queryBytes, err := ioutil.ReadFile(filename)
queryBytes, err := os.ReadFile(filename)
rtx.Must(err, "Failed to open %q", filename)

q := string(queryBytes)
Expand Down
6 changes: 3 additions & 3 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type fakeRunner struct {
updated int
}

func (f *fakeRunner) Query(query string) ([]sql.Metric, error) {
func (f *fakeRunner) Query(query string) ([]sql.Metric, *bigquery.QueryStatistics, error) {
r := []sql.Metric{
{
LabelKeys: []string{"key"},
Expand All @@ -40,9 +40,9 @@ func (f *fakeRunner) Query(query string) ([]sql.Metric, error) {
f.updated++
if f.updated > 1 {
// Simulate an error after one successful query.
return nil, fmt.Errorf("Fake failure for testing")
return nil, nil, fmt.Errorf("Fake failure for testing")
}
return r, nil
return r, nil, nil
}

func Test_main(t *testing.T) {
Expand Down
52 changes: 41 additions & 11 deletions query/bigquery_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,53 @@ type bigQueryImpl struct {
bqiface.Client
}

func (b *bigQueryImpl) Query(query string, visit func(row map[string]bigquery.Value) error) error {
func (b *bigQueryImpl) Query(query string, visit func(row map[string]bigquery.Value) error) (*bigquery.QueryStatistics, error) {
// Create a query job.
q := b.Client.Query(query)
it, err := q.Read(context.Background())
job, err := q.Run(context.Background())

if err != nil {
return err
return nil, err
}

if job == nil {
return nil, nil
}

// Wait for the job to complete.
status, err := job.Wait(context.Background())
if err != nil {
return nil, status.Err()
}
// Now, you can proceed with reading and processing the query results.
var row map[string]bigquery.Value

it, err := job.Read(context.Background())

if err != nil {
return nil, err
}
// Get the query statistics to extract the cost.
queryStatistics := new(bigquery.QueryStatistics)
// Assuming that the LastStatus method returns a string
if job.LastStatus() != nil {
queryStats := job.LastStatus().Statistics.Details.(*bigquery.QueryStatistics)
if queryStats != nil {
queryStatistics = queryStats
}
}

for err = it.Next(&row); err == nil; err = it.Next(&row) {
err2 := visit(row)
if err2 != nil {
return err2
return queryStatistics, err2
}
}

if err != iterator.Done {
return err
return queryStatistics, err
}
return nil
return queryStatistics, nil
}

// BQRunner is a concerete implementation of QueryRunner for BigQuery.
Expand All @@ -45,7 +75,7 @@ type BQRunner struct {

// runner interface allows unit testing of the Query function.
type runner interface {
Query(q string, visit func(row map[string]bigquery.Value) error) error
Query(q string, visit func(row map[string]bigquery.Value) error) (*bigquery.QueryStatistics, error)
}

// NewBQRunner creates a new QueryRunner instance.
Expand All @@ -60,16 +90,16 @@ func NewBQRunner(client *bigquery.Client) *BQRunner {
// Query executes the given query. Query only supports standard SQL. The
// query must define a column named "value" for the value, and may define
// additional columns, all of which are used as metric labels.
func (qr *BQRunner) Query(query string) ([]sql.Metric, error) {
func (qr *BQRunner) Query(query string) ([]sql.Metric, *bigquery.QueryStatistics, error) {
metrics := []sql.Metric{}
err := qr.runner.Query(query, func(row map[string]bigquery.Value) error {
queryStatistics, err := qr.runner.Query(query, func(row map[string]bigquery.Value) error {
metrics = append(metrics, rowToMetric(row))
return nil
})
if err != nil {
return nil, err
return nil, queryStatistics, err
}
return metrics, nil
return metrics, queryStatistics, nil
}

// valToFloat extracts a float from the bigquery.Value irrespective of the
Expand Down
33 changes: 17 additions & 16 deletions query/bigquery_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"cloud.google.com/go/bigquery"
"github.com/m-lab/prometheus-bigquery-exporter/sql"

"github.com/m-lab/go/cloud/bqfake"
"github.com/m-lab/go/cloudtest/bqfake"
)

func TestRowToMetric(t *testing.T) {
Expand Down Expand Up @@ -101,21 +101,21 @@ func TestRowToMetric(t *testing.T) {
}

type fakeQuery struct {
err error
rows []map[string]bigquery.Value
err error
}

func (f *fakeQuery) Query(q string, visit func(row map[string]bigquery.Value) error) error {
func (f *fakeQuery) Query(q string, visit func(row map[string]bigquery.Value) error) (*bigquery.QueryStatistics, error) {
if f.err != nil {
return f.err
return nil, f.err
}
for i := range f.rows {
err := visit(f.rows[i])
if err != nil {
return err
return nil, err
}
}
return nil
return nil, nil
}

func TestBQRunner_Query(t *testing.T) {
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestBQRunner_Query(t *testing.T) {
qr := &BQRunner{
runner: tt.runner,
}
got, err := qr.Query("select * from `fake-table`")
got, _, err := qr.Query("select * from `fake-table`")
if (err != nil) != tt.wantErr {
t.Errorf("BQRunner.Query() error = %v, wantErr %v", err, tt.wantErr)
return
Expand All @@ -170,15 +170,15 @@ func TestNewBQRunner(t *testing.T) {
func TestBigQueryImpl_Query(t *testing.T) {
tests := []struct {
name string
config bqfake.QueryConfig[map[string]bigquery.Value]
config bqfake.QueryConfig
query string
visit func(row map[string]bigquery.Value) error
wantErr bool
}{
{
name: "success-iteration",
config: bqfake.QueryConfig[map[string]bigquery.Value]{
RowIteratorConfig: bqfake.RowIteratorConfig[map[string]bigquery.Value]{
config: bqfake.QueryConfig{
RowIteratorConfig: bqfake.RowIteratorConfig{
Rows: []map[string]bigquery.Value{{"value": 1.234}},
},
},
Expand All @@ -188,8 +188,8 @@ func TestBigQueryImpl_Query(t *testing.T) {
},
{
name: "visit-error",
config: bqfake.QueryConfig[map[string]bigquery.Value]{
RowIteratorConfig: bqfake.RowIteratorConfig[map[string]bigquery.Value]{
config: bqfake.QueryConfig{
RowIteratorConfig: bqfake.RowIteratorConfig{
Rows: []map[string]bigquery.Value{{"value": 1.234}},
},
},
Expand All @@ -200,15 +200,15 @@ func TestBigQueryImpl_Query(t *testing.T) {
},
{
name: "read-error",
config: bqfake.QueryConfig[map[string]bigquery.Value]{
config: bqfake.QueryConfig{
ReadErr: fmt.Errorf("This is a fake read error"),
},
wantErr: true,
},
{
name: "iterator-error",
config: bqfake.QueryConfig[map[string]bigquery.Value]{
RowIteratorConfig: bqfake.RowIteratorConfig[map[string]bigquery.Value]{
config: bqfake.QueryConfig{
RowIteratorConfig: bqfake.RowIteratorConfig{
IterErr: fmt.Errorf("This is a fake iterator error"),
},
},
Expand All @@ -221,9 +221,10 @@ func TestBigQueryImpl_Query(t *testing.T) {
b := &bigQueryImpl{
Client: client,
}
if err := b.Query(tt.query, tt.visit); (err != nil) != tt.wantErr {
if _, err := b.Query(tt.query, tt.visit); (err != nil) != tt.wantErr {
t.Errorf("bigQueryImpl.Query() error = %v, wantErr %v", err, tt.wantErr)
}

})
}
}
42 changes: 31 additions & 11 deletions sql/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"sync"
"time"

"cloud.google.com/go/bigquery"
"github.com/m-lab/go/logx"

"github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -29,7 +29,7 @@ func NewMetric(labelKeys []string, labelValues []string, values map[string]float

// QueryRunner defines the interface used to run a query and return an array of metrics.
type QueryRunner interface {
Query(q string) ([]Metric, error)
Query(q string) ([]Metric, *bigquery.QueryStatistics, error)
}

// Collector manages a prometheus.Collector for queries performed by a QueryRunner.
Expand All @@ -51,21 +51,25 @@ type Collector struct {
metrics []Metric
// mux locks access to types above.
mux sync.Mutex

// Total billed bytes by BigQuery
totalBytesBilled int64
slotMillis int64
// RegisterErr contains any error during registration. This should be considered fatal.
RegisterErr error
}

// NewCollector creates a new BigQuery Collector instance.
func NewCollector(runner QueryRunner, valType prometheus.ValueType, metricName, query string) *Collector {
return &Collector{
runner: runner,
metricName: metricName,
query: query,
valType: valType,
descs: nil,
metrics: nil,
mux: sync.Mutex{},
runner: runner,
metricName: metricName,
query: query,
valType: valType,
descs: nil,
metrics: nil,
totalBytesBilled: 0,
slotMillis: 0,
mux: sync.Mutex{},
}
}

Expand Down Expand Up @@ -106,6 +110,8 @@ func (col *Collector) Collect(ch chan<- prometheus.Metric) {
desc, col.valType, metrics[i].Values[k], metrics[i].LabelValues...)
}
}
setSlotMillis(ch, col.slotMillis, col.metricName)
setTotalBytesBilled(ch, col.totalBytesBilled, col.metricName)
}

// String satisfies the Stringer interface. String returns the metric name.
Expand All @@ -117,7 +123,7 @@ func (col *Collector) String() string {
// Update is called automaticlly after the collector is registered.
func (col *Collector) Update() error {
logx.Debug.Println("Update:", col.metricName)
metrics, err := col.runner.Query(col.query)
metrics, queryStatistics, err := col.runner.Query(col.query)
if err != nil {
logx.Debug.Println("Failed to run query:", err)
return err
Expand All @@ -128,6 +134,10 @@ func (col *Collector) Update() error {
// Replace slice reference with new value returned from Query. References
// to the previous value of col.metrics are not affected.
col.metrics = metrics
if queryStatistics != nil {
col.totalBytesBilled = queryStatistics.TotalBytesBilled
col.slotMillis = queryStatistics.SlotMillis
}
return nil
}

Expand All @@ -140,3 +150,13 @@ func (col *Collector) setDesc() {
}
}
}

func setSlotMillis(ch chan<- prometheus.Metric, slotMillis int64, metricName string) {
desc := prometheus.NewDesc("bqx_slot_seconds_utilized", "slot milliseconds utilized", []string{"filename"}, nil)
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(slotMillis)/1000, metricName)
}

func setTotalBytesBilled(ch chan<- prometheus.Metric, totalBytesBilled int64, metricName string) {
desc := prometheus.NewDesc("bqx_total_bytes_billed", "Total billed bytes", []string{"filename"}, nil)
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, float64(totalBytesBilled), metricName)
}
Loading