Skip to content

Commit 389e76e

Browse files
committed
Adds dapr workflow
``` Workflow management commands. Use -k to target a Kubernetes Dapr cluster. Usage: dapr workflow [command] Aliases: workflow, work Available Commands: history Get the history of a workflow instance. list List workflows for the given app ID. purge Purge one or more workflow instances with a terminal state. Accepts a workflow instance ID argument or flags to purge multiple/all terminal instances. raise-event Raise an event for a workflow waiting for an external event. Expects a single argument '<instance-id>/<event-name>'. rerun ReRun a workflow instance from the beginning or a specific event. Optionally, a new instance ID and input to the starting event can be provided. resume Resume a workflow that is suspended. run Run a workflow instance based on a given workflow name. Accepts a single argument, the workflow name. suspend Suspend a workflow in progress. terminate Terminate a workflow in progress. Flags: -a, --app-id string The app ID owner of the workflow instance -h, --help help for workflow -k, --kubernetes Target a Kubernetes dapr installation -n, --namespace string Namespace to perform workflow operation on (default "default") Global Flags: --log-as-json Log output in JSON format --runtime-path string The path to the dapr runtime installation directory Get the history of a workflow instance. Usage: dapr workflow history [flags] Flags: -h, --help help for history -o, --output string Output format. One of short, wide, yaml, json (default "short") Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory List workflows for the given app ID. Usage: dapr workflow list [flags] Aliases: list, ls Flags: -c, --connection-string string The connection string used to connect and authenticate to the actor state store -m, --filter-max-age string Filter only the workflows started within the given duration or timestamp. Examples: 300ms, 1.5h or 2h45m, 2023-01-02T15:04:05 or 2023-01-02 -w, --filter-name string Filter only the workflows with the given name -s, --filter-status string Filter only the workflows with the given runtime status. One of RUNNING, COMPLETED, CONTINUED_AS_NEW, FAILED, CANCELED, TERMINATED, PENDING, SUSPENDED -h, --help help for list -o, --output string Output format. One of short, wide, yaml, json (default "short") -t, --table-name string The name of the table or collection which is used as the actor state store Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory Purge one or more workflow instances with a terminal state. Accepts a workflow instance ID argument or flags to purge multiple/all terminal instances. Usage: dapr workflow purge [flags] Flags: --all Purge all workflow instances in a terminal state. Use with caution. --all-older-than string Purge workflow instances older than the specified Go duration or timestamp, e.g., '24h' or '2023-01-02T15:04:05Z'. -c, --connection-string string The connection string used to connect and authenticate to the actor state store -h, --help help for purge -t, --table-name string The name of the table or collection which is used as the actor state store Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory Raise an event for a workflow waiting for an external event. Expects a single argument '<instance-id>/<event-name>'. Usage: dapr workflow raise-event [flags] Flags: -h, --help help for raise-event -x, --input string Optional input data for the new workflow instance. Accepts a JSON string. Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory ReRun a workflow instance from the beginning or a specific event. Optionally, a new instance ID and input to the starting event can be provided. Usage: dapr workflow rerun [instance ID] [flags] Flags: -e, --event-id uint32 The event ID from which to re-run the workflow. If not provided, the workflow will re-run from the beginning. -h, --help help for rerun -x, --input string Optional input data for the new workflow instance. Accepts a JSON string. --new-instance-id string Optional new ID for the re-run workflow instance. If not provided, a new ID will be generated. Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory Resume a workflow that is suspended. Usage: dapr workflow resume [flags] Flags: -h, --help help for resume -r, --reason string Reason for resuming the workflow Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory Run a workflow instance based on a given workflow name. Accepts a single argument, the workflow name. Usage: dapr workflow run [flags] Flags: -h, --help help for run -x, --input string Optional input data for the new workflow instance. Accepts a JSON string. -i, --instance-id string The target workflow instance ID. -s, --start-time string Optional start time for the workflow in RFC3339 or Go duration string format. If not provided, the workflow starts immediately. A duration of '0s', or any start time, will cause the command to not wait for the command to start Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory Suspend a workflow in progress. Usage: dapr workflow suspend [flags] Flags: -h, --help help for suspend -r, --reason string Reason for resuming the workflow Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory Terminate a workflow in progress. Usage: dapr workflow terminate [flags] Flags: -h, --help help for terminate -o, --output string Optional output data for the workflow in JSON string format. Global Flags: -a, --app-id string The app ID owner of the workflow instance -k, --kubernetes Target a Kubernetes dapr installation --log-as-json Log output in JSON format -n, --namespace string Namespace to perform workflow operation on (default "default") --runtime-path string The path to the dapr runtime installation directory ``` Signed-off-by: joshvanl <[email protected]> Fix scheduler & placement host routing in CI Signed-off-by: joshvanl <[email protected]> Fix unit tests Signed-off-by: joshvanl <[email protected]> Skip workflow tests in slim mode Signed-off-by: joshvanl <[email protected]> fix error string Signed-off-by: joshvanl <[email protected]> lint Signed-off-by: joshvanl <[email protected]> Also delete scheduler jobs during workflow purge Signed-off-by: joshvanl <[email protected]> Fix query string Signed-off-by: joshvanl <[email protected]> lint Signed-off-by: joshvanl <[email protected]> Fix tests Signed-off-by: joshvanl <[email protected]> Increase timeout for CI run Signed-off-by: joshvanl <[email protected]> Increase workflow timelimit Signed-off-by: joshvanl <[email protected]> not empty Signed-off-by: joshvanl <[email protected]> Adds sleep Signed-off-by: joshvanl <[email protected]> Expand list test to include not included Signed-off-by: joshvanl <[email protected]> Remove unrelated test Signed-off-by: joshvanl <[email protected]> Review comments Signed-off-by: joshvanl <[email protected]>
1 parent d6e4dbc commit 389e76e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+4165
-106
lines changed

