Skip to content

Commit 84014ed

Browse files
authored
Merge pull request #132 from LalitDeore/schedules_issue
Fix- Scheduling issue
2 parents 7ea141c + c191cb2 commit 84014ed

File tree

1 file changed

+139
-23
lines changed

1 file changed

+139
-23
lines changed

shared.go

Lines changed: 139 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"sort"
1919
"sync"
2020

21+
scheduler "cloud.google.com/go/scheduler/apiv1"
22+
"cloud.google.com/go/scheduler/apiv1/schedulerpb"
2123
"gopkg.in/yaml.v3"
2224

2325
//"os/exec"
@@ -4949,6 +4951,33 @@ func HandleGetTriggers(resp http.ResponseWriter, request *http.Request) {
49494951
}
49504952
}
49514953

4954+
if project.Environment == "cloud" {
4955+
var wg sync.WaitGroup
4956+
scheduleMutex := sync.Mutex{}
4957+
4958+
for index, schedule := range allSchedules {
4959+
wg.Add(1)
4960+
go func(index int, schedule ScheduleOld) {
4961+
defer wg.Done()
4962+
4963+
// Check if the schedule exist in the gcp
4964+
GcpSchedule, err := GetGcpSchedule(ctx, schedule.Id)
4965+
4966+
// Use mutex to safely update the schedule status
4967+
scheduleMutex.Lock()
4968+
if err != nil {
4969+
allSchedules[index].Status = "stopped"
4970+
} else {
4971+
allSchedules[index].Status = GcpSchedule.Status
4972+
}
4973+
4974+
scheduleMutex.Unlock()
4975+
}(index, schedule)
4976+
}
4977+
4978+
wg.Wait()
4979+
}
4980+
49524981
sort.SliceStable(allHooks, func(i, j int) bool {
49534982
return allHooks[i].Info.Name < allHooks[j].Info.Name
49544983
})
@@ -4977,6 +5006,65 @@ func HandleGetTriggers(resp http.ResponseWriter, request *http.Request) {
49775006
resp.Write(newjson)
49785007
}
49795008

