Skip to content

Commit ca54143

Browse files
authored
Merge pull request #33 from acroca/multiapp-suborquestration-wip
Support multi-app suborquestration
2 parents 20fd758 + 7476f21 commit ca54143

File tree

3 files changed

+65
-6
lines changed

3 files changed

+65
-6
lines changed

backend/runtimestate/runtimestate.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/dapr/durabletask-go/api"
1313
"github.com/dapr/durabletask-go/api/helpers"
1414
"github.com/dapr/durabletask-go/api/protos"
15+
"github.com/dapr/kit/ptr"
1516
)
1617

1718
var ErrDuplicateEvent = errors.New("duplicate event")
@@ -140,8 +141,35 @@ func ApplyActions(s *protos.OrchestrationRuntimeState, customStatus *wrapperspb.
140141
Router: action.Router,
141142
})
142143
if s.StartEvent.GetParentInstance() != nil {
144+
// Create a router for the completion event that routes back to the parent
145+
var completionRouter *protos.TaskRouter
146+
if action.Router != nil {
147+
var parentAppID *string
148+
149+
allEvents := append(s.OldEvents, s.NewEvents...)
150+
for _, event := range allEvents {
151+
if es := event.GetExecutionStarted(); es != nil && event.GetRouter() != nil {
152+
parentAppID = ptr.Of(event.GetRouter().GetSource())
153+
break
154+
}
155+
}
156+
157+
if parentAppID != nil {
158+
completionRouter = &protos.TaskRouter{
159+
Source: action.Router.Source,
160+
Target: parentAppID,
161+
}
162+
} else {
163+
completionRouter = action.Router
164+
}
165+
}
166+
143167
msg := &protos.OrchestrationRuntimeStateMessage{
144-
HistoryEvent: &protos.HistoryEvent{EventId: -1, Timestamp: timestamppb.Now()},
168+
HistoryEvent: &protos.HistoryEvent{
169+
EventId: -1,
170+
Timestamp: timestamppb.Now(),
171+
Router: completionRouter,
172+
},
145173
TargetInstanceID: s.StartEvent.GetParentInstance().OrchestrationInstance.InstanceId,
146174
}
147175
if completedAction.OrchestrationStatus == protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED {

task/activity.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (policy *RetryPolicy) Validate() error {
5959
return nil
6060
}
6161

62-
func WithAppID(targetAppID string) callActivityOption {
62+
func WithActivityAppID(targetAppID string) callActivityOption {
6363
return func(opt *callActivityOptions) error {
6464
opt.targetAppID = &targetAppID
6565
return nil

task/orchestrator.go

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ type OrchestrationContext struct {
5252

5353
// callSubOrchestratorOptions is a struct that holds the options for the CallSubOrchestrator orchestrator method.
5454
type callSubOrchestratorOptions struct {
55-
instanceID string
56-
rawInput *wrapperspb.StringValue
57-
55+
instanceID string
56+
rawInput *wrapperspb.StringValue
57+
targetAppID *string
5858
retryPolicy *RetryPolicy
5959
}
6060

@@ -64,6 +64,14 @@ type subOrchestratorOption func(*callSubOrchestratorOptions) error
6464
// ContinueAsNewOption is a functional option type for the ContinueAsNew orchestrator method.
6565
type ContinueAsNewOption func(*OrchestrationContext)
6666

67+
// WithSubOrchestratorAppID is a functional option type for the CallSubOrchestrator orchestrator method that specifies the app ID of the target activity.
68+
func WithSubOrchestratorAppID(appID string) subOrchestratorOption {
69+
return func(opts *callSubOrchestratorOptions) error {
70+
opts.targetAppID = &appID
71+
return nil
72+
}
73+
}
74+
6775
// WithKeepUnprocessedEvents returns a ContinueAsNewOptions struct that instructs the
6876
// runtime to carry forward any unprocessed external events to the new instance.
6977
func WithKeepUnprocessedEvents() ContinueAsNewOption {
@@ -205,7 +213,14 @@ func (ctx *OrchestrationContext) processEvent(e *backend.HistoryEvent) error {
205213
} else if es := e.GetExecutionStarted(); es != nil {
206214
// Extract source AppID from HistoryEvent Router if this is ExecutionStartedEvent
207215
if e.GetRouter() != nil {
208-
ctx.appID = ptr.Of(e.GetRouter().GetSource())
216+
router := e.GetRouter()
217+
// For cross-app suborchestrations, if we have a target, use that as our appID
218+
// since that's where we're actually executing
219+
if router.Target != nil {
220+
ctx.appID = ptr.Of(router.GetTarget())
221+
} else {
222+
ctx.appID = ptr.Of(router.GetSource())
223+
}
209224
}
210225
err = ctx.onExecutionStarted(es)
211226
} else if ts := e.GetTaskScheduled(); ts != nil {
@@ -338,6 +353,15 @@ func (ctx *OrchestrationContext) internalCallSubOrchestrator(orchestratorName st
338353
},
339354
},
340355
}
356+
if ctx.appID != nil {
357+
createSubOrchestrationAction.Router = &protos.TaskRouter{
358+
Source: *ctx.appID,
359+
}
360+
361+
if options.targetAppID != nil {
362+
createSubOrchestrationAction.Router.Target = options.targetAppID
363+
}
364+
}
341365
ctx.pendingActions[createSubOrchestrationAction.Id] = createSubOrchestrationAction
342366

343367
task := newTask(ctx)
@@ -758,6 +782,13 @@ func (ctx *OrchestrationContext) setCompleteInternal(
758782
},
759783
},
760784
}
785+
786+
if ctx.appID != nil {
787+
completedAction.Router = &protos.TaskRouter{
788+
Source: *ctx.appID,
789+
}
790+
}
791+
761792
ctx.pendingActions[sequenceNumber] = completedAction
762793
return nil
763794
}

0 commit comments

Comments
 (0)