Skip to content

Commit 1a633af

Browse files
authored
Session: inline short timeout completion activity (#770)
1 parent 0061628 commit 1a633af

File tree

2 files changed

+54
-23
lines changed

2 files changed

+54
-23
lines changed

internal/session.go

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -216,35 +216,25 @@ func CompleteSession(ctx Context) {
216216
return
217217
}
218218

219-
retryPolicy := &RetryPolicy{
220-
InitialInterval: time.Second,
221-
BackoffCoefficient: 1.1,
222-
MaximumInterval: time.Second * 10,
223-
MaximumAttempts: 5,
224-
}
225-
ao := ActivityOptions{
226-
ScheduleToStartTimeout: time.Second * 10,
227-
StartToCloseTimeout: time.Second * 10,
228-
RetryPolicy: retryPolicy,
229-
}
230-
231219
// first cancel both the creation activity and all user activities
232220
// this will cancel the ctx passed into this function
233221
sessionInfo.sessionCancelFunc()
234222

235223
// then execute then completion activity using the completionCtx, which is not cancelled.
236-
completionCtx := WithActivityOptions(sessionInfo.completionCtx, ao)
237-
Go(completionCtx, func(completionCtx Context) {
238-
// even though the creation activity has been cancelled, the session worker doesn't know. The worker will wait until
239-
// next heartbeat to figure out that the workflow is completed and then release the resource. We need to make sure the
240-
// completion activity is executed before the workflow exits.
241-
// run the activity in another coroutine so it won't block user workflow.
242-
// the tasklist will be overrided to use the one stored in sessionInfo.
243-
err := ExecuteActivity(completionCtx, sessionCompletionActivityName, sessionInfo.SessionID).Get(completionCtx, nil)
244-
if err != nil {
245-
GetLogger(completionCtx).Error("Complete session activity failed", zap.Error(err))
246-
}
224+
completionCtx := WithActivityOptions(sessionInfo.completionCtx, ActivityOptions{
225+
ScheduleToStartTimeout: time.Second * 3,
226+
StartToCloseTimeout: time.Second * 3,
247227
})
228+
229+
// even though the creation activity has been cancelled, the session worker doesn't know. The worker will wait until
230+
// next heartbeat to figure out that the workflow is completed and then release the resource. We need to make sure the
231+
// completion activity is executed before the workflow exits.
232+
// the tasklist will be overrided to use the one stored in sessionInfo.
233+
err := ExecuteActivity(completionCtx, sessionCompletionActivityName, sessionInfo.SessionID).Get(completionCtx, nil)
234+
if err != nil {
235+
GetLogger(completionCtx).Warn("Complete session activity failed", zap.Error(err))
236+
}
237+
248238
sessionInfo.sessionState = sessionStateClosed
249239
getWorkflowEnvironment(ctx).RemoveSession(sessionInfo.SessionID)
250240
GetLogger(ctx).Debug("Completed session", zap.String("sessionID", sessionInfo.SessionID))

internal/session_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"testing"
2727
"time"
2828

29+
"github.com/stretchr/testify/mock"
30+
2931
"github.com/stretchr/testify/require"
3032
"github.com/stretchr/testify/suite"
3133
"go.uber.org/cadence/encoded"
@@ -506,6 +508,45 @@ func (s *SessionTestSuite) TestSessionRecreateToken() {
506508
s.Equal(testTasklist, params.Tasklist)
507509
}
508510

511+
func (s *SessionTestSuite) TestInvalidRecreateToken() {
512+
token := []byte("some invalid token")
513+
sessionCtx, err := RecreateSession(Background(), token, s.sessionOptions)
514+
s.Error(err)
515+
s.Nil(sessionCtx)
516+
}
517+
518+
func (s *SessionTestSuite) TestCompletionFailed() {
519+
workflowFn := func(ctx Context) error {
520+
ao := ActivityOptions{
521+
ScheduleToStartTimeout: time.Minute,
522+
StartToCloseTimeout: time.Minute,
523+
HeartbeatTimeout: time.Second * 20,
524+
}
525+
ctx = WithActivityOptions(ctx, ao)
526+
sessionCtx, err := CreateSession(ctx, s.sessionOptions)
527+
if err != nil {
528+
return err
529+
}
530+
531+
CompleteSession(sessionCtx)
532+
533+
info := GetSessionInfo(sessionCtx)
534+
if info == nil || info.sessionState != sessionStateClosed {
535+
return errors.New("session state should be closed after completion even when completion activity failed")
536+
}
537+
return nil
538+
}
539+
540+
RegisterWorkflow(workflowFn)
541+
env := s.NewTestWorkflowEnvironment()
542+
env.OnActivity(sessionCompletionActivityName, mock.Anything, mock.Anything).Return(errors.New("some random error")).Once()
543+
env.ExecuteWorkflow(workflowFn)
544+
545+
env.AssertExpectations(s.T())
546+
s.True(env.IsWorkflowCompleted())
547+
s.NoError(env.GetWorkflowError())
548+
}
549+
509550
func (s *SessionTestSuite) createSessionWithoutRetry(ctx Context) (Context, error) {
510551
options := getActivityOptions(ctx)
511552
baseTasklist := options.TaskListName

0 commit comments

Comments
 (0)