From 6b15c7e81bfe2c9cbb46fd58887b94a881ee4b44 Mon Sep 17 00:00:00 2001 From: Steven L Date: Wed, 3 Sep 2025 20:16:54 -0500 Subject: [PATCH] Create unique types for service/scope/metric indexes Surprisingly easy to change all these actually, I should've done this a long time ago. Unsurprisingly we were *almost* perfect in our use, but not completely. --- .../archiver/s3store/historyArchiver_test.go | 2 +- .../s3store/visibilityArchiver_test.go | 2 +- common/cache/cache.go | 4 +- common/cache/interface_mock.go | 4 +- common/cache/metricsScopeCache.go | 12 +++--- common/cache/metricsScopeCache_test.go | 14 +++---- common/metrics/client.go | 40 +++++++++---------- common/metrics/defs.go | 17 +++++--- common/metrics/defs_test.go | 4 +- common/metrics/interfaces.go | 28 ++++++------- common/metrics/mocks/Client.go | 18 ++++----- common/metrics/mocks/Scope.go | 16 ++++---- common/metrics/nop.go | 28 ++++++------- common/metrics/scope.go | 20 +++++----- .../es_visibility_metric_clients.go | 4 +- .../pinot/pinot_visibility_metric_clients.go | 4 +- common/persistence/wrappers/metered/base.go | 22 +++++----- common/util.go | 4 +- common/util_test.go | 2 +- service/frontend/admin/handler.go | 2 +- service/frontend/api/handler.go | 2 +- service/frontend/api/producer_manager.go | 2 +- .../accesscontrolled/access_controlled.go | 2 +- .../clusterredirection/callwrappers.go | 2 +- service/history/decision/checker.go | 20 +++++----- .../engineimpl/start_workflow_execution.go | 6 +-- service/history/execution/cache.go | 2 +- .../history/execution/context_util_test.go | 2 +- service/history/handler/handler.go | 2 +- service/history/queue/processor_base.go | 2 +- service/history/queue/processor_options.go | 3 +- .../history/replication/dlq_handler_test.go | 5 ++- service/history/replication/task_executor.go | 6 +-- .../history/replication/task_executor_mock.go | 5 ++- service/history/replication/task_processor.go | 2 +- .../replication/task_processor_test.go | 6 +-- service/history/task/task.go | 2 +- service/history/task/task_util.go | 4 +- service/history/task/task_util_test.go | 4 +- .../task/timer_active_task_executor.go | 2 +- service/history/workflowcache/cache.go | 2 +- service/history/workflowcache/metrics.go | 2 +- service/matching/handler/context.go | 2 +- service/matching/handler/handler.go | 2 +- .../store/wrappers/metered/base.go | 8 ++-- .../store/wrappers/metered/metered_test.go | 4 +- .../worker/archiver/replay_metrics_client.go | 28 ++++++------- service/worker/indexer/esProcessor_test.go | 2 +- service/worker/scanner/executor/executor.go | 4 +- 49 files changed, 196 insertions(+), 186 deletions(-) diff --git a/common/archiver/s3store/historyArchiver_test.go b/common/archiver/s3store/historyArchiver_test.go index 39e66cc3073..87a52b60886 100644 --- a/common/archiver/s3store/historyArchiver_test.go +++ b/common/archiver/s3store/historyArchiver_test.go @@ -99,7 +99,7 @@ func (s *historyArchiverSuite) SetupTest() { s.Assertions = require.New(s.T()) s.container = &archiver.HistoryBootstrapContainer{ Logger: testlogger.New(s.T()), - MetricsClient: metrics.NewClient(scope, metrics.HistoryArchiverScope), + MetricsClient: metrics.NewClient(scope, metrics.History), } } diff --git a/common/archiver/s3store/visibilityArchiver_test.go b/common/archiver/s3store/visibilityArchiver_test.go index fe7f3400e91..c9983ba0e0b 100644 --- a/common/archiver/s3store/visibilityArchiver_test.go +++ b/common/archiver/s3store/visibilityArchiver_test.go @@ -124,7 +124,7 @@ func (s *visibilityArchiverSuite) SetupSuite() { s.container = &archiver.VisibilityBootstrapContainer{ Logger: testlogger.New(s.T()), - MetricsClient: metrics.NewClient(scope, metrics.VisibilityArchiverScope), + MetricsClient: metrics.NewClient(scope, metrics.History), } s.setupVisibilityDirectory() } diff --git a/common/cache/cache.go b/common/cache/cache.go index cefbcc6aa33..d10310f2f3a 100644 --- a/common/cache/cache.go +++ b/common/cache/cache.go @@ -156,9 +156,9 @@ type GetCacheItemSizeFunc func(interface{}) uint64 // DomainMetricsScopeCache represents a interface for mapping domainID and scopeIdx to metricsScope type DomainMetricsScopeCache interface { // Get retrieves metrics scope for a domainID and scopeIdx - Get(domainID string, scopeIdx int) (metrics.Scope, bool) + Get(domainID string, scopeIdx metrics.ScopeIdx) (metrics.Scope, bool) // Put adds metrics scope for a domainID and scopeIdx - Put(domainID string, scopeIdx int, metricsScope metrics.Scope) + Put(domainID string, scopeIdx metrics.ScopeIdx, metricsScope metrics.Scope) common.Daemon } diff --git a/common/cache/interface_mock.go b/common/cache/interface_mock.go index c4246b999e5..64f740e8f80 100644 --- a/common/cache/interface_mock.go +++ b/common/cache/interface_mock.go @@ -292,7 +292,7 @@ func (m *MockDomainMetricsScopeCache) EXPECT() *MockDomainMetricsScopeCacheMockR } // Get mocks base method. -func (m *MockDomainMetricsScopeCache) Get(domainID string, scopeIdx int) (metrics.Scope, bool) { +func (m *MockDomainMetricsScopeCache) Get(domainID string, scopeIdx metrics.ScopeIdx) (metrics.Scope, bool) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Get", domainID, scopeIdx) ret0, _ := ret[0].(metrics.Scope) @@ -307,7 +307,7 @@ func (mr *MockDomainMetricsScopeCacheMockRecorder) Get(domainID, scopeIdx any) * } // Put mocks base method. -func (m *MockDomainMetricsScopeCache) Put(domainID string, scopeIdx int, metricsScope metrics.Scope) { +func (m *MockDomainMetricsScopeCache) Put(domainID string, scopeIdx metrics.ScopeIdx, metricsScope metrics.Scope) { m.ctrl.T.Helper() m.ctrl.Call(m, "Put", domainID, scopeIdx, metricsScope) } diff --git a/common/cache/metricsScopeCache.go b/common/cache/metricsScopeCache.go index 82b3bd6332f..793ebcde73b 100644 --- a/common/cache/metricsScopeCache.go +++ b/common/cache/metricsScopeCache.go @@ -35,7 +35,7 @@ import ( const flushBufferedMetricsScopeDuration = 10 * time.Second type ( - metricsScopeMap map[string]map[int]metrics.Scope + metricsScopeMap map[string]map[metrics.ScopeIdx]metrics.Scope buffer struct { sync.RWMutex @@ -81,7 +81,7 @@ func (c *domainMetricsScopeCache) flushBufferedMetricsScope(flushDuration time.D data := c.cache.Load().(metricsScopeMap) // Copy everything over after atomic load for key, val := range data { - scopeMap[key] = map[int]metrics.Scope{} + scopeMap[key] = map[metrics.ScopeIdx]metrics.Scope{} for k, v := range val { scopeMap[key][k] = v } @@ -90,7 +90,7 @@ func (c *domainMetricsScopeCache) flushBufferedMetricsScope(flushDuration time.D // Copy from buffered array for key, val := range c.buffer.bufferMap { if _, ok := scopeMap[key]; !ok { - scopeMap[key] = map[int]metrics.Scope{} + scopeMap[key] = map[metrics.ScopeIdx]metrics.Scope{} } for k, v := range val { scopeMap[key][k] = v @@ -109,7 +109,7 @@ func (c *domainMetricsScopeCache) flushBufferedMetricsScope(flushDuration time.D } // Get retrieves scope for domainID and scopeIdx -func (c *domainMetricsScopeCache) Get(domainID string, scopeIdx int) (metrics.Scope, bool) { +func (c *domainMetricsScopeCache) Get(domainID string, scopeIdx metrics.ScopeIdx) (metrics.Scope, bool) { data := c.cache.Load().(metricsScopeMap) if data == nil { @@ -126,12 +126,12 @@ func (c *domainMetricsScopeCache) Get(domainID string, scopeIdx int) (metrics.Sc } // Put puts map of domainID and scopeIdx to metricsScope -func (c *domainMetricsScopeCache) Put(domainID string, scopeIdx int, scope metrics.Scope) { +func (c *domainMetricsScopeCache) Put(domainID string, scopeIdx metrics.ScopeIdx, scope metrics.Scope) { c.buffer.Lock() defer c.buffer.Unlock() if c.buffer.bufferMap[domainID] == nil { - c.buffer.bufferMap[domainID] = map[int]metrics.Scope{} + c.buffer.bufferMap[domainID] = map[metrics.ScopeIdx]metrics.Scope{} } c.buffer.bufferMap[domainID][scopeIdx] = scope } diff --git a/common/cache/metricsScopeCache_test.go b/common/cache/metricsScopeCache_test.go index 6a5db9b582d..6dab8819de5 100644 --- a/common/cache/metricsScopeCache_test.go +++ b/common/cache/metricsScopeCache_test.go @@ -66,7 +66,7 @@ func (s *domainMetricsCacheSuite) TestGetMetricsScope() { var found bool tests := []struct { - scopeID int + scopeID metrics.ScopeIdx domainID string }{ {1, "A"}, @@ -104,7 +104,7 @@ func (s *domainMetricsCacheSuite) TestGetMetricsScopeMultipleFlushLoop() { var found bool tests := []struct { - scopeID int + scopeID metrics.ScopeIdx domainID string }{ {1, "A"}, @@ -174,14 +174,14 @@ func (s *domainMetricsCacheSuite) TestConcurrentMetricsScopeAccess() { for i := 0; i < 1000; i++ { wg.Add(1) // concurrent get and put - go func(scopeIdx int) { + go func(scopeIdx metrics.ScopeIdx) { defer wg.Done() <-ch s.metricsCache.Get("test_domain", scopeIdx) - s.metricsCache.Put("test_domain", scopeIdx, s.metricsClient.Scope(scopeIdx%metrics.NumServices)) - }(i) + s.metricsCache.Put("test_domain", scopeIdx, s.metricsClient.Scope(metrics.ScopeIdx(int(scopeIdx)%int(metrics.NumServices)))) + }(metrics.ScopeIdx(i)) } close(ch) @@ -190,8 +190,8 @@ func (s *domainMetricsCacheSuite) TestConcurrentMetricsScopeAccess() { time.Sleep(120 * time.Millisecond) for i := 0; i < 1000; i++ { - metricsScope, found = s.metricsCache.Get("test_domain", i) - testMetricsScope = s.metricsClient.Scope(i % metrics.NumServices) + metricsScope, found = s.metricsCache.Get("test_domain", metrics.ScopeIdx(i)) + testMetricsScope = s.metricsClient.Scope(metrics.ScopeIdx(i % int(metrics.NumServices))) s.Equal(true, found) s.Equal(testMetricsScope, metricsScope) diff --git a/common/metrics/client.go b/common/metrics/client.go index 4485cb7b71c..945afaf2825 100644 --- a/common/metrics/client.go +++ b/common/metrics/client.go @@ -30,8 +30,8 @@ import ( type ClientImpl struct { // parentReporter is the parent scope for the metrics parentScope tally.Scope - childScopes map[int]tally.Scope - metricDefs map[int]metricDefinition + childScopes map[ScopeIdx]tally.Scope + metricDefs map[MetricIdx]metricDefinition serviceIdx ServiceIdx } @@ -43,7 +43,7 @@ func NewClient(scope tally.Scope, serviceIdx ServiceIdx) Client { totalScopes := len(ScopeDefs[Common]) + len(ScopeDefs[serviceIdx]) metricsClient := &ClientImpl{ parentScope: scope, - childScopes: make(map[int]tally.Scope, totalScopes), + childScopes: make(map[ScopeIdx]tally.Scope, totalScopes), metricDefs: getMetricDefs(serviceIdx), serviceIdx: serviceIdx, } @@ -69,60 +69,60 @@ func NewClient(scope tally.Scope, serviceIdx ServiceIdx) Client { // IncCounter increments one for a counter and emits // to metrics backend -func (m *ClientImpl) IncCounter(scopeIdx int, counterIdx int) { +func (m *ClientImpl) IncCounter(scope ScopeIdx, counterIdx MetricIdx) { name := string(m.metricDefs[counterIdx].metricName) - m.childScopes[scopeIdx].Counter(name).Inc(1) + m.childScopes[scope].Counter(name).Inc(1) } // AddCounter adds delta to the counter and // emits to the metrics backend -func (m *ClientImpl) AddCounter(scopeIdx int, counterIdx int, delta int64) { +func (m *ClientImpl) AddCounter(scope ScopeIdx, counterIdx MetricIdx, delta int64) { name := string(m.metricDefs[counterIdx].metricName) - m.childScopes[scopeIdx].Counter(name).Inc(delta) + m.childScopes[scope].Counter(name).Inc(delta) } // StartTimer starts a timer for the given // metric name -func (m *ClientImpl) StartTimer(scopeIdx int, timerIdx int) tally.Stopwatch { +func (m *ClientImpl) StartTimer(scope ScopeIdx, timerIdx MetricIdx) tally.Stopwatch { name := string(m.metricDefs[timerIdx].metricName) - return m.childScopes[scopeIdx].Timer(name).Start() + return m.childScopes[scope].Timer(name).Start() } // RecordTimer record and emit a timer for the given // metric name -func (m *ClientImpl) RecordTimer(scopeIdx int, timerIdx int, d time.Duration) { +func (m *ClientImpl) RecordTimer(scope ScopeIdx, timerIdx MetricIdx, d time.Duration) { name := string(m.metricDefs[timerIdx].metricName) - m.childScopes[scopeIdx].Timer(name).Record(d) + m.childScopes[scope].Timer(name).Record(d) } // RecordHistogramDuration record and emit a duration -func (m *ClientImpl) RecordHistogramDuration(scopeIdx int, timerIdx int, d time.Duration) { +func (m *ClientImpl) RecordHistogramDuration(scope ScopeIdx, timerIdx MetricIdx, d time.Duration) { name := string(m.metricDefs[timerIdx].metricName) - m.childScopes[scopeIdx].Histogram(name, m.getBuckets(timerIdx)).RecordDuration(d) + m.childScopes[scope].Histogram(name, m.getBuckets(timerIdx)).RecordDuration(d) } // UpdateGauge reports Gauge type metric -func (m *ClientImpl) UpdateGauge(scopeIdx int, gaugeIdx int, value float64) { +func (m *ClientImpl) UpdateGauge(scopeIdx ScopeIdx, gaugeIdx MetricIdx, value float64) { name := string(m.metricDefs[gaugeIdx].metricName) m.childScopes[scopeIdx].Gauge(name).Update(value) } // Scope return a new internal metrics scope that can be used to add additional // information to the metrics emitted -func (m *ClientImpl) Scope(scopeIdx int, tags ...Tag) Scope { - scope := m.childScopes[scopeIdx] - return newMetricsScope(scope, scope, m.metricDefs, false).Tagged(tags...) +func (m *ClientImpl) Scope(scope ScopeIdx, tags ...Tag) Scope { + sc := m.childScopes[scope] + return newMetricsScope(sc, sc, m.metricDefs, false).Tagged(tags...) } -func (m *ClientImpl) getBuckets(id int) tally.Buckets { +func (m *ClientImpl) getBuckets(id MetricIdx) tally.Buckets { if m.metricDefs[id].buckets != nil { return m.metricDefs[id].buckets } return tally.DefaultBuckets } -func getMetricDefs(serviceIdx ServiceIdx) map[int]metricDefinition { - defs := make(map[int]metricDefinition) +func getMetricDefs(serviceIdx ServiceIdx) map[MetricIdx]metricDefinition { + defs := make(map[MetricIdx]metricDefinition) for idx, def := range MetricDefs[Common] { defs[idx] = def } diff --git a/common/metrics/defs.go b/common/metrics/defs.go index d0ebacb1c15..bcd12447f5c 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -51,6 +51,12 @@ type ( // ServiceIdx is an index that uniquely identifies the service ServiceIdx int + + // ScopeIdx is an index that uniquely identifies an operation, which is required to form a new metrics scope + ScopeIdx int + + // MetricIdx is an index that uniquely identifies the metric definition + MetricIdx int ) func (s scopeDefinition) GetOperationString() string { @@ -67,12 +73,13 @@ const ( // Service names for all services that emit metrics. const ( - Common = iota + Common ServiceIdx = iota Frontend History Matching Worker ShardDistributor + NumServices ) @@ -143,7 +150,7 @@ const ( // -- Common Operation scopes -- // PersistenceCreateShardScope tracks CreateShard calls made by service to persistence layer - PersistenceCreateShardScope = iota + PersistenceCreateShardScope ScopeIdx = iota // PersistenceGetShardScope tracks GetShard calls made by service to persistence layer PersistenceGetShardScope // PersistenceUpdateShardScope tracks UpdateShard calls made by service to persistence layer @@ -1466,7 +1473,7 @@ const ( ) // ScopeDefs record the scopes for all services -var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ +var ScopeDefs = map[ServiceIdx]map[ScopeIdx]scopeDefinition{ // common scope Names Common: { PersistenceCreateShardScope: {operation: "CreateShard"}, @@ -2141,7 +2148,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ // Common Metrics enum const ( - CadenceRequests = iota + CadenceRequests MetricIdx = iota CadenceFailures CadenceLatency CadenceErrBadRequestCounter @@ -2916,7 +2923,7 @@ const ( ) // MetricDefs record the metrics for all services -var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ +var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{ Common: { CadenceRequests: {metricName: "cadence_requests", metricType: Counter}, CadenceFailures: {metricName: "cadence_errors", metricType: Counter}, diff --git a/common/metrics/defs_test.go b/common/metrics/defs_test.go index 5b6203fc894..0bdf79e70e3 100644 --- a/common/metrics/defs_test.go +++ b/common/metrics/defs_test.go @@ -134,7 +134,7 @@ func TestMetricDefs(t *testing.T) { // Duplicate indexes with the same operation name are technically fine, but there doesn't seem to be any benefit in allowing it, // and it trivially ensures that all values have only one operation name. func TestOperationIndexesAreUnique(t *testing.T) { - seen := make(map[int]bool) + seen := make(map[ScopeIdx]bool) for serviceIdx, serviceOps := range ScopeDefs { for idx := range serviceOps { if seen[idx] { @@ -158,7 +158,7 @@ func TestOperationIndexesAreUnique(t *testing.T) { func TestMetricsAreUnique(t *testing.T) { // Duplicate indexes is arguably fine, but there doesn't seem to be any benefit in allowing it. t.Run("indexes", func(t *testing.T) { - seen := make(map[int]bool) + seen := make(map[MetricIdx]bool) for _, serviceMetrics := range MetricDefs { for idx := range serviceMetrics { if seen[idx] { diff --git a/common/metrics/interfaces.go b/common/metrics/interfaces.go index f36993e5f4a..6efba4b596a 100644 --- a/common/metrics/interfaces.go +++ b/common/metrics/interfaces.go @@ -30,43 +30,43 @@ type ( // Client is the interface used to report metrics tally. Client interface { // IncCounter increments a counter metric - IncCounter(scope int, counter int) + IncCounter(scope ScopeIdx, counter MetricIdx) // AddCounter adds delta to the counter metric - AddCounter(scope int, counter int, delta int64) + AddCounter(scope ScopeIdx, counter MetricIdx, delta int64) // StartTimer starts a timer for the given // metric name. Time will be recorded when stopwatch is stopped. - StartTimer(scope int, timer int) tally.Stopwatch + StartTimer(scope ScopeIdx, timer MetricIdx) tally.Stopwatch // RecordTimer starts a timer for the given // metric name - RecordTimer(scope int, timer int, d time.Duration) + RecordTimer(scope ScopeIdx, timer MetricIdx, d time.Duration) // RecordHistogramDuration records a histogram duration value for the given // metric name - RecordHistogramDuration(scope int, timer int, d time.Duration) + RecordHistogramDuration(scope ScopeIdx, timer MetricIdx, d time.Duration) // UpdateGauge reports Gauge type absolute value metric - UpdateGauge(scope int, gauge int, value float64) + UpdateGauge(scope ScopeIdx, gauge MetricIdx, value float64) // Scope return an internal scope that can be used to add additional // information to metrics - Scope(scope int, tags ...Tag) Scope + Scope(scope ScopeIdx, tags ...Tag) Scope } // Scope is an interface for metrics Scope interface { // IncCounter increments a counter metric - IncCounter(counter int) + IncCounter(counter MetricIdx) // AddCounter adds delta to the counter metric - AddCounter(counter int, delta int64) + AddCounter(counter MetricIdx, delta int64) // StartTimer starts a timer for the given metric name. // Time will be recorded when stopwatch is stopped. - StartTimer(timer int) Stopwatch + StartTimer(timer MetricIdx) Stopwatch // RecordTimer starts a timer for the given metric name - RecordTimer(timer int, d time.Duration) + RecordTimer(timer MetricIdx, d time.Duration) // RecordHistogramDuration records a histogram duration value for the given // metric name - RecordHistogramDuration(timer int, d time.Duration) + RecordHistogramDuration(timer MetricIdx, d time.Duration) // RecordHistogramValue records a histogram value for the given metric name - RecordHistogramValue(timer int, value float64) + RecordHistogramValue(timer MetricIdx, value float64) // UpdateGauge reports Gauge type absolute value metric - UpdateGauge(gauge int, value float64) + UpdateGauge(gauge MetricIdx, value float64) // Tagged return an internal scope that can be used to add additional // information to metrics Tagged(tags ...Tag) Scope diff --git a/common/metrics/mocks/Client.go b/common/metrics/mocks/Client.go index fa72d7e19e7..bfbc3a00ca5 100644 --- a/common/metrics/mocks/Client.go +++ b/common/metrics/mocks/Client.go @@ -36,27 +36,27 @@ type Client struct { } // AddCounter provides a mock function with given fields: scope, counter, delta -func (_m *Client) AddCounter(scope int, counter int, delta int64) { +func (_m *Client) AddCounter(scope metrics.ScopeIdx, counter metrics.MetricIdx, delta int64) { _m.Called(scope, counter, delta) } // IncCounter provides a mock function with given fields: scope, counter -func (_m *Client) IncCounter(scope int, counter int) { +func (_m *Client) IncCounter(scope metrics.ScopeIdx, counter metrics.MetricIdx) { _m.Called(scope, counter) } // RecordHistogramDuration provides a mock function with given fields: scope, timer, d -func (_m *Client) RecordHistogramDuration(scope int, timer int, d time.Duration) { +func (_m *Client) RecordHistogramDuration(scope metrics.ScopeIdx, timer metrics.MetricIdx, d time.Duration) { _m.Called(scope, timer, d) } // RecordTimer provides a mock function with given fields: scope, timer, d -func (_m *Client) RecordTimer(scope int, timer int, d time.Duration) { +func (_m *Client) RecordTimer(scope metrics.ScopeIdx, timer metrics.MetricIdx, d time.Duration) { _m.Called(scope, timer, d) } // Scope provides a mock function with given fields: scope, tags -func (_m *Client) Scope(scope int, tags ...metrics.Tag) metrics.Scope { +func (_m *Client) Scope(scope metrics.ScopeIdx, tags ...metrics.Tag) metrics.Scope { _va := make([]interface{}, len(tags)) for _i := range tags { _va[_i] = tags[_i] @@ -67,7 +67,7 @@ func (_m *Client) Scope(scope int, tags ...metrics.Tag) metrics.Scope { ret := _m.Called(_ca...) var r0 metrics.Scope - if rf, ok := ret.Get(0).(func(int, ...metrics.Tag) metrics.Scope); ok { + if rf, ok := ret.Get(0).(func(metrics.ScopeIdx, ...metrics.Tag) metrics.Scope); ok { r0 = rf(scope, tags...) } else { if ret.Get(0) != nil { @@ -79,11 +79,11 @@ func (_m *Client) Scope(scope int, tags ...metrics.Tag) metrics.Scope { } // StartTimer provides a mock function with given fields: scope, timer -func (_m *Client) StartTimer(scope int, timer int) tally.Stopwatch { +func (_m *Client) StartTimer(scope metrics.ScopeIdx, timer metrics.MetricIdx) tally.Stopwatch { ret := _m.Called(scope, timer) var r0 tally.Stopwatch - if rf, ok := ret.Get(0).(func(int, int) tally.Stopwatch); ok { + if rf, ok := ret.Get(0).(func(metrics.ScopeIdx, metrics.MetricIdx) tally.Stopwatch); ok { r0 = rf(scope, timer) } else { r0 = ret.Get(0).(tally.Stopwatch) @@ -93,6 +93,6 @@ func (_m *Client) StartTimer(scope int, timer int) tally.Stopwatch { } // UpdateGauge provides a mock function with given fields: scope, gauge, value -func (_m *Client) UpdateGauge(scope int, gauge int, value float64) { +func (_m *Client) UpdateGauge(scope metrics.ScopeIdx, gauge metrics.MetricIdx, value float64) { _m.Called(scope, gauge, value) } diff --git a/common/metrics/mocks/Scope.go b/common/metrics/mocks/Scope.go index 8456725404c..d67661b35c9 100644 --- a/common/metrics/mocks/Scope.go +++ b/common/metrics/mocks/Scope.go @@ -35,36 +35,36 @@ type Scope struct { } // AddCounter provides a mock function with given fields: counter, delta -func (_m *Scope) AddCounter(counter int, delta int64) { +func (_m *Scope) AddCounter(counter metrics.MetricIdx, delta int64) { _m.Called(counter, delta) } // IncCounter provides a mock function with given fields: counter -func (_m *Scope) IncCounter(counter int) { +func (_m *Scope) IncCounter(counter metrics.MetricIdx) { _m.Called(counter) } // RecordHistogramDuration provides a mock function with given fields: timer, d -func (_m *Scope) RecordHistogramDuration(timer int, d time.Duration) { +func (_m *Scope) RecordHistogramDuration(timer metrics.MetricIdx, d time.Duration) { _m.Called(timer, d) } // RecordHistogramValue provides a mock function with given fields: timer, value -func (_m *Scope) RecordHistogramValue(timer int, value float64) { +func (_m *Scope) RecordHistogramValue(timer metrics.MetricIdx, value float64) { _m.Called(timer, value) } // RecordTimer provides a mock function with given fields: timer, d -func (_m *Scope) RecordTimer(timer int, d time.Duration) { +func (_m *Scope) RecordTimer(timer metrics.MetricIdx, d time.Duration) { _m.Called(timer, d) } // StartTimer provides a mock function with given fields: timer -func (_m *Scope) StartTimer(timer int) metrics.Stopwatch { +func (_m *Scope) StartTimer(timer metrics.MetricIdx) metrics.Stopwatch { ret := _m.Called(timer) var r0 metrics.Stopwatch - if rf, ok := ret.Get(0).(func(int) metrics.Stopwatch); ok { + if rf, ok := ret.Get(0).(func(idx metrics.MetricIdx) metrics.Stopwatch); ok { r0 = rf(timer) } else { r0 = ret.Get(0).(metrics.Stopwatch) @@ -96,6 +96,6 @@ func (_m *Scope) Tagged(tags ...metrics.Tag) metrics.Scope { } // UpdateGauge provides a mock function with given fields: gauge, value -func (_m *Scope) UpdateGauge(gauge int, value float64) { +func (_m *Scope) UpdateGauge(gauge metrics.MetricIdx, value float64) { _m.Called(gauge, value) } diff --git a/common/metrics/nop.go b/common/metrics/nop.go index e37c6b50e3f..631f9f4261a 100644 --- a/common/metrics/nop.go +++ b/common/metrics/nop.go @@ -44,26 +44,26 @@ func NopStopwatch() tally.Stopwatch { type noopClientImpl struct{} -func (n noopClientImpl) IncCounter(scope int, counter int) { +func (n noopClientImpl) IncCounter(scope ScopeIdx, counter MetricIdx) { } -func (n noopClientImpl) AddCounter(scope int, counter int, delta int64) { +func (n noopClientImpl) AddCounter(scope ScopeIdx, counter MetricIdx, delta int64) { } -func (n noopClientImpl) StartTimer(scope int, timer int) tally.Stopwatch { +func (n noopClientImpl) StartTimer(scope ScopeIdx, timer MetricIdx) tally.Stopwatch { return NoopStopwatch } -func (n noopClientImpl) RecordTimer(scope int, timer int, d time.Duration) { +func (n noopClientImpl) RecordTimer(scope ScopeIdx, timer MetricIdx, d time.Duration) { } -func (n *noopClientImpl) RecordHistogramDuration(scope int, timer int, d time.Duration) { +func (n *noopClientImpl) RecordHistogramDuration(scope ScopeIdx, timer MetricIdx, d time.Duration) { } -func (n noopClientImpl) UpdateGauge(scope int, gauge int, value float64) { +func (n noopClientImpl) UpdateGauge(scope ScopeIdx, gauge MetricIdx, value float64) { } -func (n noopClientImpl) Scope(scope int, tags ...Tag) Scope { +func (n noopClientImpl) Scope(scope ScopeIdx, tags ...Tag) Scope { return NoopScope } @@ -74,26 +74,26 @@ func NewNoopMetricsClient() Client { type noopScopeImpl struct{} -func (n *noopScopeImpl) IncCounter(counter int) { +func (n *noopScopeImpl) IncCounter(counter MetricIdx) { } -func (n *noopScopeImpl) AddCounter(counter int, delta int64) { +func (n *noopScopeImpl) AddCounter(counter MetricIdx, delta int64) { } -func (n *noopScopeImpl) StartTimer(timer int) Stopwatch { +func (n *noopScopeImpl) StartTimer(timer MetricIdx) Stopwatch { return NewTestStopwatch() } -func (n *noopScopeImpl) RecordTimer(timer int, d time.Duration) { +func (n *noopScopeImpl) RecordTimer(timer MetricIdx, d time.Duration) { } -func (n *noopScopeImpl) RecordHistogramDuration(timer int, d time.Duration) { +func (n *noopScopeImpl) RecordHistogramDuration(timer MetricIdx, d time.Duration) { } -func (n *noopScopeImpl) RecordHistogramValue(timer int, value float64) { +func (n *noopScopeImpl) RecordHistogramValue(timer MetricIdx, value float64) { } -func (n *noopScopeImpl) UpdateGauge(gauge int, value float64) { +func (n *noopScopeImpl) UpdateGauge(gauge MetricIdx, value float64) { } func (n *noopScopeImpl) Tagged(tags ...Tag) Scope { diff --git a/common/metrics/scope.go b/common/metrics/scope.go index 13e1a351795..4b91381fab7 100644 --- a/common/metrics/scope.go +++ b/common/metrics/scope.go @@ -29,14 +29,14 @@ import ( type metricsScope struct { scope tally.Scope rootScope tally.Scope - defs map[int]metricDefinition + defs map[MetricIdx]metricDefinition isDomainTagged bool } func newMetricsScope( rootScope tally.Scope, scope tally.Scope, - defs map[int]metricDefinition, + defs map[MetricIdx]metricDefinition, isDomain bool, ) Scope { return &metricsScope{ @@ -47,11 +47,11 @@ func newMetricsScope( } } -func (m *metricsScope) IncCounter(id int) { +func (m *metricsScope) IncCounter(id MetricIdx) { m.AddCounter(id, 1) } -func (m *metricsScope) AddCounter(id int, delta int64) { +func (m *metricsScope) AddCounter(id MetricIdx, delta int64) { def := m.defs[id] m.scope.Counter(def.metricName.String()).Inc(delta) if !def.metricRollupName.Empty() { @@ -59,7 +59,7 @@ func (m *metricsScope) AddCounter(id int, delta int64) { } } -func (m *metricsScope) UpdateGauge(id int, value float64) { +func (m *metricsScope) UpdateGauge(id MetricIdx, value float64) { def := m.defs[id] m.scope.Gauge(def.metricName.String()).Update(value) if !def.metricRollupName.Empty() { @@ -67,7 +67,7 @@ func (m *metricsScope) UpdateGauge(id int, value float64) { } } -func (m *metricsScope) StartTimer(id int) Stopwatch { +func (m *metricsScope) StartTimer(id MetricIdx) Stopwatch { def := m.defs[id] timer := m.scope.Timer(def.metricName.String()) switch { @@ -81,7 +81,7 @@ func (m *metricsScope) StartTimer(id int) Stopwatch { } } -func (m *metricsScope) RecordTimer(id int, d time.Duration) { +func (m *metricsScope) RecordTimer(id MetricIdx, d time.Duration) { def := m.defs[id] m.scope.Timer(def.metricName.String()).Record(d) switch { @@ -94,7 +94,7 @@ func (m *metricsScope) RecordTimer(id int, d time.Duration) { } } -func (m *metricsScope) RecordHistogramDuration(id int, value time.Duration) { +func (m *metricsScope) RecordHistogramDuration(id MetricIdx, value time.Duration) { def := m.defs[id] m.scope.Histogram(def.metricName.String(), m.getBuckets(id)).RecordDuration(value) if !def.metricRollupName.Empty() { @@ -102,7 +102,7 @@ func (m *metricsScope) RecordHistogramDuration(id int, value time.Duration) { } } -func (m *metricsScope) RecordHistogramValue(id int, value float64) { +func (m *metricsScope) RecordHistogramValue(id MetricIdx, value float64) { def := m.defs[id] m.scope.Histogram(def.metricName.String(), m.getBuckets(id)).RecordValue(value) if !def.metricRollupName.Empty() { @@ -122,7 +122,7 @@ func (m *metricsScope) Tagged(tags ...Tag) Scope { return newMetricsScope(m.rootScope, m.scope.Tagged(tagMap), m.defs, domainTagged) } -func (m *metricsScope) getBuckets(id int) tally.Buckets { +func (m *metricsScope) getBuckets(id MetricIdx) tally.Buckets { if m.defs[id].buckets != nil { return m.defs[id].buckets } diff --git a/common/persistence/elasticsearch/es_visibility_metric_clients.go b/common/persistence/elasticsearch/es_visibility_metric_clients.go index d5d7687b0bb..77a2a24a663 100644 --- a/common/persistence/elasticsearch/es_visibility_metric_clients.go +++ b/common/persistence/elasticsearch/es_visibility_metric_clients.go @@ -368,7 +368,7 @@ func (p *visibilityMetricsClient) DeleteUninitializedWorkflowExecution( return err } -func (p *visibilityMetricsClient) updateErrorMetric(scopeWithDomainTag metrics.Scope, scope int, err error) { +func (p *visibilityMetricsClient) updateErrorMetric(scopeWithDomainTag metrics.Scope, scope metrics.ScopeIdx, err error) { switch err.(type) { case *types.BadRequestError: @@ -379,7 +379,7 @@ func (p *visibilityMetricsClient) updateErrorMetric(scopeWithDomainTag metrics.S scopeWithDomainTag.IncCounter(metrics.ElasticsearchErrBusyCounterPerDomain) scopeWithDomainTag.IncCounter(metrics.ElasticsearchFailuresPerDomain) default: - p.logger.Error("Operation failed with internal error.", tag.MetricScope(scope), tag.Error(err)) + p.logger.Error("Operation failed with internal error.", tag.MetricScope(int(scope)), tag.Error(err)) scopeWithDomainTag.IncCounter(metrics.ElasticsearchFailuresPerDomain) } } diff --git a/common/persistence/pinot/pinot_visibility_metric_clients.go b/common/persistence/pinot/pinot_visibility_metric_clients.go index 44f15d469d7..98011ea737c 100644 --- a/common/persistence/pinot/pinot_visibility_metric_clients.go +++ b/common/persistence/pinot/pinot_visibility_metric_clients.go @@ -364,7 +364,7 @@ func (p *pinotVisibilityMetricsClient) DeleteUninitializedWorkflowExecution( return err } -func (p *pinotVisibilityMetricsClient) updateErrorMetric(scopeWithDomainTag metrics.Scope, scope int, err error) { +func (p *pinotVisibilityMetricsClient) updateErrorMetric(scopeWithDomainTag metrics.Scope, scope metrics.ScopeIdx, err error) { switch err.(type) { case *types.BadRequestError: @@ -375,7 +375,7 @@ func (p *pinotVisibilityMetricsClient) updateErrorMetric(scopeWithDomainTag metr scopeWithDomainTag.IncCounter(metrics.PinotErrBusyCounterPerDomain) scopeWithDomainTag.IncCounter(metrics.PinotFailuresPerDomain) default: - p.logger.Error("Operation failed with internal error.", tag.MetricScope(scope), tag.Error(err)) + p.logger.Error("Operation failed with internal error.", tag.MetricScope(int(scope)), tag.Error(err)) scopeWithDomainTag.IncCounter(metrics.PinotFailuresPerDomain) } } diff --git a/common/persistence/wrappers/metered/base.go b/common/persistence/wrappers/metered/base.go index 89cd0158988..5d24daa4b89 100644 --- a/common/persistence/wrappers/metered/base.go +++ b/common/persistence/wrappers/metered/base.go @@ -47,7 +47,7 @@ type base struct { enableShardIDMetrics dynamicproperties.BoolPropertyFn } -func (p *base) updateErrorMetricPerDomain(scope int, err error, scopeWithDomainTag metrics.Scope, logger log.Logger) { +func (p *base) updateErrorMetricPerDomain(scope metrics.ScopeIdx, err error, scopeWithDomainTag metrics.Scope, logger log.Logger) { logger = logger.Helper() switch { @@ -78,14 +78,14 @@ func (p *base) updateErrorMetricPerDomain(scope int, err error, scopeWithDomainT case errors.As(err, new(*persistence.DBUnavailableError)): scopeWithDomainTag.IncCounter(metrics.PersistenceErrDBUnavailableCounterPerDomain) scopeWithDomainTag.IncCounter(metrics.PersistenceFailuresPerDomain) - logger.Error("DBUnavailable Error:", tag.Error(err), tag.MetricScope(scope)) + logger.Error("DBUnavailable Error:", tag.Error(err), tag.MetricScope(int(scope))) default: - logger.Error("Operation failed with internal error.", tag.Error(err), tag.MetricScope(scope)) + logger.Error("Operation failed with internal error.", tag.Error(err), tag.MetricScope(int(scope))) scopeWithDomainTag.IncCounter(metrics.PersistenceFailuresPerDomain) } } -func (p *base) updateErrorMetric(scope int, err error, metricsScope metrics.Scope, logger log.Logger) { +func (p *base) updateErrorMetric(scope metrics.ScopeIdx, err error, metricsScope metrics.Scope, logger log.Logger) { logger = logger.Helper() switch { @@ -116,14 +116,14 @@ func (p *base) updateErrorMetric(scope int, err error, metricsScope metrics.Scop case errors.As(err, new(*persistence.DBUnavailableError)): metricsScope.IncCounter(metrics.PersistenceErrDBUnavailableCounter) metricsScope.IncCounter(metrics.PersistenceFailures) - logger.Error("DBUnavailable Error:", tag.Error(err), tag.MetricScope(scope)) + logger.Error("DBUnavailable Error:", tag.Error(err), tag.MetricScope(int(scope))) default: - logger.Error("Operation failed with internal error.", tag.Error(err), tag.MetricScope(scope)) + logger.Error("Operation failed with internal error.", tag.Error(err), tag.MetricScope(int(scope))) metricsScope.IncCounter(metrics.PersistenceFailures) } } -func (p *base) call(scope int, op func() error, tags ...metrics.Tag) error { +func (p *base) call(scope metrics.ScopeIdx, op func() error, tags ...metrics.Tag) error { metricsScope := p.metricClient.Scope(scope, tags...) if len(tags) > 0 { metricsScope.IncCounter(metrics.PersistenceRequestsPerDomain) @@ -154,7 +154,7 @@ func (p *base) call(scope int, op func() error, tags ...metrics.Tag) error { return err } -func (p *base) callWithoutDomainTag(scope int, op func() error, tags ...metrics.Tag) error { +func (p *base) callWithoutDomainTag(scope metrics.ScopeIdx, op func() error, tags ...metrics.Tag) error { metricsScope := p.metricClient.Scope(scope, tags...) metricsScope.IncCounter(metrics.PersistenceRequests) before := time.Now() @@ -171,7 +171,7 @@ func (p *base) callWithoutDomainTag(scope int, op func() error, tags ...metrics. return err } -func (p *base) callWithDomainAndShardScope(scope int, op func() error, domainTag metrics.Tag, shardIDTag metrics.Tag, additionalTags ...metrics.Tag) error { +func (p *base) callWithDomainAndShardScope(scope metrics.ScopeIdx, op func() error, domainTag metrics.Tag, shardIDTag metrics.Tag, additionalTags ...metrics.Tag) error { domainMetricsScope := p.metricClient.Scope(scope, append([]metrics.Tag{domainTag}, additionalTags...)...) shardOperationsMetricsScope := p.metricClient.Scope(scope, append([]metrics.Tag{shardIDTag}, additionalTags...)...) shardOverallMetricsScope := p.metricClient.Scope(metrics.PersistenceShardRequestCountScope, shardIDTag) @@ -259,7 +259,7 @@ func (p *base) emptyMetric(methodName string, req any, res any, err error) { } var emptyCountedMethods = map[string]struct { - scope int + scope metrics.ScopeIdx }{ "ExecutionManager.ListCurrentExecutions": { scope: metrics.PersistenceListCurrentExecutionsScope, @@ -288,7 +288,7 @@ var emptyCountedMethods = map[string]struct { } var payloadSizeEmittingMethods = map[string]struct { - scope int + scope metrics.ScopeIdx }{ "ExecutionManager.ListCurrentExecutions": { scope: metrics.PersistenceListCurrentExecutionsScope, diff --git a/common/util.go b/common/util.go index 6cb50848a79..857b2c89f48 100644 --- a/common/util.go +++ b/common/util.go @@ -249,7 +249,7 @@ func IsValidIDLength( scope metrics.Scope, warnLimit int, errorLimit int, - metricsCounter int, + metricsCounter metrics.MetricIdx, domainName string, logger log.Logger, idTypeViolationTag tag.Tag, @@ -981,7 +981,7 @@ func NewPerTaskListScope( taskListName string, taskListKind types.TaskListKind, client metrics.Client, - scopeIdx int, + scopeIdx metrics.ScopeIdx, ) metrics.Scope { domainTag := metrics.DomainUnknownTag() taskListTag := metrics.TaskListUnknownTag() diff --git a/common/util_test.go b/common/util_test.go index dd21cc116fa..cb52696023f 100644 --- a/common/util_test.go +++ b/common/util_test.go @@ -552,7 +552,7 @@ func TestIsValidIDLength(t *testing.T) { scope = metrics.NoopScope // arguments - metricCounter = 0 + metricCounter = metrics.MetricIdx(0) idTypeViolationTag = tag.ClusterName("idTypeViolationTag") domainName = "domain_name" id = "12345" diff --git a/service/frontend/admin/handler.go b/service/frontend/admin/handler.go index 8c466699c19..bda692db8bd 100644 --- a/service/frontend/admin/handler.go +++ b/service/frontend/admin/handler.go @@ -1509,7 +1509,7 @@ func (adh *adminHandlerImpl) validatePaginationToken( } // startRequestProfile initiates recording of request metrics -func (adh *adminHandlerImpl) startRequestProfile(ctx context.Context, scope int) (metrics.Scope, metrics.Stopwatch) { +func (adh *adminHandlerImpl) startRequestProfile(ctx context.Context, scope metrics.ScopeIdx) (metrics.Scope, metrics.Stopwatch) { metricsScope := adh.GetMetricsClient().Scope(scope).Tagged(metrics.DomainUnknownTag()).Tagged(metrics.GetContextTags(ctx)...) sw := metricsScope.StartTimer(metrics.CadenceLatency) metricsScope.IncCounter(metrics.CadenceRequests) diff --git a/service/frontend/api/handler.go b/service/frontend/api/handler.go index d908d56d606..68e1dcf249f 100644 --- a/service/frontend/api/handler.go +++ b/service/frontend/api/handler.go @@ -3324,7 +3324,7 @@ func constructRestartWorkflowRequest(w *types.WorkflowExecutionStartedEventAttri } func getMetricsScopeWithDomain( - scope int, + scope metrics.ScopeIdx, d domainGetter, metricsClient metrics.Client, ) metrics.Scope { diff --git a/service/frontend/api/producer_manager.go b/service/frontend/api/producer_manager.go index 297f06d43be..4f20e41884b 100644 --- a/service/frontend/api/producer_manager.go +++ b/service/frontend/api/producer_manager.go @@ -70,7 +70,7 @@ func NewProducerManager( InitialCapacity: 5, MaxCount: 100, Pin: true, - MetricsScope: metricsClient.Scope(metrics.Frontend), + MetricsScope: metricsClient.Scope(metrics.PersistenceGetShardScope), // was metrics.Frontend, using incorrect int Logger: logger, }), } diff --git a/service/frontend/wrappers/accesscontrolled/access_controlled.go b/service/frontend/wrappers/accesscontrolled/access_controlled.go index 72d00bb41c1..56d68d7734d 100644 --- a/service/frontend/wrappers/accesscontrolled/access_controlled.go +++ b/service/frontend/wrappers/accesscontrolled/access_controlled.go @@ -63,7 +63,7 @@ func (a *apiHandler) isAuthorized( // getMetricsScopeWithDomain return metrics scope with domain tag func (a *apiHandler) getMetricsScopeWithDomain( - scope int, + scope metrics.ScopeIdx, domain string, ) metrics.Scope { if domain != "" { diff --git a/service/frontend/wrappers/clusterredirection/callwrappers.go b/service/frontend/wrappers/clusterredirection/callwrappers.go index 86d5738afd1..d5dda1f8edf 100644 --- a/service/frontend/wrappers/clusterredirection/callwrappers.go +++ b/service/frontend/wrappers/clusterredirection/callwrappers.go @@ -38,7 +38,7 @@ type ( ) func (handler *clusterRedirectionHandler) beforeCall( - scope int, + scope metrics.ScopeIdx, ) (metrics.Scope, time.Time) { return handler.GetMetricsClient().Scope(scope), handler.GetTimeSource().Now() } diff --git a/service/history/decision/checker.go b/service/history/decision/checker.go index b1e0aa97af1..cf327e6a0e0 100644 --- a/service/history/decision/checker.go +++ b/service/history/decision/checker.go @@ -206,7 +206,7 @@ func (v *attrValidator) validateActivityScheduleAttributes( targetDomainID string, attributes *types.ScheduleActivityTaskDecisionAttributes, wfTimeout int32, - metricsScope int, + metricsScope metrics.ScopeIdx, ) error { if err := v.validateCrossDomainCall( @@ -369,7 +369,7 @@ func (v *attrValidator) validateActivityScheduleAttributes( func (v *attrValidator) validateTimerScheduleAttributes( attributes *types.StartTimerDecisionAttributes, - metricsScope int, + metricsScope metrics.ScopeIdx, domain string, ) error { @@ -400,7 +400,7 @@ func (v *attrValidator) validateTimerScheduleAttributes( func (v *attrValidator) validateActivityCancelAttributes( attributes *types.RequestCancelActivityTaskDecisionAttributes, - metricsScope int, + metricsScope metrics.ScopeIdx, domain string, ) error { @@ -427,7 +427,7 @@ func (v *attrValidator) validateActivityCancelAttributes( func (v *attrValidator) validateTimerCancelAttributes( attributes *types.CancelTimerDecisionAttributes, - metricsScope int, + metricsScope metrics.ScopeIdx, domain string, ) error { @@ -453,7 +453,7 @@ func (v *attrValidator) validateTimerCancelAttributes( func (v *attrValidator) validateRecordMarkerAttributes( attributes *types.RecordMarkerDecisionAttributes, - metricsScope int, + metricsScope metrics.ScopeIdx, domain string, ) error { @@ -515,7 +515,7 @@ func (v *attrValidator) validateCancelExternalWorkflowExecutionAttributes( domainID string, targetDomainID string, attributes *types.RequestCancelExternalWorkflowExecutionDecisionAttributes, - metricsScope int, + metricsScope metrics.ScopeIdx, ) error { if err := v.validateCrossDomainCall( @@ -568,7 +568,7 @@ func (v *attrValidator) validateSignalExternalWorkflowExecutionAttributes( domainID string, targetDomainID string, attributes *types.SignalExternalWorkflowExecutionDecisionAttributes, - metricsScope int, + metricsScope metrics.ScopeIdx, ) error { if err := v.validateCrossDomainCall( @@ -647,7 +647,7 @@ func (v *attrValidator) validateUpsertWorkflowSearchAttributes( func (v *attrValidator) validateContinueAsNewWorkflowExecutionAttributes( attributes *types.ContinueAsNewWorkflowExecutionDecisionAttributes, executionInfo *persistence.WorkflowExecutionInfo, - metricsScope int, + metricsScope metrics.ScopeIdx, domain string, ) error { @@ -706,7 +706,7 @@ func (v *attrValidator) validateStartChildExecutionAttributes( targetDomainID string, attributes *types.StartChildWorkflowExecutionDecisionAttributes, parentInfo *persistence.WorkflowExecutionInfo, - metricsScope int, + metricsScope metrics.ScopeIdx, ) error { if err := v.validateCrossDomainCall( @@ -797,7 +797,7 @@ func (v *attrValidator) validateStartChildExecutionAttributes( func (v *attrValidator) validatedTaskList( taskList *types.TaskList, defaultVal string, - metricsScope int, + metricsScope metrics.ScopeIdx, domain string, ) (*types.TaskList, error) { diff --git a/service/history/engine/engineimpl/start_workflow_execution.go b/service/history/engine/engineimpl/start_workflow_execution.go index 872aa9d027c..d481dabe0c2 100644 --- a/service/history/engine/engineimpl/start_workflow_execution.go +++ b/service/history/engine/engineimpl/start_workflow_execution.go @@ -67,7 +67,7 @@ func (e *historyEngineImpl) startWorkflowHelper( ctx context.Context, startRequest *types.HistoryStartWorkflowExecutionRequest, domainEntry *cache.DomainCacheEntry, - metricsScope int, + metricsScope metrics.ScopeIdx, signalWithStartArg *signalWithStartArg, ) (resp *types.StartWorkflowExecutionResponse, retError error) { @@ -515,7 +515,7 @@ func shouldTerminateAndStart(startRequest *types.HistoryStartWorkflowExecutionRe (state == persistence.WorkflowStateRunning || state == persistence.WorkflowStateCreated) } -func (e *historyEngineImpl) validateStartWorkflowExecutionRequest(request *types.StartWorkflowExecutionRequest, metricsScope int) error { +func (e *historyEngineImpl) validateStartWorkflowExecutionRequest(request *types.StartWorkflowExecutionRequest, metricsScope metrics.ScopeIdx) error { if len(request.GetRequestID()) == 0 { return &types.BadRequestError{Message: "Missing request ID."} } @@ -584,7 +584,7 @@ func (e *historyEngineImpl) validateStartWorkflowExecutionRequest(request *types func (e *historyEngineImpl) overrideTaskStartToCloseTimeoutSeconds( domainEntry *cache.DomainCacheEntry, request *types.StartWorkflowExecutionRequest, - metricsScope int, + metricsScope metrics.ScopeIdx, ) { domainName := domainEntry.GetInfo().Name maxDecisionStartToCloseTimeoutSeconds := int32(e.config.MaxDecisionStartToCloseSeconds(domainName)) diff --git a/service/history/execution/cache.go b/service/history/execution/cache.go index 7dce612a212..a24298b9df3 100644 --- a/service/history/execution/cache.go +++ b/service/history/execution/cache.go @@ -254,7 +254,7 @@ func (c *cacheImpl) getOrCreateWorkflowExecutionInternal( ctx context.Context, domainID string, execution types.WorkflowExecution, - scope int, + scope metrics.ScopeIdx, forceClearContext bool, ) (Context, ReleaseFunc, error) { diff --git a/service/history/execution/context_util_test.go b/service/history/execution/context_util_test.go index 03263d21faa..5baf3319f1e 100644 --- a/service/history/execution/context_util_test.go +++ b/service/history/execution/context_util_test.go @@ -240,7 +240,7 @@ func TestEmitWorkflowExecutionStats(t *testing.T) { if tc.expectCalls { mockMetricsClient.On("Scope", metrics.ExecutionSizeStatsScope, mock.Anything).Return(mockScope) mockMetricsClient.On("Scope", metrics.ExecutionCountStatsScope, mock.Anything).Return(mockScope) - mockScope.On("RecordTimer", mock.AnythingOfType("int"), mock.AnythingOfType("time.Duration")).Return().Times(14) + mockScope.On("RecordTimer", mock.AnythingOfType("metrics.MetricIdx"), mock.AnythingOfType("time.Duration")).Return().Times(14) } else { mockScope.AssertNotCalled(t, "RecordTimer") } diff --git a/service/history/handler/handler.go b/service/history/handler/handler.go index bb7b566c186..19888065aca 100644 --- a/service/history/handler/handler.go +++ b/service/history/handler/handler.go @@ -2247,7 +2247,7 @@ func (h *handlerImpl) emitInfoOrDebugLog( } } -func (h *handlerImpl) startRequestProfile(ctx context.Context, scope int) (metrics.Scope, metrics.Stopwatch) { +func (h *handlerImpl) startRequestProfile(ctx context.Context, scope metrics.ScopeIdx) (metrics.Scope, metrics.Stopwatch) { metricsScope := h.GetMetricsClient().Scope(scope, metrics.GetContextTags(ctx)...) metricsScope.IncCounter(metrics.CadenceRequests) sw := metricsScope.StartTimer(metrics.CadenceLatency) diff --git a/service/history/queue/processor_base.go b/service/history/queue/processor_base.go index 85ff9ac709e..7ff5262ce2d 100644 --- a/service/history/queue/processor_base.go +++ b/service/history/queue/processor_base.go @@ -429,7 +429,7 @@ func (p *processorBase) submitTask(task task.Task) (bool, error) { return true, nil } -func getPendingTasksMetricIdx(scopeIdx int) int { +func getPendingTasksMetricIdx(scopeIdx metrics.ScopeIdx) metrics.MetricIdx { switch scopeIdx { case metrics.TimerActiveQueueProcessorScope: return metrics.ShardInfoTimerActivePendingTasksTimer diff --git a/service/history/queue/processor_options.go b/service/history/queue/processor_options.go index 9e8f3ba5a9a..bd49702267f 100644 --- a/service/history/queue/processor_options.go +++ b/service/history/queue/processor_options.go @@ -22,6 +22,7 @@ package queue import ( "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" + "github.com/uber/cadence/common/metrics" ) type queueProcessorOptions struct { @@ -55,5 +56,5 @@ type queueProcessorOptions struct { ValidationInterval dynamicproperties.DurationPropertyFn // MaxPendingTaskSize is used in cross cluster queue to limit the pending task count MaxPendingTaskSize dynamicproperties.IntPropertyFn - MetricScope int + MetricScope metrics.ScopeIdx } diff --git a/service/history/replication/dlq_handler_test.go b/service/history/replication/dlq_handler_test.go index 53099063d6d..65d8fbffdbe 100644 --- a/service/history/replication/dlq_handler_test.go +++ b/service/history/replication/dlq_handler_test.go @@ -37,6 +37,7 @@ import ( "github.com/uber/cadence/client/admin" "github.com/uber/cadence/common" "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" @@ -572,13 +573,13 @@ func (s *dlqHandlerSuite) TestMergeMessages_executeFailed() { } type fakeTaskExecutor struct { - scope int + scope metrics.ScopeIdx err error executedTasks []*types.ReplicationTask } -func (e *fakeTaskExecutor) execute(replicationTask *types.ReplicationTask, _ bool) (int, error) { +func (e *fakeTaskExecutor) execute(replicationTask *types.ReplicationTask, _ bool) (metrics.ScopeIdx, error) { e.executedTasks = append(e.executedTasks, replicationTask) return e.scope, e.err } diff --git a/service/history/replication/task_executor.go b/service/history/replication/task_executor.go index a62b9d4e7d6..dec3f48bb97 100644 --- a/service/history/replication/task_executor.go +++ b/service/history/replication/task_executor.go @@ -39,7 +39,7 @@ import ( type ( // TaskExecutor is the executor for replication task TaskExecutor interface { - execute(replicationTask *types.ReplicationTask, forceApply bool) (int, error) + execute(replicationTask *types.ReplicationTask, forceApply bool) (metrics.ScopeIdx, error) } taskExecutorImpl struct { @@ -83,10 +83,10 @@ func NewTaskExecutor( func (e *taskExecutorImpl) execute( replicationTask *types.ReplicationTask, forceApply bool, -) (int, error) { +) (metrics.ScopeIdx, error) { var err error - var scope int + var scope metrics.ScopeIdx switch replicationTask.GetTaskType() { case types.ReplicationTaskTypeSyncActivity: scope = metrics.SyncActivityTaskScope diff --git a/service/history/replication/task_executor_mock.go b/service/history/replication/task_executor_mock.go index 340f7e7239e..c816b76137b 100644 --- a/service/history/replication/task_executor_mock.go +++ b/service/history/replication/task_executor_mock.go @@ -14,6 +14,7 @@ import ( gomock "go.uber.org/mock/gomock" + metrics "github.com/uber/cadence/common/metrics" types "github.com/uber/cadence/common/types" ) @@ -42,10 +43,10 @@ func (m *MockTaskExecutor) EXPECT() *MockTaskExecutorMockRecorder { } // execute mocks base method. -func (m *MockTaskExecutor) execute(replicationTask *types.ReplicationTask, forceApply bool) (int, error) { +func (m *MockTaskExecutor) execute(replicationTask *types.ReplicationTask, forceApply bool) (metrics.ScopeIdx, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "execute", replicationTask, forceApply) - ret0, _ := ret[0].(int) + ret0, _ := ret[0].(metrics.ScopeIdx) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/service/history/replication/task_processor.go b/service/history/replication/task_processor.go index 2e0c9416950..b6bbb0dd6d5 100644 --- a/service/history/replication/task_processor.go +++ b/service/history/replication/task_processor.go @@ -663,7 +663,7 @@ func (p *taskProcessorImpl) shouldRetryDLQ(err error) bool { } } -func (p *taskProcessorImpl) updateFailureMetric(scope int, err error, shardID int) { +func (p *taskProcessorImpl) updateFailureMetric(scope metrics.ScopeIdx, err error, shardID int) { // Always update failure counter for all replicator errors shardScope := p.metricsClient.Scope(scope, metrics.InstanceTag(strconv.Itoa(shardID))) shardScope.IncCounter(metrics.ReplicatorFailures) diff --git a/service/history/replication/task_processor_test.go b/service/history/replication/task_processor_test.go index d7b54163ff6..1217f4c797a 100644 --- a/service/history/replication/task_processor_test.go +++ b/service/history/replication/task_processor_test.go @@ -220,7 +220,7 @@ func (s *taskProcessorSuite) TestProcessorLoop_RespChanClosed() { func (s *taskProcessorSuite) TestProcessorLoop_TaskExecuteSuccess() { // taskExecutor will fail to execute the task // returning a non-retriable task to keep mocking simpler - s.taskExecutor.EXPECT().execute(gomock.Any(), false).Return(0, nil).Times(1) + s.taskExecutor.EXPECT().execute(gomock.Any(), false).Return(metrics.ScopeIdx(0), nil).Times(1) // domain name will be fetched s.mockDomainCache.EXPECT().GetDomainName(testDomainID).Return(testDomainName, nil).AnyTimes() @@ -255,7 +255,7 @@ func (s *taskProcessorSuite) TestProcessorLoop_TaskExecuteSuccess() { func (s *taskProcessorSuite) TestProcessorLoop_TaskExecuteFailed_PutDLQSuccess() { // taskExecutor will fail to execute the task // returning a non-retriable task to keep mocking simpler - s.taskExecutor.EXPECT().execute(gomock.Any(), false).Return(0, &types.BadRequestError{}).Times(1) + s.taskExecutor.EXPECT().execute(gomock.Any(), false).Return(metrics.ScopeIdx(0), &types.BadRequestError{}).Times(1) // domain name will be fetched s.mockDomainCache.EXPECT().GetDomainName(testDomainID).Return(testDomainName, nil).AnyTimes() @@ -305,7 +305,7 @@ func (s *taskProcessorSuite) TestProcessorLoop_TaskExecuteFailed_PutDLQSuccess() func (s *taskProcessorSuite) TestProcessorLoop_TaskExecuteFailed_PutDLQFailed() { // taskExecutor will fail to execute the task // returning a non-retriable task to keep mocking simpler - s.taskExecutor.EXPECT().execute(gomock.Any(), false).Return(0, &types.BadRequestError{}).Times(1) + s.taskExecutor.EXPECT().execute(gomock.Any(), false).Return(metrics.ScopeIdx(0), &types.BadRequestError{}).Times(1) // domain name will be fetched s.mockDomainCache.EXPECT().GetDomainName(testDomainID).Return(testDomainName, nil).AnyTimes() diff --git a/service/history/task/task.go b/service/history/task/task.go index cb34bbe2228..49c44101a81 100644 --- a/service/history/task/task.go +++ b/service/history/task/task.go @@ -493,7 +493,7 @@ func logEvent( // otherwise, it creates a new domain-tagged scope, cache and return the scope func getOrCreateDomainTaggedScope( shard shard.Context, - scopeIdx int, + scopeIdx metrics.ScopeIdx, domainID string, logger log.Logger, ) metrics.Scope { diff --git a/service/history/task/task_util.go b/service/history/task/task_util.go index eaec9fb5921..9d1b68036ac 100644 --- a/service/history/task/task_util.go +++ b/service/history/task/task_util.go @@ -64,7 +64,7 @@ func InitializeLoggerForTask( func GetTransferTaskMetricsScope( taskType int, isActive bool, -) int { +) metrics.ScopeIdx { switch taskType { case persistence.TransferTaskTypeActivityTask: if isActive { @@ -138,7 +138,7 @@ func GetTransferTaskMetricsScope( func GetTimerTaskMetricScope( taskType int, isActive bool, -) int { +) metrics.ScopeIdx { switch taskType { case persistence.TaskTypeDecisionTimeout: if isActive { diff --git a/service/history/task/task_util_test.go b/service/history/task/task_util_test.go index 8154e72a70c..e24c4a24ee0 100644 --- a/service/history/task/task_util_test.go +++ b/service/history/task/task_util_test.go @@ -82,7 +82,7 @@ func TestGetTransferTaskMetricsScope(t *testing.T) { name string taskType int isActive bool - expectedScope int + expectedScope metrics.ScopeIdx }{ { name: "TransferTaskTypeActivityTask - active", @@ -254,7 +254,7 @@ func TestGetTimerTaskMetricScope(t *testing.T) { name string taskType int isActive bool - expectedScope int + expectedScope metrics.ScopeIdx }{ { name: "TimerTaskTypeDecisionTimeout - active", diff --git a/service/history/task/timer_active_task_executor.go b/service/history/task/timer_active_task_executor.go index b0ceeff138a..bc50346c300 100644 --- a/service/history/task/timer_active_task_executor.go +++ b/service/history/task/timer_active_task_executor.go @@ -851,7 +851,7 @@ func (t *timerActiveTaskExecutor) updateWorkflowExecution( func (t *timerActiveTaskExecutor) emitTimeoutMetricScopeWithDomainTag( domainID string, - scope int, + scope metrics.ScopeIdx, timerType execution.TimerType, tags ...metrics.Tag, ) { diff --git a/service/history/workflowcache/cache.go b/service/history/workflowcache/cache.go index 9345a626f59..120b0cfe93c 100644 --- a/service/history/workflowcache/cache.go +++ b/service/history/workflowcache/cache.go @@ -172,7 +172,7 @@ func (c *wfCache) emitRateLimitMetrics( workflowID string, domainName string, callType string, - metric int, + metric metrics.MetricIdx, ) { c.metricsClient.Scope( metrics.HistoryClientWfIDCacheScope, diff --git a/service/history/workflowcache/metrics.go b/service/history/workflowcache/metrics.go index a0556db324b..6dc6454803f 100644 --- a/service/history/workflowcache/metrics.go +++ b/service/history/workflowcache/metrics.go @@ -50,7 +50,7 @@ func (cm *workflowIDCountMetric) updatePerDomainMaxWFRequestCount( domainName string, timeSource clock.TimeSource, metricsClient metrics.Client, - metric int, + metric metrics.MetricIdx, ) { cm.Lock() defer cm.Unlock() diff --git a/service/matching/handler/context.go b/service/matching/handler/context.go index ed70c0a0f5f..3fad7106913 100644 --- a/service/matching/handler/context.go +++ b/service/matching/handler/context.go @@ -44,7 +44,7 @@ func newHandlerContext( domainName string, taskList *types.TaskList, metricsClient metrics.Client, - metricsScope int, + metricsScope metrics.ScopeIdx, logger log.Logger, ) *handlerContext { return &handlerContext{ diff --git a/service/matching/handler/handler.go b/service/matching/handler/handler.go index 9f6d76cb538..5bd900d2d63 100644 --- a/service/matching/handler/handler.go +++ b/service/matching/handler/handler.go @@ -110,7 +110,7 @@ func (h *handlerImpl) newHandlerContext( ctx context.Context, domainName string, taskList *types.TaskList, - scope int, + scope metrics.ScopeIdx, ) *handlerContext { return newHandlerContext( ctx, diff --git a/service/sharddistributor/store/wrappers/metered/base.go b/service/sharddistributor/store/wrappers/metered/base.go index 052008fddee..76820ffb916 100644 --- a/service/sharddistributor/store/wrappers/metered/base.go +++ b/service/sharddistributor/store/wrappers/metered/base.go @@ -38,20 +38,20 @@ type base struct { timeSource clock.TimeSource } -func (p *base) updateErrorMetricPerNamespace(scope int, err error, scopeWithNamespaceTags metrics.Scope, logger log.Logger) { +func (p *base) updateErrorMetricPerNamespace(scope metrics.ScopeIdx, err error, scopeWithNamespaceTags metrics.Scope, logger log.Logger) { logger = logger.Helper() switch { case errors.Is(err, store.ErrExecutorNotFound): scopeWithNamespaceTags.IncCounter(metrics.ShardDistributorStoreExecutorNotFound) - logger.Error("Executor not found.", tag.Error(err), tag.MetricScope(scope)) + logger.Error("Executor not found.", tag.Error(err), tag.MetricScope(int(scope))) // int??? default: - logger.Error("Store failed with internal error.", tag.Error(err), tag.MetricScope(scope)) + logger.Error("Store failed with internal error.", tag.Error(err), tag.MetricScope(int(scope))) // int??? } scopeWithNamespaceTags.IncCounter(metrics.ShardDistributorStoreFailuresPerNamespace) } -func (p *base) call(scope int, op func() error, tags ...metrics.Tag) error { +func (p *base) call(scope metrics.ScopeIdx, op func() error, tags ...metrics.Tag) error { metricsScope := p.metricClient.Scope(scope, tags...) metricsScope.IncCounter(metrics.ShardDistributorStoreRequestsPerNamespace) diff --git a/service/sharddistributor/store/wrappers/metered/metered_test.go b/service/sharddistributor/store/wrappers/metered/metered_test.go index 5b3fc3ebe40..1835ab42cc3 100644 --- a/service/sharddistributor/store/wrappers/metered/metered_test.go +++ b/service/sharddistributor/store/wrappers/metered/metered_test.go @@ -45,7 +45,7 @@ func TestMeteredStore_GetHeartbeat(t *testing.T) { setupMocks: func(logger *log.MockLogger) { logger.EXPECT().Error( "Executor not found.", - []tag.Tag{tag.Error(store.ErrExecutorNotFound), tag.MetricScope(metrics.ShardDistributorStoreGetHeartbeatScope)}, + []tag.Tag{tag.Error(store.ErrExecutorNotFound), tag.MetricScope(int(metrics.ShardDistributorStoreGetHeartbeatScope))}, ).Times(1) }, error: store.ErrExecutorNotFound, @@ -55,7 +55,7 @@ func TestMeteredStore_GetHeartbeat(t *testing.T) { setupMocks: func(logger *log.MockLogger) { logger.EXPECT().Error( "Store failed with internal error.", - []tag.Tag{tag.Error(&types.InternalServiceError{}), tag.MetricScope(metrics.ShardDistributorStoreGetHeartbeatScope)}, + []tag.Tag{tag.Error(&types.InternalServiceError{}), tag.MetricScope(int(metrics.ShardDistributorStoreGetHeartbeatScope))}, ).Times(1) }, error: &types.InternalServiceError{}, diff --git a/service/worker/archiver/replay_metrics_client.go b/service/worker/archiver/replay_metrics_client.go index 6ec7e1c94ce..3a9fa889287 100644 --- a/service/worker/archiver/replay_metrics_client.go +++ b/service/worker/archiver/replay_metrics_client.go @@ -50,7 +50,7 @@ func NewReplayMetricsClient(client metrics.Client, ctx workflow.Context) metrics } // IncCounter increments a counter metric -func (r *replayMetricsClient) IncCounter(scope int, counter int) { +func (r *replayMetricsClient) IncCounter(scope metrics.ScopeIdx, counter metrics.MetricIdx) { if workflow.IsReplaying(r.ctx) { return } @@ -58,7 +58,7 @@ func (r *replayMetricsClient) IncCounter(scope int, counter int) { } // AddCounter adds delta to the counter metric -func (r *replayMetricsClient) AddCounter(scope int, counter int, delta int64) { +func (r *replayMetricsClient) AddCounter(scope metrics.ScopeIdx, counter metrics.MetricIdx, delta int64) { if workflow.IsReplaying(r.ctx) { return } @@ -66,7 +66,7 @@ func (r *replayMetricsClient) AddCounter(scope int, counter int, delta int64) { } // StartTimer starts a timer for the given metric name. Time will be recorded when stopwatch is stopped. -func (r *replayMetricsClient) StartTimer(scope int, timer int) tally.Stopwatch { +func (r *replayMetricsClient) StartTimer(scope metrics.ScopeIdx, timer metrics.MetricIdx) tally.Stopwatch { if workflow.IsReplaying(r.ctx) { return metrics.NopStopwatch() } @@ -74,7 +74,7 @@ func (r *replayMetricsClient) StartTimer(scope int, timer int) tally.Stopwatch { } // RecordTimer starts a timer for the given metric name -func (r *replayMetricsClient) RecordTimer(scope int, timer int, d time.Duration) { +func (r *replayMetricsClient) RecordTimer(scope metrics.ScopeIdx, timer metrics.MetricIdx, d time.Duration) { if workflow.IsReplaying(r.ctx) { return } @@ -82,7 +82,7 @@ func (r *replayMetricsClient) RecordTimer(scope int, timer int, d time.Duration) } // RecordHistogramDuration record and emit a duration -func (r *replayMetricsClient) RecordHistogramDuration(scope int, timer int, d time.Duration) { +func (r *replayMetricsClient) RecordHistogramDuration(scope metrics.ScopeIdx, timer metrics.MetricIdx, d time.Duration) { if workflow.IsReplaying(r.ctx) { return } @@ -90,7 +90,7 @@ func (r *replayMetricsClient) RecordHistogramDuration(scope int, timer int, d ti } // UpdateGauge reports Gauge type absolute value metric -func (r *replayMetricsClient) UpdateGauge(scope int, gauge int, value float64) { +func (r *replayMetricsClient) UpdateGauge(scope metrics.ScopeIdx, gauge metrics.MetricIdx, value float64) { if workflow.IsReplaying(r.ctx) { return } @@ -98,7 +98,7 @@ func (r *replayMetricsClient) UpdateGauge(scope int, gauge int, value float64) { } // Scope returns a client that adds the given tags to all metrics -func (r *replayMetricsClient) Scope(scope int, tags ...metrics.Tag) metrics.Scope { +func (r *replayMetricsClient) Scope(scope metrics.ScopeIdx, tags ...metrics.Tag) metrics.Scope { return NewReplayMetricsScope(r.client.Scope(scope, tags...), r.ctx) } @@ -111,7 +111,7 @@ func NewReplayMetricsScope(scope metrics.Scope, ctx workflow.Context) metrics.Sc } // IncCounter increments a counter metric -func (r *replayMetricsScope) IncCounter(counter int) { +func (r *replayMetricsScope) IncCounter(counter metrics.MetricIdx) { if workflow.IsReplaying(r.ctx) { return } @@ -119,7 +119,7 @@ func (r *replayMetricsScope) IncCounter(counter int) { } // AddCounter adds delta to the counter metric -func (r *replayMetricsScope) AddCounter(counter int, delta int64) { +func (r *replayMetricsScope) AddCounter(counter metrics.MetricIdx, delta int64) { if workflow.IsReplaying(r.ctx) { return } @@ -127,7 +127,7 @@ func (r *replayMetricsScope) AddCounter(counter int, delta int64) { } // StartTimer starts a timer for the given metric name. Time will be recorded when stopwatch is stopped. -func (r *replayMetricsScope) StartTimer(timer int) metrics.Stopwatch { +func (r *replayMetricsScope) StartTimer(timer metrics.MetricIdx) metrics.Stopwatch { if workflow.IsReplaying(r.ctx) { return metrics.NewTestStopwatch() } @@ -135,7 +135,7 @@ func (r *replayMetricsScope) StartTimer(timer int) metrics.Stopwatch { } // RecordTimer starts a timer for the given metric name -func (r *replayMetricsScope) RecordTimer(timer int, d time.Duration) { +func (r *replayMetricsScope) RecordTimer(timer metrics.MetricIdx, d time.Duration) { if workflow.IsReplaying(r.ctx) { return } @@ -143,7 +143,7 @@ func (r *replayMetricsScope) RecordTimer(timer int, d time.Duration) { } // RecordHistogramDuration records a duration value in a histogram -func (r *replayMetricsScope) RecordHistogramDuration(timer int, d time.Duration) { +func (r *replayMetricsScope) RecordHistogramDuration(timer metrics.MetricIdx, d time.Duration) { if workflow.IsReplaying(r.ctx) { return } @@ -151,7 +151,7 @@ func (r *replayMetricsScope) RecordHistogramDuration(timer int, d time.Duration) } // RecordHistogramValue records a value in a histogram -func (r *replayMetricsScope) RecordHistogramValue(timer int, value float64) { +func (r *replayMetricsScope) RecordHistogramValue(timer metrics.MetricIdx, value float64) { if workflow.IsReplaying(r.ctx) { return } @@ -159,7 +159,7 @@ func (r *replayMetricsScope) RecordHistogramValue(timer int, value float64) { } // UpdateGauge reports Gauge type absolute value metric -func (r *replayMetricsScope) UpdateGauge(gauge int, value float64) { +func (r *replayMetricsScope) UpdateGauge(gauge metrics.MetricIdx, value float64) { if workflow.IsReplaying(r.ctx) { return } diff --git a/service/worker/indexer/esProcessor_test.go b/service/worker/indexer/esProcessor_test.go index 18b6b391e08..e05db5eeb9a 100644 --- a/service/worker/indexer/esProcessor_test.go +++ b/service/worker/indexer/esProcessor_test.go @@ -529,7 +529,7 @@ func (s *esProcessorSuite) TestBulkAfterAction_Nack_Shadow_WithError() { // Mock Kafka message Nack and Value mockKafkaMsg.On("Nack").Return(nil).Once() mockKafkaMsg.On("Value").Return(payload).Once() - s.mockScope.On("IncCounter", mock.AnythingOfType("int")).Return() + s.mockScope.On("IncCounter", mock.AnythingOfType("metrics.MetricIdx")).Return() // Execute bulkAfterAction for primary processor with error s.esProcessor.bulkAfterAction(0, requests, response, mockErr) } diff --git a/service/worker/scanner/executor/executor.go b/service/worker/scanner/executor/executor.go index 7f5d8b41bd3..1bca7f562c2 100644 --- a/service/worker/scanner/executor/executor.go +++ b/service/worker/scanner/executor/executor.go @@ -58,7 +58,7 @@ type ( outstanding int64 status int32 metrics metrics.Client - metricScope int + metricScope metrics.ScopeIdx stopC chan struct{} stopWG sync.WaitGroup } @@ -81,7 +81,7 @@ const ( // to be deferred for fairness. To defer processing of a task, simply return TaskStatsDefer // from your task.Run method. When a task is deferred, it will be added to the tail of a // deferredTaskQ which in turn will be processed after the current runQ is drained -func NewFixedSizePoolExecutor(size int, maxDeferred int, metrics metrics.Client, scope int) Executor { +func NewFixedSizePoolExecutor(size int, maxDeferred int, metrics metrics.Client, scope metrics.ScopeIdx) Executor { stopC := make(chan struct{}) return &fixedPoolExecutor{ size: size,