Skip to content

Commit a1a5179

Browse files
committed
fix: Compute counters per-table and not per table per client
1 parent 8ab8841 commit a1a5179

File tree

13 files changed

+179
-172
lines changed

13 files changed

+179
-172
lines changed

scheduler/metrics/metrics.go

Lines changed: 90 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -13,89 +13,79 @@ import (
1313
)
1414

1515
const (
16-
OtelName = "io.cloudquery"
16+
ResourceName = "io.cloudquery"
17+
18+
resourcesMetricName = "sync.table.resources"
19+
errorsMetricName = "sync.table.errors"
20+
panicsMetricName = "sync.table.panics"
21+
durationMetricName = "sync.table.duration"
1722
)
1823

19-
func NewMetrics() *Metrics {
20-
resources, err := otel.Meter(OtelName).Int64Counter("sync.table.resources",
24+
func NewMetrics(invocationID string) *Metrics {
25+
resources, err := otel.Meter(ResourceName).Int64Counter(resourcesMetricName,
2126
metric.WithDescription("Number of resources synced for a table"),
2227
metric.WithUnit("/{tot}"),
2328
)
2429
if err != nil {
2530
return nil
2631
}
2732

28-
errors, err := otel.Meter(OtelName).Int64Counter("sync.table.errors",
33+
errors, err := otel.Meter(ResourceName).Int64Counter(errorsMetricName,
2934
metric.WithDescription("Number of errors encountered while syncing a table"),
3035
metric.WithUnit("/{tot}"),
3136
)
3237
if err != nil {
3338
return nil
3439
}
3540

36-
panics, err := otel.Meter(OtelName).Int64Counter("sync.table.panics",
41+
panics, err := otel.Meter(ResourceName).Int64Counter(panicsMetricName,
3742
metric.WithDescription("Number of panics encountered while syncing a table"),
3843
metric.WithUnit("/{tot}"),
3944
)
4045
if err != nil {
4146
return nil
4247
}
4348

44-
startTime, err := otel.Meter(OtelName).Int64Counter("sync.table.start_time",
45-
metric.WithDescription("Start time of syncing a table"),
49+
duration, err := otel.Meter(ResourceName).Int64Counter(durationMetricName,
50+
metric.WithDescription("Duration of syncing a table"),
4651
metric.WithUnit("ns"),
4752
)
4853
if err != nil {
4954
return nil
5055
}
5156

52-
endTime, err := otel.Meter(OtelName).Int64Counter("sync.table.end_time",
53-
metric.WithDescription("End time of syncing a table"),
54-
metric.WithUnit("ns"),
55-
)
57+
return &Metrics{
58+
invocationID: invocationID,
5659

57-
if err != nil {
58-
return nil
59-
}
60+
resources: resources,
61+
errors: errors,
62+
panics: panics,
63+
duration: duration,
6064

61-
return &Metrics{
62-
TableClient: make(map[string]map[string]*TableClientMetrics),
63-
resources: resources,
64-
errors: errors,
65-
panics: panics,
66-
startTime: startTime,
67-
endTime: endTime,
65+
TableClient: make(map[string]map[string]*tableClientMetrics),
6866
}
6967
}
7068

71-
// Metrics is deprecated as we move toward open telemetry for tracing and metrics
7269
type Metrics struct {
70+
invocationID string
71+
7372
resources metric.Int64Counter
7473
errors metric.Int64Counter
7574
panics metric.Int64Counter
75+
duration metric.Int64Counter
7676

77-
startTime metric.Int64Counter
78-
started bool
79-
startedLock sync.Mutex
80-
81-
endTime metric.Int64Counter
82-
previousEndTime int64
83-
previousEndTimeLock sync.Mutex
84-
85-
TableClient map[string]map[string]*TableClientMetrics
86-
}
87-
88-
type OtelMeters struct {
89-
attributes []attribute.KeyValue
77+
TableClient map[string]map[string]*tableClientMetrics
9078
}
9179

92-
type TableClientMetrics struct {
93-
Resources uint64
94-
Errors uint64
95-
Panics uint64
96-
Duration atomic.Pointer[time.Duration]
80+
type tableClientMetrics struct {
81+
resources uint64
82+
errors uint64
83+
panics uint64
84+
duration atomic.Pointer[time.Duration]
9785

98-
otelMeters *OtelMeters
86+
startTime time.Time
87+
started bool
88+
startedLock sync.Mutex
9989
}
10090

10191
func durationPointerEqual(a, b *time.Duration) bool {
@@ -105,8 +95,8 @@ func durationPointerEqual(a, b *time.Duration) bool {
10595
return b != nil && *a == *b
10696
}
10797

108-
func (m *TableClientMetrics) Equal(other *TableClientMetrics) bool {
109-
return m.Resources == other.Resources && m.Errors == other.Errors && m.Panics == other.Panics && durationPointerEqual(m.Duration.Load(), other.Duration.Load())
98+
func (m *tableClientMetrics) Equal(other *tableClientMetrics) bool {
99+
return m.resources == other.resources && m.errors == other.errors && m.panics == other.panics && durationPointerEqual(m.duration.Load(), other.duration.Load())
110100
}
111101

112102
// Equal compares to stats. Mostly useful in testing
@@ -140,32 +130,32 @@ func (m *Metrics) Equal(other *Metrics) bool {
140130
return true
141131
}
142132

143-
func GetOtelAttributeSet(tableName string, clientID string) []attribute.KeyValue {
144-
return []attribute.KeyValue{
145-
attribute.Key("sync.client.id").String(clientID),
146-
attribute.Key("sync.table.name").String(tableName),
133+
func (m *Metrics) NewSelector(clientID, tableName string) Selector {
134+
return Selector{
135+
Set: attribute.NewSet(
136+
attribute.Key("sync.invocation.id").String(m.invocationID),
137+
attribute.Key("sync.table.name").String(tableName),
138+
),
139+
clientID: clientID,
140+
tableName: tableName,
147141
}
148142
}
149143

150-
func (m *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta) {
151-
m.TableClient[table.Name] = make(map[string]*TableClientMetrics, len(clients))
144+
func (m *Metrics) InitWithClients(invocationID string, table *schema.Table, clients []schema.ClientMeta) {
145+
m.TableClient[table.Name] = make(map[string]*tableClientMetrics, len(clients))
152146
for _, client := range clients {
153-
tableName := table.Name
154-
clientID := client.ID()
155-
m.TableClient[tableName][clientID] = &TableClientMetrics{
156-
otelMeters: &OtelMeters{attributes: GetOtelAttributeSet(tableName, clientID)},
157-
}
147+
m.TableClient[table.Name][client.ID()] = &tableClientMetrics{}
158148
}
159149
for _, relation := range table.Relations {
160-
m.InitWithClients(relation, clients)
150+
m.InitWithClients(invocationID, relation, clients)
161151
}
162152
}
163153

164154
func (m *Metrics) TotalErrors() uint64 {
165155
var total uint64
166156
for _, clientMetrics := range m.TableClient {
167157
for _, metrics := range clientMetrics {
168-
total += metrics.Errors
158+
total += metrics.errors
169159
}
170160
}
171161
return total
@@ -175,7 +165,7 @@ func (m *Metrics) TotalErrorsAtomic() uint64 {
175165
var total uint64
176166
for _, clientMetrics := range m.TableClient {
177167
for _, metrics := range clientMetrics {
178-
total += atomic.LoadUint64(&metrics.Errors)
168+
total += atomic.LoadUint64(&metrics.errors)
179169
}
180170
}
181171
return total
@@ -185,7 +175,7 @@ func (m *Metrics) TotalPanics() uint64 {
185175
var total uint64
186176
for _, clientMetrics := range m.TableClient {
187177
for _, metrics := range clientMetrics {
188-
total += metrics.Panics
178+
total += metrics.panics
189179
}
190180
}
191181
return total
@@ -195,7 +185,7 @@ func (m *Metrics) TotalPanicsAtomic() uint64 {
195185
var total uint64
196186
for _, clientMetrics := range m.TableClient {
197187
for _, metrics := range clientMetrics {
198-
total += atomic.LoadUint64(&metrics.Panics)
188+
total += atomic.LoadUint64(&metrics.panics)
199189
}
200190
}
201191
return total
@@ -205,7 +195,7 @@ func (m *Metrics) TotalResources() uint64 {
205195
var total uint64
206196
for _, clientMetrics := range m.TableClient {
207197
for _, metrics := range clientMetrics {
208-
total += metrics.Resources
198+
total += metrics.resources
209199
}
210200
}
211201
return total
@@ -215,57 +205,64 @@ func (m *Metrics) TotalResourcesAtomic() uint64 {
215205
var total uint64
216206
for _, clientMetrics := range m.TableClient {
217207
for _, metrics := range clientMetrics {
218-
total += atomic.LoadUint64(&metrics.Resources)
208+
total += atomic.LoadUint64(&metrics.resources)
219209
}
220210
}
221211
return total
222212
}
223213

224-
func (m *Metrics) OtelResourcesAdd(ctx context.Context, count int64, tc *TableClientMetrics) {
225-
m.resources.Add(ctx, count, metric.WithAttributes(tc.otelMeters.attributes...))
226-
atomic.AddUint64(&tc.Resources, uint64(count))
214+
func (m *Metrics) ResourcesAdd(ctx context.Context, count int64, selector Selector) {
215+
m.resources.Add(ctx, count, metric.WithAttributeSet(selector.Set))
216+
atomic.AddUint64(&m.TableClient[selector.tableName][selector.clientID].resources, uint64(count))
227217
}
228218

229-
func (m *Metrics) OtelErrorsAdd(ctx context.Context, count int64, tc *TableClientMetrics) {
230-
m.errors.Add(ctx, count, metric.WithAttributes(tc.otelMeters.attributes...))
231-
atomic.AddUint64(&tc.Errors, uint64(count))
219+
func (m *Metrics) ResourcesGet(selector Selector) uint64 {
220+
return atomic.LoadUint64(&m.TableClient[selector.tableName][selector.clientID].resources)
232221
}
233222

234-
func (m *Metrics) OtelPanicsAdd(ctx context.Context, count int64, tc *TableClientMetrics) {
235-
m.panics.Add(ctx, count, metric.WithAttributes(tc.otelMeters.attributes...))
236-
atomic.AddUint64(&tc.Panics, uint64(count))
223+
func (m *Metrics) ErrorsAdd(ctx context.Context, count int64, selector Selector) {
224+
m.errors.Add(ctx, count, metric.WithAttributeSet(selector.Set))
225+
atomic.AddUint64(&m.TableClient[selector.tableName][selector.clientID].errors, uint64(count))
237226
}
238227

239-
func (m *Metrics) OtelStartTime(ctx context.Context, start time.Time, tc *TableClientMetrics) {
240-
if m.startTime == nil {
241-
return
242-
}
228+
func (m *Metrics) ErrorsGet(selector Selector) uint64 {
229+
return atomic.LoadUint64(&m.TableClient[selector.tableName][selector.clientID].errors)
230+
}
243231

244-
// If we have already started, don't start again. This can happen for relational tables that are resolved multiple times (per parent resource)
245-
m.startedLock.Lock()
246-
defer m.startedLock.Unlock()
247-
if m.started {
248-
return
249-
}
232+
func (m *Metrics) PanicsAdd(ctx context.Context, count int64, selector Selector) {
233+
m.panics.Add(ctx, count, metric.WithAttributeSet(selector.Set))
234+
atomic.AddUint64(&m.TableClient[selector.tableName][selector.clientID].panics, uint64(count))
235+
}
250236

251-
m.started = true
252-
m.startTime.Add(ctx, start.UnixNano(), metric.WithAttributes(tc.otelMeters.attributes...))
237+
func (m *Metrics) PanicsGet(selector Selector) uint64 {
238+
return atomic.LoadUint64(&m.TableClient[selector.tableName][selector.clientID].panics)
253239
}
254240

255-
func (m *Metrics) OtelEndTime(ctx context.Context, end time.Time, tc *TableClientMetrics) {
256-
if m.endTime == nil {
241+
func (m *Metrics) StartTime(start time.Time, selector Selector) {
242+
tc := m.TableClient[selector.tableName][selector.clientID]
243+
244+
// If we have already started, don't start again. This can happen for relational tables that are resolved multiple times (per parent resource)
245+
tc.startedLock.Lock()
246+
defer tc.startedLock.Unlock()
247+
if tc.started {
257248
return
258249
}
259250

260-
m.previousEndTimeLock.Lock()
261-
defer m.previousEndTimeLock.Unlock()
262-
val := end.UnixNano()
251+
tc.started = true
252+
tc.startTime = start
253+
}
263254

264-
// If we got another end time to report, use the latest value. This can happen for relational tables that are resolved multiple times (per parent resource)
265-
if m.previousEndTime != 0 {
266-
m.endTime.Add(ctx, val-m.previousEndTime, metric.WithAttributes(tc.otelMeters.attributes...))
267-
} else {
268-
m.endTime.Add(ctx, val, metric.WithAttributes(tc.otelMeters.attributes...))
255+
func (m *Metrics) EndTime(ctx context.Context, end time.Time, selector Selector) {
256+
tc := m.TableClient[selector.tableName][selector.clientID]
257+
duration := time.Duration(end.UnixNano() - tc.startTime.UnixNano())
258+
tc.duration.Store(&duration)
259+
m.duration.Add(ctx, duration.Nanoseconds(), metric.WithAttributeSet(selector.Set))
260+
}
261+
262+
func (m *Metrics) DurationGet(selector Selector) *time.Duration {
263+
tc := m.TableClient[selector.tableName][selector.clientID]
264+
if tc == nil {
265+
return nil
269266
}
270-
m.previousEndTime = val
267+
return tc.duration.Load()
271268
}

scheduler/metrics/metrics_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ package metrics
33
import "testing"
44

55
func TestMetrics(t *testing.T) {
6-
s := NewMetrics()
7-
s.TableClient["test_table"] = make(map[string]*TableClientMetrics)
8-
s.TableClient["test_table"]["testExecutionClient"] = &TableClientMetrics{
9-
Resources: 1,
10-
Errors: 2,
11-
Panics: 3,
6+
s := NewMetrics("test_invocation_id")
7+
s.TableClient["test_table"] = make(map[string]*tableClientMetrics)
8+
s.TableClient["test_table"]["testExecutionClient"] = &tableClientMetrics{
9+
resources: 1,
10+
errors: 2,
11+
panics: 3,
1212
}
1313
if s.TotalResources() != 1 {
1414
t.Fatal("expected 1 resource")
@@ -20,12 +20,12 @@ func TestMetrics(t *testing.T) {
2020
t.Fatal("expected 3 panics")
2121
}
2222

23-
other := NewMetrics()
24-
other.TableClient["test_table"] = make(map[string]*TableClientMetrics)
25-
other.TableClient["test_table"]["testExecutionClient"] = &TableClientMetrics{
26-
Resources: 1,
27-
Errors: 2,
28-
Panics: 3,
23+
other := NewMetrics("test_invocation_id")
24+
other.TableClient["test_table"] = make(map[string]*tableClientMetrics)
25+
other.TableClient["test_table"]["testExecutionClient"] = &tableClientMetrics{
26+
resources: 1,
27+
errors: 2,
28+
panics: 3,
2929
}
3030
if !s.Equal(other) {
3131
t.Fatal("expected metrics to be equal")

scheduler/metrics/selector.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package metrics
2+
3+
import "go.opentelemetry.io/otel/attribute"
4+
5+
type Selector struct {
6+
attribute.Set
7+
8+
clientID string
9+
tableName string
10+
}

scheduler/queue/scheduler_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
1010
"github.com/cloudquery/plugin-sdk/v4/schema"
1111
"github.com/cloudquery/plugin-sdk/v4/transformers"
12+
"github.com/google/uuid"
1213
"github.com/rs/zerolog"
1314
"github.com/stretchr/testify/require"
1415
)
@@ -44,8 +45,9 @@ func testResolver(_ context.Context, _ schema.ClientMeta, parent *schema.Resourc
4445

4546
func TestScheduler(t *testing.T) {
4647
nopLogger := zerolog.Nop()
47-
m := metrics.NewMetrics()
48-
scheduler := NewShuffleQueueScheduler(nopLogger, m, int64(0), WithWorkerCount(1000))
48+
invocationID := uuid.New().String()
49+
m := metrics.NewMetrics(invocationID)
50+
scheduler := NewShuffleQueueScheduler(nopLogger, m, int64(0), WithWorkerCount(1000), WithInvocationID(invocationID))
4951
tableClients := []WorkUnit{
5052
{
5153
Table: &schema.Table{
@@ -80,7 +82,7 @@ func TestScheduler(t *testing.T) {
8082
}
8183

8284
for _, tc := range tableClients {
83-
m.InitWithClients(tc.Table, []schema.ClientMeta{tc.Client})
85+
m.InitWithClients(scheduler.invocationID, tc.Table, []schema.ClientMeta{tc.Client})
8486
}
8587

8688
resolvedResources := make(chan *schema.Resource)

0 commit comments

Comments
 (0)