Skip to content

Commit 31738c2

Browse files
committed
Workflow: Rerun workflow instance form Event ID
Change extends the functionally with an rpc to enable rerunning a workflow from an existing _terminal_ workflow instance. See; dapr/proposals#80 Rerunning workflows has the given restrictions; - The source workflow instance _must_ be in a terminal state- i.e. is not running or suspended. It has to have been run before. - The target event ID must be an activity. The event ID must exist. - The target event ID input data can be overwritten. - The new instance ID can be supplied, or will be randomly generated as a UUID. - The source and target instance ID cannot be the same. The orginal history will not be overwritten/deleted/truncated. The new workflow cannot have any timers or raise events which are active at the time of starting. There is no technical resource why we cannot achieve this, but is a limitation of the existing implementation. A future change should see a refactor to enable this; likely moving timers and raise events to a new actor type and instantiation. To rerun a workflow, the source instance ID workflow actor will fork its history state to the point of the target event ID, optionally update its input, then send the new state to the target instance actor ID who will then write that state and run the target activity, as well as any activity which are in-progress at that point in time. Care is taken to ensure all currently active activities are executed if the target event ID is in a fanout scenario. ``` rerunWorkfolow -> source Actor ID [Fork State] -{state}-> target Actor ID ->>> call activities ``` The workflow and activities actor code has been refactored to multiple files for readability and maintainability. Signed-off-by: joshvanl <[email protected]>
1 parent 137da85 commit 31738c2

40 files changed

+3279
-1292
lines changed