.github/workflows/self_hosted_e2e.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ jobs:
131131
shell: bash
132132
- name: Set the test timeout - MacOS
133133
if: matrix.os == 'macos-14-large'
134-
run: echo "E2E_SH_TEST_TIMEOUT=30m" >> $GITHUB_ENV
134+
run: echo "E2E_SH_TEST_TIMEOUT=40m" >> $GITHUB_ENV
135135
- name: Run E2E tests with GHCR
136136
# runs every 6hrs
137137
if: github.event.schedule == '0 */6 * * *'

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ TEST_OUTPUT_FILE ?= test_output.json
7474

7575
# Set the default timeout for tests to 10 minutes
7676
ifndef E2E_SH_TEST_TIMEOUT
77-
override E2E_SH_TEST_TIMEOUT := 30m
77+
override E2E_SH_TEST_TIMEOUT := 40m
7878
endif
7979

8080
# Use the variable H to add a header (equivalent to =>) to informational output

cmd/dapr.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/spf13/viper"
2323

2424
"github.com/dapr/cli/cmd/scheduler"
25+
"github.com/dapr/cli/cmd/workflow"
2526
"github.com/dapr/cli/pkg/api"
2627
"github.com/dapr/cli/pkg/print"
2728
"github.com/dapr/cli/pkg/standalone"
@@ -111,4 +112,5 @@ func init() {
111112
RootCmd.PersistentFlags().BoolVarP(&logAsJSON, "log-as-json", "", false, "Log output in JSON format")
112113

113114
RootCmd.AddCommand(scheduler.SchedulerCmd)
115+
RootCmd.AddCommand(workflow.WorkflowCmd)
114116
}

cmd/workflow/history.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package workflow
15+
16+
import (
17+
"os"
18+
19+
"github.com/gocarina/gocsv"
20+
"github.com/spf13/cobra"
21+
22+
"github.com/dapr/cli/pkg/workflow"
23+
"github.com/dapr/cli/utils"
24+
"github.com/dapr/kit/signals"
25+
)
26+
27+
var (
28+
historyOutputFormat *string
29+
)
30+
31+
var HistoryCmd = &cobra.Command{
32+
Use: "history",
33+
Short: "Get the history of a workflow instance.",
34+
Args: cobra.ExactArgs(1),
35+
RunE: func(cmd *cobra.Command, args []string) error {
36+
ctx := signals.Context()
37+
38+
appID, err := getWorkflowAppID(cmd)
39+
if err != nil {
40+
return err
41+
}
42+
43+
opts := workflow.HistoryOptions{
44+
KubernetesMode: flagKubernetesMode,
45+
Namespace: flagDaprNamespace,
46+
AppID: appID,
47+
InstanceID: args[0],
48+
}
49+
50+
var list any
51+
if *historyOutputFormat == outputFormatShort {
52+
list, err = workflow.HistoryShort(ctx, opts)
53+
} else {
54+
list, err = workflow.HistoryWide(ctx, opts)
55+
}
56+
if err != nil {
57+
return err
58+
}
59+
60+
switch *historyOutputFormat {
61+
case outputFormatYAML:
62+
err = utils.PrintDetail(os.Stdout, "yaml", list)
63+
case outputFormatJSON:
64+
err = utils.PrintDetail(os.Stdout, "json", list)
65+
default:
66+
var table string
67+
table, err = gocsv.MarshalString(list)
68+
if err != nil {
69+
break
70+
}
71+
72+
utils.PrintTable(table)
73+
}
74+
if err != nil {
75+
return err
76+
}
77+
78+
return nil
79+
},
80+
}
81+
82+
func init() {
83+
historyOutputFormat = outputFunc(HistoryCmd)
84+
WorkflowCmd.AddCommand(HistoryCmd)
85+
}

