Skip to content

Commit 3f7a56c

Browse files
committed
2 parents fff8131 + 4e7d84d commit 3f7a56c

File tree

4 files changed

+164
-92
lines changed

4 files changed

+164
-92
lines changed

blobs.go

Lines changed: 37 additions & 11 deletions
Large diffs are not rendered by default.

db-connector.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,18 @@ func GetCache(ctx context.Context, name string) (interface{}, error) {
225225
//log.Printf("[WARNING] CACHE: TOTAL SIZE FOR %s: %d", name, len(totalData))
226226
}
227227

228+
if len(totalData) == 0 {
229+
log.Printf("[ERROR] Cache payload invalid for key %s", name)
230+
return "", fmt.Errorf("Cache payload invalid for %s", name)
231+
}
232+
228233
return totalData, nil
229234
} else {
235+
if len(item.Value) == 0 {
236+
log.Printf("[ERROR] Cache payload invalid for %s", name)
237+
return "", fmt.Errorf("Cache payload invalid for %s", name)
238+
}
239+
230240
return item.Value, nil
231241
}
232242
}
@@ -301,6 +311,10 @@ func SetCache(ctx context.Context, name string, data []byte, expiration int32) e
301311
return nil
302312
}
303313

314+
if len(data) == 0 {
315+
log.Printf("[WARNING] Data is empty with key %s and expiration %d. Skipping cache", name, expiration)
316+
}
317+
304318
// Maxsize ish~
305319
name = strings.Replace(name, " ", "_", -1)
306320

