Skip to content
This repository was archived by the owner on Feb 28, 2025. It is now read-only.

Commit cb30dc3

Browse files
ketsiambakumrombout
authored andcommitted
Add debugger interface to allow extracting the workerStats from Worker (cadence-workflow#1363)
1 parent 8b0f09c commit cb30dc3

File tree

4 files changed

+20
-0
lines changed

4 files changed

+20
-0
lines changed

debug/types.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,8 @@ type (
5151
// Activities is a list of executing activities on the worker
5252
// Deprecated: in development and very likely to change
5353
Activities = internal.Activities
54+
55+
// Debugger exposes stats collected on a running Worker
56+
// Deprecated: in development and very likely to change
57+
Debugger = internal.Debugger
5458
)

internal/common/debug/types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,10 @@ type (
6969
Info ActivityInfo
7070
Count int64
7171
}
72+
73+
// Debugger exposes stats collected on a running Worker
74+
// Deprecated: in development and very likely to change
75+
Debugger interface {
76+
GetWorkerStats() WorkerStats
77+
}
7278
)

internal/internal_worker.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,8 +804,11 @@ type aggregatedWorker struct {
804804
shadowWorker *shadowWorker
805805
logger *zap.Logger
806806
registry *registry
807+
workerstats debug.WorkerStats
807808
}
808809

810+
var _ debug.Debugger = &aggregatedWorker{}
811+
809812
func (aw *aggregatedWorker) GetRegisteredWorkflows() []RegistryWorkflowInfo {
810813
workflows := aw.registry.GetRegisteredWorkflows()
811814
var result []RegistryWorkflowInfo
@@ -1010,6 +1013,10 @@ func (aw *aggregatedWorker) Stop() {
10101013
aw.logger.Info("Stopped Worker")
10111014
}
10121015

1016+
func (aw *aggregatedWorker) GetWorkerStats() debug.WorkerStats {
1017+
return aw.workerstats
1018+
}
1019+
10131020
// AggregatedWorker returns an instance to manage the workers. Use defaultConcurrentPollRoutineSize (which is 2) as
10141021
// poller size. The typical RTT (round-trip time) is below 1ms within data center. And the poll API latency is about 5ms.
10151022
// With 2 poller, we could achieve around 300~400 RPS.
@@ -1148,6 +1155,7 @@ func newAggregatedWorker(
11481155
shadowWorker: shadowWorker,
11491156
logger: logger,
11501157
registry: registry,
1158+
workerstats: workerParams.WorkerStats,
11511159
}
11521160
}
11531161

internal/internal_worker_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,6 +1116,7 @@ func TestWorkerOptionDefaults(t *testing.T) {
11161116
require.NotNil(t, activityWorker.executionParameters.MetricsScope)
11171117
require.Nil(t, activityWorker.executionParameters.ContextPropagators)
11181118
assertWorkerExecutionParamsEqual(t, expected, activityWorker.executionParameters)
1119+
assert.Equal(t, expected.WorkerStats, aggWorker.GetWorkerStats())
11191120
}
11201121

11211122
func TestWorkerOptionNonDefaults(t *testing.T) {
@@ -1176,6 +1177,7 @@ func TestWorkerOptionNonDefaults(t *testing.T) {
11761177
activityWorker := aggWorker.activityWorker
11771178
require.True(t, len(activityWorker.executionParameters.ContextPropagators) > 0)
11781179
assertWorkerExecutionParamsEqual(t, expected, activityWorker.executionParameters)
1180+
assert.Equal(t, expected.WorkerStats, aggWorker.GetWorkerStats())
11791181
}
11801182

11811183
func assertWorkerExecutionParamsEqual(t *testing.T, paramsA workerExecutionParameters, paramsB workerExecutionParameters) {

0 commit comments

Comments
 (0)