cmd/workflow/list.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package workflow
15+
16+
import (
17+
"os"
18+
19+
"github.com/gocarina/gocsv"
20+
"github.com/spf13/cobra"
21+
22+
"github.com/dapr/cli/pkg/print"
23+
"github.com/dapr/cli/pkg/workflow"
24+
"github.com/dapr/cli/utils"
25+
"github.com/dapr/kit/signals"
26+
)
27+
28+
var (
29+
listFilter *workflow.Filter
30+
listOutputFormat *string
31+
32+
listConn *connFlag
33+
)
34+
35+
var ListCmd = &cobra.Command{
36+
Use: "list",
37+
Aliases: []string{"ls"},
38+
Short: "List workflows for the given app ID.",
39+
Args: cobra.NoArgs,
40+
RunE: func(cmd *cobra.Command, args []string) error {
41+
ctx := signals.Context()
42+
43+
appID, err := getWorkflowAppID(cmd)
44+
if err != nil {
45+
return err
46+
}
47+
48+
opts := workflow.ListOptions{
49+
KubernetesMode: flagKubernetesMode,
50+
Namespace: flagDaprNamespace,
51+
AppID: appID,
52+
ConnectionString: listConn.connectionString,
53+
TableName: listConn.tableName,
54+
Filter: *listFilter,
55+
}
56+
57+
var list any
58+
var empty bool
59+
60+
switch *listOutputFormat {
61+
case outputFormatShort:
62+
var ll []*workflow.ListOutputShort
63+
ll, err = workflow.ListShort(ctx, opts)
64+
if err != nil {
65+
return err
66+
}
67+
empty = len(ll) == 0
68+
list = ll
69+
70+
default:
71+
var ll []*workflow.ListOutputWide
72+
ll, err = workflow.ListWide(ctx, opts)
73+
if err != nil {
74+
return err
75+
}
76+
empty = len(ll) == 0
77+
list = ll
78+
}
79+
80+
if empty {
81+
print.FailureStatusEvent(os.Stderr, "No workflow found in namespace %q for app ID %q", flagDaprNamespace, appID)
82+
return nil
83+
}
84+
85+
switch *listOutputFormat {
86+
case outputFormatYAML:
87+
err = utils.PrintDetail(os.Stdout, "yaml", list)
88+
case outputFormatJSON:
89+
err = utils.PrintDetail(os.Stdout, "json", list)
90+
default:
91+
var table string
92+
table, err = gocsv.MarshalString(list)
93+
if err != nil {
94+
break
95+
}
96+
97+
utils.PrintTable(table)
98+
}
99+
100+
if err != nil {
101+
return err
102+
}
103+
104+
return nil
105+
},
106+
}
107+
108+
func init() {
109+
listFilter = filterCmd(ListCmd)
110+
listOutputFormat = outputFunc(ListCmd)
111+
listConn = connectionCmd(ListCmd)
112+
WorkflowCmd.AddCommand(ListCmd)
113+
}

