Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/archiver/s3store/historyArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *historyArchiverSuite) SetupTest() {
s.Assertions = require.New(s.T())
s.container = &archiver.HistoryBootstrapContainer{
Logger: testlogger.New(s.T()),
MetricsClient: metrics.NewClient(scope, metrics.HistoryArchiverScope),
MetricsClient: metrics.NewClient(scope, metrics.History),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a ServiceIdx, not a ScopeIdx. as it's a test, I just updated it to something plausible.

}
}

Expand Down
2 changes: 1 addition & 1 deletion common/archiver/s3store/visibilityArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (s *visibilityArchiverSuite) SetupSuite() {

s.container = &archiver.VisibilityBootstrapContainer{
Logger: testlogger.New(s.T()),
MetricsClient: metrics.NewClient(scope, metrics.VisibilityArchiverScope),
MetricsClient: metrics.NewClient(scope, metrics.History),
}
s.setupVisibilityDirectory()
}
Expand Down
4 changes: 2 additions & 2 deletions common/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ type GetCacheItemSizeFunc func(interface{}) uint64
// DomainMetricsScopeCache represents a interface for mapping domainID and scopeIdx to metricsScope
type DomainMetricsScopeCache interface {
// Get retrieves metrics scope for a domainID and scopeIdx
Get(domainID string, scopeIdx int) (metrics.Scope, bool)
Get(domainID string, scopeIdx metrics.ScopeIdx) (metrics.Scope, bool)
// Put adds metrics scope for a domainID and scopeIdx
Put(domainID string, scopeIdx int, metricsScope metrics.Scope)
Put(domainID string, scopeIdx metrics.ScopeIdx, metricsScope metrics.Scope)

common.Daemon
}
Expand Down
4 changes: 2 additions & 2 deletions common/cache/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions common/cache/metricsScopeCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
const flushBufferedMetricsScopeDuration = 10 * time.Second

