Skip to content

Commit df6788e

Browse files
committed
Remove worker hardware utilization code
1 parent 7a3beaa commit df6788e

File tree

5 files changed

+1
-90
lines changed

5 files changed

+1
-90
lines changed

internal/common/metrics/constants.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,6 @@ const (
110110
ReplaySkippedCounter = CadenceMetricsPrefix + "replay-skipped"
111111
ReplayLatency = CadenceMetricsPrefix + "replay-latency"
112112

113-
NumCPUCores = CadenceMetricsPrefix + "num-cpu-cores"
114-
CPUPercentage = CadenceMetricsPrefix + "cpu-percentage"
115-
TotalMemory = CadenceMetricsPrefix + "total-memory"
116-
MemoryUsedHeap = CadenceMetricsPrefix + "memory-used-heap"
117-
MemoryUsedStack = CadenceMetricsPrefix + "memory-used-stack"
118-
NumGoRoutines = CadenceMetricsPrefix + "num-go-routines"
119-
120113
EstimatedHistorySize = CadenceMetricsPrefix + "estimated-history-size"
121114
ServerSideHistorySize = CadenceMetricsPrefix + "server-side-history-size"
122115
ConcurrentTaskQuota = CadenceMetricsPrefix + "concurrent-task-quota"

internal/internal_worker.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ const (
8383
testTagsContextKey = "cadence-testTags"
8484
clientVersionTag = "cadence_client_version"
8585
clientGauge = "client_version_metric"
86-
clientHostTag = "cadence_client_host"
8786
)
8887

8988
type (
@@ -330,9 +329,6 @@ func newWorkflowTaskWorkerInternal(
330329
// 3) the result pushed to laTunnel will be send as task to workflow worker to process.
331330
worker.taskQueueCh = laTunnel.resultCh
332331

333-
worker.options.host = params.Host
334-
localActivityWorker.options.host = params.Host
335-
336332
return &workflowWorker{
337333
executionParameters: params,
338334
workflowService: service,
@@ -507,7 +503,6 @@ func newActivityTaskWorker(
507503
workerParams.MetricsScope,
508504
sessionTokenBucket,
509505
)
510-
base.options.host = workerParams.Host
511506

512507
return &activityWorker{
513508
executionParameters: workerParams,

internal/internal_worker_base.go

Lines changed: 1 addition & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,12 @@ import (
2828
"errors"
2929
"fmt"
3030
"os"
31-
"runtime"
3231
"sync"
3332
"syscall"
3433
"time"
3534

3635
"go.uber.org/cadence/internal/common/debug"
3736

38-
"github.com/shirou/gopsutil/cpu"
3937
"github.com/uber-go/tally"
4038
"go.uber.org/zap"
4139
"go.uber.org/zap/zapcore"
@@ -50,7 +48,6 @@ import (
5048
const (
5149
retryPollOperationInitialInterval = 20 * time.Millisecond
5250
retryPollOperationMaxInterval = 10 * time.Second
53-
hardwareMetricsCollectInterval = 30 * time.Second
5451
)
5552

5653
var (
@@ -59,8 +56,6 @@ var (
5956

6057
var errShutdown = errors.New("worker shutting down")
6158

62-
var collectHardwareUsageOnce sync.Once
63-
6459
type (
6560
// resultHandler that returns result
6661
resultHandler func(result []byte, err error)
@@ -223,7 +218,6 @@ func (bw *baseWorker) Start() {
223218
// We want the emit function run once per host instead of run once per worker
224219
// since the emit function is host level metric.
225220
bw.shutdownWG.Add(1)
226-
go bw.emitHardwareUsage()
227221

228222
bw.isWorkerStarted = true
229223
traceLog(func() {
@@ -400,7 +394,7 @@ func (bw *baseWorker) Run() {
400394
bw.Stop()
401395
}
402396

403-
// Shutdown is a blocking call and cleans up all the resources associated with worker.
397+
// Stop is a blocking call and cleans up all the resources associated with worker.
404398
func (bw *baseWorker) Stop() {
405399
if !bw.isWorkerStarted {
406400
return
@@ -423,53 +417,3 @@ func (bw *baseWorker) Stop() {
423417
}
424418
return
425419
}
426-
427-
func (bw *baseWorker) emitHardwareUsage() {
428-
defer func() {
429-
if p := recover(); p != nil {
430-
bw.metricsScope.Counter(metrics.WorkerPanicCounter).Inc(1)
431-
topLine := fmt.Sprintf("base worker for %s [panic]:", bw.options.workerType)
432-
st := getStackTraceRaw(topLine, 7, 0)
433-
bw.logger.Error("Unhandled panic in hardware emitting.",
434-
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
435-
zap.String(tagPanicStack, st))
436-
}
437-
}()
438-
defer bw.shutdownWG.Done()
439-
collectHardwareUsageOnce.Do(
440-
func() {
441-
ticker := time.NewTicker(hardwareMetricsCollectInterval)
442-
for {
443-
select {
444-
case <-bw.shutdownCh:
445-
ticker.Stop()
446-
return
447-
case <-ticker.C:
448-
host := bw.options.host
449-
scope := bw.metricsScope.Tagged(map[string]string{clientHostTag: host})
450-
451-
cpuPercent, err := cpu.Percent(0, false)
452-
if err != nil {
453-
bw.logger.Warn("Failed to get cpu percent", zap.Error(err))
454-
return
455-
}
456-
cpuCores, err := cpu.Counts(false)
457-
if err != nil {
458-
bw.logger.Warn("Failed to get number of cpu cores", zap.Error(err))
459-
return
460-
}
461-
scope.Gauge(metrics.NumCPUCores).Update(float64(cpuCores))
462-
scope.Gauge(metrics.CPUPercentage).Update(cpuPercent[0])
463-
464-
var memStats runtime.MemStats
465-
runtime.ReadMemStats(&memStats)
466-
467-
scope.Gauge(metrics.NumGoRoutines).Update(float64(runtime.NumGoroutine()))
468-
scope.Gauge(metrics.TotalMemory).Update(float64(memStats.Sys))
469-
scope.Gauge(metrics.MemoryUsedHeap).Update(float64(memStats.HeapInuse))
470-
scope.Gauge(metrics.MemoryUsedStack).Update(float64(memStats.StackInuse))
471-
}
472-
}
473-
})
474-
475-
}

internal/internal_worker_test.go

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -238,16 +238,6 @@ func (s *internalWorkerTestSuite) TestCreateWorker_WithStrictNonDeterminism() {
238238
worker.Stop()
239239
}
240240

241-
func (s *internalWorkerTestSuite) TestCreateWorker_WithHost() {
242-
worker := createWorkerWithHost(s.T(), s.service)
243-
err := worker.Start()
244-
require.NoError(s.T(), err)
245-
time.Sleep(time.Millisecond * 200)
246-
assert.Equal(s.T(), "test_host", worker.activityWorker.worker.options.host)
247-
assert.Equal(s.T(), "test_host", worker.workflowWorker.worker.options.host)
248-
worker.Stop()
249-
}
250-
251241
func (s *internalWorkerTestSuite) TestCreateWorkerRun() {
252242
// Create service endpoint
253243
mockCtrl := gomock.NewController(s.T())
@@ -445,13 +435,6 @@ func createWorkerWithStrictNonDeterminismDisabled(
445435
return createWorkerWithThrottle(t, service, 0, WorkerOptions{WorkerBugPorts: WorkerBugPorts{DisableStrictNonDeterminismCheck: true}})
446436
}
447437

448-
func createWorkerWithHost(
449-
t *testing.T,
450-
service *workflowservicetest.MockClient,
451-
) *aggregatedWorker {
452-
return createWorkerWithThrottle(t, service, 0, WorkerOptions{Host: "test_host"})
453-
}
454-
455438
func (s *internalWorkerTestSuite) testCompleteActivityHelper(opt *ClientOptions) {
456439
t := s.T()
457440
mockService := s.service

internal/worker.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,10 +266,6 @@ type (
266266
// default: No provider
267267
Authorization auth.AuthorizationProvider
268268

269-
// Optional: Host is just string on the machine running the client
270-
// default: empty string
271-
Host string
272-
273269
// Optional: See WorkerBugPorts for more details
274270
//
275271
// Deprecated: All bugports are always deprecated and may be removed at any time.

0 commit comments

Comments
 (0)