Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 70 additions & 32 deletions health.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,13 +485,6 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
apiKey = org.Users[validIndex].ApiKey
}

if len(apiKey) == 0 || len(orgId) == 0 {
log.Printf("[WARNING] Ops dashboard api key or org not set. Not setting up ops workflow")
resp.WriteHeader(500)
resp.Write([]byte(`{"success": false, "reason": "SHUFFLE_OPS_DASHBOARD_APIKEY or SHUFFLE_OPS_DASHBOARD_ORG not set. Please set these to use this feature!"}`))
return
}

platformHealth := HealthCheck{}
force := request.URL.Query().Get("force")
cacheKey := fmt.Sprintf("ops-health-check")
Expand Down Expand Up @@ -535,7 +528,6 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
health := healths[0]

if err == nil {
log.Printf("[DEBUG] Last health check was: %#v", health)
platformData, err := json.Marshal(health)
if err != nil {
log.Printf("[ERROR] Failed marshalling platform health data: %s", err)
Expand Down Expand Up @@ -949,18 +941,46 @@ func fixOpensearch() error {
return nil
}

func fixHealthSubflowParameters(ctx context.Context, workflow *Workflow) (Workflow, error) {
subflowActionId := ""
for _, action := range workflow.Actions {
if action.Label == "call_subflow" {
subflowActionId = action.ID
}
}


for i := range workflow.Triggers {
if workflow.Triggers[i].AppName == "Shuffle Workflow" {
for j := range workflow.Triggers[i].Parameters {
if workflow.Triggers[i].Parameters[j].Name == "workflow" {
workflow.Triggers[i].Parameters[j].Value = workflow.ID
}
if workflow.Triggers[i].Parameters[j].Name == "startnode" {
workflow.Triggers[i].Parameters[j].Value = subflowActionId
}
}
}
}

return *workflow, nil
}

func RunOpsWorkflow(apiKey string, orgId string, cloudRunUrl string) (WorkflowHealth, error) {
// run workflow with id 602c7cf5-500e-4bd1-8a97-aa5bc8a554e6
ctx := context.Background()

workflowHealth := WorkflowHealth{
Create: false,
BackendVersion: os.Getenv("SHUFFLE_BACKEND_VERSION"),
Run: false,
RunFinished: false,
ExecutionTook: 0,
Delete: false,
RunStatus: "",
ExecutionId: "",
WorkflowId: "",
WorkflowValidation: false,
}

baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL")
Expand Down Expand Up @@ -1026,6 +1046,7 @@ func RunOpsWorkflow(apiKey string, orgId string, cloudRunUrl string) (WorkflowHe
req.Header.Set("Org-Id", orgId)

// send the request
startTime := time.Now()
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
Expand Down Expand Up @@ -1070,23 +1091,24 @@ func RunOpsWorkflow(apiKey string, orgId string, cloudRunUrl string) (WorkflowHe
if resp.StatusCode == 200 {
workflowHealth.Run = true
workflowHealth.ExecutionId = execution.ExecutionId
workflowHealth.WorkflowValidation = execution.Workflow.Validation.Valid
}

updateOpsCache(workflowHealth)
timeout := time.After(5 * time.Minute)

if workflowHealth.Create == true {
log.Printf("[DEBUG] Deleting created ops workflow")
err = deleteOpsWorkflow(workflowHealth, apiKey, orgId)
if err != nil {
log.Printf("[ERROR] Failed deleting workflow: %s", err)
} else {
log.Printf("[DEBUG] Deleted ops workflow successfully!")
workflowHealth.Delete = true
updateOpsCache(workflowHealth)
}
}
// Removed as it was deleting the workflow before execution which
// was effecting the subflow
// if workflowHealth.Create == true {
// log.Printf("[DEBUG] Deleting created ops workflow")
// err = deleteOpsWorkflow(workflowHealth, apiKey, orgId)
// if err != nil {
// log.Printf("[ERROR] Failed deleting workflow: %s", err)
// } else {
// log.Printf("[DEBUG] Deleted ops workflow successfully!")
// workflowHealth.Delete = true
// updateOpsCache(workflowHealth)
// }
// }

// 3. Check if workflow ran successfully
// ping /api/v1/streams/results/<execution_id> while workflowHealth.RunFinished is false
Expand Down Expand Up @@ -1149,14 +1171,10 @@ func RunOpsWorkflow(apiKey string, orgId string, cloudRunUrl string) (WorkflowHe

if executionResults.Status == "FINISHED" {
log.Printf("[DEBUG] Workflow Health exeution is finished, checking it's results")

// yash asked to comment these out
// for _, r := range executionResults.Results {
// if r.Status != "SUCCESS" {
// workflowHealth.RunStatus = "FAILED"
// break
// }
// }
workflowHealth.WorkflowValidation = executionResults.Workflow.Validation.Valid
finishTime := time.Since(startTime).Seconds()
workflowHealth.ExecutionTook = finishTime
//workflowHealth = time.Since(startTime)
}


Expand All @@ -1169,6 +1187,7 @@ func RunOpsWorkflow(apiKey string, orgId string, cloudRunUrl string) (WorkflowHe
case <-timeout:
log.Printf("[ERROR] Timeout reached for workflow health check. Returning")
workflowHealth.RunStatus = "ABANDONED_BY_HEALTHCHECK"

return workflowHealth, errors.New("Timeout reached for workflow health check")
default:
// do nothing
Expand All @@ -1178,10 +1197,23 @@ func RunOpsWorkflow(apiKey string, orgId string, cloudRunUrl string) (WorkflowHe
time.Sleep(2 * time.Second)
}

// Delete junk workflows
if workflowHealth.Create == true {
log.Printf("[DEBUG] Deleting created ops workflow")
err = deleteOpsWorkflow(workflowHealth, apiKey, orgId)
if err != nil {
log.Printf("[ERROR] Failed deleting workflow: %s", err)
} else {
log.Printf("[DEBUG] Deleted ops workflow successfully!")
workflowHealth.Delete = true
updateOpsCache(workflowHealth)
}
}


// Delete junk workflows, this will remove all the healthWorkflow which failed
err = deleteJunkOpsWorkflow(ctx, workflowHealth)
if err != nil {
//log.Printf("[ERROR] Failed deleting junk workflows: %s", err)
log.Printf("[WARNING] Failed deleting junk workflows: %s", err)
}

return workflowHealth, nil
Expand Down Expand Up @@ -1265,7 +1297,8 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {
if project.Environment == "cloud" {
// url := "https://shuffler.io/api/v1/workflows/602c7cf5-500e-4bd1-8a97-aa5bc8a554e6"
// url := "https://shuffler.io/api/v1/workflows/7b729319-b395-4ba3-b497-d8246da67b1c"
url := "https://shuffler.io/api/v1/workflows/412256ca-ce62-4d20-9e55-1491548349e1"
//url := "https://shuffler.io/api/v1/workflows/412256ca-ce62-4d20-9e55-1491548349e1"
url := "https://shuffler.io/api/v1/workflows/ae89a788-a26b-4866-8a0b-ce0b31d354ea"
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Println("[ERROR] creating HTTP request:", err)
Expand Down Expand Up @@ -1443,6 +1476,12 @@ func InitOpsWorkflow(apiKey string, OrgId string) (string, error) {
workflowData.Hidden = true
workflowData.Public = false

workflowData, err = fixHealthSubflowParameters(ctx, &workflowData)
if err != nil {
log.Printf("[ERROR] Subflow parameter changing failed might create an issue.")
}


// Save the workflow: PUT http://localhost:5002/api/v1/workflows/{id}?skip_save=true
req, err = http.NewRequest("PUT", baseUrl+"/api/v1/workflows/"+workflowData.ID+"?skip_save=true", nil)
if err != nil {
Expand Down Expand Up @@ -2369,7 +2408,6 @@ func GetStaticWorkflowHealth(ctx context.Context, workflow Workflow) (Workflow,
if len(param.Value) > 0 {
fieldsFilled += 1
}

authRequired = true
break
}
Expand Down
3 changes: 3 additions & 0 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3970,7 +3970,10 @@ type AppHealth struct {
type WorkflowHealth struct {
Create bool `json:"create"`
Run bool `json:"run"`
BackendVersion string `json:"backend_version"`
RunFinished bool `json:"run_finished"`
// NOTE: This does not represent the actual time execution took, it includes the time took to send an API request for the exeution + get back the results for every action.
ExecutionTook float64 `json:"execution_took"`
RunStatus string `json:"run_status"`
Delete bool `json:"delete"`
ExecutionId string `json:"execution_id"`
Expand Down
Loading