Skip to content
This repository was archived by the owner on Feb 28, 2025. It is now read-only.

Commit adf6bef

Browse files
ketsiambakumrombout
authored andcommitted
Add workerStats as worker options to collect stats on poller start/stop events (cadence-workflow#1356)
* Add stats collector in worker options * add comment for type definition * update comments * add tests assertions * add pollerlifecycle interface * lint * fix typo * add implementation and tests * run lint * update debug log msg & internal pkg * address comments * address more comments * address more comments * lint * rewrite poller tracker * fix typo in test * add tests for noop impl
1 parent 161d137 commit adf6bef

File tree

11 files changed

+291
-18
lines changed

11 files changed

+291
-18
lines changed

debug/interfaces.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright (c) 2017-2021 Uber Technologies Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package debug
22+
23+
import (
24+
internal "go.uber.org/cadence/internal/common/debug"
25+
)
26+
27+
type (
28+
// PollerTracker is an interface to track running pollers on a worker
29+
// Deprecated: in development and very likely to change
30+
PollerTracker = internal.PollerTracker
31+
32+
// Stopper is an interface for tracking stop events in an ongoing process or goroutine.
33+
// Implementations should ensure that Stop() is used to signal and track the stop event
34+
// and not to clean up any resources opened by worker
35+
// Deprecated: in development and very likely to change
36+
Stopper = internal.Stopper
37+
38+
// WorkerStats provides a set of methods that can be used to collect
39+
// stats on the Worker for debugging purposes.
40+
// Deprecated: in development and very likely to change
41+
WorkerStats = internal.WorkerStats
42+
)

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ require (
1818
github.com/uber/cadence-idl v0.0.0-20240723221048-0482c040f91d
1919
github.com/uber/jaeger-client-go v2.22.1+incompatible
2020
github.com/uber/tchannel-go v1.32.1
21-
go.uber.org/atomic v1.9.0
21+
go.uber.org/atomic v1.11.0
2222
go.uber.org/goleak v1.1.12
2323
go.uber.org/multierr v1.6.0
2424
go.uber.org/thriftrw v1.25.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
223223
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
224224
go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
225225
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
226-
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
227-
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
226+
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
227+
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
228228
go.uber.org/dig v1.8.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw=
229229
go.uber.org/dig v1.10.0 h1:yLmDDj9/zuDjv3gz8GQGviXMs9TfysIUMUilCpgzUJY=
230230
go.uber.org/dig v1.10.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw=
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright (c) 2017-2021 Uber Technologies Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package debug
22+
23+
import (
24+
"fmt"
25+
26+
"go.uber.org/atomic"
27+
)
28+
29+
type (
30+
// pollerTrackerImpl implements the PollerTracker interface
31+
pollerTrackerImpl struct {
32+
pollerCount atomic.Int32
33+
}
34+
35+
// stopperImpl implements the Stopper interface
36+
stopperImpl struct {
37+
pollerTracker *pollerTrackerImpl
38+
}
39+
)
40+
41+
func (p *pollerTrackerImpl) Start() Stopper {
42+
p.pollerCount.Inc()
43+
return &stopperImpl{
44+
pollerTracker: p,
45+
}
46+
}
47+
48+
func (p *pollerTrackerImpl) Stats() int32 {
49+
return p.pollerCount.Load()
50+
}
51+
52+
func (s *stopperImpl) Stop() {
53+
s.pollerTracker.pollerCount.Dec()
54+
}
55+
56+
func Example() {
57+
var pollerTracker PollerTracker
58+
pollerTracker = &pollerTrackerImpl{}
59+
60+
// Initially, poller count should be 0
61+
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
62+
63+
// Start a poller and verify that the count increments
64+
stopper1 := pollerTracker.Start()
65+
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
66+
67+
// Start another poller and verify that the count increments again
68+
stopper2 := pollerTracker.Start()
69+
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
70+
71+
// Stop the pollers and verify the counter
72+
stopper1.Stop()
73+
stopper2.Stop()
74+
fmt.Println(fmt.Sprintf("stats: %d", pollerTracker.Stats()))
75+
76+
// Output:
77+
// stats: 0
78+
// stats: 1
79+
// stats: 2
80+
// stats: 0
81+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright (c) 2017-2021 Uber Technologies Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package debug
22+
23+
type (
24+
// Stopper is an interface for tracking stop events in an ongoing process or goroutine.
25+
// Implementations should ensure not to clean up any resources opened by worker
26+
// Deprecated: in development and very likely to change
27+
Stopper interface {
28+
// Stop signals and track a stop event
29+
Stop()
30+
}
31+
32+
// PollerTracker is an interface to track running pollers on a worker
33+
// Deprecated: in development and very likely to change
34+
PollerTracker interface {
35+
// Start collects information on poller start up.
36+
// consumers should provide a concurrency-safe implementation.
37+
Start() Stopper
38+
// Stats return the number or running pollers
39+
Stats() int32
40+
}
41+
42+
// WorkerStats provides a set of methods that can be used to collect
43+
// stats on the Worker for debugging purposes.
44+
// Deprecated: in development and very likely to change
45+
WorkerStats struct {
46+
PollerTracker PollerTracker
47+
}
48+
)
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright (c) 2017-2021 Uber Technologies Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package debug
22+
23+
type (
24+
// pollerTrackerNoopImpl implements the PollerTracker interface
25+
pollerTrackerNoopImpl struct{}
26+
// stopperNoopImpl implements the Stopper interface
27+
stopperNoopImpl struct{}
28+
)
29+
30+
func (lc *pollerTrackerNoopImpl) Start() Stopper { return &stopperNoopImpl{} }
31+
func (lc *pollerTrackerNoopImpl) Stats() int32 { return 0 }
32+
func (r *stopperNoopImpl) Stop() {}
33+
34+
// NewNoopPollerTracker creates a new PollerTracker instance
35+
func NewNoopPollerTracker() PollerTracker { return &pollerTrackerNoopImpl{} }
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright (c) 2017-2021 Uber Technologies Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package debug
22+
23+
import (
24+
"testing"
25+
26+
"github.com/stretchr/testify/assert"
27+
)
28+
29+
func TestWorkerStats(t *testing.T) {
30+
pollerTracker := NewNoopPollerTracker()
31+
assert.NotNil(t, pollerTracker)
32+
assert.NotNil(t, pollerTracker.Start())
33+
assert.Equal(t, int32(0), pollerTracker.Stats())
34+
assert.NotPanics(t, pollerTracker.Start().Stop)
35+
}

internal/internal_worker.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import (
3939
"sync/atomic"
4040
"time"
4141

42+
"go.uber.org/cadence/internal/common/debug"
43+
4244
"go.uber.org/cadence/internal/common/isolationgroup"
4345

4446
"github.com/opentracing/opentracing-go"
@@ -187,6 +189,10 @@ func ensureRequiredParams(params *workerExecutionParameters) {
187189
if params.UserContext == nil {
188190
params.UserContext = context.Background()
189191
}
192+
if params.WorkerStats.PollerTracker == nil {
193+
params.WorkerStats.PollerTracker = debug.NewNoopPollerTracker()
194+
params.Logger.Debug("No PollerTracker configured for WorkerStats option. Will use the default.")
195+
}
190196
}
191197

192198
// verifyDomainExist does a DescribeDomain operation on the specified domain with backoff/retry
@@ -281,7 +287,9 @@ func newWorkflowTaskWorkerInternal(
281287
taskWorker: poller,
282288
identity: params.Identity,
283289
workerType: "DecisionWorker",
284-
shutdownTimeout: params.WorkerStopTimeout},
290+
shutdownTimeout: params.WorkerStopTimeout,
291+
pollerTracker: params.WorkerStats.PollerTracker,
292+
},
285293
params.Logger,
286294
params.MetricsScope,
287295
nil,
@@ -304,7 +312,9 @@ func newWorkflowTaskWorkerInternal(
304312
taskWorker: localActivityTaskPoller,
305313
identity: params.Identity,
306314
workerType: "LocalActivityWorker",
307-
shutdownTimeout: params.WorkerStopTimeout},
315+
shutdownTimeout: params.WorkerStopTimeout,
316+
pollerTracker: params.WorkerStats.PollerTracker,
317+
},
308318
params.Logger,
309319
params.MetricsScope,
310320
nil,
@@ -482,7 +492,10 @@ func newActivityTaskWorker(
482492
identity: workerParams.Identity,
483493
workerType: workerType,
484494
shutdownTimeout: workerParams.WorkerStopTimeout,
485-
userContextCancel: workerParams.UserContextCancel},
495+
userContextCancel: workerParams.UserContextCancel,
496+
pollerTracker: workerParams.WorkerStats.PollerTracker,
497+
},
498+
486499
workerParams.Logger,
487500
workerParams.MetricsScope,
488501
sessionTokenBucket,

internal/internal_worker_base.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import (
3333
"syscall"
3434
"time"
3535

36+
"go.uber.org/cadence/internal/common/debug"
37+
3638
"github.com/shirou/gopsutil/cpu"
3739
"github.com/uber-go/tally"
3840
"go.uber.org/zap"
@@ -127,6 +129,7 @@ type (
127129
shutdownTimeout time.Duration
128130
userContextCancel context.CancelFunc
129131
host string
132+
pollerTracker debug.PollerTracker
130133
}
131134

132135
// baseWorker that wraps worker activities.
@@ -178,16 +181,15 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
178181
}
179182

180183
bw := &baseWorker{
181-
options: options,
182-
shutdownCh: make(chan struct{}),
183-
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
184-
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
185-
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
186-
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
187-
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
188-
pollerAutoScaler: pollerAS,
189-
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
190-
184+
options: options,
185+
shutdownCh: make(chan struct{}),
186+
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
187+
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
188+
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
189+
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
190+
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
191+
pollerAutoScaler: pollerAS,
192+
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
191193
limiterContext: ctx,
192194
limiterContextCancel: cancel,
193195
sessionTokenBucket: sessionTokenBucket,
@@ -244,6 +246,8 @@ func (bw *baseWorker) isShutdown() bool {
244246

245247
func (bw *baseWorker) runPoller() {
246248
defer bw.shutdownWG.Done()
249+
defer bw.options.pollerTracker.Start().Stop()
250+
247251
bw.metricsScope.Counter(metrics.PollerStartCounter).Inc(1)
248252

249253
for {

0 commit comments

Comments
 (0)