Skip to content

Commit 0cbc253

Browse files
committed
Workflow for data converters
1 parent 6350c61 commit 0cbc253

File tree

13 files changed

+914
-0
lines changed

13 files changed

+914
-0
lines changed

2023-01-01

Whitespace-only changes.

=

Whitespace-only changes.

Makefile

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ PROGS = helloworld \
3535
pageflow \
3636
signalcounter \
3737
sideeffect \
38+
sleepworkflow \
39+
dataconverter \
40+
3841

3942
TEST_ARG ?= -race -v -timeout 5m
4043
BUILD := ./build
@@ -68,6 +71,8 @@ TEST_DIRS=./cmd/samples/cron \
6871
./cmd/samples/recipes/searchattributes \
6972
./cmd/samples/recipes/sideeffect \
7073
./cmd/samples/recipes/signalcounter \
74+
./cmd/samples/recipes/sleepworkflow \
75+
./cmd/samples/recipes/dataconverter \
7176
./cmd/samples/recovery \
7277
./cmd/samples/pso \
7378

@@ -176,6 +181,9 @@ sideeffect:
176181
versioning:
177182
go build -o bin/versioning cmd/samples/recipes/versioning/*.go
178183

184+
dataconverter:
185+
go build -o bin/dataconverter cmd/samples/recipes/dataconverter/*.go
186+
179187
bins: helloworld \
180188
versioning \
181189
delaystart \
@@ -207,6 +215,8 @@ bins: helloworld \
207215
pageflow \
208216
signalcounter \
209217
sideeffect \
218+
sleepworkflow \
219+
dataconverter \
210220

211221
test: bins
212222
@rm -f test

cmd/samples/jiraToGithub/README.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Expense
2+
This sample workflow process an expense request. The key part of this sample is to show how to complete an activity asynchronously.
3+
4+
# Sample Description
5+
* Create a new expense report.
6+
* Wait for the expense report to be approved. This could take an arbitrary amount of time. So the activity's Execute method has to return before it is actually approved. This is done by returning a special error so the framework knows the activity is not completed yet.
7+
* When the expense is approved (or rejected), somewhere in the world needs to be notified, and it will need to call WorkflowClient.CompleteActivity() to tell cadence service that that activity is now completed. In this sample case, the dummy server do this job. In real world, you will need to register some listener to the expense system or you will need to have your own pulling agent to check for the expense status periodic.
8+
* After the wait activity is completed, it did the payment for the expense. (dummy step in this sample case)
9+
10+
This sample rely on an a dummy expense server to work.
11+
12+
# Steps To Run Sample
13+
* You need a cadence service running. See https://github.com/uber/cadence/blob/master/README.md for more details.
14+
* Start the dummy server
15+
```
16+
./bin/expense_dummy
17+
```
18+
If dummy is not found, run make to build it.
19+
* Start workflow and activity workers
20+
```
21+
./bin/expense -m worker
22+
```
23+
* Start expanse workflow execution
24+
```
25+
./bin/expense -m trigger
26+
```
27+
* When you see the console print out the expense is created, go to [localhost:8099/list](http://localhost:8099/list) to approve the expense.
28+
* You should see the workflow complete after you approve the expense. You can also reject the expense.
29+
* If you see the workflow failed, try to change to a different port number in dummy.go and workflow.go. Then rebuild everything.

cmd/samples/jiraToGithub/main.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"time"
6+
7+
"github.com/pborman/uuid"
8+
9+
"go.uber.org/cadence/client"
10+
"go.uber.org/cadence/worker"
11+
12+
"github.com/uber-common/cadence-samples/cmd/samples/common"
13+
)
14+
15+
// This needs to be done as part of a bootstrap step when the process starts.
16+
// The workers are supposed to be long running.
17+
func startWorkers(h *common.SampleHelper) {
18+
// Configure worker options.
19+
workerOptions := worker.Options{
20+
MetricsScope: h.WorkerMetricScope,
21+
Logger: h.Logger,
22+
}
23+
h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)
24+
}
25+
26+
func startWorkflow(h *common.SampleHelper) {
27+
workflowOptions := client.StartWorkflowOptions{
28+
ID: "jiraToGithub_" + uuid.New(),
29+
TaskList: ApplicationName,
30+
ExecutionStartToCloseTimeout: time.Minute * 12,
31+
DecisionTaskStartToCloseTimeout: time.Minute * 12,
32+
}
33+
h.StartWorkflow(workflowOptions, jiraToGithubWorkflow)
34+
}
35+
36+
func main() {
37+
var mode string
38+
flag.StringVar(&mode, "m", "trigger", "Mode is worker or trigger.")
39+
flag.Parse()
40+
41+
var h common.SampleHelper
42+
h.SetupServiceConfig()
43+
44+
switch mode {
45+
case "worker":
46+
h.RegisterWorkflow(jiraToGithubWorkflow)
47+
h.RegisterActivity(getJiraTasksActivity)
48+
h.RegisterActivity(createGithubIssuesActivity)
49+
startWorkers(&h)
50+
51+
// The workers are supposed to be long running process that should not exit.
52+
// Use select{} to block indefinitely for samples, you can quit by CMD+C.
53+
select {}
54+
case "trigger":
55+
startWorkflow(&h)
56+
}
57+
}
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"os/exec"
8+
"strings"
9+
"time"
10+
11+
// "code.uber.internal/devexp/utils/jirawithretry"
12+
// "github.com/andygrunwald/go-jira"
13+
jira "github.com/andygrunwald/go-jira"
14+
"github.com/google/go-github/github"
15+
"golang.org/x/oauth2"
16+
17+
"go.uber.org/cadence/activity"
18+
"go.uber.org/cadence/workflow"
19+
"go.uber.org/zap"
20+
)
21+
22+
const (
23+
// ApplicationName is the task list for this sample
24+
ApplicationName = "jiraToGithubGroup"
25+
jiraURL = "https://t3.uberinternal.com"
26+
jiraUsername = "[email protected]"
27+
)
28+
29+
// type Activity struct {
30+
// jiraClient jirawithretry.IssueClient
31+
// }
32+
33+
func jiraToGithubWorkflow(ctx workflow.Context) (result string, err error) {
34+
// step 1, get JIRA tasks from cadence project
35+
ao := workflow.ActivityOptions{
36+
ScheduleToStartTimeout: time.Minute,
37+
StartToCloseTimeout: time.Minute * 4,
38+
HeartbeatTimeout: time.Second * 20,
39+
}
40+
ctx1 := workflow.WithActivityOptions(ctx, ao)
41+
logger := workflow.GetLogger(ctx)
42+
logger.Info("Jira to Github workflow started")
43+
44+
var issues []jira.Issue
45+
err = workflow.ExecuteActivity(ctx1, getJiraTasksActivity).Get(ctx1, &issues)
46+
if err != nil {
47+
return "", err
48+
}
49+
50+
// step 2, create issues in github
51+
ao = workflow.ActivityOptions{
52+
ScheduleToStartTimeout: time.Minute,
53+
StartToCloseTimeout: time.Minute * 8,
54+
}
55+
ctx2 := workflow.WithActivityOptions(ctx, ao)
56+
57+
err = workflow.ExecuteActivity(ctx2, createGithubIssuesActivity, issues).Get(ctx2, nil)
58+
if err != nil {
59+
return "", err
60+
}
61+
62+
logger.Info("Workflow completed with Github issues created.")
63+
return "COMPLETED", nil
64+
}
65+
66+
// func New() (*Activity, error) {
67+
// client, err := jirawithretry.NewIssueClient(&http.Client{}, jiraURL)
68+
// if err != nil {
69+
// return nil, err
70+
// }
71+
// envVars, err := getEnvVars()
72+
// if err != nil {
73+
// return nil, err
74+
// }
75+
// client.Authentication().SetBasicAuth(jiraUsername, envVars["JIRA_API_TOKEN"])
76+
// return &Activity{jiraClient: client}, nil
77+
// }
78+
79+
func getJiraTasksActivity(ctx context.Context) ([]jira.Issue, error) {
80+
jiraClient, err := jira.NewClient(nil, jiraURL)
81+
if err != nil {
82+
return nil, err
83+
}
84+
85+
jql := "project = Cadence AND created >= -365d AND issuetype = Task AND labels = opensourceable ORDER BY created DESC" //AND labels = opensourceable AND description IS NOT EMPTY
86+
options := &jira.SearchOptions{
87+
MaxResults: 10,
88+
}
89+
issues, _, err := jiraClient.Issue.Search(jql, options)
90+
if err != nil {
91+
return nil, err
92+
}
93+
94+
activity.GetLogger(ctx).Info("JIRA tasks with label opensourceable fetched", zap.Int("Count", len(issues)))
95+
return issues, err
96+
}
97+
98+
func createGithubIssuesActivity(ctx context.Context, issues []jira.Issue) error {
99+
// Replace with your actual token
100+
token := "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"
101+
102+
// GitHub org and repo
103+
// org := "cadence-workflow"
104+
// repo := "cadence-java-samples"
105+
client := github.NewClient(nil)
106+
107+
org := "vishwa-test-2"
108+
repo := "sample"
109+
110+
ts := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: token})
111+
tc := oauth2.NewClient(ctx, ts)
112+
client2 := github.NewClient(tc)
113+
for _, issue := range issues {
114+
title := issue.Fields.Summary
115+
// key := issue.Key
116+
body := issue.Fields.Description
117+
118+
// Check if the issue already exists in GitHub
119+
searchOpts := &github.SearchOptions{
120+
TextMatch: true,
121+
}
122+
query := fmt.Sprintf("repo:vishwa-test-2/sample in:title %s state:open", title)
123+
result, _, err := client.Search.Issues(ctx, query, searchOpts)
124+
if err != nil {
125+
return err
126+
}
127+
128+
if len(result.Issues) == 0 {
129+
// Create issue request
130+
issueRequest := &github.IssueRequest{
131+
Title: github.String(title),
132+
Body: github.String(body),
133+
}
134+
135+
// Create issue
136+
issue, _, err := client2.Issues.Create(ctx, org, repo, issueRequest)
137+
if err != nil {
138+
log.Fatalf("Error creating issue: %v", err)
139+
}
140+
141+
// req := &github.IssueRequest{
142+
// Title: github.String(fmt.Sprintf("[JIRA %s] %s", key, title)),
143+
// Body: github.String(body),
144+
// }
145+
// _, _, err := client.Issues.Create(ctx, "vishwa-test-2", "sample", req)
146+
// if err != nil {
147+
// return err
148+
// }
149+
activity.GetLogger(ctx).Info("Created an issue in GitHub", zap.String("title", issue.GetHTMLURL()))
150+
} else {
151+
activity.GetLogger(ctx).Info("GitHub issue already exists", zap.String("title", title))
152+
}
153+
}
154+
return nil
155+
}
156+
157+
// type IssueRequest struct {
158+
// Title string `json:"title"`
159+
// Body string `json:"body"`
160+
// }
161+
162+
// func createGithubIssuesActivity(ctx context.Context, issues []jira.Issue) error {
163+
// for _, issue := range issues {
164+
// title := issue.Fields.Summary
165+
// key := issue.Key
166+
// body := issue.Fields.Description
167+
168+
// // Check if the issue already exists in GitHub
169+
// searchArgs := []string{
170+
// "gh",
171+
// "issue",
172+
// "list",
173+
// "--repo",
174+
// "cadence-workflow/cadence-java-samples",
175+
// "--search",
176+
// fmt.Sprintf("[JIRA issue] %s in:title", title),
177+
// }
178+
179+
// searchOutput, err := exec.Command(searchArgs[0], searchArgs[1:]...).Output()
180+
// if err != nil {
181+
// return err
182+
// }
183+
184+
// if len(searchOutput) == 0 {
185+
// Create a new issue in GitHub
186+
187+
// issueData, err := json.Marshal(IssueRequest{
188+
// Title: title,
189+
// Body: body,
190+
// })
191+
// if err != nil {
192+
// fmt.Println("Error marshaling JSON:", err)
193+
// }
194+
195+
// url := fmt.Sprintf("https://api.github.com/repos/%s/%s/issues", "vishwa2-uber", "cadence-java-samples")
196+
// req, err := http.NewRequest("POST", url, bytes.NewBuffer(issueData))
197+
// if err != nil {
198+
// fmt.Println("Error creating request:", err)
199+
// }
200+
201+
// req.Header.Set("Content-Type", "application/json")
202+
203+
// client := &http.Client{}
204+
// resp, err := client.Do(req)
205+
// if err != nil {
206+
// fmt.Println("Error sending request:", err)
207+
// }
208+
// defer resp.Body.Close()
209+
210+
// if resp.StatusCode != http.StatusCreated {
211+
// fmt.Println("Failed to create issue. Status code:", resp.StatusCode)
212+
// buf := new(bytes.Buffer)
213+
// buf.ReadFrom(resp.Body)
214+
// newStr := buf.String()
215+
// fmt.Println(newStr)
216+
// os.Exit(1)
217+
// // return
218+
// }
219+
220+
// fmt.Println("Issue created successfully!")
221+
222+
// createArgs := []string{
223+
// "gh",
224+
// "issue",
225+
// "create",
226+
// "--repo",
227+
// "cadence-workflow/cadence-java-samples",
228+
// "--title",
229+
// fmt.Sprintf("[JIRA %s] %s", key, title),
230+
// "--body",
231+
// body,
232+
// }
233+
234+
// _, err := exec.Command(createArgs[0], createArgs[1:]...).Output()
235+
// if err != nil {
236+
// return err
237+
// }
238+
// activity.GetLogger(ctx).Info("Created an issue in GitHub", zap.String("title", title))
239+
// } else {
240+
// activity.GetLogger(ctx).Info("GitHub issue already exists", zap.String("title", title))
241+
// }
242+
// }
243+
// return nil
244+
// }
245+
246+
func getEnvVars() (map[string]string, error) {
247+
jiraArgs := []string{"usso", "-ussh", "t3", "-print"}
248+
output, err := exec.Command(jiraArgs[0], jiraArgs[1:]...).Output()
249+
if err != nil {
250+
return nil, err
251+
}
252+
253+
githubArgs := []string{"usso", "-ussh", "git", "-print"}
254+
githubTokenOutput, err := exec.Command(githubArgs[0], githubArgs[1:]...).Output()
255+
if err != nil {
256+
return nil, err
257+
}
258+
259+
envVars := map[string]string{
260+
"JIRA_API_TOKEN": string(output),
261+
"GITHUB_TOKEN": strings.TrimSuffix(string(githubTokenOutput), "\n"),
262+
}
263+
264+
return envVars, nil
265+
}

0 commit comments

Comments
 (0)