Skip to content

Commit 32f02a1

Browse files
authored
We now log when the automatic heartbeating fails (#1263)
1 parent 225289a commit 32f02a1

File tree

3 files changed

+45
-13
lines changed

3 files changed

+45
-13
lines changed

internal/activity_test.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,24 @@ import (
2828
"github.com/stretchr/testify/require"
2929
"github.com/stretchr/testify/suite"
3030
"go.uber.org/yarpc"
31+
"go.uber.org/zap"
32+
"go.uber.org/zap/zaptest"
3133

3234
"go.uber.org/cadence/.gen/go/cadence/workflowservicetest"
3335
"go.uber.org/cadence/.gen/go/shared"
3436
"go.uber.org/cadence/internal/common"
3537
)
3638

39+
const (
40+
testWorkflowType = "test-workflow-type"
41+
testActivityType = "test-activity-type"
42+
)
43+
3744
type activityTestSuite struct {
3845
suite.Suite
3946
mockCtrl *gomock.Controller
4047
service *workflowservicetest.MockClient
48+
logger *zap.Logger
4149
}
4250

4351
func TestActivityTestSuite(t *testing.T) {
@@ -48,6 +56,7 @@ func TestActivityTestSuite(t *testing.T) {
4856
func (s *activityTestSuite) SetupTest() {
4957
s.mockCtrl = gomock.NewController(s.T())
5058
s.service = workflowservicetest.NewMockClient(s.mockCtrl)
59+
s.logger = zaptest.NewLogger(s.T())
5160
}
5261

5362
func (s *activityTestSuite) TearDownTest() {
@@ -56,7 +65,7 @@ func (s *activityTestSuite) TearDownTest() {
5665

5766
func (s *activityTestSuite) TestActivityHeartbeat() {
5867
ctx, cancel := context.WithCancel(context.Background())
59-
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 1, make(chan struct{}), FeatureFlags{})
68+
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 1, make(chan struct{}), FeatureFlags{}, s.logger, testWorkflowType, testActivityType)
6069
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{serviceInvoker: invoker})
6170

6271
s.service.EXPECT().RecordActivityTaskHeartbeat(gomock.Any(), gomock.Any(), callOptions()...).
@@ -67,7 +76,7 @@ func (s *activityTestSuite) TestActivityHeartbeat() {
6776

6877
func (s *activityTestSuite) TestActivityHeartbeat_InternalError() {
6978
ctx, cancel := context.WithCancel(context.Background())
70-
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 1, make(chan struct{}), FeatureFlags{})
79+
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 1, make(chan struct{}), FeatureFlags{}, s.logger, testWorkflowType, testActivityType)
7180
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
7281
serviceInvoker: invoker,
7382
logger: getTestLogger(s.T())})
@@ -83,7 +92,7 @@ func (s *activityTestSuite) TestActivityHeartbeat_InternalError() {
8392

8493
func (s *activityTestSuite) TestActivityHeartbeat_CancelRequested() {
8594
ctx, cancel := context.WithCancel(context.Background())
86-
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 1, make(chan struct{}), FeatureFlags{})
95+
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 1, make(chan struct{}), FeatureFlags{}, s.logger, testWorkflowType, testActivityType)
8796
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
8897
serviceInvoker: invoker,
8998
logger: getTestLogger(s.T())})
@@ -98,7 +107,7 @@ func (s *activityTestSuite) TestActivityHeartbeat_CancelRequested() {
98107

99108
func (s *activityTestSuite) TestActivityHeartbeat_EntityNotExist() {
100109
ctx, cancel := context.WithCancel(context.Background())
101-
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 1, make(chan struct{}), FeatureFlags{})
110+
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 1, make(chan struct{}), FeatureFlags{}, s.logger, testWorkflowType, testActivityType)
102111
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
103112
serviceInvoker: invoker,
104113
logger: getTestLogger(s.T())})
@@ -113,7 +122,7 @@ func (s *activityTestSuite) TestActivityHeartbeat_EntityNotExist() {
113122

114123
func (s *activityTestSuite) TestActivityHeartbeat_SuppressContinousInvokes() {
115124
ctx, cancel := context.WithCancel(context.Background())
116-
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 2, make(chan struct{}), FeatureFlags{})
125+
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 2, make(chan struct{}), FeatureFlags{}, s.logger, testWorkflowType, testActivityType)
117126
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
118127
serviceInvoker: invoker,
119128
logger: getTestLogger(s.T())})
@@ -128,7 +137,7 @@ func (s *activityTestSuite) TestActivityHeartbeat_SuppressContinousInvokes() {
128137

129138
// No HB timeout configured.
130139
service2 := workflowservicetest.NewMockClient(s.mockCtrl)
131-
invoker2 := newServiceInvoker([]byte("task-token"), "identity", service2, cancel, 0, make(chan struct{}), FeatureFlags{})
140+
invoker2 := newServiceInvoker([]byte("task-token"), "identity", service2, cancel, 0, make(chan struct{}), FeatureFlags{}, s.logger, testWorkflowType, testActivityType)
132141
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
133142
serviceInvoker: invoker2,
134143
logger: getTestLogger(s.T())})
@@ -141,7 +150,7 @@ func (s *activityTestSuite) TestActivityHeartbeat_SuppressContinousInvokes() {
141150
// simulate batch picks before expiry.
142151
waitCh := make(chan struct{})
143152
service3 := workflowservicetest.NewMockClient(s.mockCtrl)
144-
invoker3 := newServiceInvoker([]byte("task-token"), "identity", service3, cancel, 2, make(chan struct{}), FeatureFlags{})
153+
invoker3 := newServiceInvoker([]byte("task-token"), "identity", service3, cancel, 2, make(chan struct{}), FeatureFlags{}, s.logger, testWorkflowType, testActivityType)
145154
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
146155
serviceInvoker: invoker3,
147156
logger: getTestLogger(s.T())})
@@ -171,7 +180,7 @@ func (s *activityTestSuite) TestActivityHeartbeat_SuppressContinousInvokes() {
171180
// simulate batch picks before expiry, with out any progress specified.
172181
waitCh2 := make(chan struct{})
173182
service4 := workflowservicetest.NewMockClient(s.mockCtrl)
174-
invoker4 := newServiceInvoker([]byte("task-token"), "identity", service4, cancel, 2, make(chan struct{}), FeatureFlags{})
183+
invoker4 := newServiceInvoker([]byte("task-token"), "identity", service4, cancel, 2, make(chan struct{}), FeatureFlags{}, s.logger, testWorkflowType, testActivityType)
175184
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
176185
serviceInvoker: invoker4,
177186
logger: getTestLogger(s.T())})
@@ -195,7 +204,7 @@ func (s *activityTestSuite) TestActivityHeartbeat_SuppressContinousInvokes() {
195204
func (s *activityTestSuite) TestActivityHeartbeat_WorkerStop() {
196205
ctx, cancel := context.WithCancel(context.Background())
197206
workerStopChannel := make(chan struct{})
198-
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 5, workerStopChannel, FeatureFlags{})
207+
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 5, workerStopChannel, FeatureFlags{}, s.logger, testWorkflowType, testActivityType)
199208
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{serviceInvoker: invoker})
200209