cmd/workflow/purge.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package workflow
15+
16+
import (
17+
"errors"
18+
19+
"github.com/dapr/cli/pkg/workflow"
20+
"github.com/dapr/kit/signals"
21+
"github.com/spf13/cobra"
22+
)
23+
24+
var (
25+
flagPurgeOlderThan string
26+
flagPurgeAll bool
27+
flagPurgeConn *connFlag
28+
schedulerNamespace string
29+
)
30+
31+
var PurgeCmd = &cobra.Command{
32+
Use: "purge",
33+
Short: "Purge one or more workflow instances with a terminal state. Accepts a workflow instance ID argument or flags to purge multiple/all terminal instances. Also deletes all associated scheduler jobs.",
34+
Args: func(cmd *cobra.Command, args []string) error {
35+
switch {
36+
case cmd.Flags().Changed("all-older-than"),
37+
cmd.Flags().Changed("all"):
38+
if len(args) > 0 {
39+
return errors.New("no arguments are accepted when using purge all flags")
40+
}
41+
default:
42+
if len(args) == 0 {
43+
return errors.New("one or more workflow instance ID arguments are required when not using purge all flags")
44+
}
45+
}
46+
47+
return nil
48+
},
49+
RunE: func(cmd *cobra.Command, args []string) error {
50+
ctx := signals.Context()
51+
52+
appID, err := getWorkflowAppID(cmd)
53+
if err != nil {
54+
return err
55+
}
56+
57+
opts := workflow.PurgeOptions{
58+
KubernetesMode: flagKubernetesMode,
59+
Namespace: flagDaprNamespace,
60+
SchedulerNamespace: schedulerNamespace,
61+
AppID: appID,
62+
InstanceIDs: args,
63+
All: flagPurgeAll,
64+
ConnectionString: flagPurgeConn.connectionString,
65+
TableName: flagPurgeConn.tableName,
66+
}
67+
68+
if cmd.Flags().Changed("all-older-than") {
69+
opts.AllOlderThan, err = parseWorkflowDurationTimestamp(flagPurgeOlderThan, true)
70+
if err != nil {
71+
return err
72+
}
73+
}
74+
75+
return workflow.Purge(ctx, opts)
76+
},
77+
}
78+
79+
func init() {
80+
PurgeCmd.Flags().StringVar(&flagPurgeOlderThan, "all-older-than", "", "Purge workflow instances older than the specified Go duration or timestamp, e.g., '24h' or '2023-01-02T15:04:05Z'.")
81+
PurgeCmd.Flags().BoolVar(&flagPurgeAll, "all", false, "Purge all workflow instances in a terminal state. Use with caution.")
82+
PurgeCmd.MarkFlagsMutuallyExclusive("all-older-than", "all")
83+
84+
PurgeCmd.Flags().StringVar(&schedulerNamespace, "scheduler-namespace", "dapr-system", "Kubernetes namespace where the scheduler is deployed, only relevant if --kubernetes is set")
85+
86+
flagPurgeConn = connectionCmd(PurgeCmd)
87+
88+
WorkflowCmd.AddCommand(PurgeCmd)
89+
}

cmd/workflow/raiseevent.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package workflow
15+
16+
import (
17+
"errors"
18+
"os"
19+
"strings"
20+
21+
"github.com/dapr/cli/pkg/print"
22+
"github.com/dapr/cli/pkg/workflow"
23+
"github.com/dapr/kit/signals"
24+
"github.com/spf13/cobra"
25+
)
26+
27+
var (
28+
flagRaiseEventInput *inputFlag
29+
)
30+
31+
var RaiseEventCmd = &cobra.Command{
32+
Use: "raise-event",
33+
Short: "Raise an event for a workflow waiting for an external event. Expects a single argument '<instance-id>/<event-name>'.",
34+
Args: cobra.ExactArgs(1),
35+
RunE: func(cmd *cobra.Command, args []string) error {
36+
ctx := signals.Context()
37+
38+
split := strings.Split(args[0], "/")
39+
if len(split) != 2 {
40+
return errors.New("the argument must be in the format '<instance-id>/<event-name>'")
41+
}
42+
instanceID := split[0]
43+
eventName := split[1]
44+
45+
appID, err := getWorkflowAppID(cmd)
46+
if err != nil {
47+
return err
48+
}
49+
50+
opts := workflow.RaiseEventOptions{
51+
KubernetesMode: flagKubernetesMode,
52+
Namespace: flagDaprNamespace,
53+
AppID: appID,
54+
InstanceID: instanceID,
55+
Name: eventName,
56+
Input: flagRaiseEventInput.input,
57+
}
58+
59+
if err = workflow.RaiseEvent(ctx, opts); err != nil {
60+
print.FailureStatusEvent(os.Stdout, err.Error())
61+
os.Exit(1)
62+
}
63+
64+
print.InfoStatusEvent(os.Stdout, "Workflow '%s' raised event '%s' successfully", instanceID, eventName)
65+
66+
return nil
67+
},
68+
}
69+
70+
func init() {
71+
flagRaiseEventInput = inputCmd(RaiseEventCmd)
72+
73+
WorkflowCmd.AddCommand(RaiseEventCmd)
74+
}

0 commit comments

Comments
 (0)