go.mod

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ require (
3434
github.com/hashicorp/golang-lru/v2 v2.0.7
3535
github.com/hashicorp/raft v1.4.0
3636
github.com/hashicorp/raft-boltdb v0.0.0-20230125174641-2a8082862702
37-
github.com/jackc/pgx/v5 v5.5.5
37+
github.com/jackc/pgx/v5 v5.7.4
3838
github.com/jhump/protoreflect v1.15.3
3939
github.com/kelseyhightower/envconfig v1.4.0
4040
github.com/lestrrat-go/jwx/v2 v2.0.21
@@ -307,8 +307,8 @@ require (
307307
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf // indirect
308308
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa // indirect
309309
github.com/jackc/pgpassfile v1.0.0 // indirect
310-
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
311-
github.com/jackc/puddle/v2 v2.2.1 // indirect
310+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
311+
github.com/jackc/puddle/v2 v2.2.2 // indirect
312312
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
313313
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
314314
github.com/jcmturner/gofork v1.7.6 // indirect
@@ -518,3 +518,5 @@ replace (
518518
//
519519
// Then, run `make modtidy-all` in this repository.
520520
// This ensures that go.mod and go.sum are up-to-date for each go.mod file.
521+
522+
replace github.com/dapr/durabletask-go => github.com/joshvanl/durabletask-go v0.0.0-20250506134053-1cae3eb4b56b

go.sum

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -501,8 +501,6 @@ github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuA
501501
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
502502
github.com/dapr/components-contrib v1.15.1-0.20250522140356-026f99710af9 h1:SX/SuFgvpi+CG4SqiIecBIJc06RHRYzr5ImoTzPNkgI=
503503
github.com/dapr/components-contrib v1.15.1-0.20250522140356-026f99710af9/go.mod h1:biFaHakuo9uNY79zmiAQF6Z0BZvDNFSCkbWyD3lAycY=
504-
github.com/dapr/durabletask-go v0.6.5 h1:aWcxMfYudojpgRjJRdUr7yyZ7rGcvLtWXUuA4cGHBR0=
505-
github.com/dapr/durabletask-go v0.6.5/go.mod h1:nTZ5fCbJLnZbVdi6Z2YxdDF1OgQZL3LroogGuetrwuA=
506504
github.com/dapr/kit v0.15.3-0.20250522135818-baea6263991d h1:8/Qhy5T6mb49KipoHnWQaG+uQ5Cjo9/tRaL8MFojG+g=
507505
github.com/dapr/kit v0.15.3-0.20250522135818-baea6263991d/go.mod h1:6w2Pr38zOAtBn+ld/jknwI4kgMfwanCIcFVnPykdPZQ=
508506
github.com/dave/jennifer v1.4.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
@@ -1066,12 +1064,12 @@ github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa h1:s+4MhCQ6YrzisK6
10661064
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
10671065
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
10681066
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
1069-
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA=
1070-
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
1071-
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
1072-
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
1073-
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
1074-
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
1067+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
1068+
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
1069+
github.com/jackc/pgx/v5 v5.7.4 h1:9wKznZrhWa2QiHL+NjTSPP6yjl3451BX3imWDnokYlg=
1070+
github.com/jackc/pgx/v5 v5.7.4/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
1071+
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
1072+
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
10751073
github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk=
10761074
github.com/jawher/mow.cli v1.2.0/go.mod h1:y+pcA3jBAdo/GIZx/0rFjw/K2bVEODP9rfZOfaiq8Ko=
10771075
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
@@ -1109,6 +1107,8 @@ github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbd
11091107
github.com/jonboulle/clockwork v0.5.0/go.mod h1:3mZlmanh0g2NDKO5TWZVJAfofYk64M7XN3SzBPjZF60=
11101108
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
11111109
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
1110+
github.com/joshvanl/durabletask-go v0.0.0-20250506134053-1cae3eb4b56b h1:Shdwhif7Nr+1oJtrjEQJB+Tuaq2tLjiqBMtXaJGoKQM=
1111+
github.com/joshvanl/durabletask-go v0.0.0-20250506134053-1cae3eb4b56b/go.mod h1:JhMyDybRUFmmgieGxCPeg9e2cWwtx4LwNXjD+LBtKYk=
11121112
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
11131113
github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
11141114
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=

pkg/actors/router/router.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,18 +110,19 @@ func (r *router) Call(ctx context.Context, req *internalv1pb.InternalInvokeReque
110110
var res *internalv1pb.InternalInvokeResponse
111111
if r.resiliency.PolicyDefined(req.GetActor().GetActorType(), resiliency.ActorPolicy{}) {
112112
res, err = r.callActor(ctx, req)
113-
// Don't bubble perminant errors up to the caller to interfere with top level
114-
// retries.
115-
if _, ok := err.(*backoff.PermanentError); ok {
116-
err = errors.Unwrap(err)
117-
}
118113
} else {
119114
policyRunner := resiliency.NewRunner[*internalv1pb.InternalInvokeResponse](ctx, r.resiliency.BuiltInPolicy(resiliency.BuiltInActorNotFoundRetries))
120115
res, err = policyRunner(func(ctx context.Context) (*internalv1pb.InternalInvokeResponse, error) {
121116
return r.callActor(ctx, req)
122117
})
123118
}
124119

120+
// Don't bubble perminant errors up to the caller to interfere with top level
121+
// retries.
122+
if _, ok := err.(*backoff.PermanentError); ok {
123+
err = errors.Unwrap(err)
124+
}
125+
125126
return res, err
126127
}
127128

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package activity
15+
16+
import (
17+
"context"
18+
"errors"
19+
"time"
20+
21+
"github.com/dapr/dapr/pkg/actors"
22+
actorapi "github.com/dapr/dapr/pkg/actors/api"
23+
"github.com/dapr/dapr/pkg/actors/reminders"
24+
"github.com/dapr/dapr/pkg/actors/router"
25+
"github.com/dapr/dapr/pkg/actors/state"
26+
"github.com/dapr/dapr/pkg/actors/table"
27+
"github.com/dapr/dapr/pkg/actors/targets"
28+
internalsv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
29+
"github.com/dapr/dapr/pkg/runtime/wfengine/todo"
30+
"github.com/dapr/kit/logger"
31+
)
32+
33+
var log = logger.NewLogger("dapr.runtime.actors.targets.activity")
34+
35+
type activity struct {
36+
appID string
37+
actorID string
38+
actorType string
39+
workflowActorType string
40+
41+
table table.Interface
42+
router router.Interface
43+
state state.Interface
44+
reminders reminders.Interface
45+
46+
scheduler todo.ActivityScheduler
47+
reminderInterval time.Duration
48+
schedulerReminders bool
49+
}
50+
51+
type ActivityOptions struct {
52+
AppID string
53+
ActivityActorType string
54+
WorkflowActorType string
55+
ReminderInterval *time.Duration
56+
Scheduler todo.ActivityScheduler
57+
Actors actors.Interface
58+
SchedulerReminders bool
59+
}
60+
61+
func ActivityFactory(ctx context.Context, opts ActivityOptions) (targets.Factory, error) {
62+
table, err := opts.Actors.Table(ctx)
63+
if err != nil {
64+
return nil, err
65+
}
66+
67+
router, err := opts.Actors.Router(ctx)
68+
if err != nil {
69+
return nil, err
70+
}
71+
72+
state, err := opts.Actors.State(ctx)
73+
if err != nil {
74+
return nil, err
75+
}
76+
77+
reminders, err := opts.Actors.Reminders(ctx)
78+
if err != nil {
79+
return nil, err
80+
}
81+
82+
return func(actorID string) targets.Interface {
83+
reminderInterval := time.Minute * 1
84+
85+
if opts.ReminderInterval != nil {
86+
reminderInterval = *opts.ReminderInterval
87+
}
88+
89+
return &activity{
90+
appID: opts.AppID,
91+
actorID: actorID,
92+
actorType: opts.ActivityActorType,
93+
workflowActorType: opts.WorkflowActorType,
94+
reminderInterval: reminderInterval,
95+
table: table,
96+
router: router,
97+
state: state,
98+
reminders: reminders,
99+
scheduler: opts.Scheduler,
100+
schedulerReminders: opts.SchedulerReminders,
101+
}
102+
}, nil
103+
}
104+
105+
// InvokeMethod implements actors.InternalActor and schedules the background execution of a workflow activity.
106+
// Activities are scheduled by workflows and can execute for arbitrary lengths of time. Instead of executing
107+
// activity logic directly, InvokeMethod creates a reminder that executes the activity logic. InvokeMethod
108+
// returns immediately after creating the reminder, enabling the workflow to continue processing other events
109+
// in parallel.
110+
func (a *activity) InvokeMethod(ctx context.Context, req *internalsv1pb.InternalInvokeRequest) (*internalsv1pb.InternalInvokeResponse, error) {
111+
return a.handleInvoke(ctx, req)
112+
}
113+
114+
// InvokeReminder implements actors.InternalActor and executes the activity logic.
115+
func (a *activity) InvokeReminder(ctx context.Context, reminder *actorapi.Reminder) error {
116+
return a.handleReminder(ctx, reminder)
117+
}
118+
119+
// InvokeTimer implements actors.InternalActor
120+
func (a *activity) InvokeTimer(ctx context.Context, reminder *actorapi.Reminder) error {
121+
return errors.New("timers are not implemented")
122+
}
123+
124+
// DeactivateActor implements actors.InternalActor
125+
func (a *activity) Deactivate() error {
126+
log.Debugf("Activity actor '%s': deactivated", a.actorID)
127+
return nil
128+
}
129+
130+
func (a *activity) InvokeStream(context.Context, *internalsv1pb.InternalInvokeRequest, chan<- *internalsv1pb.InternalInvokeResponse) error {
131+
return errors.New("not implemented")
132+
}
133+
134+
// Key returns the key for this unique actor.
135+
func (a *activity) Key() string {
136+
return a.actorType + actorapi.DaprSeparator + a.actorID
137+
}
138+
139+
// Type returns the type of actor.
140+
func (a *activity) Type() string {
141+
return a.actorType
142+
}
143+
144+
// ID returns the ID of the actor.
145+
func (a *activity) ID() string {
146+
return a.actorID
147+
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package activity
15+
16+
import (
17+
"context"
18+
"errors"
19+
"fmt"
20+
"strings"
21+
"time"
22+
23+
"google.golang.org/protobuf/proto"
24+
25+
diag "github.com/dapr/dapr/pkg/diagnostics"
26+
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
27+
internalsv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
28+
wferrors "github.com/dapr/dapr/pkg/runtime/wfengine/errors"
29+
"github.com/dapr/dapr/pkg/runtime/wfengine/todo"
30+
"github.com/dapr/durabletask-go/api"
31+
"github.com/dapr/durabletask-go/backend"
32+
)
33+
34+
func (a *activity) executeActivity(ctx context.Context, name string, taskEvent *backend.HistoryEvent) (todo.RunCompleted, error) {
35+
activityName := ""
36+
if ts := taskEvent.GetTaskScheduled(); ts != nil {
37+
activityName = ts.GetName()
38+
} else {
39+
return todo.RunCompletedTrue, fmt.Errorf("invalid activity task event: '%s'", taskEvent.String())
40+
}
41+
42+
endIndex := strings.Index(a.actorID, "::")
43+
if endIndex < 0 {
44+
return todo.RunCompletedTrue, fmt.Errorf("invalid activity actor ID: '%s'", a.actorID)
45+
}
46+
workflowID := a.actorID[0:endIndex]
47+
48+
wi := &backend.ActivityWorkItem{
49+
SequenceNumber: int64(taskEvent.GetEventId()),
50+
InstanceID: api.InstanceID(workflowID),
51+
NewEvent: taskEvent,
52+
Properties: make(map[string]any),
53+
}
54+
55+
// Executing activity code is a one-way operation. We must wait for the app code to report its completion, which
56+
// will trigger this callback channel.
57+
// TODO: Need to come up with a design for timeouts. Some activities may need to run for hours but we also need
58+
// to handle the case where the app crashes and never responds to the workflow. It may be necessary to
59+
// introduce some kind of heartbeat protocol to help identify such cases.
60+
callback := make(chan bool, 1)
61+
wi.Properties[todo.CallbackChannelProperty] = callback
62+
log.Debugf("Activity actor '%s': scheduling activity '%s' for workflow with instanceId '%s'", a.actorID, name, wi.InstanceID)
63+
elapsed := float64(0)
64+
start := time.Now()
65+
err := a.scheduler(ctx, wi)
66+
elapsed = diag.ElapsedSince(start)
67+
68+
if errors.Is(err, context.DeadlineExceeded) {
69+
diag.DefaultWorkflowMonitoring.ActivityOperationEvent(ctx, activityName, diag.StatusRecoverable, elapsed)
70+
return todo.RunCompletedFalse, wferrors.NewRecoverable(fmt.Errorf("timed-out trying to schedule an activity execution - this can happen if too many activities are running in parallel or if the workflow engine isn't running: %w", err))
71+
} else if err != nil {
72+
diag.DefaultWorkflowMonitoring.ActivityOperationEvent(ctx, activityName, diag.StatusRecoverable, elapsed)
73+
return todo.RunCompletedFalse, wferrors.NewRecoverable(fmt.Errorf("failed to schedule an activity execution: %w", err))
74+
}
75+
diag.DefaultWorkflowMonitoring.ActivityOperationEvent(ctx, activityName, diag.StatusSuccess, elapsed)
76+
77+
// Activity execution started
78+
start = time.Now()
79+
executionStatus := ""
80+
elapsed = float64(0)
81+
// Record metrics on exit
82+
defer func() {
83+
if executionStatus != "" {
84+
diag.DefaultWorkflowMonitoring.ActivityExecutionEvent(ctx, activityName, executionStatus, elapsed)
85+
}
86+
}()
87+
88+
select {
89+
case <-ctx.Done():
90+
// Activity execution failed with recoverable error
91+
elapsed = diag.ElapsedSince(start)
92+
executionStatus = diag.StatusRecoverable
93+
return todo.RunCompletedFalse, ctx.Err() // will be retried
94+
case completed := <-callback:
95+
elapsed = diag.ElapsedSince(start)
96+
if !completed {
97+
// Activity execution failed with recoverable error
98+
executionStatus = diag.StatusRecoverable
99+
return todo.RunCompletedFalse, wferrors.NewRecoverable(todo.ErrExecutionAborted) // AbandonActivityWorkItem was called
100+
}
101+
}
102+
log.Debugf("Activity actor '%s': activity completed for workflow with instanceId '%s' activityName '%s'", a.actorID, wi.InstanceID, name)
103+
104+
// publish the result back to the workflow actor as a new event to be processed
105+
resultData, err := proto.Marshal(wi.Result)
106+
if err != nil {
107+
// Returning non-recoverable error
108+
executionStatus = diag.StatusFailed
109+
return todo.RunCompletedTrue, err
110+
}
111+
112+
req := internalsv1pb.
113+
NewInternalInvokeRequest(todo.AddWorkflowEventMethod).
114+
WithActor(a.workflowActorType, workflowID).
115+
WithData(resultData).
116+
WithContentType(invokev1.ProtobufContentType)
117+
118+
_, err = a.router.Call(ctx, req)
119+
switch {
120+
case err != nil:
121+
// Returning recoverable error, record metrics
122+
executionStatus = diag.StatusRecoverable
123+
return todo.RunCompletedFalse, wferrors.NewRecoverable(fmt.Errorf("failed to invoke '%s' method on workflow actor: %w", todo.AddWorkflowEventMethod, err))
124+
case wi.Result.GetTaskCompleted() != nil:
125+
// Activity execution completed successfully
126+
executionStatus = diag.StatusSuccess
127+
case wi.Result.GetTaskFailed() != nil:
128+
// Activity execution failed
129+
executionStatus = diag.StatusFailed
130+
}
131+
return todo.RunCompletedTrue, nil
132+
}

0 commit comments

Comments
 (0)