Skip to content

Commit 0356cc6

Browse files
Add context propagation on child workflow and continue as new (#769)
* Add header propagation to continueasnew * Add context propagation on ExecuteChildWorkflow * Add some tests
1 parent 1a633af commit 0356cc6

File tree

4 files changed

+27
-1
lines changed

4 files changed

+27
-1
lines changed

internal/error.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ func NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) *C
232232
workflowOptions: *options,
233233
workflowType: workflowType,
234234
input: input,
235+
header: getWorkflowHeader(ctx, options.contextPropagators),
235236
}
236237
return &ContinueAsNewError{wfn: wfn, args: args, params: params}
237238
}

internal/error_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,14 @@ func Test_ContinueAsNewError(t *testing.T) {
405405
Name: continueAsNewWfName,
406406
})
407407

408-
s := &WorkflowTestSuite{}
408+
header := &shared.Header{
409+
Fields: map[string][]byte{"test": []byte("test-data")},
410+
}
411+
412+
s := &WorkflowTestSuite{
413+
header: header,
414+
ctxProps: []ContextPropagator{NewStringMapPropagator([]string{"test"})},
415+
}
409416
wfEnv := s.NewTestWorkflowEnvironment()
410417
wfEnv.ExecuteWorkflow(continueAsNewWorkflowFn, 101, "another random string")
411418
err := wfEnv.GetWorkflowError()
@@ -422,4 +429,5 @@ func Test_ContinueAsNewError(t *testing.T) {
422429
stringArg, ok := args[1].(string)
423430
require.True(t, ok)
424431
require.Equal(t, a2, stringArg)
432+
require.Equal(t, header, continueAsNewErr.params.header)
425433
}

internal/internal_workflow_testsuite_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ func (s *WorkflowTestSuiteUnitTest) SetupSuite() {
5555
s.localActivityOptions = LocalActivityOptions{
5656
ScheduleToCloseTimeout: time.Second * 3,
5757
}
58+
s.header = &shared.Header{
59+
Fields: map[string][]byte{"test": []byte("test-data")},
60+
}
61+
s.ctxProps = []ContextPropagator{NewStringMapPropagator([]string{"test"})}
5862
RegisterWorkflowWithOptions(testWorkflowHello, RegisterWorkflowOptions{Name: "testWorkflowHello"})
5963
RegisterWorkflow(testWorkflowContext)
6064
RegisterWorkflow(testWorkflowHeartbeat)

internal/workflow.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"time"
2828

2929
"github.com/uber-go/tally"
30+
s "go.uber.org/cadence/.gen/go/shared"
3031
"go.uber.org/cadence/encoded"
3132
"go.uber.org/cadence/internal/common"
3233
"go.uber.org/zap"
@@ -572,6 +573,7 @@ func ExecuteChildWorkflow(ctx Context, childWorkflow interface{}, args ...interf
572573
workflowOptions: *options,
573574
input: input,
574575
workflowType: wfType,
576+
header: getWorkflowHeader(ctx, options.contextPropagators),
575577
scheduledTime: Now(ctx), /* this is needed for test framework, and is not send to server */
576578
}
577579

@@ -615,6 +617,17 @@ func ExecuteChildWorkflow(ctx Context, childWorkflow interface{}, args ...interf
615617
return result
616618
}
617619

620+
func getWorkflowHeader(ctx Context, ctxProps []ContextPropagator) *s.Header {
621+
header := &s.Header{
622+
Fields: make(map[string][]byte),
623+
}
624+
writer := NewHeaderWriter(header)
625+
for _, ctxProp := range ctxProps {
626+
ctxProp.InjectFromWorkflow(ctx, writer)
627+
}
628+
return header
629+
}
630+
618631
// WorkflowInfo information about currently executing workflow
619632
type WorkflowInfo struct {
620633
WorkflowExecution WorkflowExecution

0 commit comments

Comments
 (0)