type (
metricsScopeMap map[string]map[int]metrics.Scope
metricsScopeMap map[string]map[metrics.ScopeIdx]metrics.Scope

buffer struct {
sync.RWMutex
Expand Down Expand Up @@ -81,7 +81,7 @@ func (c *domainMetricsScopeCache) flushBufferedMetricsScope(flushDuration time.D
data := c.cache.Load().(metricsScopeMap)
// Copy everything over after atomic load
for key, val := range data {
scopeMap[key] = map[int]metrics.Scope{}
scopeMap[key] = map[metrics.ScopeIdx]metrics.Scope{}
for k, v := range val {
scopeMap[key][k] = v
}
Expand All @@ -90,7 +90,7 @@ func (c *domainMetricsScopeCache) flushBufferedMetricsScope(flushDuration time.D
// Copy from buffered array
for key, val := range c.buffer.bufferMap {
if _, ok := scopeMap[key]; !ok {
scopeMap[key] = map[int]metrics.Scope{}
scopeMap[key] = map[metrics.ScopeIdx]metrics.Scope{}
}
for k, v := range val {
scopeMap[key][k] = v
Expand All @@ -109,7 +109,7 @@ func (c *domainMetricsScopeCache) flushBufferedMetricsScope(flushDuration time.D
}

// Get retrieves scope for domainID and scopeIdx
func (c *domainMetricsScopeCache) Get(domainID string, scopeIdx int) (metrics.Scope, bool) {
func (c *domainMetricsScopeCache) Get(domainID string, scopeIdx metrics.ScopeIdx) (metrics.Scope, bool) {
data := c.cache.Load().(metricsScopeMap)

if data == nil {
Expand All @@ -126,12 +126,12 @@ func (c *domainMetricsScopeCache) Get(domainID string, scopeIdx int) (metrics.Sc
}

// Put puts map of domainID and scopeIdx to metricsScope
func (c *domainMetricsScopeCache) Put(domainID string, scopeIdx int, scope metrics.Scope) {
func (c *domainMetricsScopeCache) Put(domainID string, scopeIdx metrics.ScopeIdx, scope metrics.Scope) {
c.buffer.Lock()
defer c.buffer.Unlock()

if c.buffer.bufferMap[domainID] == nil {
c.buffer.bufferMap[domainID] = map[int]metrics.Scope{}
c.buffer.bufferMap[domainID] = map[metrics.ScopeIdx]metrics.Scope{}
}
c.buffer.bufferMap[domainID][scopeIdx] = scope
}
Expand Down
14 changes: 7 additions & 7 deletions common/cache/metricsScopeCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *domainMetricsCacheSuite) TestGetMetricsScope() {
var found bool

tests := []struct {
scopeID int
scopeID metrics.ScopeIdx
domainID string
}{
{1, "A"},
Expand Down Expand Up @@ -104,7 +104,7 @@ func (s *domainMetricsCacheSuite) TestGetMetricsScopeMultipleFlushLoop() {
var found bool

tests := []struct {
scopeID int
scopeID metrics.ScopeIdx
domainID string
}{
{1, "A"},
Expand Down Expand Up @@ -174,14 +174,14 @@ func (s *domainMetricsCacheSuite) TestConcurrentMetricsScopeAccess() {
for i := 0; i < 1000; i++ {
wg.Add(1)
// concurrent get and put
go func(scopeIdx int) {
go func(scopeIdx metrics.ScopeIdx) {
defer wg.Done()

<-ch

s.metricsCache.Get("test_domain", scopeIdx)
s.metricsCache.Put("test_domain", scopeIdx, s.metricsClient.Scope(scopeIdx%metrics.NumServices))
}(i)
s.metricsCache.Put("test_domain", scopeIdx, s.metricsClient.Scope(metrics.ScopeIdx(int(scopeIdx)%int(metrics.NumServices))))
}(metrics.ScopeIdx(i))
}

close(ch)
Expand All @@ -190,8 +190,8 @@ func (s *domainMetricsCacheSuite) TestConcurrentMetricsScopeAccess() {
time.Sleep(120 * time.Millisecond)

for i := 0; i < 1000; i++ {
metricsScope, found = s.metricsCache.Get("test_domain", i)
testMetricsScope = s.metricsClient.Scope(i % metrics.NumServices)
metricsScope, found = s.metricsCache.Get("test_domain", metrics.ScopeIdx(i))
testMetricsScope = s.metricsClient.Scope(metrics.ScopeIdx(i % int(metrics.NumServices)))

s.Equal(true, found)
s.Equal(testMetricsScope, metricsScope)
Expand Down
40 changes: 20 additions & 20 deletions common/metrics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
type ClientImpl struct {
// parentReporter is the parent scope for the metrics
parentScope tally.Scope
childScopes map[int]tally.Scope
metricDefs map[int]metricDefinition
childScopes map[ScopeIdx]tally.Scope
metricDefs map[MetricIdx]metricDefinition
serviceIdx ServiceIdx
}

Expand All @@ -43,7 +43,7 @@ func NewClient(scope tally.Scope, serviceIdx ServiceIdx) Client {
totalScopes := len(ScopeDefs[Common]) + len(ScopeDefs[serviceIdx])
metricsClient := &ClientImpl{
parentScope: scope,
childScopes: make(map[int]tally.Scope, totalScopes),
childScopes: make(map[ScopeIdx]tally.Scope, totalScopes),
metricDefs: getMetricDefs(serviceIdx),
serviceIdx: serviceIdx,
}
Expand All @@ -69,60 +69,60 @@ func NewClient(scope tally.Scope, serviceIdx ServiceIdx) Client {

// IncCounter increments one for a counter and emits
// to metrics backend
func (m *ClientImpl) IncCounter(scopeIdx int, counterIdx int) {
func (m *ClientImpl) IncCounter(scope ScopeIdx, counterIdx MetricIdx) {
name := string(m.metricDefs[counterIdx].metricName)
m.childScopes[scopeIdx].Counter(name).Inc(1)
m.childScopes[scope].Counter(name).Inc(1)
}

// AddCounter adds delta to the counter and
// emits to the metrics backend
func (m *ClientImpl) AddCounter(scopeIdx int, counterIdx int, delta int64) {
func (m *ClientImpl) AddCounter(scope ScopeIdx, counterIdx MetricIdx, delta int64) {
name := string(m.metricDefs[counterIdx].metricName)
m.childScopes[scopeIdx].Counter(name).Inc(delta)
m.childScopes[scope].Counter(name).Inc(delta)
}

// StartTimer starts a timer for the given
// metric name
func (m *ClientImpl) StartTimer(scopeIdx int, timerIdx int) tally.Stopwatch {
func (m *ClientImpl) StartTimer(scope ScopeIdx, timerIdx MetricIdx) tally.Stopwatch {
name := string(m.metricDefs[timerIdx].metricName)
return m.childScopes[scopeIdx].Timer(name).Start()
return m.childScopes[scope].Timer(name).Start()
}

// RecordTimer record and emit a timer for the given
// metric name
func (m *ClientImpl) RecordTimer(scopeIdx int, timerIdx int, d time.Duration) {
func (m *ClientImpl) RecordTimer(scope ScopeIdx, timerIdx MetricIdx, d time.Duration) {
name := string(m.metricDefs[timerIdx].metricName)
m.childScopes[scopeIdx].Timer(name).Record(d)
m.childScopes[scope].Timer(name).Record(d)
}

// RecordHistogramDuration record and emit a duration
func (m *ClientImpl) RecordHistogramDuration(scopeIdx int, timerIdx int, d time.Duration) {
func (m *ClientImpl) RecordHistogramDuration(scope ScopeIdx, timerIdx MetricIdx, d time.Duration) {
name := string(m.metricDefs[timerIdx].metricName)
m.childScopes[scopeIdx].Histogram(name, m.getBuckets(timerIdx)).RecordDuration(d)
m.childScopes[scope].Histogram(name, m.getBuckets(timerIdx)).RecordDuration(d)
}

// UpdateGauge reports Gauge type metric
func (m *ClientImpl) UpdateGauge(scopeIdx int, gaugeIdx int, value float64) {
func (m *ClientImpl) UpdateGauge(scopeIdx ScopeIdx, gaugeIdx MetricIdx, value float64) {
name := string(m.metricDefs[gaugeIdx].metricName)
m.childScopes[scopeIdx].Gauge(name).Update(value)
}

// Scope return a new internal metrics scope that can be used to add additional
// information to the metrics emitted
func (m *ClientImpl) Scope(scopeIdx int, tags ...Tag) Scope {
scope := m.childScopes[scopeIdx]
return newMetricsScope(scope, scope, m.metricDefs, false).Tagged(tags...)
func (m *ClientImpl) Scope(scope ScopeIdx, tags ...Tag) Scope {
sc := m.childScopes[scope]
return newMetricsScope(sc, sc, m.metricDefs, false).Tagged(tags...)
}

func (m *ClientImpl) getBuckets(id int) tally.Buckets {
func (m *ClientImpl) getBuckets(id MetricIdx) tally.Buckets {
if m.metricDefs[id].buckets != nil {
return m.metricDefs[id].buckets
}
return tally.DefaultBuckets
}

func getMetricDefs(serviceIdx ServiceIdx) map[int]metricDefinition {
defs := make(map[int]metricDefinition)
func getMetricDefs(serviceIdx ServiceIdx) map[MetricIdx]metricDefinition {
defs := make(map[MetricIdx]metricDefinition)
for idx, def := range MetricDefs[Common] {
defs[idx] = def
}
Expand Down
17 changes: 12 additions & 5 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ type (

// ServiceIdx is an index that uniquely identifies the service
ServiceIdx int

// ScopeIdx is an index that uniquely identifies an operation, which is required to form a new metrics scope
ScopeIdx int

// MetricIdx is an index that uniquely identifies the metric definition
MetricIdx int
)

func (s scopeDefinition) GetOperationString() string {
Expand All @@ -67,12 +73,13 @@ const (

// Service names for all services that emit metrics.
const (
Common = iota
Common ServiceIdx = iota
Frontend
History
Matching
Worker
ShardDistributor

NumServices
)

Expand Down Expand Up @@ -143,7 +150,7 @@ const (
// -- Common Operation scopes --

// PersistenceCreateShardScope tracks CreateShard calls made by service to persistence layer
PersistenceCreateShardScope = iota
PersistenceCreateShardScope ScopeIdx = iota
// PersistenceGetShardScope tracks GetShard calls made by service to persistence layer
PersistenceGetShardScope
// PersistenceUpdateShardScope tracks UpdateShard calls made by service to persistence layer
Expand Down Expand Up @@ -1466,7 +1473,7 @@ const (
)

// ScopeDefs record the scopes for all services
var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
var ScopeDefs = map[ServiceIdx]map[ScopeIdx]scopeDefinition{
// common scope Names
Common: {
PersistenceCreateShardScope: {operation: "CreateShard"},
Expand Down Expand Up @@ -2141,7 +2148,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{

// Common Metrics enum
const (
CadenceRequests = iota
CadenceRequests MetricIdx = iota
CadenceFailures
CadenceLatency
CadenceErrBadRequestCounter
Expand Down Expand Up @@ -2916,7 +2923,7 @@ const (
)

// MetricDefs record the metrics for all services
var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
Common: {
CadenceRequests: {metricName: "cadence_requests", metricType: Counter},
CadenceFailures: {metricName: "cadence_errors", metricType: Counter},
Expand Down
4 changes: 2 additions & 2 deletions common/metrics/defs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestMetricDefs(t *testing.T) {
// Duplicate indexes with the same operation name are technically fine, but there doesn't seem to be any benefit in allowing it,
// and it trivially ensures that all values have only one operation name.
func TestOperationIndexesAreUnique(t *testing.T) {
seen := make(map[int]bool)
seen := make(map[ScopeIdx]bool)
for serviceIdx, serviceOps := range ScopeDefs {
for idx := range serviceOps {
if seen[idx] {
Expand All @@ -158,7 +158,7 @@ func TestOperationIndexesAreUnique(t *testing.T) {
func TestMetricsAreUnique(t *testing.T) {
// Duplicate indexes is arguably fine, but there doesn't seem to be any benefit in allowing it.
t.Run("indexes", func(t *testing.T) {
seen := make(map[int]bool)
seen := make(map[MetricIdx]bool)
for _, serviceMetrics := range MetricDefs {
for idx := range serviceMetrics {
if seen[idx] {
Expand Down
28 changes: 14 additions & 14 deletions common/metrics/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,43 +30,43 @@ type (
// Client is the interface used to report metrics tally.
Client interface {
// IncCounter increments a counter metric
IncCounter(scope int, counter int)
IncCounter(scope ScopeIdx, counter MetricIdx)
// AddCounter adds delta to the counter metric
AddCounter(scope int, counter int, delta int64)
AddCounter(scope ScopeIdx, counter MetricIdx, delta int64)
// StartTimer starts a timer for the given
// metric name. Time will be recorded when stopwatch is stopped.
StartTimer(scope int, timer int) tally.Stopwatch
StartTimer(scope ScopeIdx, timer MetricIdx) tally.Stopwatch
// RecordTimer starts a timer for the given
// metric name
RecordTimer(scope int, timer int, d time.Duration)
RecordTimer(scope ScopeIdx, timer MetricIdx, d time.Duration)
// RecordHistogramDuration records a histogram duration value for the given
// metric name
RecordHistogramDuration(scope int, timer int, d time.Duration)
RecordHistogramDuration(scope ScopeIdx, timer MetricIdx, d time.Duration)
// UpdateGauge reports Gauge type absolute value metric
UpdateGauge(scope int, gauge int, value float64)
UpdateGauge(scope ScopeIdx, gauge MetricIdx, value float64)
// Scope return an internal scope that can be used to add additional
// information to metrics
Scope(scope int, tags ...Tag) Scope
Scope(scope ScopeIdx, tags ...Tag) Scope
}

// Scope is an interface for metrics
Scope interface {
// IncCounter increments a counter metric
IncCounter(counter int)
IncCounter(counter MetricIdx)
// AddCounter adds delta to the counter metric
AddCounter(counter int, delta int64)
AddCounter(counter MetricIdx, delta int64)
// StartTimer starts a timer for the given metric name.
// Time will be recorded when stopwatch is stopped.
StartTimer(timer int) Stopwatch
StartTimer(timer MetricIdx) Stopwatch
// RecordTimer starts a timer for the given metric name
RecordTimer(timer int, d time.Duration)
RecordTimer(timer MetricIdx, d time.Duration)
// RecordHistogramDuration records a histogram duration value for the given
// metric name
RecordHistogramDuration(timer int, d time.Duration)
RecordHistogramDuration(timer MetricIdx, d time.Duration)
// RecordHistogramValue records a histogram value for the given metric name
RecordHistogramValue(timer int, value float64)
RecordHistogramValue(timer MetricIdx, value float64)
// UpdateGauge reports Gauge type absolute value metric
UpdateGauge(gauge int, value float64)
UpdateGauge(gauge MetricIdx, value float64)
// Tagged return an internal scope that can be used to add additional
// information to metrics
Tagged(tags ...Tag) Scope
Expand Down
Loading
Loading