Skip to content

Commit 9865e57

Browse files
authored
Merge pull request #1560 from JoshVanL/workflow
Workflow
2 parents a803c06 + 4c4ea04 commit 9865e57

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+4144
-85
lines changed

.github/workflows/self_hosted_e2e.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ jobs:
131131
shell: bash
132132
- name: Set the test timeout - MacOS
133133
if: matrix.os == 'macos-14-large'
134-
run: echo "E2E_SH_TEST_TIMEOUT=30m" >> $GITHUB_ENV
134+
run: echo "E2E_SH_TEST_TIMEOUT=40m" >> $GITHUB_ENV
135135
- name: Run E2E tests with GHCR
136136
# runs every 6hrs
137137
if: github.event.schedule == '0 */6 * * *'

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ TEST_OUTPUT_FILE ?= test_output.json
7474

7575
# Set the default timeout for tests to 10 minutes
7676
ifndef E2E_SH_TEST_TIMEOUT
77-
override E2E_SH_TEST_TIMEOUT := 30m
77+
override E2E_SH_TEST_TIMEOUT := 40m
7878
endif
7979

8080
# Use the variable H to add a header (equivalent to =>) to informational output

cmd/dapr.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/spf13/viper"
2323

2424
"github.com/dapr/cli/cmd/scheduler"
25+
"github.com/dapr/cli/cmd/workflow"
2526
"github.com/dapr/cli/pkg/api"
2627
"github.com/dapr/cli/pkg/print"
2728
"github.com/dapr/cli/pkg/standalone"
@@ -111,4 +112,5 @@ func init() {
111112
RootCmd.PersistentFlags().BoolVarP(&logAsJSON, "log-as-json", "", false, "Log output in JSON format")
112113

113114
RootCmd.AddCommand(scheduler.SchedulerCmd)
115+
RootCmd.AddCommand(workflow.WorkflowCmd)
114116
}

cmd/workflow/history.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package workflow
15+
16+
import (
17+
"os"
18+
19+
"github.com/gocarina/gocsv"
20+
"github.com/spf13/cobra"
21+
22+
"github.com/dapr/cli/pkg/workflow"
23+
"github.com/dapr/cli/utils"
24+
"github.com/dapr/kit/signals"
25+
)
26+
27+
var (
28+
historyOutputFormat *string
29+
)
30+
31+
var HistoryCmd = &cobra.Command{
32+
Use: "history",
33+
Short: "Get the history of a workflow instance.",
34+
Args: cobra.ExactArgs(1),
35+
RunE: func(cmd *cobra.Command, args []string) error {
36+
ctx := signals.Context()
37+
38+
appID, err := getWorkflowAppID(cmd)
39+
if err != nil {
40+
return err
41+
}
42+
43+
opts := workflow.HistoryOptions{
44+
KubernetesMode: flagKubernetesMode,
45+
Namespace: flagDaprNamespace,
46+
AppID: appID,
47+
InstanceID: args[0],
48+
}
49+
50+
var list any
51+
if *historyOutputFormat == outputFormatShort {
52+
list, err = workflow.HistoryShort(ctx, opts)
53+
} else {
54+
list, err = workflow.HistoryWide(ctx, opts)
55+
}
56+
if err != nil {
57+
return err
58+
}
59+
60+
switch *historyOutputFormat {
61+
case outputFormatYAML:
62+
err = utils.PrintDetail(os.Stdout, "yaml", list)
63+
case outputFormatJSON:
64+
err = utils.PrintDetail(os.Stdout, "json", list)
65+
default:
66+
var table string
67+
table, err = gocsv.MarshalString(list)
68+
if err != nil {
69+
break
70+
}
71+
72+
utils.PrintTable(table)
73+
}
74+
if err != nil {
75+
return err
76+
}
77+
78+
return nil
79+
},
80+
}
81+
82+
func init() {
83+
historyOutputFormat = outputFunc(HistoryCmd)
84+
WorkflowCmd.AddCommand(HistoryCmd)
85+
}

