Skip to content
This repository was archived by the owner on Feb 28, 2025. It is now read-only.

Commit 8b0f09c

Browse files
ketsiambakumrombout
authored andcommitted
Add ActivityTracker to worker stats option (cadence-workflow#1362)
* Add activity tracker option and emit activity counter * update workflow test suite * Revert "update workflow test suite" This reverts commit e6798e0. * Revert "Add activity tracker option and emit activity counter" This reverts commit 0755ceb. * add activity info to activity tracker * remove idls change * add example * run linter and add type comments * add more type comments * address comments
1 parent adf6bef commit 8b0f09c

11 files changed

+205
-16
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,16 @@ type (
3939
// stats on the Worker for debugging purposes.
4040
// Deprecated: in development and very likely to change
4141
WorkerStats = internal.WorkerStats
42+
43+
// ActivityTracker is a worker option to track executing activities on a worker
44+
// Deprecated: in development and very likely to change
45+
ActivityTracker = internal.ActivityTracker
46+
47+
// ActivityInfo contains details on the executing activity
48+
// Deprecated: in development and very likely to change
49+
ActivityInfo = internal.ActivityInfo
50+
51+
// Activities is a list of executing activities on the worker
52+
// Deprecated: in development and very likely to change
53+
Activities = internal.Activities
4254
)

internal/common/debug/example_test.go

Lines changed: 113 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
package debug
2222

2323
import (
24+
"encoding/json"
2425
"fmt"
26+
"sync"
2527

2628
"go.uber.org/atomic"
2729
)
@@ -36,8 +38,57 @@ type (
3638
stopperImpl struct {
3739
pollerTracker *pollerTrackerImpl
3840
}
41+
42+
// activityTrackerImpl implements the ActivityTracker interface
43+
activityTrackerImpl struct {
44+
sync.RWMutex
45+
activityCount map[ActivityInfo]int64
46+
}
47+
48+
// activityStopperImpl implements the Stopper interface
49+
activityStopperImpl struct {
50+
sync.Once
51+
info ActivityInfo
52+
tracker *activityTrackerImpl
53+
}
3954
)
4055

56+
var _ ActivityTracker = &activityTrackerImpl{}
57+
var _ Stopper = &activityStopperImpl{}
58+
59+
func (ati *activityTrackerImpl) Start(info ActivityInfo) Stopper {
60+
ati.Lock()
61+
defer ati.Unlock()
62+
ati.activityCount[info]++
63+
return &activityStopperImpl{info: info, tracker: ati}
64+
}
65+
66+
func (ati *activityTrackerImpl) Stats() Activities {
67+
var activities Activities
68+
ati.RLock()
69+
defer ati.RUnlock()
70+
for a, count := range ati.activityCount {
71+
if count > 0 {
72+
activities = append(activities, struct {
73+
Info ActivityInfo
74+
Count int64
75+
}{Info: a, Count: count})
76+
}
77+
}
78+
return activities
79+
}
80+
81+
func (asi *activityStopperImpl) Stop() {
82+
asi.Do(func() {
83+
asi.tracker.Lock()
84+
defer asi.tracker.Unlock()
85+
asi.tracker.activityCount[asi.info]--
86+
if asi.tracker.activityCount[asi.info] == 0 {
87+
delete(asi.tracker.activityCount, asi.info)
88+
}
89+
})
90+
}
91+
4192
func (p *pollerTrackerImpl) Start() Stopper {
4293
p.pollerCount.Inc()
4394
return &stopperImpl{
@@ -58,24 +109,78 @@ func Example() {
58109
pollerTracker = &pollerTrackerImpl{}
59110

60111
// Initially, poller count should be 0
61-
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
112+
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))
62113

63114
// Start a poller and verify that the count increments
64115
stopper1 := pollerTracker.Start()
65-
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
116+
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))
66117

