Skip to content

Commit c0d960b

Browse files
committed
Unit tests for activity task handler
1 parent a71b70f commit c0d960b

11 files changed

+1011
-321
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/gogo/protobuf v1.3.2
99
github.com/golang-jwt/jwt/v5 v5.2.0
1010
github.com/golang/mock v1.5.0
11+
github.com/jonboulle/clockwork v0.4.0
1112
github.com/marusama/semaphore/v2 v2.5.0
1213
github.com/opentracing/opentracing-go v1.1.0
1314
github.com/pborman/uuid v0.0.0-20160209185913-a97ce2ca70fa

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
9696
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
9797
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
9898
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
99+
github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
100+
github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc=
99101
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
100102
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
101103
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=

internal/activity.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import (
3434
"go.uber.org/cadence/.gen/go/shared"
3535
)
3636

37+
//go:generate mockery --name ServiceInvoker --inpackage --with-expecter --case snake --filename service_invoker_mock.go --boilerplate-file ../LICENSE
38+
3739
type (
3840
// RegistryActivityInfo
3941
RegistryActivityInfo interface {

internal/activity_task_handler.go

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
// Copyright (c) 2017-2021 Uber Technologies Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package internal
22+
23+
import (
24+
"context"
25+
"fmt"
26+
"strings"
27+
"time"
28+
29+
"github.com/jonboulle/clockwork"
30+
"github.com/opentracing/opentracing-go"
31+
"go.uber.org/zap"
32+
33+
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
34+
s "go.uber.org/cadence/.gen/go/shared"
35+
"go.uber.org/cadence/internal/common/debug"
36+
"go.uber.org/cadence/internal/common/metrics"
37+
)
38+
39+
type (
40+
// activityTaskHandlerImpl is the implementation of ActivityTaskHandler
41+
activityTaskHandlerImpl struct {
42+
clock clockwork.Clock
43+
taskListName string
44+
identity string
45+
service workflowserviceclient.Interface
46+
metricsScope *metrics.TaggedScope
47+
logger *zap.Logger
48+
userContext context.Context
49+
registry *registry
50+
activityProvider activityProvider
51+
dataConverter DataConverter
52+
workerStopCh <-chan struct{}
53+
contextPropagators []ContextPropagator
54+
tracer opentracing.Tracer
55+
featureFlags FeatureFlags
56+
activityTracker debug.ActivityTracker
57+
}
58+
)
59+
60+
func newActivityTaskHandler(
61+
service workflowserviceclient.Interface,
62+
params workerExecutionParameters,
63+
registry *registry,
64+
) ActivityTaskHandler {
65+
return newActivityTaskHandlerWithCustomProvider(service, params, registry, nil, clockwork.NewRealClock())
66+
}
67+
68+
func newActivityTaskHandlerWithCustomProvider(
69+
service workflowserviceclient.Interface,
70+
params workerExecutionParameters,
71+
registry *registry,
72+
activityProvider activityProvider,
73+
clock clockwork.Clock,
74+
) ActivityTaskHandler {
75+
if params.Tracer == nil {
76+
params.Tracer = opentracing.NoopTracer{}
77+
}
78+
if params.WorkerStats.ActivityTracker == nil {
79+
params.WorkerStats.ActivityTracker = debug.NewNoopActivityTracker()
80+
}
81+
return &activityTaskHandlerImpl{
82+
clock: clock,
83+
taskListName: params.TaskList,
84+
identity: params.Identity,
85+
service: service,
86+
logger: params.Logger,
87+
metricsScope: metrics.NewTaggedScope(params.MetricsScope),
88+
userContext: params.UserContext,
89+
registry: registry,
90+
activityProvider: activityProvider,
91+
dataConverter: params.DataConverter,
92+
workerStopCh: params.WorkerStopChannel,
93+
contextPropagators: params.ContextPropagators,
94+
tracer: params.Tracer,
95+
featureFlags: params.FeatureFlags,
96+
activityTracker: params.WorkerStats.ActivityTracker,
97+
}
98+
}
99+
100+
// Execute executes an implementation of the activity.
101+
func (ath *activityTaskHandlerImpl) Execute(taskList string, t *s.PollForActivityTaskResponse) (result interface{}, err error) {
102+
traceLog(func() {
103+
ath.logger.Debug("Processing new activity task",
104+
zap.String(tagWorkflowID, t.WorkflowExecution.GetWorkflowId()),
105+
zap.String(tagRunID, t.WorkflowExecution.GetRunId()),
106+
zap.String(tagActivityType, t.ActivityType.GetName()))
107+
})
108+
109+
rootCtx := ath.userContext
110+
if rootCtx == nil {
111+
rootCtx = context.Background()
112+
}
113+
canCtx, cancel := context.WithCancel(rootCtx)
114+
defer cancel()
115+
116+
workflowType := t.WorkflowType.GetName()
117+
activityType := t.ActivityType.GetName()
118+
invoker := newServiceInvoker(t.TaskToken, ath.identity, ath.service, cancel, t.GetHeartbeatTimeoutSeconds(), ath.workerStopCh, ath.featureFlags, ath.logger, workflowType, activityType)
119+
defer func() {
120+
_, activityCompleted := result.(*s.RespondActivityTaskCompletedRequest)
121+
invoker.Close(!activityCompleted) // flush buffered heartbeat if activity was not successfully completed.
122+
}()
123+
124+
metricsScope := getMetricsScopeForActivity(ath.metricsScope, workflowType, activityType)
125+
ctx := WithActivityTask(canCtx, t, taskList, invoker, ath.logger, metricsScope, ath.dataConverter, ath.workerStopCh, ath.contextPropagators, ath.tracer)
126+
127+
activityImplementation := ath.getActivity(activityType)
128+
if activityImplementation == nil {
129+
// Couldn't find the activity implementation.
130+
supported := strings.Join(ath.getRegisteredActivityNames(), ", ")
131+
return nil, fmt.Errorf("unable to find activityType=%v. Supported types: [%v]", activityType, supported)
132+
}
133+
134+
// panic handler
135+
defer func() {
136+
if p := recover(); p != nil {
137+
topLine := fmt.Sprintf("activity for %s [panic]:", ath.taskListName)
138+
st := getStackTraceRaw(topLine, 7, 0)
139+
ath.logger.Error("Activity panic.",
140+
zap.String(tagWorkflowID, t.WorkflowExecution.GetWorkflowId()),
141+
zap.String(tagRunID, t.WorkflowExecution.GetRunId()),
142+
zap.String(tagActivityType, activityType),
143+
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
144+
zap.String(tagPanicStack, st))
145+
metricsScope.Counter(metrics.ActivityTaskPanicCounter).Inc(1)
146+
panicErr := newPanicError(p, st)
147+
result, err = convertActivityResultToRespondRequest(ath.identity, t.TaskToken, nil, panicErr, ath.dataConverter), nil
148+
}
149+
}()
150+
151+
// propagate context information into the activity context from the headers
152+
for _, ctxProp := range ath.contextPropagators {
153+
var err error
154+
if ctx, err = ctxProp.Extract(ctx, NewHeaderReader(t.Header)); err != nil {
155+
return nil, fmt.Errorf("unable to propagate context %w", err)
156+
}
157+
}
158+
159+
info := ctx.Value(activityEnvContextKey).(*activityEnvironment)
160+
ctx, dlCancelFunc := context.WithDeadline(ctx, info.deadline)
161+
defer dlCancelFunc()
162+
163+
ctx, span := createOpenTracingActivitySpan(ctx, ath.tracer, time.Now(), activityType, t.WorkflowExecution.GetWorkflowId(), t.WorkflowExecution.GetRunId())
164+
defer span.Finish()
165+
166+
if activityImplementation.GetOptions().EnableAutoHeartbeat && t.HeartbeatTimeoutSeconds != nil && *t.HeartbeatTimeoutSeconds > 0 {
167+
heartBeater := newHeartbeater(ath.workerStopCh, invoker, ath.logger, ath.clock, activityType, t.WorkflowExecution)
168+
go heartBeater.Run(ctx, time.Duration(*t.HeartbeatTimeoutSeconds)*time.Second)
169+
}
170+
activityInfo := debug.ActivityInfo{
171+
TaskList: ath.taskListName,
172+
ActivityType: activityType,
173+
}
174+
defer ath.activityTracker.Start(activityInfo).Stop()
175+
output, err := activityImplementation.Execute(ctx, t.Input)
176+
177+
dlCancelFunc()
178+
if <-ctx.Done(); ctx.Err() == context.DeadlineExceeded {
179+
ath.logger.Warn("Activity timeout.",
180+
zap.String(tagWorkflowID, t.WorkflowExecution.GetWorkflowId()),
181+
zap.String(tagRunID, t.WorkflowExecution.GetRunId()),
182+
zap.String(tagActivityType, activityType),
183+
)
184+
return nil, ctx.Err()
185+
}
186+
if err != nil && err != ErrActivityResultPending {
187+
ath.logger.Error("Activity error.",
188+
zap.String(tagWorkflowID, t.WorkflowExecution.GetWorkflowId()),
189+
zap.String(tagRunID, t.WorkflowExecution.GetRunId()),
190+
zap.String(tagActivityType, activityType),
191+
zap.Error(err),
192+
)
193+
}
194+
return convertActivityResultToRespondRequest(ath.identity, t.TaskToken, output, err, ath.dataConverter), nil
195+
}
196+
197+
func (ath *activityTaskHandlerImpl) getActivity(name string) activity {
198+
if ath.activityProvider != nil {
199+
return ath.activityProvider(name)
200+
}
201+
202+
if a, ok := ath.registry.GetActivity(name); ok {
203+
return a
204+
}
205+
206+
return nil
207+
}
208+
209+
func (ath *activityTaskHandlerImpl) getRegisteredActivityNames() (activityNames []string) {
210+
for _, a := range ath.registry.getRegisteredActivities() {
211+
activityNames = append(activityNames, a.ActivityType().Name)
212+
}
213+
return
214+
}

0 commit comments

Comments
 (0)