Skip to content

Commit 2f2ca26

Browse files
authored
Merge pull request #143 from yashsinghcodes/quick-fixes
feat: Improvement over health API
2 parents d125545 + 78ae550 commit 2f2ca26

File tree

2 files changed

+79
-32
lines changed

2 files changed

+79
-32
lines changed

health.go

Lines changed: 76 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -485,13 +485,6 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
485485
apiKey = org.Users[validIndex].ApiKey
486486
}
487487

488-
if len(apiKey) == 0 || len(orgId) == 0 {
489-
log.Printf("[WARNING] Ops dashboard api key or org not set. Not setting up ops workflow")
490-
resp.WriteHeader(500)
491-
resp.Write([]byte(`{"success": false, "reason": "SHUFFLE_OPS_DASHBOARD_APIKEY or SHUFFLE_OPS_DASHBOARD_ORG not set. Please set these to use this feature!"}`))
492-
return
493-
}
494-
495488
platformHealth := HealthCheck{}
496489
force := request.URL.Query().Get("force")
497490
cacheKey := fmt.Sprintf("ops-health-check")
@@ -535,7 +528,6 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
535528
health := healths[0]
536529

537530
if err == nil {
538-
log.Printf("[DEBUG] Last health check was: %#v", health)
539531
platformData, err := json.Marshal(health)
540532
if err != nil {
541533
log.Printf("[ERROR] Failed marshalling platform health data: %s", err)
@@ -949,18 +941,52 @@ func fixOpensearch() error {
949941
return nil
950942
}
951943

944+
func fixHealthSubflowParameters(ctx context.Context, workflow *Workflow) (Workflow, error) {
945+
subflowActionId := ""
946+
for _, action := range workflow.Actions {
947+
if action.Label == "call_subflow" {
948+
subflowActionId = action.ID
949+
break
950+
}
951+
}
952+
953+
954+
for i := range workflow.Triggers {
955+
if workflow.Triggers[i].AppName != "Shuffle Workflow" {
956+
continue
957+
}
958+
959+
for j := range workflow.Triggers[i].Parameters {
960+
if workflow.Triggers[i].Parameters[j].Name == "workflow" {
961+
workflow.Triggers[i].Parameters[j].Value = workflow.ID
962+
}
963+
964+
if workflow.Triggers[i].Parameters[j].Name == "startnode" {
965+
workflow.Triggers[i].Parameters[j].Value = subflowActionId
966+
break
967+
}
968+
}
969+
break
970+
}
971+
972+
return *workflow, nil
973+
}
974+
952975
func RunOpsWorkflow(apiKey string, orgId string, cloudRunUrl string) (WorkflowHealth, error) {
953976
// run workflow with id 602c7cf5-500e-4bd1-8a97-aa5bc8a554e6
954977
ctx := context.Background()
955978

956979
workflowHealth := WorkflowHealth{
957980
Create: false,
981+
BackendVersion: os.Getenv("SHUFFLE_BACKEND_VERSION"),
958982
Run: false,
959983
RunFinished: false,
984+
ExecutionTook: 0,
960985
Delete: false,
961986
RunStatus: "",
962987
ExecutionId: "",
963988
WorkflowId: "",
989+
WorkflowValidation: false,
964990
}
965991

966992
baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL")
@@ -1026,6 +1052,7 @@ func RunOpsWorkflow(apiKey string, orgId string, cloudRunUrl string) (WorkflowHe
10261052
req.Header.Set("Org-Id", orgId)
10271053

10281054
// send the request
1055+
startTime := time.Now()
10291056
client := &http.Client{}
10301057
resp, err := client.Do(req)
10311058
if err != nil {
@@ -1070,23 +1097,24 @@ func RunOpsWorkflow(apiKey string, orgId string, cloudRunUrl string) (WorkflowHe
10701097
if resp.StatusCode == 200 {
10711098
workflowHealth.Run = true
10721099
workflowHealth.ExecutionId = execution.ExecutionId
1073-
workflowHealth.WorkflowValidation = execution.Workflow.Validation.Valid
10741100
}
10751101

10761102
updateOpsCache(workflowHealth)
10771103
timeout := time.After(5 * time.Minute)
10781104

1079-
if workflowHealth.Create == true {
1080-
log.Printf("[DEBUG] Deleting created ops workflow")
1081-
err = deleteOpsWorkflow(workflowHealth, apiKey, orgId)
1082-
if err != nil {
1083-
log.Printf("[ERROR] Failed deleting workflow: %s", err)
1084-
} else {
1085-
log.Printf("[DEBUG] Deleted ops workflow successfully!")
1086-
workflowHealth.Delete = true
1087-
updateOpsCache(workflowHealth)
1088-
}
1089-
}
1105+
// Removed as it was deleting the workflow before execution which
1106+
// was effecting the subflow
1107+
// if workflowHealth.Create == true {
1108+
// log.Printf("[DEBUG] Deleting created ops workflow")
1109+
// err = deleteOpsWorkflow(workflowHealth, apiKey, orgId)
1110+
// if err != nil {
1111+
// log.Printf("[ERROR] Failed deleting workflow: %s", err)
1112+
// } else {
1113+
// log.Printf("[DEBUG] Deleted ops workflow successfully!")
1114+
// workflowHealth.Delete = true
1115+
// updateOpsCache(workflowHealth)
1116+
// }
1117+
// }
10901118

10911119
// 3. Check if workflow ran successfully
10921120
// ping /api/v1/streams/results/<execution_id> while workflowHealth.RunFinished is false
@@ -1149,14 +1177,10 @@ func RunOpsWorkflow(apiKey string, orgId string, cloudRunUrl string) (WorkflowHe
11491177

11501178
if executionResults.Status == "FINISHED" {
11511179
log.Printf("[DEBUG] Workflow Health exeution is finished, checking it's results")
1152-
1153-
// yash asked to comment these out
1154-
// for _, r := range executionResults.Results {
1155-
// if r.Status != "SUCCESS" {
1156-
// workflowHealth.RunStatus = "FAILED"
1157-
// break
1158-
// }
1159-
// }
1180+
workflowHealth.WorkflowValidation = executionResults.Workflow.Validation.Valid
1181+
finishTime := time.Since(startTime).Seconds()
1182+
workflowHealth.ExecutionTook = finishTime
1183+
//workflowHealth = time.Since(startTime)
11601184
}
11611185

11621186

@@ -1169,6 +1193,7 @@ func RunOpsWorkflow(apiKey string, orgId string, cloudRunUrl string) (WorkflowHe
11691193
case <-timeout:
11701194
log.Printf("[ERROR] Timeout reached for workflow health check. Returning")
11711195
workflowHealth.RunStatus = "ABANDONED_BY_HEALTHCHECK"
1196+
11721197
return workflowHealth, errors.New("Timeout reached for workflow health check")
11731198
default:
11741199
// do nothing
@@ -1178,10 +1203,23 @@ func RunOpsWorkflow(apiKey string, orgId string, cloudRunUrl string) (WorkflowHe
11781203
time.Sleep(2 * time.Second)
11791204
}
11801205

1181-
// Delete junk workflows
1206+
if workflowHealth.Create == true {
1207+
log.Printf("[DEBUG] Deleting created ops workflow")
1208+
err = deleteOpsWorkflow(workflowHealth, apiKey, orgId)
1209+
if err != nil {
1210+
log.Printf("[ERROR] Failed deleting workflow: %s", err)
1211+
} else {
1212+
log.Printf("[DEBUG] Deleted ops workflow successfully!")
1213+
workflowHealth.Delete = true
1214+
updateOpsCache(workflowHealth)
1215+
}
1216+
}
1217+
1218+
1219+
// Delete junk workflows, this will remove all the healthWorkflow which failed
11821220
err = deleteJunkOpsWorkflow(ctx, workflowHealth)
11831221
if err != nil {
1184-
//log.Printf("[ERROR] Failed deleting junk workflows: %s", err)
1222+
log.Printf("[WARNING] Failed deleting junk workflows: %s", err)
11851223
}
11861224

11871225
return workflowHealth, nil
@@ -1265,7 +1303,8 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {
12651303
if project.Environment == "cloud" {
12661304
// url := "https://shuffler.io/api/v1/workflows/602c7cf5-500e-4bd1-8a97-aa5bc8a554e6"
12671305
// url := "https://shuffler.io/api/v1/workflows/7b729319-b395-4ba3-b497-d8246da67b1c"
1268-
url := "https://shuffler.io/api/v1/workflows/412256ca-ce62-4d20-9e55-1491548349e1"
1306+
//url := "https://shuffler.io/api/v1/workflows/412256ca-ce62-4d20-9e55-1491548349e1"
1307+
url := "https://shuffler.io/api/v1/workflows/ae89a788-a26b-4866-8a0b-ce0b31d354ea"
12691308
req, err := http.NewRequest("GET", url, nil)
12701309
if err != nil {
12711310
log.Println("[ERROR] creating HTTP request:", err)
@@ -1443,6 +1482,12 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {
14431482
workflowData.Hidden = true
14441483
workflowData.Public = false
14451484

1485+
workflowData, err = fixHealthSubflowParameters(ctx, &workflowData)
1486+
if err != nil {
1487+
log.Printf("[ERROR] Subflow parameter changing failed might create an issue.")
1488+
}
1489+
1490+
14461491
// Save the workflow: PUT http://localhost:5002/api/v1/workflows/{id}?skip_save=true
14471492
req, err = http.NewRequest("PUT", baseUrl+"/api/v1/workflows/"+workflowData.ID+"?skip_save=true", nil)
14481493
if err != nil {
@@ -2376,7 +2421,6 @@ func GetStaticWorkflowHealth(ctx context.Context, workflow Workflow) (Workflow,
23762421
if len(param.Value) > 0 {
23772422
fieldsFilled += 1
23782423
}
2379-
23802424
authRequired = true
23812425
break
23822426
}

structs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3971,7 +3971,10 @@ type AppHealth struct {
39713971
type WorkflowHealth struct {
39723972
Create bool `json:"create"`
39733973
Run bool `json:"run"`
3974+
BackendVersion string `json:"backend_version"`
39743975
RunFinished bool `json:"run_finished"`
3976+
// NOTE: This does not represent the actual time execution took, it includes the time took to send an API request for the exeution + get back the results for every action.
3977+
ExecutionTook float64 `json:"execution_took"`
39753978
RunStatus string `json:"run_status"`
39763979
Delete bool `json:"delete"`
39773980
ExecutionId string `json:"execution_id"`

0 commit comments

Comments
 (0)