67118
// Start another poller and verify that the count increments again
68119
stopper2 := pollerTracker.Start()
69-
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
120+
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))
70121

71122
// Stop the pollers and verify the counter
72123
stopper1.Stop()
73124
stopper2.Stop()
74-
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
125+
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))
126+
127+
var activityTracker ActivityTracker
128+
activityTracker = &activityTrackerImpl{activityCount: make(map[ActivityInfo]int64)}
129+
130+
info1 := ActivityInfo{
131+
TaskList: "task-list",
132+
ActivityType: "activity1",
133+
}
134+
135+
info2 := ActivityInfo{
136+
TaskList: "task-list",
137+
ActivityType: "activity2",
138+
}
139+
140+
stopper1 = activityTracker.Start(info1)
141+
stopper2 = activityTracker.Start(info2)
142+
jsonActivities, _ := json.MarshalIndent(activityTracker.Stats(), "", " ")
143+
fmt.Println(string(jsonActivities))
144+
145+
stopper1.Stop()
146+
stopper1.Stop()
147+
jsonActivities, _ = json.MarshalIndent(activityTracker.Stats(), "", " ")
148+
149+
fmt.Println(string(jsonActivities))
150+
stopper2.Stop()
151+
152+
jsonActivities, _ = json.MarshalIndent(activityTracker.Stats(), "", " ")
153+
fmt.Println(string(jsonActivities))
75154

76155
// Output:
77-
// stats: 0
78-
// stats: 1
79-
// stats: 2
80-
// stats: 0
156+
// poller stats: 0
157+
// poller stats: 1
158+
// poller stats: 2
159+
// poller stats: 0
160+
// [
161+
// {
162+
// "Info": {
163+
// "TaskList": "task-list",
164+
// "ActivityType": "activity1"
165+
// },
166+
// "Count": 1
167+
// },
168+
// {
169+
// "Info": {
170+
// "TaskList": "task-list",
171+
// "ActivityType": "activity2"
172+
// },
173+
// "Count": 1
174+
// }
175+
// ]
176+
// [
177+
// {
178+
// "Info": {
179+
// "TaskList": "task-list",
180+
// "ActivityType": "activity2"
181+
// },
182+
// "Count": 1
183+
// }
184+
// ]
185+
// null
81186
}
Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,30 @@ type (
4343
// stats on the Worker for debugging purposes.
4444
// Deprecated: in development and very likely to change
4545
WorkerStats struct {
46-
PollerTracker PollerTracker
46+
PollerTracker PollerTracker
47+
ActivityTracker ActivityTracker
48+
}
49+
50+
// ActivityInfo contains details on the executing activity
51+
// Deprecated: in development and very likely to change
52+
ActivityInfo struct {
53+
TaskList string
54+
ActivityType string
55+
}
56+
57+
// ActivityTracker is a worker option to track executing activities on a worker
58+
// Deprecated: in development and very likely to change
59+
ActivityTracker interface {
60+
// Start records activity execution
61+
Start(info ActivityInfo) Stopper
62+
// Stats returns a list of executing activity info
63+
Stats() Activities
64+
}
65+
66+
// Activities is a list of executing activities on the worker
67+
// Deprecated: in development and very likely to change
68+
Activities []struct {
69+
Info ActivityInfo
70+
Count int64
4771
}
4872
)

internal/common/debug/workerstats_noop.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ type (
2525
pollerTrackerNoopImpl struct{}
2626
// stopperNoopImpl implements the Stopper interface
2727
stopperNoopImpl struct{}
28+
// activityTrackerNoopImpl implements the ActivityTracker interface
29+
activityTrackerNoopImpl struct{}
2830
)
2931

3032
func (lc *pollerTrackerNoopImpl) Start() Stopper { return &stopperNoopImpl{} }
@@ -33,3 +35,13 @@ func (r *stopperNoopImpl) Stop() {}
3335

