Skip to content

Commit f5eb256

Browse files
authored
Emit cadence worker's hardware utilization inside worker once per host (#1260)
* Add hardware monitoring of CPU and RAM metrics on workers * Use cpu.counts over runtime.numcpu() * Add host machine tag injection for metrics * add Sync.Once to ensure metrics is emitted once per worker host
1 parent a072981 commit f5eb256

File tree

7 files changed

+114
-3
lines changed

7 files changed

+114
-3
lines changed

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@ require (
1212
github.com/opentracing/opentracing-go v1.1.0
1313
github.com/pborman/uuid v0.0.0-20160209185913-a97ce2ca70fa
1414
github.com/robfig/cron v1.2.0
15+
github.com/shirou/gopsutil v3.21.11+incompatible
1516
github.com/stretchr/testify v1.8.1
17+
github.com/tklauser/go-sysconf v0.3.11 // indirect
1618
github.com/uber-go/tally v3.3.15+incompatible
1719
github.com/uber/cadence-idl v0.0.0-20230131090243-b690237fffaa
1820
github.com/uber/jaeger-client-go v2.22.1+incompatible
1921
github.com/uber/tchannel-go v1.32.1
22+
github.com/yusufpapurcu/wmi v1.2.3 // indirect
2023
go.uber.org/atomic v1.9.0
2124
go.uber.org/fx v1.13.1 // indirect
2225
go.uber.org/goleak v1.1.12
@@ -25,7 +28,6 @@ require (
2528
go.uber.org/yarpc v1.55.0
2629
go.uber.org/zap v1.13.0
2730
golang.org/x/net v0.0.0-20220121210141-e204ce36a2ba
28-
golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb // indirect
2931
golang.org/x/time v0.0.0-20170927054726-6dc17368e09b
3032
honnef.co/go/tools v0.3.2 // indirect
3133
)

go.sum

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
5151
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
5252
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
5353
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
54+
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
55+
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
5456
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
5557
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
5658
github.com/gogo/googleapis v1.3.2 h1:kX1es4djPJrsDhY7aZKJy7aZasdcB5oSOEphMjSB53c=
@@ -153,6 +155,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
153155
github.com/samuel/go-thrift v0.0.0-20190219015601-e8b6b52668fe/go.mod h1:Vrkh1pnjV9Bl8c3P9zH0/D4NlOHWP5d4/hF4YTULaec=
154156
github.com/samuel/go-thrift v0.0.0-20191111193933-5165175b40af h1:EiWVfh8mr40yFZEui2oF0d45KgH48PkB2H0Z0GANvSI=
155157
github.com/samuel/go-thrift v0.0.0-20191111193933-5165175b40af/go.mod h1:Vrkh1pnjV9Bl8c3P9zH0/D4NlOHWP5d4/hF4YTULaec=
158+
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
159+
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
156160
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
157161
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
158162
github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25/go.mod h1:lbP8tGiBjZ5YWIc2fzuRpTaz0b/53vT6PEs3QuAWzuU=
@@ -174,6 +178,10 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
174178
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
175179
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
176180
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
181+
github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
182+
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
183+
github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms=
184+
github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4=
177185
github.com/uber-common/bark v1.2.1/go.mod h1:g0ZuPcD7XiExKHynr93Q742G/sbrdVQkghrqLGOoFuY=
178186
github.com/uber-go/mapdecode v1.0.0 h1:euUEFM9KnuCa1OBixz1xM+FIXmpixyay5DLymceOVrU=
179187
github.com/uber-go/mapdecode v1.0.0/go.mod h1:b5nP15FwXTgpjTjeA9A2uTHXV5UJCl4arwKpP0FP1Hw=
@@ -195,6 +203,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
195203
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
196204
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
197205
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
206+
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
207+
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
198208
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
199209
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
200210
go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
@@ -284,6 +294,7 @@ golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5h
284294
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
285295
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
286296
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
297+
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
287298
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
288299
golang.org/x/sys v0.0.0-20200117145432-59e60aa80a0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
289300
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -296,8 +307,8 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
296307
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
297308
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
298309
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
299-
golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb h1:PVGECzEo9Y3uOidtkHGdd347NjLtITfJFO9BxFpmRoo=
300-
golang.org/x/sys v0.0.0-20220403205710-6acee93ad0eb/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
310+
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
311+
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
301312
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
302313
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
303314
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

internal/common/metrics/constants.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,11 @@ const (
107107
ReplayFailedCounter = CadenceMetricsPrefix + "replay-failed"
108108
ReplaySkippedCounter = CadenceMetricsPrefix + "replay-skipped"
109109
ReplayLatency = CadenceMetricsPrefix + "replay-latency"
110+
111+
NumCPUCores = CadenceMetricsPrefix + "num-cpu-cores"
112+
CPUPercentage = CadenceMetricsPrefix + "cpu-percentage"
113+
TotalMemory = CadenceMetricsPrefix + "total-memory"
114+
MemoryUsedHeap = CadenceMetricsPrefix + "memory-used-heap"
115+
MemoryUsedStack = CadenceMetricsPrefix + "memory-used-stack"
116+
NumGoRoutines = CadenceMetricsPrefix + "num-go-routines"
110117
)

internal/internal_worker.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ const (
8080
testTagsContextKey = "cadence-testTags"
8181
clientVersionTag = "cadence_client_version"
8282
clientGauge = "client_version_metric"
83+
clientHostTag = "cadence_client_host"
8384
)
8485

8586
type (
@@ -311,6 +312,9 @@ func newWorkflowTaskWorkerInternal(
311312
// 3) the result pushed to laTunnel will be send as task to workflow worker to process.
312313
worker.taskQueueCh = laTunnel.resultCh
313314

315+
worker.options.host = params.Host
316+
localActivityWorker.options.host = params.Host
317+
314318
return &workflowWorker{
315319
executionParameters: params,
316320
workflowService: service,
@@ -482,6 +486,7 @@ func newActivityTaskWorker(
482486
workerParams.MetricsScope,
483487
sessionTokenBucket,
484488
)
489+
base.options.host = workerParams.Host
485490

486491
return &activityWorker{
487492
executionParameters: workerParams,

internal/internal_worker_base.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ import (
2828
"errors"
2929
"fmt"
3030
"os"
31+
"runtime"
3132
"sync"
3233
"syscall"
3334
"time"
3435

36+
"github.com/shirou/gopsutil/cpu"
3537
"github.com/uber-go/tally"
3638
"go.uber.org/zap"
3739
"go.uber.org/zap/zapcore"
@@ -46,6 +48,7 @@ import (
4648
const (
4749
retryPollOperationInitialInterval = 20 * time.Millisecond
4850
retryPollOperationMaxInterval = 10 * time.Second
51+
hardwareMetricsCollectInterval = 30 * time.Second
4952
)
5053

5154
var (
@@ -54,6 +57,8 @@ var (
5457

5558
var errShutdown = errors.New("worker shutting down")
5659

60+
var collectHardwareUsageOnce sync.Once
61+
5762
type (
5863
// resultHandler that returns result
5964
resultHandler func(result []byte, err error)
@@ -118,6 +123,7 @@ type (
118123
workerType string
119124
shutdownTimeout time.Duration
120125
userContextCancel context.CancelFunc
126+
host string
121127
}
122128

123129
// baseWorker that wraps worker activities.
@@ -209,6 +215,15 @@ func (bw *baseWorker) Start() {
209215
bw.shutdownWG.Add(1)
210216
go bw.runTaskDispatcher()
211217

218+
// We want the emit function run once per host instead of run once per worker
219+
//collectHardwareUsageOnce.Do(func() {
220+
// bw.shutdownWG.Add(1)
221+
// go bw.emitHardwareUsage()
222+
//})
223+
224+
bw.shutdownWG.Add(1)
225+
go bw.emitHardwareUsage()
226+
212227
bw.isWorkerStarted = true
213228
traceLog(func() {
214229
bw.logger.Info("Started Worker",
@@ -401,3 +416,53 @@ func (bw *baseWorker) Stop() {
401416
}
402417
return
403418
}
419+
420+
func (bw *baseWorker) emitHardwareUsage() {
421+
defer func() {
422+
if p := recover(); p != nil {
423+
bw.metricsScope.Counter(metrics.WorkerPanicCounter).Inc(1)
424+
topLine := fmt.Sprintf("base worker for %s [panic]:", bw.options.workerType)
425+
st := getStackTraceRaw(topLine, 7, 0)
426+
bw.logger.Error("Unhandled panic in hardware emitting.",
427+
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
428+
zap.String(tagPanicStack, st))
429+
}
430+
}()
431+
defer bw.shutdownWG.Done()
432+
collectHardwareUsageOnce.Do(
433+
func() {
434+
ticker := time.NewTicker(hardwareMetricsCollectInterval)
435+
for {
436+
select {
437+
case <-bw.shutdownCh:
438+
ticker.Stop()
439+
return
440+
case <-ticker.C:
441+
host := bw.options.host
442+
scope := bw.metricsScope.Tagged(map[string]string{clientHostTag: host})
443+
444+
cpuPercent, err := cpu.Percent(0, false)
445+
if err != nil {
446+
bw.logger.Warn("Failed to get cpu percent", zap.Error(err))
447+
return
448+
}
449+
cpuCores, err := cpu.Counts(false)
450+
if err != nil {
451+
bw.logger.Warn("Failed to get number of cpu cores", zap.Error(err))
452+
return
453+
}
454+
scope.Gauge(metrics.NumCPUCores).Update(float64(cpuCores))
455+
scope.Gauge(metrics.CPUPercentage).Update(cpuPercent[0])
456+
457+
var memStats runtime.MemStats
458+
runtime.ReadMemStats(&memStats)
459+
460+
scope.Gauge(metrics.NumGoRoutines).Update(float64(runtime.NumGoroutine()))
461+
scope.Gauge(metrics.TotalMemory).Update(float64(memStats.Sys))
462+
scope.Gauge(metrics.MemoryUsedHeap).Update(float64(memStats.HeapInuse))
463+
scope.Gauge(metrics.MemoryUsedStack).Update(float64(memStats.StackInuse))
464+
}
465+
}
466+
})
467+
468+
}

internal/internal_worker_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,16 @@ func (s *internalWorkerTestSuite) TestCreateWorker_WithAutoScaler() {
227227
worker.Stop()
228228
}
229229

230+
func (s *internalWorkerTestSuite) TestCreateWorker_WithHost() {
231+
worker := createWorkerWithHost(s.T(), s.service)
232+
err := worker.Start()
233+
require.NoError(s.T(), err)
234+
time.Sleep(time.Millisecond * 200)
235+
assert.Equal(s.T(), "test_host", worker.activityWorker.worker.options.host)
236+
assert.Equal(s.T(), "test_host", worker.workflowWorker.worker.options.host)
237+
worker.Stop()
238+
}
239+
230240
func (s *internalWorkerTestSuite) TestCreateWorkerRun() {
231241
// Create service endpoint
232242
mockCtrl := gomock.NewController(s.T())
@@ -415,6 +425,13 @@ func createWorkerWithAutoscaler(
415425
return createWorkerWithThrottle(t, service, float64(0), WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}})
416426
}
417427

428+
func createWorkerWithHost(
429+
t *testing.T,
430+
service *workflowservicetest.MockClient,
431+
) *aggregatedWorker {
432+
return createWorkerWithThrottle(t, service, float64(0), WorkerOptions{Host: "test_host"})
433+
}
434+
418435
func (s *internalWorkerTestSuite) testCompleteActivityHelper(opt *ClientOptions) {
419436
t := s.T()
420437
mockService := s.service

internal/worker.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,10 @@ type (
262262
// Optional: Authorization interface to get the Auth Token
263263
// default: No provider
264264
Authorization auth.AuthorizationProvider
265+
266+
// Optional: Host is just string on the machine running the client
267+
// default: empty string
268+
Host string
265269
}
266270
)
267271

0 commit comments

Comments
 (0)