5009+
func GetGcpSchedule(ctx context.Context, id string) (*ScheduleOld, error) {
5010+
5011+
// Check if we have the schedule in cache
5012+
cacheData, err := GetCache(ctx, fmt.Sprintf("schedule-%s", id))
5013+
if err == nil {
5014+
data, ok := cacheData.([]byte)
5015+
if !ok {
5016+
log.Printf("[ERROR] Cache data for %s is not of type []byte", id)
5017+
} else {
5018+
schedule := &ScheduleOld{}
5019+
err = json.Unmarshal(data, schedule)
5020+
if err != nil {
5021+
log.Printf("[ERROR] Failed to unmarshal schedule cache for %s: %s", id, err)
5022+
} else {
5023+
return schedule, nil
5024+
}
5025+
}
5026+
}
5027+
5028+
schedule := &ScheduleOld{}
5029+
c, err := scheduler.NewCloudSchedulerClient(ctx)
5030+
if err != nil {
5031+
log.Printf("[ERROR] Client error: %s", err)
5032+
return schedule, err
5033+
}
5034+
location := "europe-west2"
5035+
if len(os.Getenv("SHUFFLE_GCEPROJECT")) > 0 && len(os.Getenv("SHUFFLE_GCEPROJECT_LOCATION")) > 0 {
5036+
location = os.Getenv("SHUFFLE_GCEPROJECT_LOCATION")
5037+
}
5038+
req := &schedulerpb.GetJobRequest{
5039+
Name: fmt.Sprintf("projects/%s/locations/%s/jobs/schedule_%s", gceProject, location, id),
5040+
}
5041+
resp, err := c.GetJob(ctx, req)
5042+
if err != nil {
5043+
log.Printf("[ERROR] Failed getting schedule %s: %s", id, err)
5044+
return schedule, err
5045+
}
5046+
schedule.Id = id
5047+
schedule.Name = resp.Name
5048+
if resp.State == schedulerpb.Job_ENABLED {
5049+
schedule.Status = "running"
5050+
} else {
5051+
schedule.Status = "stopped"
5052+
}
5053+
5054+
// Set cache for 5 minutes just to make it fast
5055+
scheduleJSON, err := json.Marshal(schedule)
5056+
if err != nil {
5057+
log.Printf("[ERROR] Failed to marshal schedule for cache: %s", err)
5058+
return schedule, err
5059+
}
5060+
err = SetCache(ctx, fmt.Sprintf("schedule-%s", id), scheduleJSON, 300)
5061+
if err != nil {
5062+
log.Printf("[ERROR] Failed setting cache for schedule %s: %s", id, err)
5063+
}
5064+
5065+
return schedule, nil
5066+
}
5067+
49805068
func HandleGetSchedules(resp http.ResponseWriter, request *http.Request) {
49815069
cors := HandleCors(resp, request)
49825070
if cors {
@@ -6137,7 +6225,7 @@ func diffWorkflowWrapper(parentWorkflow Workflow) Workflow {
61376225
}
61386226

61396227
func subflowPropagationWrapper(parentWorkflow Workflow, childWorkflow Workflow, parentTrigger Trigger) Trigger {
6140-
// remember: when this function is used, the parent trigger is passed to
6228+
// remember: when this function is used, the parent trigger is passed to
61416229
// create the new child trigger.
61426230
trigger := parentTrigger
61436231

@@ -6179,13 +6267,13 @@ func subflowPropagationWrapper(parentWorkflow Workflow, childWorkflow Workflow,
61796267
alreadyPropagatedSubflow := ""
61806268

61816269
for _, workflow := range childOrgWorkflows {
6182-
// this means that the subflow has been propagated to
6270+
// this means that the subflow has been propagated to
61836271
// child workflow already. no need to complicate things further.
61846272
if workflow.ParentWorkflowId == parentSubflowPointedId {
61856273
propagatedEarlier = true
61866274
alreadyPropagatedSubflow = workflow.ID
61876275
break
6188-
}
6276+
}
61896277
}
61906278

61916279
if propagatedEarlier {
@@ -6280,7 +6368,6 @@ func subflowPropagationWrapper(parentWorkflow Workflow, childWorkflow Workflow,
62806368
return trigger
62816369
}
62826370

6283-
62846371
func deleteScheduleGeneral(ctx context.Context, scheduleId string) error {
62856372
schedule, err := GetSchedule(ctx, scheduleId)
62866373
if err != nil {
@@ -6322,7 +6409,7 @@ func deleteScheduleGeneral(ctx context.Context, scheduleId string) error {
63226409
} else if project.Environment == "cloud" && schedule.Environment == "onprem" {
63236410
// hybrid case
63246411
// TODO: to be handled
6325-
} else if project.Environment == "onprem" && (schedule.Environment == "cloud" ) {
6412+
} else if project.Environment == "onprem" && (schedule.Environment == "cloud") {
63266413
scheduleWorkflow, err := GetWorkflow(ctx, schedule.WorkflowId)
63276414
if err != nil {
63286415
log.Printf("[WARNING] Failed getting schedule workflow %s: %s", schedule.WorkflowId, err)
@@ -6363,13 +6450,12 @@ func deleteScheduleGeneral(ctx context.Context, scheduleId string) error {
63636450
return nil
63646451
}
63656452

6366-
63676453
func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
63686454
// Check if there is a difference in actions, and what they are
63696455
// Check if there is a difference in triggers, and what they are
63706456
// Check if there is a difference in branches, and what they are
63716457

6372-
// We create a new ID for each trigger.
6458+
// We create a new ID for each trigger.
63736459
// Older ID is stored in trigger.ReplacementForTrigger
63746460
nameChanged := false
63756461
descriptionChanged := false
@@ -6563,7 +6649,7 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
65636649
}
65646650
}
65656651

6566-
// checks if parentWorkflow removed a trigger
6652+
// checks if parentWorkflow removed a trigger
65676653
// that was distributed to child workflow.
65686654
for _, oldAction := range oldWorkflow.Triggers {
65696655
if !oldAction.ParentControlled {
@@ -6607,11 +6693,11 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
66076693
// continue
66086694
// }
66096695

6610-
if oldAction.ReplacementForTrigger != newAction.ID {
6696+
if oldAction.ReplacementForTrigger != newAction.ID {
66116697
continue
66126698
}
66136699

6614-
changeType, changed := hasTriggerChanged(newAction, oldAction)
6700+
changeType, changed := hasTriggerChanged(newAction, oldAction)
66156701
if changed {
66166702
log.Printf("[DEBUG] Trigger %s (%s) has changed in '%s'", newAction.Label, newAction.ID, changeType)
66176703
// updatedTriggers always has parent workflow's new trigger.
@@ -6923,7 +7009,7 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
69237009
newChildTriggers := childTriggers
69247010
for _, trigger := range childWorkflow.Triggers {
69257011
if ArrayContains(removedTriggers, trigger.ID) {
6926-
// while removing triggers,
7012+
// while removing triggers,
69277013
// make sure to stop them as well
69287014

69297015
// need to handle this better
@@ -6935,7 +7021,7 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
69357021
hook, err := GetHook(ctx, trigger.ID)
69367022
if err == nil {
69377023
// this anyhow, means it is a webhook
6938-
err = DeleteKey(ctx, "hooks", hook.Id)
7024+
err = DeleteKey(ctx, "hooks", hook.Id)
69397025
if err != nil {
69407026
log.Printf("[WARNING] Failed deleting hook: %s", err)
69417027
}
@@ -6986,7 +7072,7 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
69867072
// FIXME:
69877073
// Make sure it changes things such as URL & references properly
69887074
if action.TriggerType == "WEBHOOK" {
6989-
// make sure to only override: name, label, position,
7075+
// make sure to only override: name, label, position,
69907076
// app_version, startnode and nothing else
69917077

69927078
childWorkflow.Triggers[index].Name = action.Name
@@ -7023,12 +7109,12 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
70237109
childWorkflow.Triggers[index].AppVersion = action.AppVersion
70247110

70257111
// essentially, now we try to verify:
7026-
// okay, new workflow? we see it's a subflow that's
7112+
// okay, new workflow? we see it's a subflow that's
70277113
// what changed? is it the workflow?
70287114

70297115
action = subflowPropagationWrapper(parentWorkflow, childWorkflow, action)
70307116
childWorkflow.Triggers[index].Parameters = action.Parameters
7031-
break
7117+
break
70327118
}
70337119

70347120
childWorkflow.Triggers[index] = action
@@ -10477,6 +10563,38 @@ func GetSpecificWorkflow(resp http.ResponseWriter, request *http.Request) {
1047710563
}
1047810564
}
1047910565

10566+
//Check if workflow trigger schedule is in sync with the gcp cron job
10567+
if workflow.Triggers != nil {
10568+
var wg sync.WaitGroup
10569+
triggerMutex := sync.Mutex{}
10570+
10571+
for index, trigger := range workflow.Triggers {
10572+
if trigger.TriggerType == "SCHEDULE" {
10573+
wg.Add(1)
10574+
go func(index int, trigger Trigger) {
10575+
defer wg.Done()
10576+
10577+
// Check if the schedule is in sync with the gcp cron job
10578+
GcpSchedule, err := GetGcpSchedule(ctx, trigger.ID)
10579+
if err != nil {
10580+
log.Printf("[ERROR] Failed getting gcp schedule for trigger %s: %s", trigger.ID, err)
10581+
10582+
triggerMutex.Lock()
10583+
workflow.Triggers[index].Status = "stopped"
10584+
triggerMutex.Unlock()
10585+
} else {
10586+
triggerMutex.Lock()
10587+
workflow.Triggers[index].Status = GcpSchedule.Status
10588+
triggerMutex.Unlock()
10589+
}
10590+
}(index, trigger)
10591+
}
10592+
}
10593+
10594+
wg.Wait()
10595+
SetWorkflow(ctx, *workflow, workflow.ID)
10596+
}
10597+
1048010598
log.Printf("[INFO] Got new version of workflow %s (%s) for org %s and user %s (%s). Actions: %d, Triggers: %d", workflow.Name, workflow.ID, user.ActiveOrg.Id, user.Username, user.Id, len(workflow.Actions), len(workflow.Triggers))
1048110599

1048210600
body, err := json.Marshal(workflow)
@@ -25726,7 +25844,7 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) {
2572625844

2572725845
//RunAiQuery(systemMessage, userMessage)
2572825846

25729-
partialMatch := true
25847+
partialMatch := true
2573025848
availableLabels := []string{}
2573125849

2573225850
matchName := strings.ReplaceAll(strings.ToLower(strings.TrimSpace(value.AppName)), " ", "_")
@@ -25789,11 +25907,11 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) {
2578925907
log.Printf("[DEBUG] Found app - checking label: %s vs %s (%s)", app.Name, value.AppName, app.ID)
2579025908
//selectedAction, selectedCategory, availableLabels = GetActionFromLabel(ctx, selectedApp, value.Label, true)
2579125909
selectedAction, selectedCategory, availableLabels = GetActionFromLabel(ctx, app, value.Label, true)
25792-
partialMatch = false
25910+
partialMatch = false
2579325911

2579425912
break
2579525913

25796-
// Finds a random match, but doesn't break in case it finds exact
25914+
// Finds a random match, but doesn't break in case it finds exact
2579725915
} else if selectedApp.ID == "" && len(matchName) > 0 && (strings.Contains(appName, matchName) || strings.Contains(matchName, appName)) {
2579825916
selectedApp = app
2579925917

@@ -26450,7 +26568,6 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) {
2645026568

2645126569
client := GetExternalClient(baseUrl)
2645226570

26453-
2645426571
selectedAction.AppName = selectedApp.Name
2645526572
selectedAction.AppID = selectedApp.ID
2645626573
selectedAction.AppVersion = selectedApp.AppVersion
@@ -26829,7 +26946,6 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) {
2682926946
return
2683026947
}
2683126948

26832-
2683326949
// Ensures frontend has something to debug if things go wrong
2683426950
for key, value := range newresp.Header {
2683526951
if strings.HasSuffix(strings.ToLower(key), "-url") {
@@ -26858,7 +26974,7 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) {
2685826974

2685926975
httpOutput, marshalledBody, httpParseErr := FindHttpBody(apprunBody)
2686026976
//log.Printf("\n\nGOT RESPONSE (%d): %s. STATUS: %d\n\n", newresp.StatusCode, string(apprunBody), httpOutput.Status)
26861-
if successStruct.Success == false && len(successStruct.Reason) > 0 && httpOutput.Status == 0 && strings.Contains(strings.ReplaceAll(string(apprunBody), " ", ""), `"success":false`){
26977+
if successStruct.Success == false && len(successStruct.Reason) > 0 && httpOutput.Status == 0 && strings.Contains(strings.ReplaceAll(string(apprunBody), " ", ""), `"success":false`) {
2686226978
log.Printf("[WARNING][AI] Failed running app %s (%s). Contact support. Reason: %s", selectedAction.Name, selectedAction.AppID, successStruct.Reason)
2686326979

2686426980
resp.WriteHeader(400)
@@ -27481,7 +27597,7 @@ func GetActionFromLabel(ctx context.Context, app WorkflowApp, label string, fixL
2748127597
}
2748227598

2748327599
if newLabel == lowercaseLabel {
27484-
exactMatch = true
27600+
exactMatch = true
2748527601
break
2748627602
}
2748727603
}
@@ -27492,7 +27608,7 @@ func GetActionFromLabel(ctx context.Context, app WorkflowApp, label string, fixL
2749227608
}
2749327609
}
2749427610

27495-
// Decides if we are to autocomplete the app if labels are not found
27611+
// Decides if we are to autocomplete the app if labels are not found
2749627612
if len(selectedAction.ID) == 0 {
2749727613
if fixLabels == true {
2749827614
//log.Printf("\n\n[DEBUG] Action not found in app %s (%s) for label '%s'. Autodiscovering and updating the app!!!\n\n", app.Name, app.ID, label)

0 commit comments

Comments
 (0)