3436
// NewNoopPollerTracker creates a new PollerTracker instance
3537
func NewNoopPollerTracker() PollerTracker { return &pollerTrackerNoopImpl{} }
38+
39+
func (at *activityTrackerNoopImpl) Start(info ActivityInfo) Stopper { return &stopperNoopImpl{} }
40+
func (at *activityTrackerNoopImpl) Stats() Activities { return nil }
41+
42+
// NewNoopActivityTracker creates a new PollerTracker instance
43+
func NewNoopActivityTracker() ActivityTracker { return &activityTrackerNoopImpl{} }
44+
45+
var _ PollerTracker = &pollerTrackerNoopImpl{}
46+
var _ Stopper = &stopperNoopImpl{}
47+
var _ ActivityTracker = &activityTrackerNoopImpl{}

internal/common/debug/workerstats_noop_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@ import (
2828

2929
func TestWorkerStats(t *testing.T) {
3030
pollerTracker := NewNoopPollerTracker()
31+
activityTracker := NewNoopActivityTracker()
3132
assert.NotNil(t, pollerTracker)
3233
assert.NotNil(t, pollerTracker.Start())
3334
assert.Equal(t, int32(0), pollerTracker.Stats())
3435
assert.NotPanics(t, pollerTracker.Start().Stop)
36+
assert.NotNil(t, activityTracker.Start(ActivityInfo{}))
37+
assert.Nil(t, activityTracker.Stats())
3538
}

internal/internal_task_handlers.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import (
3434
"sync/atomic"
3535
"time"
3636

37+
"go.uber.org/cadence/internal/common/debug"
38+
3739
"github.com/opentracing/opentracing-go"
3840
"github.com/uber-go/tally"
3941
"go.uber.org/zap"
@@ -152,6 +154,7 @@ type (
152154
contextPropagators []ContextPropagator
153155
tracer opentracing.Tracer
154156
featureFlags FeatureFlags
157+
activityTracker debug.ActivityTracker
155158
}
156159

157160
// history wrapper method to help information about events.
@@ -1417,6 +1420,7 @@ func newActivityTaskHandlerWithCustomProvider(
14171420
contextPropagators: params.ContextPropagators,
14181421
tracer: params.Tracer,
14191422
featureFlags: params.FeatureFlags,
1423+
activityTracker: params.WorkerStats.ActivityTracker,
14201424
}
14211425
}
14221426

@@ -1710,7 +1714,11 @@ func (ath *activityTaskHandlerImpl) Execute(taskList string, t *s.PollForActivit
17101714
}
17111715
}()
17121716
}
1713-
1717+
activityInfo := debug.ActivityInfo{
1718+
TaskList: ath.taskListName,
1719+
ActivityType: activityType,
1720+
}
1721+
defer ath.activityTracker.Start(activityInfo).Stop()
17141722
output, err := activityImplementation.Execute(ctx, t.Input)
17151723

17161724
dlCancelFunc()