201210
heartBeatDetail := "testDetails"

internal/internal_task_handlers.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,6 +1400,9 @@ type cadenceInvoker struct {
14001400
closeCh chan struct{}
14011401
workerStopChannel <-chan struct{}
14021402
featureFlags FeatureFlags
1403+
logger *zap.Logger
1404+
workflowType string
1405+
activityType string
14031406
}
14041407

14051408
func (i *cadenceInvoker) Heartbeat(details []byte) error {
@@ -1485,10 +1488,16 @@ func (i *cadenceInvoker) heartbeatAndScheduleNextRun(details []byte) error {
14851488
i.hbBatchEndTimer.Stop()
14861489
i.hbBatchEndTimer = nil
14871490

1491+
var err error
14881492
if detailsToReport != nil {
1489-
i.heartbeatAndScheduleNextRun(*detailsToReport)
1493+
err = i.heartbeatAndScheduleNextRun(*detailsToReport)
14901494
}
14911495
i.Unlock()
1496+
1497+
// Log the error outside the lock.
1498+
if err != nil {
1499+
i.logger.Error("Failed to send heartbeat", zap.Error(err), zap.String(tagWorkflowType, i.workflowType), zap.String(tagActivityType, i.activityType))
1500+
}
14921501
}()
14931502
}
14941503