cmd/workflow/list.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package workflow
15+
16+
import (
17+
"os"
18+
19+
"github.com/gocarina/gocsv"
20+
"github.com/spf13/cobra"
21+
22+
"github.com/dapr/cli/pkg/print"
23+
"github.com/dapr/cli/pkg/workflow"
24+
"github.com/dapr/cli/utils"
25+
"github.com/dapr/kit/signals"
26+
)
27+
28+
var (
29+
listFilter *workflow.Filter
30+
listOutputFormat *string
31+
32+
listConn *connFlag
33+
)
34+
35+
var ListCmd = &cobra.Command{
36+
Use: "list",
37+
Aliases: []string{"ls"},
38+
Short: "List workflows for the given app ID.",
39+
Args: cobra.NoArgs,
40+
RunE: func(cmd *cobra.Command, args []string) error {
41+
ctx := signals.Context()
42+
43+
appID, err := getWorkflowAppID(cmd)
44+
if err != nil {
45+
return err
46+
}
47+
48+
opts := workflow.ListOptions{
49+
KubernetesMode: flagKubernetesMode,
50+
Namespace: flagDaprNamespace,
51+
AppID: appID,
52+
ConnectionString: listConn.connectionString,
53+
TableName: listConn.tableName,
54+
Filter: *listFilter,
55+
}
56+
57+
var list any
58+
var empty bool
59+
60+
switch *listOutputFormat {
61+
case outputFormatShort:
62+
var ll []*workflow.ListOutputShort
63+
ll, err = workflow.ListShort(ctx, opts)
64+
if err != nil {
65+
return err
66+
}
67+
empty = len(ll) == 0
68+
list = ll
69+
70+
default:
71+
var ll []*workflow.ListOutputWide
72+
ll, err = workflow.ListWide(ctx, opts)
73+
if err != nil {
74+
return err
75+
}
76+
empty = len(ll) == 0
77+
list = ll
78+
}
79+
80+
if empty {
81+
print.FailureStatusEvent(os.Stderr, "No workflow found in namespace %q for app ID %q", flagDaprNamespace, appID)
82+
return nil
83+
}
84+
85+
switch *listOutputFormat {
86+
case outputFormatYAML:
87+
err = utils.PrintDetail(os.Stdout, "yaml", list)
88+
case outputFormatJSON:
89+
err = utils.PrintDetail(os.Stdout, "json", list)
90+
default:
91+
var table string
92+
table, err = gocsv.MarshalString(list)
93+
if err != nil {
94+
break
95+
}
96+
97+
utils.PrintTable(table)
98+
}
99+
100+
if err != nil {
101+
return err
102+
}
103+
104+
return nil
105+
},
106+
}
107+
108+
func init() {
109+
listFilter = filterCmd(ListCmd)
110+
listOutputFormat = outputFunc(ListCmd)
111+
listConn = connectionCmd(ListCmd)
112+
WorkflowCmd.AddCommand(ListCmd)
113+
}

