Skip to content

Commit 6b2ad35

Browse files
Chief-Rishabravisuhag
authored andcommitted
feat: instrument bigquery extractor with OpenTelemetry
1 parent 7019f29 commit 6b2ad35

File tree

3 files changed

+206
-57
lines changed

3 files changed

+206
-57
lines changed

plugins/extractors/bigquery/auditlog/auditlog.go

Lines changed: 73 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,14 @@ import (
55
"fmt"
66
"time"
77

8+
"cloud.google.com/go/bigquery"
89
"cloud.google.com/go/logging/logadmin"
910
"github.com/pkg/errors"
11+
"github.com/raystack/meteor/plugins"
1012
"github.com/raystack/salt/log"
13+
"go.opentelemetry.io/otel"
14+
"go.opentelemetry.io/otel/attribute"
15+
"go.opentelemetry.io/otel/metric"
1116
"google.golang.org/api/iterator"
1217
"google.golang.org/api/option"
1318
auditpb "google.golang.org/genproto/googleapis/cloud/audit"
@@ -23,24 +28,38 @@ type Config struct {
2328
UsageProjectIDs []string
2429
}
2530

26-
const advancedFilterTemplate = `protoPayload.methodName="jobservice.jobcompleted" AND ` +
27-
`resource.type="bigquery_resource" AND NOT ` +
28-
`protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query:(INFORMATION_SCHEMA OR __TABLES__) AND ` +
29-
`timestamp >= "%s" AND timestamp < "%s" AND %s`
31+
const (
32+
advancedFilterTemplate = `protoPayload.methodName="jobservice.jobcompleted" AND ` +
33+
`resource.type="bigquery_resource" AND NOT ` +
34+
`protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.query.query:(INFORMATION_SCHEMA OR __TABLES__) AND ` +
35+
`timestamp >= "%s" AND timestamp < "%s" AND %s`
36+
37+
metricTableDurn = "meteor.bq.client.table.duration"
38+
)
3039

3140
type AuditLog struct {
3241
logger log.Logger
3342
client *logadmin.Client
3443
config Config
44+
45+
histogram metric.Int64Histogram
3546
}
3647

3748
func New(logger log.Logger) *AuditLog {
49+
h, err := otel.Meter("github.com/raystack/meteor/plugins/extractors/bigquery").
50+
Int64Histogram(metricTableDurn, metric.WithUnit("ms"))
51+
if err != nil {
52+
otel.Handle(err)
53+
}
54+
3855
return &AuditLog{
3956
logger: logger,
57+
58+
histogram: h,
4059
}
4160
}
4261

43-
func (l *AuditLog) Init(ctx context.Context, opts ...InitOption) (err error) {
62+
func (l *AuditLog) Init(ctx context.Context, opts ...InitOption) error {
4463
for _, opt := range opts {
4564
opt(l)
4665
}
@@ -50,71 +69,78 @@ func (l *AuditLog) Init(ctx context.Context, opts ...InitOption) (err error) {
5069
}
5170

5271
if l.client == nil {
72+
var err error
5373
l.client, err = l.createClient(ctx)
5474
if err != nil {
55-
err = errors.Wrap(err, "failed to create logadmin client")
56-
return
75+
return fmt.Errorf("create logadmin client: %w", err)
5776
}
5877
}
59-
return
78+
79+
return nil
6080
}
6181

62-
func (l *AuditLog) createClient(ctx context.Context) (client *logadmin.Client, err error) {
82+
func (l *AuditLog) createClient(ctx context.Context) (*logadmin.Client, error) {
6383
if l.config.ServiceAccountJSON == "" {
6484
l.logger.Info("credentials are not specified, creating logadmin using default credentials...")
65-
client, err = logadmin.NewClient(ctx, l.config.ProjectID)
66-
return
85+
return logadmin.NewClient(ctx, l.config.ProjectID)
6786
}
6887

69-
client, err = logadmin.NewClient(ctx, l.config.ProjectID, option.WithCredentialsJSON([]byte(l.config.ServiceAccountJSON)))
70-
if err != nil {
71-
err = errors.New("client is nil, failed initiating client")
72-
}
73-
return
88+
return logadmin.NewClient(ctx, l.config.ProjectID, option.WithCredentialsJSON([]byte(l.config.ServiceAccountJSON)))
7489
}
7590

76-
func (l *AuditLog) Collect(ctx context.Context, tableID string) (tableStats *TableStats, err error) {
91+
func (l *AuditLog) Collect(ctx context.Context, tbl *bigquery.Table) (stats *TableStats, err error) {
92+
defer func(start time.Time) {
93+
attrs := []attribute.KeyValue{
94+
attribute.String("bq.operation", "table.audit_logs"),
95+
attribute.String("bq.project_id", tbl.ProjectID),
96+
attribute.String("bq.dataset_id", tbl.DatasetID),
97+
}
98+
if err != nil {
99+
attrs = append(attrs, attribute.String("bq.error_code", plugins.BQErrReason(err)))
100+
}
101+
102+
l.histogram.Record(
103+
ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attrs...),
104+
)
105+
}(time.Now())
106+
77107
if l.client == nil {
78-
err = errors.New("auditlog client is nil")
79-
return
108+
return nil, errors.New("auditlog client is nil")
80109
}
81110

82-
tableStats = NewTableStats()
83-
84-
filter := l.buildFilter(tableID)
111+
filter := l.buildFilter(tbl.TableID)
85112
it := l.client.Entries(ctx,
86113
logadmin.ProjectIDs(l.config.UsageProjectIDs),
87114
logadmin.Filter(filter))
88115

89116
l.logger.Info("getting logs in these projects", "projects", l.config.UsageProjectIDs)
90117
l.logger.Info("getting logs with the filter", "filter", filter)
91118

119+
stats = NewTableStats()
92120
for {
93-
entry, errF := it.Next()
94-
if errF == iterator.Done {
121+
entry, err := it.Next()
122+
if errors.Is(err, iterator.Done) {
95123
break
96124
}
97-
if errF != nil {
98-
err = errors.Wrap(errF, "error iterating logEntries")
99-
break
125+
if err != nil {
126+
return nil, fmt.Errorf("error iterating logEntries: %w", err)
100127
}
101128

102-
logData, errF := parsePayload(entry.Payload)
103-
if errF != nil {
104-
l.logger.Warn("error parsing LogEntry payload", "err", errF)
129+
logData, err := parsePayload(entry.Payload)
130+
if err != nil {
131+
l.logger.Warn("error parsing LogEntry payload", "err", err)
105132
continue
106133
}
107134

108-
if errF := tableStats.Populate(logData); errF != nil {
135+
if errF := stats.Populate(logData); errF != nil {
109136
l.logger.Warn("error populating logdata", "err", errF)
110137
continue
111138
}
112139
}
113-
return
140+
return stats, nil
114141
}
115142

116143
func (l *AuditLog) buildFilter(tableID string) string {
117-
118144
timeNow := time.Now().UTC()
119145
dayDuration := time.Duration(24*l.config.UsagePeriodInDay) * time.Hour
120146
timeFrom := timeNow.Add(-1 * dayDuration)
@@ -125,23 +151,23 @@ func (l *AuditLog) buildFilter(tableID string) string {
125151
return fmt.Sprintf(advancedFilterTemplate, timeFromFormatted, timeNowFormatted, tableID)
126152
}
127153

128-
func parsePayload(payload interface{}) (ld *LogData, err error) {
129-
130-
ad := &loggingpb.AuditData{}
154+
func parsePayload(payload interface{}) (*LogData, error) {
131155
pl, ok := payload.(*auditpb.AuditLog)
132156
if !ok {
133-
err = errors.New("cannot parse payload to AuditLog")
134-
return
157+
return nil, errors.New("parse payload to AuditLog")
135158
}
136159

137-
if errPB := getAuditData(pl, ad); errPB != nil {
138-
err = errors.Wrap(errPB, "failed to get audit data from metadata")
139-
return
160+
var ad loggingpb.AuditData
161+
if err := getAuditData(pl, &ad); err != nil {
162+
return nil, fmt.Errorf("get audit data from metadata: %w", err)
140163
}
141164

142-
ld = &LogData{ad}
143-
err = ld.validateAuditData()
144-
return
165+
ld := &LogData{&ad}
166+
if err := ld.validateAuditData(); err != nil {
167+
return nil, err
168+
}
169+
170+
return ld, nil
145171
}
146172

147173
func getAuditData(pl *auditpb.AuditLog, ad *loggingpb.AuditData) error {
@@ -160,24 +186,23 @@ func getAuditData(pl *auditpb.AuditLog, ad *loggingpb.AuditData) error {
160186
func getAuditDataFromServiceData(pl *auditpb.AuditLog, ad *loggingpb.AuditData) error {
161187
//nolint:staticcheck
162188
if err := pl.GetServiceData().UnmarshalTo(ad); err != nil {
163-
return errors.Wrap(err, "failed to marshal service data to audit data")
189+
return fmt.Errorf("marshal service data to audit data: %w", err)
164190
}
165191
return nil
166192
}
167193

168194
func getAuditDataFromMetadata(pl *auditpb.AuditLog, ad *loggingpb.AuditData) error {
169-
170195
if pl.GetMetadata() == nil {
171196
return errors.New("metadata field is nil")
172197
}
173198

174199
mdJSON, err := pl.GetMetadata().MarshalJSON()
175200
if err != nil {
176-
return errors.Wrap(err, "cannot marshal payload metadata")
201+
return fmt.Errorf("marshal payload metadata: %w", err)
177202
}
178203

179204
if err := protojson.Unmarshal(mdJSON, ad); err != nil {
180-
return errors.Wrap(err, "cannot parse service data to Audit")
205+
return fmt.Errorf("parse service data to Audit: %w", err)
181206
}
182207

183208
return nil

0 commit comments

Comments
 (0)