Skip to content

Commit 27dd019

Browse files
authored
Improve session framework heartbeat mechanism (#965)
1 parent 08d2a5e commit 27dd019

File tree

4 files changed

+47
-17
lines changed

4 files changed

+47
-17
lines changed

internal/activity.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) {
260260
panic(err)
261261
}
262262
}
263-
err = env.serviceInvoker.Heartbeat(data)
263+
err = env.serviceInvoker.Heartbeat(data, false)
264264
if err != nil {
265265
log := GetActivityLogger(ctx)
266266
log.Debug("RecordActivityHeartbeat With Error:", zap.Error(err))
@@ -271,7 +271,7 @@ func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) {
271271
// Implement to unit test activities.
272272
type ServiceInvoker interface {
273273
// Returns ActivityTaskCanceledError if activity is cancelled
274-
Heartbeat(details []byte) error
274+
Heartbeat(details []byte, skipBatching bool) error
275275
Close(flushBufferedHeartbeat bool)
276276
GetClient(domain string, options *ClientOptions) Client
277277
}

internal/internal_task_handlers.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1621,11 +1621,11 @@ type cadenceInvoker struct {
16211621
workerStopChannel <-chan struct{}
16221622
}
16231623

1624-
func (i *cadenceInvoker) Heartbeat(details []byte) error {
1624+
func (i *cadenceInvoker) Heartbeat(details []byte, skipBatching bool) error {
16251625
i.Lock()
16261626
defer i.Unlock()
16271627

1628-
if i.hbBatchEndTimer != nil {
1628+
if i.hbBatchEndTimer != nil && !skipBatching {
16291629
// If we have started batching window, keep track of last reported progress.
16301630
i.lastDetailsToReport = &details
16311631
return nil
@@ -1635,7 +1635,7 @@ func (i *cadenceInvoker) Heartbeat(details []byte) error {
16351635

16361636
// If the activity is cancelled, the activity can ignore the cancellation and do its work
16371637
// and complete. Our cancellation is co-operative, so we will try to heartbeat.
1638-
if err == nil || isActivityCancelled {
1638+
if (err == nil || isActivityCancelled) && !skipBatching {
16391639
// We have successfully sent heartbeat, start next batching window.
16401640
i.lastDetailsToReport = nil
16411641

@@ -1671,7 +1671,11 @@ func (i *cadenceInvoker) Heartbeat(details []byte) error {
16711671
i.Unlock()
16721672

16731673
if detailsToReport != nil {
1674-
i.Heartbeat(*detailsToReport)
1674+
// TODO: there is a potential race condition here as the lock is released here and
1675+
// locked again in the Hearbeat() method. This possible that a heartbeat call from
1676+
// user activity grabs the lock first and calls internalHeartBeat before this
1677+
// batching goroutine, which means some activity progress will be lost.
1678+
i.Heartbeat(*detailsToReport, false)
16751679
}
16761680
}()
16771681
}

internal/internal_task_handlers_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1171,7 +1171,7 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NoError() {
11711171
taskToken: nil,
11721172
}
11731173

1174-
heartbeatErr := cadenceInvoker.Heartbeat(nil)
1174+
heartbeatErr := cadenceInvoker.Heartbeat(nil, false)
11751175

11761176
t.Nil(heartbeatErr)
11771177
}
@@ -1191,7 +1191,7 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithError() {
11911191
0,
11921192
make(chan struct{}))
11931193

1194-
heartbeatErr := cadenceInvoker.Heartbeat(nil)
1194+
heartbeatErr := cadenceInvoker.Heartbeat(nil, false)
11951195
t.NotNil(heartbeatErr)
11961196
_, ok := (heartbeatErr).(*s.EntityNotExistsError)
11971197
t.True(ok, "heartbeatErr must be EntityNotExistsError.")
@@ -1215,7 +1215,7 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithDomainNotActiveErro
12151215
0,
12161216
make(chan struct{}))
12171217

1218-
heartbeatErr := cadenceInvoker.Heartbeat(nil)
1218+
heartbeatErr := cadenceInvoker.Heartbeat(nil, false)
12191219
t.NotNil(heartbeatErr)
12201220
_, ok := (heartbeatErr).(*s.DomainNotActiveError)
12211221
t.True(ok, "heartbeatErr must be DomainNotActiveError.")

internal/session.go

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"time"
2929

3030
"github.com/pborman/uuid"
31+
"go.uber.org/cadence/internal/common/backoff"
3132
"go.uber.org/zap"
3233
)
3334

@@ -121,8 +122,8 @@ const (
121122

122123
errTooManySessionsMsg string = "too many outstanding sessions"
123124

124-
defaultSessionHeartBeatTimeout time.Duration = time.Second * 20
125-
maxSessionHeartBeatInterval time.Duration = time.Second * 10
125+
defaultSessionHeartbeatTimeout time.Duration = time.Second * 20
126+
maxSessionHeartbeatInterval time.Duration = time.Second * 10
126127
)
127128

128129
var (
@@ -305,7 +306,7 @@ func createSession(ctx Context, creationTasklist string, options *SessionOptions
305306
},
306307
}
307308

308-
heartbeatTimeout := defaultSessionHeartBeatTimeout
309+
heartbeatTimeout := defaultSessionHeartbeatTimeout
309310
if options.HeartbeatTimeout != time.Duration(0) {
310311
heartbeatTimeout = options.HeartbeatTimeout
311312
}
@@ -410,22 +411,47 @@ func sessionCreationActivity(ctx context.Context, sessionID string) error {
410411

411412
activityEnv := getActivityEnv(ctx)
412413
heartbeatInterval := activityEnv.heartbeatTimeout / 3
413-
if heartbeatInterval > maxSessionHeartBeatInterval {
414-
heartbeatInterval = maxSessionHeartBeatInterval
414+
if heartbeatInterval > maxSessionHeartbeatInterval {
415+
heartbeatInterval = maxSessionHeartbeatInterval
415416
}
416417
ticker := time.NewTicker(heartbeatInterval)
417418
defer ticker.Stop()
418419

420+
heartbeatRetryPolicy := backoff.NewExponentialRetryPolicy(time.Second)
421+
heartbeatRetryPolicy.SetMaximumInterval(time.Second * 2)
422+
heartbeatRetryPolicy.SetExpirationInterval(heartbeatInterval)
423+
419424
for {
420425
select {
421426
case <-ctx.Done():
422427
sessionEnv.CompleteSession(sessionID)
423428
return ctx.Err()
424429
case <-ticker.C:
425-
err := activityEnv.serviceInvoker.Heartbeat([]byte{})
430+
heartbeatOp := func() error {
431+
// here we skip the internal heartbeat batching, as otherwise the activity has only once chance
432+
// for heartbeating and if that failed, the entire session will get fail due to heartbeat timeout.
433+
// since the heartbeat interval is controlled by the session framework, we don't need to worry about
434+
// calling heartbeat too frequently and causing trouble for the sever. (note the min heartbeat timeout
435+
// is 1 sec.)
436+
return activityEnv.serviceInvoker.Heartbeat([]byte{}, true)
437+
}
438+
isRetryable := func(_ error) bool {
439+
// there will be two types of error here:
440+
// 1. transient errors like timeout, in which case we should not fail the session
441+
// 2. non-retryable errors like activity cancelled, activity not found or domain
442+
// not active. In those cases, the internal implementation will cancel the context,
443+
// so in the next iteration, ctx.Done() will be selected. Here we rely on the heartbeat
444+
// internal implementation to tell which error is non-retryable.
445+
select {
446+
case <-ctx.Done():
447+
return false
448+
default:
449+
return true
450+
}
451+
}
452+
err := backoff.Retry(ctx, heartbeatOp, heartbeatRetryPolicy, isRetryable)
426453
if err != nil {
427-
sessionEnv.CompleteSession(sessionID)
428-
return err
454+
GetActivityLogger(ctx).Info("session heartbeat failed", zap.Error(err), zap.String("sessionID", sessionID))
429455
}
430456
case <-doneCh:
431457
return nil

0 commit comments

Comments
 (0)