Skip to content

Commit 692cd28

Browse files
authored
Log metrics and do not panic when the channel receives a corrupted signal (#513)
1 parent 97e1a89 commit 692cd28

File tree

10 files changed

+544
-83
lines changed

10 files changed

+544
-83
lines changed
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
// Copyright (c) 2017 Uber Technologies, Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package metrics
22+
23+
import (
24+
"io"
25+
"time"
26+
"github.com/uber-go/tally"
27+
)
28+
29+
func NewMetricsScope(isReplay *bool) (tally.Scope, io.Closer, *CapturingStatsReporter) {
30+
reporter := &CapturingStatsReporter{}
31+
opts := tally.ScopeOptions{Reporter: reporter}
32+
scope, closer := tally.NewRootScope(opts, time.Second)
33+
return WrapScope(isReplay, scope, &RealClock{}), closer, reporter
34+
}
35+
36+
func NewTaggedMetricsScope() (*TaggedScope, io.Closer, *CapturingStatsReporter) {
37+
isReplay := false
38+
scope, closer, reporter := NewMetricsScope(&isReplay)
39+
return &TaggedScope{Scope: scope}, closer, reporter
40+
}
41+
42+
type RealClock struct {
43+
}
44+
45+
func (c *RealClock) Now() time.Time {
46+
return time.Now()
47+
}
48+
49+
// capturingStatsReporter is a reporter used by tests to capture the metric so we can verify our tests.
50+
type CapturingStatsReporter struct {
51+
counts []CapturedCount
52+
gauges []CapturedGauge
53+
timers []CapturedTimer
54+
histogramValueSamples []CapturedHistogramValueSamples
55+
histogramDurationSamples []CapturedHistogramDurationSamples
56+
capabilities int
57+
flush int
58+
}
59+
60+
func (c *CapturingStatsReporter) HistogramDurationSamples() []CapturedHistogramDurationSamples {
61+
return c.histogramDurationSamples
62+
}
63+
64+
func (c *CapturingStatsReporter) HistogramValueSamples() []CapturedHistogramValueSamples {
65+
return c.histogramValueSamples
66+
}
67+
68+
func (c *CapturingStatsReporter) Timers() []CapturedTimer {
69+
return c.timers
70+
}
71+
72+
func (c *CapturingStatsReporter) Gauges() []CapturedGauge {
73+
return c.gauges
74+
}
75+
76+
func (c *CapturingStatsReporter) Counts() []CapturedCount {
77+
return c.counts
78+
}
79+
80+
type CapturedCount struct {
81+
name string
82+
tags map[string]string
83+
value int64
84+
}
85+
86+
func (c *CapturedCount) Value() int64 {
87+
return c.value
88+
}
89+
90+
func (c *CapturedCount) Tags() map[string]string {
91+
return c.tags
92+
}
93+
94+
func (c *CapturedCount) Name() string {
95+
return c.name
96+
}
97+
98+
type CapturedGauge struct {
99+
name string
100+
tags map[string]string
101+
value float64
102+
}
103+
104+
func (c *CapturedGauge) Value() float64 {
105+
return c.value
106+
}
107+
108+
func (c *CapturedGauge) Tags() map[string]string {
109+
return c.tags
110+
}
111+
112+
func (c *CapturedGauge) Name() string {
113+
return c.name
114+
}
115+
116+
type CapturedTimer struct {
117+
name string
118+
tags map[string]string
119+
value time.Duration
120+
}
121+
122+
func (c *CapturedTimer) Value() time.Duration {
123+
return c.value
124+
}
125+
126+
func (c *CapturedTimer) Tags() map[string]string {
127+
return c.tags
128+
}
129+
130+
func (c *CapturedTimer) Name() string {
131+
return c.name
132+
}
133+
134+
type CapturedHistogramValueSamples struct {
135+
name string
136+
tags map[string]string
137+
bucketLowerBound float64
138+
bucketUpperBound float64
139+
samples int64
140+
}
141+
142+
type CapturedHistogramDurationSamples struct {
143+
name string
144+
tags map[string]string
145+
bucketLowerBound time.Duration
146+
bucketUpperBound time.Duration
147+
samples int64
148+
}
149+
150+
func (r *CapturingStatsReporter) ReportCounter(
151+
name string,
152+
tags map[string]string,
153+
value int64,
154+
) {
155+
r.counts = append(r.counts, CapturedCount{name, tags, value})
156+
}
157+
158+
func (r *CapturingStatsReporter) ReportGauge(
159+
name string,
160+
tags map[string]string,
161+
value float64,
162+
) {
163+
r.gauges = append(r.gauges, CapturedGauge{name, tags, value})
164+
}
165+
166+
func (r *CapturingStatsReporter) ReportTimer(
167+
name string,
168+
tags map[string]string,
169+
value time.Duration,
170+
) {
171+
r.timers = append(r.timers, CapturedTimer{name, tags, value})
172+
}
173+
174+
func (r *CapturingStatsReporter) ReportHistogramValueSamples(
175+
name string,
176+
tags map[string]string,
177+
buckets tally.Buckets,
178+
bucketLowerBound,
179+
bucketUpperBound float64,
180+
samples int64,
181+
) {
182+
elem := CapturedHistogramValueSamples{name, tags,
183+
bucketLowerBound, bucketUpperBound, samples}
184+
r.histogramValueSamples = append(r.histogramValueSamples, elem)
185+
}
186+
187+
func (r *CapturingStatsReporter) ReportHistogramDurationSamples(
188+
name string,
189+
tags map[string]string,
190+
buckets tally.Buckets,
191+
bucketLowerBound,
192+
bucketUpperBound time.Duration,
193+
samples int64,
194+
) {
195+
elem := CapturedHistogramDurationSamples{name, tags,
196+
bucketLowerBound, bucketUpperBound, samples}
197+
r.histogramDurationSamples = append(r.histogramDurationSamples, elem)
198+
}
199+
200+
func (r *CapturingStatsReporter) Capabilities() tally.Capabilities {
201+
r.capabilities++
202+
return r
203+
}
204+
205+
func (r *CapturingStatsReporter) Reporting() bool {
206+
return true
207+
}
208+
209+
func (r *CapturingStatsReporter) Tagging() bool {
210+
return true
211+
}
212+
213+
func (r *CapturingStatsReporter) Flush() {
214+
r.flush++
215+
}
216+

internal/common/metrics/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ const (
7979
TaskListQueueLatency = CadenceMetricsPrefix + "tasklist-queue-latency"
8080

8181
UnhandledSignalsCounter = CadenceMetricsPrefix + "unhandled-signals"
82+
CorruptedSignalsCounter = CadenceMetricsPrefix + "corrupted-signals"
8283

8384
WorkerStartCounter = CadenceMetricsPrefix + "worker-start"
8485
PollerStartCounter = CadenceMetricsPrefix + "poller-start"

internal/common/metrics/scope_test.go

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,134 +21,134 @@
2121
package metrics
2222

2323
import (
24-
"io"
2524
"testing"
2625
"time"
2726

2827
"github.com/stretchr/testify/require"
2928
"github.com/uber-go/tally"
3029
"sync"
30+
"io"
3131
)
3232

3333
func Test_Counter(t *testing.T) {
3434
isReplay := true
35-
scope, closer, reporter := newMetricsScope(&isReplay)
35+
scope, closer, reporter := NewMetricsScope(&isReplay)
3636
scope.Counter("test-name").Inc(1)
3737
closer.Close()
38-
require.Equal(t, 0, len(reporter.counts))
38+
require.Equal(t, 0, len(reporter.Counts()))
3939

4040
isReplay = false
41-
scope, closer, reporter = newMetricsScope(&isReplay)
41+
scope, closer, reporter = NewMetricsScope(&isReplay)
4242
scope.Counter("test-name").Inc(3)
4343
closer.Close()
44-
require.Equal(t, 1, len(reporter.counts))
45-
require.Equal(t, int64(3), reporter.counts[0].value)
44+
require.Equal(t, 1, len(reporter.Counts()))
45+
require.Equal(t, int64(3), reporter.Counts()[0].Value())
4646
}
4747

4848
func Test_Gauge(t *testing.T) {
4949
isReplay := true
50-
scope, closer, reporter := newMetricsScope(&isReplay)
50+
scope, closer, reporter := NewMetricsScope(&isReplay)
5151
scope.Gauge("test-name").Update(1)
5252
closer.Close()
53-
require.Equal(t, 0, len(reporter.gauges))
53+
require.Equal(t, 0, len(reporter.Gauges()))
5454

5555
isReplay = false
56-
scope, closer, reporter = newMetricsScope(&isReplay)
56+
scope, closer, reporter = NewMetricsScope(&isReplay)
5757
scope.Gauge("test-name").Update(3)
5858
closer.Close()
59-
require.Equal(t, 1, len(reporter.gauges))
60-
require.Equal(t, float64(3), reporter.gauges[0].value)
59+
require.Equal(t, 1, len(reporter.Gauges()))
60+
require.Equal(t, float64(3), reporter.Gauges()[0].Value())
6161
}
6262

6363
func Test_Timer(t *testing.T) {
6464
isReplay := true
65-
scope, closer, reporter := newMetricsScope(&isReplay)
65+
scope, closer, reporter := NewMetricsScope(&isReplay)
6666
scope.Timer("test-name").Record(time.Second)
6767
sw := scope.Timer("test-stopwatch").Start()
6868
sw.Stop()
6969
closer.Close()
70-
require.Equal(t, 0, len(reporter.timers))
70+
require.Equal(t, 0, len(reporter.Timers()))
7171

7272
isReplay = false
73-
scope, closer, reporter = newMetricsScope(&isReplay)
73+
scope, closer, reporter = NewMetricsScope(&isReplay)
7474
scope.Timer("test-name").Record(time.Second)
7575
sw = scope.Timer("test-stopwatch").Start()
7676
sw.Stop()
7777
closer.Close()
78-
require.Equal(t, 2, len(reporter.timers))
79-
require.Equal(t, time.Second, reporter.timers[0].value)
78+
require.Equal(t, 2, len(reporter.Timers()))
79+
require.Equal(t, time.Second, reporter.Timers()[0].Value())
8080
}
8181

8282
func Test_Histogram(t *testing.T) {
8383
isReplay := true
84-
scope, closer, reporter := newMetricsScope(&isReplay)
84+
scope, closer, reporter := NewMetricsScope(&isReplay)
8585
valueBuckets := tally.MustMakeLinearValueBuckets(0, 10, 10)
8686
scope.Histogram("test-hist-1", valueBuckets).RecordValue(5)
8787
scope.Histogram("test-hist-2", valueBuckets).RecordValue(15)
8888
closer.Close()
89-
require.Equal(t, 0, len(reporter.histogramValueSamples))
90-
scope, closer, reporter = newMetricsScope(&isReplay)
89+
require.Equal(t, 0, len(reporter.HistogramValueSamples()))
90+
scope, closer, reporter = NewMetricsScope(&isReplay)
9191
durationBuckets := tally.MustMakeLinearDurationBuckets(0, time.Hour, 10)
9292
scope.Histogram("test-hist-1", durationBuckets).RecordDuration(time.Minute)
9393
scope.Histogram("test-hist-2", durationBuckets).RecordDuration(time.Minute * 61)
9494
sw := scope.Histogram("test-hist-3", durationBuckets).Start()
9595
sw.Stop()
9696
closer.Close()
97-
require.Equal(t, 0, len(reporter.histogramDurationSamples))
97+
require.Equal(t, 0, len(reporter.HistogramDurationSamples()))
9898

9999
isReplay = false
100-
scope, closer, reporter = newMetricsScope(&isReplay)
100+
scope, closer, reporter = NewMetricsScope(&isReplay)
101101
valueBuckets = tally.MustMakeLinearValueBuckets(0, 10, 10)
102102
scope.Histogram("test-hist-1", valueBuckets).RecordValue(5)
103103
scope.Histogram("test-hist-2", valueBuckets).RecordValue(15)
104104
closer.Close()
105-
require.Equal(t, 2, len(reporter.histogramValueSamples))
105+
require.Equal(t, 2, len(reporter.HistogramValueSamples()))
106106

107-
scope, closer, reporter = newMetricsScope(&isReplay)
107+
scope, closer, reporter = NewMetricsScope(&isReplay)
108108
durationBuckets = tally.MustMakeLinearDurationBuckets(0, time.Hour, 10)
109109
scope.Histogram("test-hist-1", durationBuckets).RecordDuration(time.Minute)
110110
scope.Histogram("test-hist-2", durationBuckets).RecordDuration(time.Minute * 61)
111111
sw = scope.Histogram("test-hist-3", durationBuckets).Start()
112112
sw.Stop()
113113
closer.Close()
114-
require.Equal(t, 3, len(reporter.histogramDurationSamples))
114+
require.Equal(t, 3, len(reporter.HistogramDurationSamples()))
115115
}
116116

117117
func Test_ScopeCoverage(t *testing.T) {
118118
isReplay := false
119-
scope, closer, reporter := newMetricsScope(&isReplay)
119+
scope, closer, reporter := NewMetricsScope(&isReplay)
120120
caps := scope.Capabilities()
121121
require.Equal(t, true, caps.Reporting())
122122
require.Equal(t, true, caps.Tagging())
123123
subScope := scope.SubScope("test")
124124
taggedScope := subScope.Tagged(make(map[string]string))
125125
taggedScope.Counter("test-counter").Inc(1)
126126
closer.Close()
127-
require.Equal(t, 1, len(reporter.counts))
127+
require.Equal(t, 1, len(reporter.Counts()))
128128
}
129129

130130
func Test_TaggedScope(t *testing.T) {
131-
taggedScope, closer, reporter := newTaggedMetricsScope()
131+
taggedScope, closer, reporter := NewTaggedMetricsScope()
132132
scope := taggedScope.GetTaggedScope("tag1", "val1")
133133
scope.Counter("test-name").Inc(3)
134134
closer.Close()
135-
require.Equal(t, 1, len(reporter.counts))
136-
require.Equal(t, int64(3), reporter.counts[0].value)
135+
require.Equal(t, 1, len(reporter.Counts()))
136+
require.Equal(t, int64(3), reporter.Counts()[0].Value())
137137

138138
m := &sync.Map{}
139-
taggedScope, closer, reporter = newTaggedMetricsScope()
139+
taggedScope, closer, reporter = NewTaggedMetricsScope()
140140
taggedScope.Map = m
141141
scope = taggedScope.GetTaggedScope("tag2", "val1")
142142
scope.Counter("test-name").Inc(2)
143-
taggedScope, closer2, reporter2 := newTaggedMetricsScope()
143+
taggedScope, closer2, reporter2 := NewTaggedMetricsScope()
144144
taggedScope.Map = m
145145
scope = taggedScope.GetTaggedScope("tag2", "val1")
146146
scope.Counter("test-name").Inc(1)
147147
closer2.Close()
148-
require.Equal(t, 0, len(reporter2.counts))
148+
require.Equal(t, 0, len(reporter2.Counts()))
149149
closer.Close()
150-
require.Equal(t, 1, len(reporter.counts))
151-
require.Equal(t, int64(3), reporter.counts[0].value)
150+
require.Equal(t, 1, len(reporter.Counts()))
151+
require.Equal(t, int64(3), reporter.Counts()[0].Value())
152152
}
153153

154154
func Test_TaggedScope_WithMultiTags(t *testing.T) {

0 commit comments

Comments
 (0)