From d3de151f218c7319ac6665b1097f57e2a195cec0 Mon Sep 17 00:00:00 2001 From: Ketsia Date: Mon, 2 Sep 2024 21:58:48 +0200 Subject: [PATCH] Add debugger interface to allow extracting the workerStats from Worker --- debug/types.go | 4 ++++ internal/common/debug/types.go | 6 ++++++ internal/internal_worker.go | 8 ++++++++ internal/internal_worker_test.go | 2 ++ 4 files changed, 20 insertions(+) diff --git a/debug/types.go b/debug/types.go index 87b2e3c1b..b28edd4cf 100644 --- a/debug/types.go +++ b/debug/types.go @@ -51,4 +51,8 @@ type ( // Activities is a list of executing activities on the worker // Deprecated: in development and very likely to change Activities = internal.Activities + + // Debugger exposes stats collected on a running Worker + // Deprecated: in development and very likely to change + Debugger = internal.Debugger ) diff --git a/internal/common/debug/types.go b/internal/common/debug/types.go index 028b2d083..7438a5202 100644 --- a/internal/common/debug/types.go +++ b/internal/common/debug/types.go @@ -69,4 +69,10 @@ type ( Info ActivityInfo Count int64 } + + // Debugger exposes stats collected on a running Worker + // Deprecated: in development and very likely to change + Debugger interface { + GetWorkerStats() WorkerStats + } ) diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 30cdca5f5..78911d0a4 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -804,8 +804,11 @@ type aggregatedWorker struct { shadowWorker *shadowWorker logger *zap.Logger registry *registry + workerstats debug.WorkerStats } +var _ debug.Debugger = &aggregatedWorker{} + func (aw *aggregatedWorker) GetRegisteredWorkflows() []RegistryWorkflowInfo { workflows := aw.registry.GetRegisteredWorkflows() var result []RegistryWorkflowInfo @@ -1010,6 +1013,10 @@ func (aw *aggregatedWorker) Stop() { aw.logger.Info("Stopped Worker") } +func (aw *aggregatedWorker) GetWorkerStats() debug.WorkerStats { + return aw.workerstats +} + // AggregatedWorker returns an instance to manage the workers. Use defaultConcurrentPollRoutineSize (which is 2) as // poller size. The typical RTT (round-trip time) is below 1ms within data center. And the poll API latency is about 5ms. // With 2 poller, we could achieve around 300~400 RPS. @@ -1148,6 +1155,7 @@ func newAggregatedWorker( shadowWorker: shadowWorker, logger: logger, registry: registry, + workerstats: workerParams.WorkerStats, } } diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index fd474aa1b..8b02cf329 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -1116,6 +1116,7 @@ func TestWorkerOptionDefaults(t *testing.T) { require.NotNil(t, activityWorker.executionParameters.MetricsScope) require.Nil(t, activityWorker.executionParameters.ContextPropagators) assertWorkerExecutionParamsEqual(t, expected, activityWorker.executionParameters) + assert.Equal(t, expected.WorkerStats, aggWorker.GetWorkerStats()) } func TestWorkerOptionNonDefaults(t *testing.T) { @@ -1176,6 +1177,7 @@ func TestWorkerOptionNonDefaults(t *testing.T) { activityWorker := aggWorker.activityWorker require.True(t, len(activityWorker.executionParameters.ContextPropagators) > 0) assertWorkerExecutionParamsEqual(t, expected, activityWorker.executionParameters) + assert.Equal(t, expected.WorkerStats, aggWorker.GetWorkerStats()) } func assertWorkerExecutionParamsEqual(t *testing.T, paramsA workerExecutionParameters, paramsB workerExecutionParameters) {