Skip to content

Commit b8496f9

Browse files
committed
fix: working per-table & per-client duration
1 parent a1a5179 commit b8496f9

File tree

6 files changed

+141
-136
lines changed

6 files changed

+141
-136
lines changed

scheduler/metrics/duration.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package metrics
2+
3+
import (
4+
"sync"
5+
"time"
6+
)
7+
8+
type durationMeasurement struct {
9+
startTime time.Time
10+
started bool
11+
duration time.Duration
12+
sem sync.Mutex
13+
}
14+
15+
func (dm *durationMeasurement) Start(start time.Time) {
16+
// If we have already started, don't start again. This can happen for relational tables that are resolved multiple times (per parent resource)
17+
dm.sem.Lock()
18+
defer dm.sem.Unlock()
19+
if dm.started {
20+
return
21+
}
22+
23+
dm.started = true
24+
dm.startTime = start
25+
}
26+
27+
// End calculates, updates and returns the delta duration for updating OTEL counters.
28+
func (dm *durationMeasurement) End(end time.Time) time.Duration {
29+
var delta time.Duration
30+
newDuration := end.Sub(dm.startTime)
31+
32+
dm.sem.Lock()
33+
defer dm.sem.Unlock()
34+
35+
delta = newDuration - dm.duration
36+
dm.duration = newDuration
37+
return delta
38+
}

scheduler/metrics/metrics.go

Lines changed: 71 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package metrics
22