@@ -1550,6 +1559,9 @@ func newServiceInvoker(
15501559
heartBeatTimeoutInSec int32,
15511560
workerStopChannel <-chan struct{},
15521561
featureFlags FeatureFlags,
1562+
logger *zap.Logger,
1563+
workflowType string,
1564+
activityType string,
15531565
) ServiceInvoker {
15541566
return &cadenceInvoker{
15551567
taskToken: taskToken,
@@ -1560,6 +1572,9 @@ func newServiceInvoker(
15601572
closeCh: make(chan struct{}),
15611573
workerStopChannel: workerStopChannel,
15621574
featureFlags: featureFlags,
1575+
logger: logger,
1576+
workflowType: workflowType,
1577+
activityType: activityType,
15631578
}
15641579
}
15651580

@@ -1579,14 +1594,14 @@ func (ath *activityTaskHandlerImpl) Execute(taskList string, t *s.PollForActivit
15791594
canCtx, cancel := context.WithCancel(rootCtx)
15801595
defer cancel()
15811596

1582-
invoker := newServiceInvoker(t.TaskToken, ath.identity, ath.service, cancel, t.GetHeartbeatTimeoutSeconds(), ath.workerStopCh, ath.featureFlags)
1597+
workflowType := t.WorkflowType.GetName()
1598+
activityType := t.ActivityType.GetName()
1599+
invoker := newServiceInvoker(t.TaskToken, ath.identity, ath.service, cancel, t.GetHeartbeatTimeoutSeconds(), ath.workerStopCh, ath.featureFlags, ath.logger, workflowType, activityType)
15831600
defer func() {
15841601
_, activityCompleted := result.(*s.RespondActivityTaskCompletedRequest)
15851602
invoker.Close(!activityCompleted) // flush buffered heartbeat if activity was not successfully completed.
15861603
}()
15871604

1588-
workflowType := t.WorkflowType.GetName()
1589-
activityType := t.ActivityType.GetName()
15901605
metricsScope := getMetricsScopeForActivity(ath.metricsScope, workflowType, activityType)
15911606
ctx := WithActivityTask(canCtx, t, taskList, invoker, ath.logger, metricsScope, ath.dataConverter, ath.workerStopCh, ath.contextPropagators, ath.tracer)
15921607

internal/internal_task_handlers_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1384,6 +1384,7 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_Interleaved() {
13841384
func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithError() {
13851385
mockCtrl := gomock.NewController(t.T())
13861386
mockService := workflowservicetest.NewMockClient(mockCtrl)
1387+
logger := zaptest.NewLogger(t.T())
13871388

13881389
entityNotExistsError := &s.EntityNotExistsError{}
13891390
mockService.EXPECT().RecordActivityTaskHeartbeat(gomock.Any(), gomock.Any(), callOptions()...).Return(nil, entityNotExistsError)
@@ -1396,6 +1397,9 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithError() {
13961397
0,
13971398
make(chan struct{}),
13981399
FeatureFlags{},
1400+
logger,
1401+
testWorkflowType,
1402+
testActivityType,
13991403
)
14001404

14011405
heartbeatErr := cadenceInvoker.BatchHeartbeat(nil)
@@ -1407,6 +1411,7 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithError() {
14071411
func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithDomainNotActiveError() {
14081412
mockCtrl := gomock.NewController(t.T())
14091413
mockService := workflowservicetest.NewMockClient(mockCtrl)
1414+
logger := zaptest.NewLogger(t.T())
14101415

14111416
domainNotActiveError := &s.DomainNotActiveError{}
14121417
mockService.EXPECT().RecordActivityTaskHeartbeat(gomock.Any(), gomock.Any(), callOptions()...).Return(nil, domainNotActiveError)
@@ -1422,6 +1427,9 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithDomainNotActiveErro
14221427
0,
14231428
make(chan struct{}),
14241429
FeatureFlags{},
1430+
logger,
1431+
testWorkflowType,
1432+
testActivityType,
14251433
)
14261434

14271435
heartbeatErr := cadenceInvoker.BatchHeartbeat(nil)

0 commit comments

Comments
 (0)