Skip to content

Commit 781a033

Browse files
authored
Add missing info population in the activity info. (#936) (#937)
1 parent 9ebe8ea commit 781a033

File tree

5 files changed

+21
-6
lines changed

5 files changed

+21
-6
lines changed

internal/internal_task_pollers.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,12 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
464464
rootCtx = context.Background()
465465
}
466466

467+
workflowTypeLocal := task.params.WorkflowInfo.WorkflowType
468+
467469
ctx := context.WithValue(rootCtx, activityEnvContextKey, &activityEnvironment{
470+
workflowType: &workflowTypeLocal,
471+
workflowDomain: task.params.WorkflowInfo.Domain,
472+
taskList: task.params.WorkflowInfo.TaskListName,
468473
activityType: ActivityType{Name: activityType},
469474
activityID: fmt.Sprintf("%v", task.activityID),
470475
workflowExecution: task.params.WorkflowInfo.WorkflowExecution,
@@ -505,6 +510,7 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
505510
// this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout
506511
deadline = task.expireTime
507512
}
513+
508514
ctx, cancel := context.WithDeadline(ctx, deadline)
509515
task.Lock()
510516
if task.canceled {

internal/internal_workflow_testsuite.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ func (env *testWorkflowEnvironmentImpl) executeActivity(
537537
func (env *testWorkflowEnvironmentImpl) executeLocalActivity(
538538
activityFn interface{},
539539
args ...interface{},
540-
) (Value, error) {
540+
) (val Value, result *localActivityResult, err error) {
541541
params := executeLocalActivityParams{
542542
localActivityOptions: localActivityOptions{
543543
ScheduleToCloseTimeoutSeconds: common.Int32Ceil(env.testTimeout.Seconds()),
@@ -559,11 +559,11 @@ func (env *testWorkflowEnvironmentImpl) executeLocalActivity(
559559
tracer: opentracing.NoopTracer{},
560560
}
561561

562-
result := taskHandler.executeLocalActivityTask(task)
562+
result = taskHandler.executeLocalActivityTask(task)
563563
if result.err != nil {
564-
return nil, result.err
564+
return nil, nil, result.err
565565
}
566-
return newEncodedValue(result.result, env.GetDataConverter()), nil
566+
return newEncodedValue(result.result, env.GetDataConverter()), result, nil
567567
}
568568

569569
func (env *testWorkflowEnvironmentImpl) startDecisionTask() {

internal/internal_workflow_testsuite_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1616,8 +1616,11 @@ func (s *WorkflowTestSuiteUnitTest) Test_LocalActivity() {
16161616
}
16171617

16181618
env := s.NewTestActivityEnvironment()
1619-
result, err := env.ExecuteLocalActivity(localActivityFn, "local_activity")
1619+
result, localActivity, err := env.ExecuteLocalActivity(localActivityFn, "local_activity")
16201620
s.NoError(err)
1621+
s.Equal(WorkflowType{Name: workflowTypeNotSpecified}, localActivity.task.params.WorkflowInfo.WorkflowType)
1622+
s.Equal(defaultTestDomain, localActivity.task.params.WorkflowInfo.Domain)
1623+
s.Equal(defaultTestTaskList, localActivity.task.params.WorkflowInfo.TaskListName)
16211624
var laResult string
16221625
err = result.Get(&laResult)
16231626
s.NoError(err)

internal/workflow_testsuite.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func (t *TestActivityEnvironment) ExecuteActivity(activityFn interface{}, args .
159159

160160
// ExecuteLocalActivity executes a local activity. The tested activity will be executed synchronously in the calling goroutinue.
161161
// Caller should use Value.Get() to extract strong typed result value.
162-
func (t *TestActivityEnvironment) ExecuteLocalActivity(activityFn interface{}, args ...interface{}) (Value, error) {
162+
func (t *TestActivityEnvironment) ExecuteLocalActivity(activityFn interface{}, args ...interface{}) (val Value, result *localActivityResult, err error) {
163163
return t.impl.executeLocalActivity(activityFn, args...)
164164
}
165165

test/workflow_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,12 @@ func (w *Workflows) ConsistentQueryWorkflow(ctx workflow.Context, delay time.Dur
427427
laCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
428428
ScheduleToCloseTimeout: 5 * time.Second,
429429
})
430+
431+
workflowInfo := internal.GetWorkflowInfo(laCtx)
432+
if &workflowInfo.WorkflowType == nil {
433+
return errors.New("failed to get work flow type")
434+
}
435+
430436
workflow.ExecuteLocalActivity(laCtx, LocalSleep, delay).Get(laCtx, nil)
431437
queryResult = signalData
432438
return nil

0 commit comments

Comments
 (0)