33
import (
44
"context"
5-
"sync"
65
"sync/atomic"
76
"time"
87

@@ -48,7 +47,7 @@ func NewMetrics(invocationID string) *Metrics {
4847

4948
duration, err := otel.Meter(ResourceName).Int64Counter(durationMetricName,
5049
metric.WithDescription("Duration of syncing a table"),
51-
metric.WithUnit("ns"),
50+
metric.WithUnit("ms"),
5251
)
5352
if err != nil {
5453
return nil
@@ -62,7 +61,7 @@ func NewMetrics(invocationID string) *Metrics {
6261
panics: panics,
6362
duration: duration,
6463

65-
TableClient: make(map[string]map[string]*tableClientMetrics),
64+
measurements: make(map[string]tableMeasurements),
6665
}
6766
}
6867

@@ -74,55 +73,49 @@ type Metrics struct {
7473
panics metric.Int64Counter
7574
duration metric.Int64Counter
7675

77-
TableClient map[string]map[string]*tableClientMetrics
76+
measurements map[string]tableMeasurements
7877
}
7978

80-
type tableClientMetrics struct {
79+
type tableMeasurements struct {
80+
duration *durationMeasurement
81+
clients map[string]*measurement
82+
}
83+
84+
type measurement struct {
8185
resources uint64
8286
errors uint64
8387
panics uint64
84-
duration atomic.Pointer[time.Duration]
85-
86-
startTime time.Time
87-
started bool
88-
startedLock sync.Mutex
89-
}
90-
91-
func durationPointerEqual(a, b *time.Duration) bool {
92-
if a == nil {
93-
return b == nil
94-
}
95-
return b != nil && *a == *b
88+
duration *durationMeasurement
9689
}
9790

98-
func (m *tableClientMetrics) Equal(other *tableClientMetrics) bool {
99-
return m.resources == other.resources && m.errors == other.errors && m.panics == other.panics && durationPointerEqual(m.duration.Load(), other.duration.Load())
91+
func (m *measurement) Equal(other *measurement) bool {
92+
return m.resources == other.resources && m.errors == other.errors && m.panics == other.panics && m.duration == other.duration
10093
}
10194

10295
// Equal compares to stats. Mostly useful in testing
10396
func (m *Metrics) Equal(other *Metrics) bool {
104-
for table, clientStats := range m.TableClient {
105-
for client, stats := range clientStats {
106-
if _, ok := other.TableClient[table]; !ok {
97+
for table, clientStats := range m.measurements {
98+
for client, stats := range clientStats.clients {
99+
if _, ok := other.measurements[table]; !ok {
107100
return false
108101
}
109-
if _, ok := other.TableClient[table][client]; !ok {
102+
if _, ok := other.measurements[table].clients[client]; !ok {
110103
return false
111104
}
112-
if !stats.Equal(other.TableClient[table][client]) {
105+
if !stats.Equal(other.measurements[table].clients[client]) {
113106
return false
114107
}
115108
}
116109
}
117-
for table, clientStats := range other.TableClient {
118-
for client, stats := range clientStats {
119-
if _, ok := m.TableClient[table]; !ok {
110+
for table, clientStats := range other.measurements {
111+
for client, stats := range clientStats.clients {
112+
if _, ok := m.measurements[table]; !ok {
120113
return false
121114
}
122-
if _, ok := m.TableClient[table][client]; !ok {
115+
if _, ok := m.measurements[table].clients[client]; !ok {
123116
return false
124117
}
125-
if !stats.Equal(m.TableClient[table][client]) {
118+
if !stats.Equal(m.measurements[table].clients[client]) {
126119
return false
127120
}
128121
}
@@ -142,9 +135,9 @@ func (m *Metrics) NewSelector(clientID, tableName string) Selector {
142135
}
143136

144137
func (m *Metrics) InitWithClients(invocationID string, table *schema.Table, clients []schema.ClientMeta) {
145-
m.TableClient[table.Name] = make(map[string]*tableClientMetrics, len(clients))
138+
m.measurements[table.Name] = tableMeasurements{clients: make(map[string]*measurement), duration: &durationMeasurement{}}
146139
for _, client := range clients {
147-
m.TableClient[table.Name][client.ID()] = &tableClientMetrics{}
140+
m.measurements[table.Name].clients[client.ID()] = &measurement{duration: &durationMeasurement{}}
148141
}
149142
for _, relation := range table.Relations {
150143
m.InitWithClients(invocationID, relation, clients)
@@ -153,116 +146,101 @@ func (m *Metrics) InitWithClients(invocationID string, table *schema.Table, clie
153146

154147
func (m *Metrics) TotalErrors() uint64 {
155148
var total uint64
156-
for _, clientMetrics := range m.TableClient {
157-
for _, metrics := range clientMetrics {
158-
total += metrics.errors
149+
for _, clientMetrics := range m.measurements {
150+
for _, metrics := range clientMetrics.clients {
151+
total += atomic.LoadUint64(&metrics.errors)
159152
}
160153
}
161154
return total
162155
}
163156

157+
// Deprecated: Use TotalErrors instead, it provides the same functionality but is more consistent with the naming of other metrics methods.
164158
func (m *Metrics) TotalErrorsAtomic() uint64 {
165-
var total uint64
166-
for _, clientMetrics := range m.TableClient {
167-
for _, metrics := range clientMetrics {
168-
total += atomic.LoadUint64(&metrics.errors)
169-
}
170-
}
171-
return total
159+
return m.TotalErrors()
172160
}
173161

174162
func (m *Metrics) TotalPanics() uint64 {
175163
var total uint64
176-
for _, clientMetrics := range m.TableClient {
177-
for _, metrics := range clientMetrics {
178-
total += metrics.panics
164+
for _, clientMetrics := range m.measurements {
165+
for _, metrics := range clientMetrics.clients {
166+
total += atomic.LoadUint64(&metrics.panics)
179167
}
180168
}
181169
return total
182170
}
183171

172+
// Deprecated: Use TotalPanics instead, it provides the same functionality but is more consistent with the naming of other metrics methods.
184173
func (m *Metrics) TotalPanicsAtomic() uint64 {
185-
var total uint64
186-
for _, clientMetrics := range m.TableClient {
187-
for _, metrics := range clientMetrics {
188-
total += atomic.LoadUint64(&metrics.panics)
189-
}
190-
}
191-
return total
174+
return m.TotalPanics()
192175
}
193176

194177
func (m *Metrics) TotalResources() uint64 {
195178
var total uint64
196-
for _, clientMetrics := range m.TableClient {
197-
for _, metrics := range clientMetrics {
198-
total += metrics.resources
179+
for _, clientMetrics := range m.measurements {
180+
for _, metrics := range clientMetrics.clients {
181+
total += atomic.LoadUint64(&metrics.resources)
199182
}
200183
}
201184
return total
202185
}
203186

187+
// Deprecated: Use TotalResources instead, it provides the same functionality but is more consistent with the naming of other metrics methods.
204188
func (m *Metrics) TotalResourcesAtomic() uint64 {
205-
var total uint64
206-
for _, clientMetrics := range m.TableClient {
207-
for _, metrics := range clientMetrics {
208-
total += atomic.LoadUint64(&metrics.resources)
209-
}
210-
}
211-
return total
189+
return m.TotalResources()
212190
}
213191

214-
func (m *Metrics) ResourcesAdd(ctx context.Context, count int64, selector Selector) {
192+
func (m *Metrics) TableDuration(tableName string) time.Duration {
193+
tc := m.measurements[tableName]
194+
return tc.duration.duration
195+
}
196+
197+
func (m *Metrics) AddResources(ctx context.Context, count int64, selector Selector) {
215198
m.resources.Add(ctx, count, metric.WithAttributeSet(selector.Set))
216-
atomic.AddUint64(&m.TableClient[selector.tableName][selector.clientID].resources, uint64(count))
199+
atomic.AddUint64(&m.measurements[selector.tableName].clients[selector.clientID].resources, uint64(count))
217200
}
218201

219-
func (m *Metrics) ResourcesGet(selector Selector) uint64 {
220-
return atomic.LoadUint64(&m.TableClient[selector.tableName][selector.clientID].resources)
202+
func (m *Metrics) GetResources(selector Selector) uint64 {
203+
return atomic.LoadUint64(&m.measurements[selector.tableName].clients[selector.clientID].resources)
221204
}
222205

223-
func (m *Metrics) ErrorsAdd(ctx context.Context, count int64, selector Selector) {
206+
func (m *Metrics) AddErrors(ctx context.Context, count int64, selector Selector) {
224207
m.errors.Add(ctx, count, metric.WithAttributeSet(selector.Set))
225-
atomic.AddUint64(&m.TableClient[selector.tableName][selector.clientID].errors, uint64(count))
208+
atomic.AddUint64(&m.measurements[selector.tableName].clients[selector.clientID].errors, uint64(count))
226209
}
227210

228-
func (m *Metrics) ErrorsGet(selector Selector) uint64 {
229-
return atomic.LoadUint64(&m.TableClient[selector.tableName][selector.clientID].errors)
211+
func (m *Metrics) GetErrors(selector Selector) uint64 {
212+
return atomic.LoadUint64(&m.measurements[selector.tableName].clients[selector.clientID].errors)
230213
}
231214

232-
func (m *Metrics) PanicsAdd(ctx context.Context, count int64, selector Selector) {
215+
func (m *Metrics) AddPanics(ctx context.Context, count int64, selector Selector) {
233216
m.panics.Add(ctx, count, metric.WithAttributeSet(selector.Set))
234-
atomic.AddUint64(&m.TableClient[selector.tableName][selector.clientID].panics, uint64(count))
217+
atomic.AddUint64(&m.measurements[selector.tableName].clients[selector.clientID].panics, uint64(count))
235218
}
236219

237-
func (m *Metrics) PanicsGet(selector Selector) uint64 {
238-
return atomic.LoadUint64(&m.TableClient[selector.tableName][selector.clientID].panics)
220+
func (m *Metrics) GetPanics(selector Selector) uint64 {
221+
return atomic.LoadUint64(&m.measurements[selector.tableName].clients[selector.clientID].panics)
239222
}
240223

241224
func (m *Metrics) StartTime(start time.Time, selector Selector) {
242-
tc := m.TableClient[selector.tableName][selector.clientID]
243-
244-
// If we have already started, don't start again. This can happen for relational tables that are resolved multiple times (per parent resource)
245-
tc.startedLock.Lock()
246-
defer tc.startedLock.Unlock()
247-
if tc.started {
248-
return
249-
}
225+
t := m.measurements[selector.tableName]
226+
tc := t.clients[selector.clientID]
250227

251-
tc.started = true
252-
tc.startTime = start
228+
tc.duration.Start(start)
229+
t.duration.Start(start)
253230
}
254231

255232
func (m *Metrics) EndTime(ctx context.Context, end time.Time, selector Selector) {
256-
tc := m.TableClient[selector.tableName][selector.clientID]
257-
duration := time.Duration(end.UnixNano() - tc.startTime.UnixNano())
258-
tc.duration.Store(&duration)
259-
m.duration.Add(ctx, duration.Nanoseconds(), metric.WithAttributeSet(selector.Set))
233+
t := m.measurements[selector.tableName]
234+
tc := t.clients[selector.clientID]
235+
236+
_ = tc.duration.End(end)
237+
delta := t.duration.End(end)
238+
239+
// only compute and add the total duration for per-table measurements (and not per-client)
240+
m.duration.Add(ctx, delta.Milliseconds(), metric.WithAttributeSet(selector.Set))
260241
}
261242

262-
func (m *Metrics) DurationGet(selector Selector) *time.Duration {
263-
tc := m.TableClient[selector.tableName][selector.clientID]
264-
if tc == nil {
265-
return nil
266-
}
267-
return tc.duration.Load()
243+
func (m *Metrics) GetDuration(selector Selector) time.Duration {
244+
tc := m.measurements[selector.tableName].clients[selector.clientID]
245+
return tc.duration.duration
268246
}

scheduler/queue/worker.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s
8888
selector := w.metrics.NewSelector(clientName, table.Name)
8989
defer func() {
9090
span.AddEvent("sync.finish.stats", trace.WithAttributes(
91-
attribute.Key("sync.resources").Int64(int64(w.metrics.ResourcesGet(selector))),
92-
attribute.Key("sync.errors").Int64(int64(w.metrics.ErrorsGet(selector))),
93-
attribute.Key("sync.panics").Int64(int64(w.metrics.PanicsGet(selector))),
91+
attribute.Key("sync.resources").Int64(int64(w.metrics.GetResources(selector))),
92+
attribute.Key("sync.errors").Int64(int64(w.metrics.GetErrors(selector))),
93+
attribute.Key("sync.panics").Int64(int64(w.metrics.GetPanics(selector))),
9494
))
9595
}()
9696
w.metrics.StartTime(startTime, selector)
@@ -101,13 +101,13 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s
101101
if err := recover(); err != nil {
102102
stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack()))
103103
logger.Error().Interface("error", err).Str("stack", stack).Msg("table resolver finished with panic")
104-
w.metrics.PanicsAdd(ctx, 1, selector)
104+
w.metrics.AddPanics(ctx, 1, selector)
105105
}
106106
close(res)
107107
}()
108108
if err := table.Resolver(ctx, client, parent, res); err != nil {
109109
logger.Error().Err(err).Msg("table resolver finished with error")
110-
w.metrics.ErrorsAdd(ctx, 1, selector)
110+
w.metrics.AddErrors(ctx, 1, selector)
111111
// Send SyncError message
112112
syncErrorMsg := &message.SyncError{
113113
TableName: table.Name,
@@ -124,12 +124,8 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s
124124

125125
endTime := time.Now()
126126
w.metrics.EndTime(ctx, endTime, selector)
127-
duration := w.metrics.DurationGet(selector)
128-
if duration == nil {
129-
duration = new(time.Duration)
130-
}
131127
if parent == nil {
132-
logger.Info().Uint64("resources", w.metrics.ResourcesGet(selector)).Uint64("errors", w.metrics.ErrorsGet(selector)).Dur("duration_ms", *duration).Msg("table sync finished")
128+
logger.Info().Uint64("resources", w.metrics.GetResources(selector)).Uint64("errors", w.metrics.GetErrors(selector)).Dur("duration_ms", w.metrics.GetDuration(selector)).Msg("table sync finished")
133129
}
134130
}
135131

@@ -156,7 +152,7 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien
156152

157153
if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil {
158154
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
159-
w.metrics.ErrorsAdd(ctx, 1, selector)
155+
w.metrics.AddErrors(ctx, 1, selector)
160156
return
161157
}
162158
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
@@ -166,7 +162,7 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien
166162
switch err.(type) {
167163
case *schema.PKError:
168164
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
169-
w.metrics.ErrorsAdd(ctx, 1, selector)
165+
w.metrics.AddErrors(ctx, 1, selector)
170166
return
171167
case *schema.PKComponentError:
172168
w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")

0 commit comments

Comments
 (0)