@@ -804,7 +818,7 @@ func GetWorkflowExecution(ctx context.Context, id string) (*WorkflowExecution, e
804818
cacheData := []byte(cache.([]uint8))
805819
err = json.Unmarshal(cacheData, &workflowExecution)
806820

807-
if err == nil || len(workflowExecution.ExecutionId) > 0 {
821+
if (err == nil && workflowExecution != nil)|| len(workflowExecution.ExecutionId) > 0 {
808822
//log.Printf("[DEBUG] Checking individual execution cache with %d results", len(workflowExecution.Results))
809823
if strings.Contains(workflowExecution.ExecutionArgument, "Result too large to handle") {
810824
baseArgument := &ActionResult{
@@ -3345,7 +3359,7 @@ func GetWorkflow(ctx context.Context, id string, skipHealth ...bool) (*Workflow,
33453359
cache, err := GetCache(ctx, cacheKey)
33463360
if err == nil {
33473361
cacheData := []byte(cache.([]uint8))
3348-
err = json.Unmarshal(cacheData, &workflow)
3362+
err = json.Unmarshal(cacheData, workflow)
33493363
if err == nil && workflow.ID != "" {
33503364
validationData, err := GetCache(ctx, fmt.Sprintf("validation_workflow_%s", workflow.ID))
33513365
if err == nil {
@@ -3395,7 +3409,7 @@ func GetWorkflow(ctx context.Context, id string, skipHealth ...bool) (*Workflow,
33953409
return workflow, nil
33963410
}
33973411
} else {
3398-
//log.Printf("[DEBUG] Failed getting cache for workflow: %s", err)
3412+
log.Printf("[DEBUG] Failed getting cache for workflow: %s", err)
33993413
}
34003414
}
34013415

@@ -3531,12 +3545,12 @@ func GetOrgStatistics(ctx context.Context, orgId string) (*ExecutionInfo, error)
35313545
cache, err := GetCache(ctx, cacheKey)
35323546
if err == nil {
35333547
cacheData := []byte(cache.([]uint8))
3534-
err = json.Unmarshal(cacheData, &stats)
3548+
err = json.Unmarshal(cacheData, stats)
35353549
if err == nil {
35363550
return stats, nil
35373551
}
35383552
} else {
3539-
//log.Printf("[DEBUG] Failed getting cache for stats: %s", err)
3553+
log.Printf("[DEBUG] Failed getting cache for stats: %s", err)
35403554
}
35413555
}
35423556

@@ -3996,12 +4010,12 @@ func GetOrgByCreatorId(ctx context.Context, id string) (*Org, error) {
39964010
cache, err := GetCache(ctx, cacheKey)
39974011
if err == nil {
39984012
cacheData := []byte(cache.([]uint8))
3999-
err = json.Unmarshal(cacheData, &curOrg)
4013+
err = json.Unmarshal(cacheData, curOrg)
40004014
if err == nil {
40014015
return curOrg, nil
40024016
}
40034017
} else {
4004-
//log.Printf("[DEBUG] Failed getting cache for org: %s", err)
4018+
log.Printf("[DEBUG] Failed getting cache for org: %s", err)
40054019
}
40064020
}
40074021

@@ -4095,7 +4109,7 @@ func GetOrg(ctx context.Context, id string) (*Org, error) {
40954109
cache, err := GetCache(ctx, cacheKey)
40964110
if err == nil {
40974111
cacheData := []byte(cache.([]uint8))
4098-
err = json.Unmarshal(cacheData, &curOrg)
4112+
err = json.Unmarshal(cacheData, curOrg)
40994113
if err == nil {
41004114
if curOrg.Id == "" {
41014115
return curOrg, errors.New("Org doesn't exist")
@@ -4104,7 +4118,7 @@ func GetOrg(ctx context.Context, id string) (*Org, error) {
41044118
}
41054119
}
41064120
} else {
4107-
//log.Printf("[DEBUG] Failed getting cache for org: %s", err)
4121+
log.Printf("[DEBUG] Failed getting cache for org: %s", err)
41084122
}
41094123
}
41104124

health.go

Lines changed: 96 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import (
44
"bytes"
55
"context"
66
"encoding/base64"
7-
// "encoding/json"
8-
"github.com/goccy/go-json"
7+
// "encoding/json"
98
"errors"
109
"fmt"
10+
"github.com/goccy/go-json"
1111
"io"
1212
"io/ioutil"
1313
"log"
@@ -93,6 +93,12 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) {
9393
}
9494

9595
baseURL := "https://shuffler.io"
96+
var err error
97+
var url string
98+
var req *http.Request
99+
var client *http.Client
100+
var resp *http.Response
101+
var respBody []byte
96102
if os.Getenv("SHUFFLE_CLOUDRUN_URL") != "" {
97103
//log.Printf("[DEBUG] Setting the baseUrl for health check to %s", baseURL)
98104
baseURL = os.Getenv("SHUFFLE_CLOUDRUN_URL")
@@ -106,42 +112,51 @@ func RunOpsAppHealthCheck(apiKey string, orgId string) (AppHealth, error) {
106112
}
107113
}
108114

109-
url := baseURL + "/api/v1/apps/edaa73d40238ee60874a853dc3ccaa6f/config"
110-
log.Printf("[DEBUG] Getting app with URL: %s", url)
115+
app := appConfig{}
116+
if project.Environment == "onprem" {
117+
config := GetHealthAppConfig()
118+
err = json.Unmarshal([]byte(config), &app)
119+
if err != nil {
120+
log.Printf("[ERROR] Failed unmarshalling health app config blob: %s", err)
121+
return appHealth, err
122+
}
123+
} else {
124+
url = baseURL + "/api/v1/apps/edaa73d40238ee60874a853dc3ccaa6f/config"
125+
log.Printf("[DEBUG] Getting app with URL: %s", url)
111126

112-
req, err := http.NewRequest("GET", url, nil)
113-
if err != nil {
114-
log.Printf("[ERROR] Failed creating HTTP request: %s", err)
115-
return appHealth, err
116-
}
127+
req, err = http.NewRequest("GET", url, nil)
128+
if err != nil {
129+
log.Printf("[ERROR] Failed creating HTTP request: %s", err)
130+
return appHealth, err
131+
}
117132

118-
// send the request
119-
client := &http.Client{}
120-
resp, err := client.Do(req)
121-
if err != nil {
122-
log.Printf("[ERROR] Failed sending HTTP request: %s", err)
123-
return appHealth, err
124-
}
133+
// send the request
134+
client := &http.Client{}
135+
resp, err := client.Do(req)
136+
if err != nil {
137+
log.Printf("[ERROR] Failed sending HTTP request: %s", err)
138+
return appHealth, err
139+
}
125140

126-
defer resp.Body.Close()
141+
defer resp.Body.Close()
127142

128-
if resp.StatusCode != 200 {
129-
log.Printf("[ERROR] Failed getting health check app: %s. The status code was: %d", err, resp.StatusCode)
130-
return appHealth, err
131-
}
143+
if resp.StatusCode != 200 {
144+
log.Printf("[ERROR] Failed getting health check app: %s. The status code was: %d", err, resp.StatusCode)
145+
return appHealth, err
146+
}
132147

133-
respBody, err := ioutil.ReadAll(resp.Body)
134-
if err != nil {
135-
log.Printf("[ERROR] Failed readin while getting HTTP response body: %s", err)
136-
return appHealth, err
137-
}
148+
respBody, err := ioutil.ReadAll(resp.Body)
149+
if err != nil {
150+
log.Printf("[ERROR] Failed readin while getting HTTP response body: %s", err)
151+
return appHealth, err
152+
}
138153

139-
// Unmarshal the JSON data into a Workflow instance
140-
app := appConfig{}
141-
err = json.Unmarshal([]byte(respBody), &app)
142-
if err != nil {
143-
log.Printf("[ERROR] Failed unmarshalling JSON data: %s", err)
144-
return appHealth, err
154+
// Unmarshal the JSON data into a Workflow instance
155+
err = json.Unmarshal([]byte(respBody), &app)
156+
if err != nil {
157+
log.Printf("[ERROR] Failed unmarshalling JSON data: %s", err)
158+
return appHealth, err
159+
}
145160
}
146161

147162
if app.Success == false {
@@ -709,7 +724,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
709724

710725
// Use channel for getting RunOpsWorkflow function results
711726
workflowHealthChannel := make(chan WorkflowHealth)
712-
errorChannel := make(chan error)
727+
errorChannel := make(chan error, 6)
713728
go func() {
714729
if debug {
715730
log.Printf("[DEBUG] Running workflowHealthChannel goroutine")
@@ -732,6 +747,9 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
732747
opensearchHealth, err := RunOpensearchOps(ctx)
733748
if err != nil {
734749
log.Printf("[ERROR] Failed running opensearch health check: %s", err)
750+
opensearchHealthChannel <- opensearchapi.ClusterHealthResp{}
751+
errorChannel <- err
752+
return
735753
}
736754

737755
opensearchHealthChannel <- *opensearchHealth
@@ -742,19 +760,42 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
742760
}
743761

744762
// TODO: More testing for onprem health checks
745-
if project.Environment == "cloud" {
746-
openapiAppHealthChannel := make(chan AppHealth)
747-
go func() {
748-
appHealth, err := RunOpsAppHealthCheck(apiKey, orgId)
749-
if err != nil {
750-
log.Printf("[ERROR] Failed running app health check: %s", err)
751-
}
763+
openapiAppHealthChannel := make(chan AppHealth)
764+
go func() {
765+
appHealth, err := RunOpsAppHealthCheck(apiKey, orgId)
766+
if err != nil {
767+
log.Printf("[ERROR] Failed running app health check: %s", err)
768+
}
752769

753-
appHealth.Result = ""
754-
openapiAppHealthChannel <- appHealth
755-
errorChannel <- err
756-
}()
770+
appHealth.Result = ""
771+
openapiAppHealthChannel <- appHealth
772+
errorChannel <- err
773+
}()
774+
775+
datastoreHealthChannel := make(chan DatastoreHealth)
776+
go func() {
777+
datastoreHealth, err := RunOpsDatastore(apiKey, orgId)
778+
if err != nil {
779+
log.Printf("[ERROR] Failed running datastore health check: %s", err)
780+
}
781+
782+
datastoreHealthChannel <- datastoreHealth
783+
errorChannel <- err
784+
}()
785+
786+
fileHealthChannel := make(chan FileHealth)
787+
go func() {
788+
fileHealth, err := RunOpsFile(apiKey, orgId)
789+
if err != nil {
790+
log.Printf("[ERROR] Failed running file health check: %s", err)
791+
}
757792

793+
fileHealthChannel <- fileHealth
794+
errorChannel <- err
795+
}()
796+
797+
if project.Environment == "cloud" {
798+
// App upload via zip is not supported in self-hosted machine yet
758799
pythonAppHealthChannel := make(chan AppHealth)
759800
go func() {
760801
pythonAppHealth, err := RunOpsAppUpload(apiKey, orgId)
@@ -766,38 +807,24 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
766807
errorChannel <- err
767808
}()
768809

769-
datastoreHealthChannel := make(chan DatastoreHealth)
770-
go func() {
771-
datastoreHealth, err := RunOpsDatastore(apiKey, orgId)
772-
if err != nil {
773-
log.Printf("[ERROR] Failed running datastore health check: %s", err)
774-
}
775-
776-
datastoreHealthChannel <- datastoreHealth
777-
errorChannel <- err
778-
}()
779-
780-
fileHealthChannel := make(chan FileHealth)
781-
go func() {
782-
fileHealth, err := RunOpsFile(apiKey, orgId)
783-
if err != nil {
784-
log.Printf("[ERROR] Failed running file health check: %s", err)
785-
}
786-
787-
fileHealthChannel <- fileHealth
788-
errorChannel <- err
789-
}()
790810

791811
// Use channel for getting RunOpsWorkflow function results
792-
platformHealth.Apps = <-openapiAppHealthChannel
793812
platformHealth.PythonApps = <-pythonAppHealthChannel
794-
platformHealth.Datastore = <-datastoreHealthChannel
795-
platformHealth.FileOps = <-fileHealthChannel
796813
}
797814

815+
platformHealth.Datastore = <-datastoreHealthChannel
816+
platformHealth.FileOps = <-fileHealthChannel
817+
platformHealth.Apps = <-openapiAppHealthChannel
798818
platformHealth.Workflows = <-workflowHealthChannel
799819
err = <-errorChannel
800820

821+
if project.Environment != "cloud" {
822+
select {
823+
case <-errorChannel:
824+
default:
825+
}
826+
}
827+
801828
if err != nil {
802829
if err.Error() == "High number of requests. Try again later" {
803830
log.Printf("[DEBUG] High number of requests sent to the backend. Skipping this run.")
@@ -1045,6 +1072,7 @@ func deleteOpsWorkflow(workflowHealth WorkflowHealth, apiKey string, orgId strin
10451072
// send the request
10461073
client := &http.Client{}
10471074
resp, err := client.Do(req)
1075+
10481076
if err != nil {
10491077
log.Printf("[ERROR] Failed deleting the health check workflow with HTTP request: %s", err)
10501078
return err

structs.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4374,10 +4374,14 @@ type HealthCheck struct {
43744374
}
43754375

43764376
type HealthCheckDB struct {
4377-
Success bool `json:"success"`
4378-
Updated int64 `json:"updated"`
4379-
Workflows WorkflowHealth `json:"workflows"`
4380-
ID string `json:"id"`
4377+
Success bool `json:"success"`
4378+
Updated int64 `json:"updated"`
4379+
Workflows WorkflowHealth `json:"workflows"`
4380+
Opensearch opensearchapi.ClusterHealthResp `json:"opnsearch"`
4381+
Datastore DatastoreHealth `json:"datastore"`
4382+
FileOps FileHealth `json:"fileops"`
4383+
Apps AppHealth `json:"apps"`
4384+
ID string `json:"id"`
43814385
}
43824386

43834387
type NodeData struct {

0 commit comments

Comments
 (0)