cmd/workflow/purge.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package workflow
15+
16+
import (
17+
"errors"
18+
19+
"github.com/dapr/cli/pkg/workflow"
20+
"github.com/dapr/kit/signals"
21+
"github.com/spf13/cobra"
22+
)
23+
24+
var (
25+
flagPurgeOlderThan string
26+
flagPurgeAll bool
27+
flagPurgeConn *connFlag
28+
schedulerNamespace string
29+
)
30+
31+
var PurgeCmd = &cobra.Command{
32+
Use: "purge",
33+
Short: "Purge one or more workflow instances with a terminal state. Accepts a workflow instance ID argument or flags to purge multiple/all terminal instances. Also deletes all associated scheduler jobs.",
34+
Args: func(cmd *cobra.Command, args []string) error {
35+
switch {
36+
case cmd.Flags().Changed("all-older-than"),
37+
cmd.Flags().Changed("all"):
38+
if len(args) > 0 {
39+
return errors.New("no arguments are accepted when using purge all flags")
40+
}
41+
default:
42+
if len(args) == 0 {
43+
return errors.New("one or more workflow instance ID arguments are required when not using purge all flags")
44+
}
45+
}
46+
47+
return nil
48+
},
49+
RunE: func(cmd *cobra.Command, args []string) error {
50+
ctx := signals.Context()
51+
52+
appID, err := getWorkflowAppID(cmd)
53+
if err != nil {
54+
return err
55+
}
56+
57+
opts := workflow.PurgeOptions{
58+
KubernetesMode: flagKubernetesMode,
59+
Namespace: flagDaprNamespace,
60+
SchedulerNamespace: schedulerNamespace,
61+
AppID: appID,
62+
InstanceIDs: args,
63+
All: flagPurgeAll,
64+
ConnectionString: flagPurgeConn.connectionString,
65+
TableName: flagPurgeConn.tableName,
66+
}
67+
68+
if cmd.Flags().Changed("all-older-than") {
69+
opts.AllOlderThan, err = parseWorkflowDurationTimestamp(flagPurgeOlderThan, true)
70+
if err != nil {
71+
return err
72+
}
73+
}
74+
75+
return workflow.Purge(ctx, opts)
76+
},
77+
}
78+
79+
func init() {
80+
PurgeCmd.Flags().StringVar(&flagPurgeOlderThan, "all-older-than", "", "Purge workflow instances older than the specified Go duration or timestamp, e.g., '24h' or '2023-01-02T15:04:05Z'.")
81+
PurgeCmd.Flags().BoolVar(&flagPurgeAll, "all", false, "Purge all workflow instances in a terminal state. Use with caution.")
82+
PurgeCmd.MarkFlagsMutuallyExclusive("all-older-than", "all")
83+
84+
PurgeCmd.Flags().StringVar(&schedulerNamespace, "scheduler-namespace", "dapr-system", "Kubernetes namespace where the scheduler is deployed, only relevant if --kubernetes is set")
85+
86+
flagPurgeConn = connectionCmd(PurgeCmd)
87+
88+
WorkflowCmd.AddCommand(PurgeCmd)
89+
}

cmd/workflow/raiseevent.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package workflow
15+
16+
import (
17+
"errors"
18+
"os"
19+
"strings"
20+
21+
"github.com/dapr/cli/pkg/print"
22+
"github.com/dapr/cli/pkg/workflow"
23+
"github.com/dapr/kit/signals"
24+
"github.com/spf13/cobra"
25+
)
26+
27+
var (
28+
flagRaiseEventInput *inputFlag
29+
)
30+
31+
var RaiseEventCmd = &cobra.Command{
32+
Use: "raise-event",
33+
Short: "Raise an event for a workflow waiting for an external event. Expects a single argument '<instance-id>/<event-name>'.",
34+
Args: cobra.ExactArgs(1),
35+
RunE: func(cmd *cobra.Command, args []string) error {
36+
ctx := signals.Context()
37+
38+
split := strings.Split(args[0], "/")
39+
if len(split) != 2 {
40+
return errors.New("the argument must be in the format '<instance-id>/<event-name>'")
41+
}
42+
instanceID := split[0]
43+
eventName := split[1]
44+
45+
appID, err := getWorkflowAppID(cmd)
46+
if err != nil {
47+
return err
48+
}
49+
50+
opts := workflow.RaiseEventOptions{
51+
KubernetesMode: flagKubernetesMode,
52+
Namespace: flagDaprNamespace,
53+
AppID: appID,
54+
InstanceID: instanceID,
55+
Name: eventName,
56+
Input: flagRaiseEventInput.input,
57+
}
58+
59+
if err = workflow.RaiseEvent(ctx, opts); err != nil {
60+
print.FailureStatusEvent(os.Stdout, err.Error())
61+
os.Exit(1)
62+
}
63+
64+
print.InfoStatusEvent(os.Stdout, "Workflow '%s' raised event '%s' successfully", instanceID, eventName)
65+
66+
return nil
67+
},
68+
}
69+
70+
func init() {
71+
flagRaiseEventInput = inputCmd(RaiseEventCmd)
72+
73+
WorkflowCmd.AddCommand(RaiseEventCmd)
74+
}

0 commit comments

Comments
 (0)