internal/internal_task_handlers_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1542,6 +1542,7 @@ func (t *TaskHandlersTestSuite) TestActivityExecutionDeadline() {
15421542
Tracer: opentracing.NoopTracer{},
15431543
},
15441544
}
1545+
ensureRequiredParams(&wep)
15451546
activityHandler := newActivityTaskHandler(mockService, wep, registry)
15461547
pats := &s.PollForActivityTaskResponse{
15471548
TaskToken: []byte("token"),

internal/internal_task_pollers.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import (
3030
"sync"
3131
"time"
3232

33+
"go.uber.org/cadence/internal/common/debug"
34+
3335
"github.com/opentracing/opentracing-go"
3436
"github.com/pborman/uuid"
3537
"github.com/uber-go/tally"
@@ -136,6 +138,7 @@ type (
136138
dataConverter DataConverter
137139
contextPropagators []ContextPropagator
138140
tracer opentracing.Tracer
141+
activityTracker debug.ActivityTracker
139142
}
140143

141144
localActivityResult struct {
@@ -529,6 +532,7 @@ func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localAct
529532
dataConverter: params.DataConverter,
530533
contextPropagators: params.ContextPropagators,
531534
tracer: params.Tracer,
535+
activityTracker: params.WorkerStats.ActivityTracker,
532536
}
533537
return &localActivityTaskPoller{
534538
basePoller: basePoller{shutdownC: params.WorkerStopChannel},
@@ -658,6 +662,11 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
658662

659663
laStartTime := time.Now()
660664
ctx, span := createOpenTracingActivitySpan(ctx, lath.tracer, time.Now(), task.params.ActivityType, task.params.WorkflowInfo.WorkflowExecution.ID, task.params.WorkflowInfo.WorkflowExecution.RunID)
665+
activityInfo := debug.ActivityInfo{
666+
TaskList: task.params.WorkflowInfo.TaskListName,
667+
ActivityType: activityType,
668+
}
669+
defer lath.activityTracker.Start(activityInfo).Stop()
661670
defer span.Finish()
662671
laResult, err = ae.ExecuteWithActualArgs(ctx, task.params.InputArgs)
663672
executionLatency := time.Now().Sub(laStartTime)

internal/internal_worker.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,10 @@ func ensureRequiredParams(params *workerExecutionParameters) {
193193
params.WorkerStats.PollerTracker = debug.NewNoopPollerTracker()
194194
params.Logger.Debug("No PollerTracker configured for WorkerStats option. Will use the default.")
195195
}
196+
if params.WorkerStats.ActivityTracker == nil {
197+
params.WorkerStats.ActivityTracker = debug.NewNoopActivityTracker()
198+
params.Logger.Debug("No ActivityTracker configured for WorkerStats option. Will use the default.")
199+
}
196200
}
197201

198202
// verifyDomainExist does a DescribeDomain operation on the specified domain with backoff/retry

internal/internal_worker_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,7 +1100,10 @@ func TestWorkerOptionDefaults(t *testing.T) {
11001100
Logger: decisionWorker.executionParameters.Logger,
11011101
MetricsScope: decisionWorker.executionParameters.MetricsScope,
11021102
Identity: decisionWorker.executionParameters.Identity,
1103-
WorkerStats: debug.WorkerStats{debug.NewNoopPollerTracker()},
1103+
WorkerStats: debug.WorkerStats{
1104+
PollerTracker: debug.NewNoopPollerTracker(),
1105+
ActivityTracker: debug.NewNoopActivityTracker(),
1106+
},
11041107
},
11051108
UserContext: decisionWorker.executionParameters.UserContext,
11061109
}
@@ -1161,7 +1164,10 @@ func TestWorkerOptionNonDefaults(t *testing.T) {
11611164
Logger: options.Logger,
11621165
MetricsScope: options.MetricsScope,
11631166
Identity: options.Identity,
1164-
WorkerStats: debug.WorkerStats{debug.NewNoopPollerTracker()},
1167+
WorkerStats: debug.WorkerStats{
1168+
PollerTracker: debug.NewNoopPollerTracker(),
1169+
ActivityTracker: debug.NewNoopActivityTracker(),
1170+
},
11651171
},
11661172
}
11671173

@@ -1190,6 +1196,7 @@ func assertWorkerExecutionParamsEqual(t *testing.T, paramsA workerExecutionParam
11901196
require.Equal(t, paramsA.EnableLoggingInReplay, paramsB.EnableLoggingInReplay)
11911197
require.Equal(t, paramsA.DisableStickyExecution, paramsB.DisableStickyExecution)
11921198
require.Equal(t, paramsA.WorkerStats.PollerTracker, paramsB.WorkerStats.PollerTracker)
1199+
require.Equal(t, paramsA.WorkerStats.ActivityTracker, paramsB.WorkerStats.ActivityTracker)
11931200
}
11941201

11951202
/*

0 commit comments

Comments
 (0)