Skip to content

Commit fd813fc

Browse files
committed
fix: Remove invocationID from metric point attributes
1 parent 79ebc60 commit fd813fc

File tree

7 files changed

+38
-79
lines changed

7 files changed

+38
-79
lines changed

scheduler/metrics/metrics.go

Lines changed: 32 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package metrics
22

33
import (
44
"context"
5+
"sync"
56
"sync/atomic"
67
"time"
78

@@ -20,42 +21,38 @@ const (
2021
durationMetricName = "sync.table.duration"
2122
)
2223

23-
func NewMetrics(invocationID string) *Metrics {
24-
resources, err := otel.Meter(ResourceName).Int64Counter(resourcesMetricName,
25-
metric.WithDescription("Number of resources synced for a table"),
26-
metric.WithUnit("/{tot}"),
27-
)
28-
if err != nil {
29-
return nil
30-
}
31-
32-
errors, err := otel.Meter(ResourceName).Int64Counter(errorsMetricName,
33-
metric.WithDescription("Number of errors encountered while syncing a table"),
34-
metric.WithUnit("/{tot}"),
35-
)
36-
if err != nil {
37-
return nil
38-
}
39-
40-
panics, err := otel.Meter(ResourceName).Int64Counter(panicsMetricName,
41-
metric.WithDescription("Number of panics encountered while syncing a table"),
42-
metric.WithUnit("/{tot}"),
43-
)
44-
if err != nil {
45-
return nil
46-
}
24+
var (
25+
resources metric.Int64Counter
26+
errors metric.Int64Counter
27+
panics metric.Int64Counter
28+
duration metric.Int64Counter
29+
once sync.Once
30+
)
4731

48-
duration, err := otel.Meter(ResourceName).Int64Counter(durationMetricName,
49-
metric.WithDescription("Duration of syncing a table"),
50-
metric.WithUnit("ms"),
51-
)
52-
if err != nil {
53-
return nil
54-
}
32+
func NewMetrics(invocationID string) *Metrics {
33+
once.Do(func() {
34+
resources, _ = otel.Meter(ResourceName).Int64Counter(resourcesMetricName,
35+
metric.WithDescription("Number of resources synced for a table"),
36+
metric.WithUnit("/{tot}"),
37+
)
38+
39+
errors, _ = otel.Meter(ResourceName).Int64Counter(errorsMetricName,
40+
metric.WithDescription("Number of errors encountered while syncing a table"),
41+
metric.WithUnit("/{tot}"),
42+
)
43+
44+
panics, _ = otel.Meter(ResourceName).Int64Counter(panicsMetricName,
45+
metric.WithDescription("Number of panics encountered while syncing a table"),
46+
metric.WithUnit("/{tot}"),
47+
)
48+
49+
duration, _ = otel.Meter(ResourceName).Int64Counter(durationMetricName,
50+
metric.WithDescription("Duration of syncing a table"),
51+
metric.WithUnit("ms"),
52+
)
53+
})
5554

5655
return &Metrics{
57-
invocationID: invocationID,
58-
5956
resources: resources,
6057
errors: errors,
6158
panics: panics,
@@ -66,8 +63,6 @@ func NewMetrics(invocationID string) *Metrics {
6663
}
6764

6865
type Metrics struct {
69-
invocationID string
70-
7166
resources metric.Int64Counter
7267
errors metric.Int64Counter
7368
panics metric.Int64Counter
@@ -88,59 +83,23 @@ type measurement struct {
8883
duration *durationMeasurement
8984
}
9085

91-
func (m *measurement) Equal(other *measurement) bool {
92-
return m.resources == other.resources && m.errors == other.errors && m.panics == other.panics && m.duration == other.duration
93-
}
94-
95-
// Equal compares to stats. Mostly useful in testing
96-
func (m *Metrics) Equal(other *Metrics) bool {
97-
for table, clientStats := range m.measurements {
98-
for client, stats := range clientStats.clients {
99-
if _, ok := other.measurements[table]; !ok {
100-
return false
101-
}
102-
if _, ok := other.measurements[table].clients[client]; !ok {
103-
return false
104-
}
105-
if !stats.Equal(other.measurements[table].clients[client]) {
106-
return false
107-
}
108-
}
109-
}
110-
for table, clientStats := range other.measurements {
111-
for client, stats := range clientStats.clients {
112-
if _, ok := m.measurements[table]; !ok {
113-
return false
114-
}
115-
if _, ok := m.measurements[table].clients[client]; !ok {
116-
return false
117-
}
118-
if !stats.Equal(m.measurements[table].clients[client]) {
119-
return false
120-
}
121-
}
122-
}
123-
return true
124-
}
125-
12686
func (m *Metrics) NewSelector(clientID, tableName string) Selector {
12787
return Selector{
12888
Set: attribute.NewSet(
129-
attribute.Key("sync.invocation.id").String(m.invocationID),
13089
attribute.Key("sync.table.name").String(tableName),
13190
),
13291
clientID: clientID,
13392
tableName: tableName,
13493
}
13594
}
13695

137-
func (m *Metrics) InitWithClients(invocationID string, table *schema.Table, clients []schema.ClientMeta) {
96+
func (m *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta) {
13897
m.measurements[table.Name] = tableMeasurements{clients: make(map[string]*measurement), duration: &durationMeasurement{}}
13998
for _, client := range clients {
14099
m.measurements[table.Name].clients[client.ID()] = &measurement{duration: &durationMeasurement{}}
141100
}
142101
for _, relation := range table.Relations {
143-
m.InitWithClients(invocationID, relation, clients)
102+
m.InitWithClients(relation, clients)
144103
}
145104
}
146105

scheduler/queue/scheduler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func TestScheduler(t *testing.T) {
8282
}
8383

8484
for _, tc := range tableClients {
85-
m.InitWithClients(scheduler.invocationID, tc.Table, []schema.ClientMeta{tc.Client})
85+
m.InitWithClients(tc.Table, []schema.ClientMeta{tc.Client})
8686
}
8787

8888
resolvedResources := make(chan *schema.Resource)

scheduler/scheduler_debug.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (s *syncClient) syncTest(ctx context.Context, syncMultiplier int, resolvedR
5050
preInitialisedClients[i] = clients
5151
// we do this here to avoid locks so we initialize the metrics structure once in the main goroutine
5252
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
53-
s.metrics.InitWithClients(s.invocationID, table, clients)
53+
s.metrics.InitWithClients(table, clients)
5454
}
5555

5656
// First interleave the tables like in round-robin

scheduler/scheduler_dfs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *sche
4040
preInitialisedClients[i] = clients
4141
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
4242
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
43-
s.metrics.InitWithClients(s.invocationID, table, clients)
43+
s.metrics.InitWithClients(table, clients)
4444
}
4545

4646
tableClients := make([]tableClient, 0)

scheduler/scheduler_round_robin.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (s *syncClient) syncRoundRobin(ctx context.Context, resolvedResources chan<
3333
preInitialisedClients[i] = clients
3434
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
3535
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
36-
s.metrics.InitWithClients(s.invocationID, table, clients)
36+
s.metrics.InitWithClients(table, clients)
3737
}
3838

3939
tableClients := roundRobinInterleave(s.tables, preInitialisedClients)

scheduler/scheduler_shuffle.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- *
3333
preInitialisedClients[i] = clients
3434
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
3535
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
36-
s.metrics.InitWithClients(s.invocationID, table, clients)
36+
s.metrics.InitWithClients(table, clients)
3737
}
3838

3939
// First interleave the tables like in round-robin

scheduler/scheduler_shuffle_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func (s *syncClient) syncShuffleQueue(ctx context.Context, resolvedResources cha
2121
preInitialisedClients[i] = clients
2222
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
2323
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
24-
s.metrics.InitWithClients(s.invocationID, table, clients)
24+
s.metrics.InitWithClients(table, clients)
2525
}
2626

2727
tableClients := roundRobinInterleave(s.tables, preInitialisedClients)

0 commit comments

Comments
 (0)