Skip to content

Commit 4cff3fb

Browse files
Chief-Rishabravisuhag
authored andcommitted
feat: instrument bigtable extractor
1 parent d0aa5b1 commit 4cff3fb

File tree

2 files changed

+129
-0
lines changed

2 files changed

+129
-0
lines changed

plugins/extractors/bigtable/bigtable.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
110110
if err != nil {
111111
return err
112112
}
113+
114+
client = WithInstanceAdminClientMW(e.config.ProjectID)(client)
115+
113116
e.instanceNames, err = instanceInfoGetter(ctx, client)
114117
if err != nil {
115118
return err
@@ -143,6 +146,9 @@ func (e *Extractor) getTablesInfo(ctx context.Context, emit plugins.Emit) error
143146
if err != nil {
144147
return err
145148
}
149+
150+
adminClient = WithAdminClientMW(e.config.ProjectID, instance)(adminClient)
151+
146152
tables, _ := adminClient.Tables(ctx)
147153
var wg sync.WaitGroup
148154
for _, table := range tables {
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package bigtable
2+
3+
import (
4+
"context"
5+
"errors"
6+
"time"
7+
8+
"cloud.google.com/go/bigtable"
9+
"github.com/googleapis/gax-go/v2/apierror"
10+
"github.com/raystack/meteor/utils"
11+
"go.opentelemetry.io/otel"
12+
"go.opentelemetry.io/otel/attribute"
13+
"go.opentelemetry.io/otel/metric"
14+
)
15+
16+
type AdminClientMW struct {
17+
tableDuration metric.Int64Histogram
18+
tablesDuration metric.Int64Histogram
19+
next AdminClient
20+
attributes []attribute.KeyValue
21+
}
22+
23+
type InstanceAdminClientMW struct {
24+
instancesDuration metric.Int64Histogram
25+
next InstanceAdminClient
26+
attributes []attribute.KeyValue
27+
}
28+
29+
func WithAdminClientMW(projectID, instanceName string) func(AdminClient) AdminClient {
30+
meter := otel.Meter("github.com/raystack/meteor/plugins/extractors/bigtable")
31+
32+
tablesDuration, err := meter.Int64Histogram("meteor.bigtable.client.tables.duration", metric.WithUnit("ms"))
33+
handleOtelErr(err)
34+
35+
tableDuration, err := meter.Int64Histogram("meteor.bigtable.client.table.duration", metric.WithUnit("ms"))
36+
handleOtelErr(err)
37+
38+
return func(next AdminClient) AdminClient {
39+
return &AdminClientMW{
40+
tableDuration: tableDuration,
41+
tablesDuration: tablesDuration,
42+
next: next,
43+
attributes: []attribute.KeyValue{
44+
attribute.String("bt.project_id", projectID),
45+
attribute.String("bt.instance_name", instanceName),
46+
},
47+
}
48+
}
49+
}
50+
51+
func WithInstanceAdminClientMW(projectID string) func(InstanceAdminClient) InstanceAdminClient {
52+
instancesDuration, err := otel.Meter("github.com/raystack/meteor/plugins/extractors/bigtable").
53+
Int64Histogram("meteor.bigtable.client.instances.duration", metric.WithUnit("ms"))
54+
handleOtelErr(err)
55+
56+
return func(next InstanceAdminClient) InstanceAdminClient {
57+
return &InstanceAdminClientMW{
58+
instancesDuration: instancesDuration,
59+
next: next,
60+
attributes: []attribute.KeyValue{
61+
attribute.String("bt.project_id", projectID),
62+
},
63+
}
64+
}
65+
}
66+
67+
func (o *AdminClientMW) Tables(ctx context.Context) (res []string, err error) {
68+
defer func(start time.Time) {
69+
attrs := o.attributes
70+
if err != nil {
71+
attrs = append(attrs, attribute.String("bt.error_code", apiErrReason(err)))
72+
}
73+
o.tablesDuration.Record(ctx,
74+
time.Since(start).Milliseconds(),
75+
metric.WithAttributes(attrs...))
76+
}(time.Now())
77+
78+
return o.next.Tables(ctx)
79+
}
80+
81+
func (o *AdminClientMW) TableInfo(ctx context.Context, table string) (res *bigtable.TableInfo, err error) {
82+
defer func(start time.Time) {
83+
attrs := append(o.attributes, attribute.String("bt.table_name", table))
84+
if err != nil {
85+
attrs = append(attrs, attribute.String("bt.error_code", apiErrReason(err)))
86+
}
87+
o.tableDuration.Record(ctx,
88+
time.Since(start).Milliseconds(),
89+
metric.WithAttributes(attrs...))
90+
}(time.Now())
91+
return o.next.TableInfo(ctx, table)
92+
}
93+
94+
func (o *InstanceAdminClientMW) Instances(ctx context.Context) (res []*bigtable.InstanceInfo, err error) {
95+
defer func(start time.Time) {
96+
attrs := o.attributes
97+
if err != nil {
98+
attrs = append(o.attributes, attribute.String("bt.error_code", apiErrReason(err)))
99+
}
100+
101+
o.instancesDuration.Record(ctx,
102+
time.Since(start).Milliseconds(),
103+
metric.WithAttributes(attrs...))
104+
}(time.Now())
105+
106+
return o.next.Instances(ctx)
107+
}
108+
109+
func apiErrReason(err error) string {
110+
reason := utils.StatusCode(err).String()
111+
var apiErr *apierror.APIError
112+
if errors.As(err, &apiErr) {
113+
reason = apiErr.Reason()
114+
}
115+
116+
return reason
117+
}
118+
119+
func handleOtelErr(err error) {
120+
if err != nil {
121+
otel.Handle(err)
122+
}
123+
}

0 commit comments

Comments
 (0)