Skip to content

Commit a972901

Browse files
committed
Added distributed possibility for memcached. Not optimal, but required for HA.
1 parent 026f124 commit a972901

File tree

2 files changed

+19
-31
lines changed

2 files changed

+19
-31
lines changed

db-connector.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8330,7 +8330,6 @@ func SetWorkflow(ctx context.Context, workflow Workflow, id string, optionalEdit
83308330
}
83318331

83328332
// Find the key for "workflows_<workflow.org_id>" and update the cache for this one. If it doesn't exist, add it
8333-
83348333
// Get the cache for the workflows
83358334
cacheKey = fmt.Sprintf("%s_workflows", workflow.OrgId)
83368335
cache, err := GetCache(ctx, cacheKey)
@@ -11999,6 +11998,21 @@ func RunInit(dbclient datastore.Client, storageClient storage.Client, gceProject
1199911998
// In case of downtime / large requests
1200011999
if len(memcached) > 0 {
1200112000
mc.Timeout = 10 * time.Second
12001+
12002+
12003+
if strings.Contains(memcached, ",") {
12004+
12005+
newMemcached := []string{}
12006+
for _, memcached := range strings.Split(memcached, ",") {
12007+
memcached = strings.TrimSpace(memcached)
12008+
if len(memcached) > 0 {
12009+
newMemcached = append(newMemcached, memcached)
12010+
}
12011+
}
12012+
12013+
log.Printf("[DEBUG] Multiple memcached servers detected. Split into %#v", newMemcached)
12014+
mc = gomemcache.New(newMemcached...)
12015+
}
1200212016
}
1200312017

1200412018
requestCache = cache.New(35*time.Minute, 35*time.Minute)
@@ -13008,7 +13022,7 @@ func ValidateFinished(ctx context.Context, extra int, workflowExecution Workflow
1300813022

1300913023
workflowExecution, _ = Fixexecution(ctx, workflowExecution)
1301013024
//if rand.Intn(5) == 1 || len(workflowExecution.Results) >= len(workflowExecution.Workflow.Actions) {
13011-
log.Printf("[INFO][%s] Workflow Validation. Status: %s, Actions: %d, Extra: %d, Results: %d\n", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Workflow.Actions), extra, len(workflowExecution.Results))
13025+
log.Printf("[INFO][%s] Workflow Finished Check. Status: %s, Actions: %d, Extra: %d, Results: %d\n", workflowExecution.ExecutionId, workflowExecution.Status, len(workflowExecution.Workflow.Actions), extra, len(workflowExecution.Results))
1301213026
if len(workflowExecution.Results) >= len(workflowExecution.Workflow.Actions)+extra && len(workflowExecution.Workflow.Actions) > 0 {
1301313027
validResults := 0
1301413028
invalidResults := 0

shared.go

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22474,35 +22474,7 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
2247422474
})
2247522475
}
2247622476

22477-
// Commented out as we don't want to stop the app, but just continue with the old tokens
22478-
/*
22479-
actionRes := ActionResult{
22480-
Action: action,
22481-
ExecutionId: workflowExecution.ExecutionId,
22482-
Authorization: workflowExecution.Authorization,
22483-
Result: fmt.Sprintf(`{"success": false, "reason": "Failed running oauth2 request to refresh oauth2 tokens for this app."}`),
22484-
StartedAt: workflowExecution.StartedAt,
22485-
CompletedAt: workflowExecution.StartedAt,
22486-
Status: "FAILURE",
22487-
}
22488-
22489-
workflowExecution.Results = append(workflowExecution.Results, actionRes)
22490-
cacheData := []byte("1")
22491-
22492-
newExecId := fmt.Sprintf("%s_%s", workflowExecution.ExecutionId, action.ID)
22493-
err = SetCache(ctx, newExecId, cacheData, 2)
22494-
if err != nil {
22495-
log.Printf("[WARNING] Failed setting base cache for failed Oauth2 action %s: %s", newExecId, err)
22496-
}
22497-
22498-
b, err := json.Marshal(actionRes)
22499-
if err == nil {
22500-
err = SetCache(ctx, fmt.Sprintf("%s_result", newExecId), b, 2)
22501-
if err != nil {
22502-
log.Printf("[WARNING] Failed setting result cache for failed Oauth2 action %s: %s", newExecId, err)
22503-
}
22504-
}
22505-
*/
22477+
// FIXME: There used to be code here to stop the app, but for now we just continue with the old tokens
2250622478
}
2250722479

2250822480
allAuths[authIndex] = newAuth
@@ -30437,6 +30409,8 @@ func checkExecutionStatus(ctx context.Context, exec *WorkflowExecution) *Workflo
3043730409

3043830410
workflow.Validation.TotalProblems = len(workflow.Validation.Errors) + len(workflow.Validation.SubflowApps)
3043930411

30412+
log.Printf("\n\n\nVALIDATION RUNNING\n\n\n")
30413+
3044030414
// Updating the workflow to show the right status every time for now
3044130415
workflowChanged = true
3044230416
workflow.Validation.ValidationRan = true

0 commit comments

Comments
 (0)