-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscheduler.go
More file actions
135 lines (106 loc) · 2.91 KB
/
scheduler.go
File metadata and controls
135 lines (106 loc) · 2.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package main
import (
"encoding/json"
"fmt"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/robfig/cron/v3"
)
var (
jobResultsLock = sync.Mutex{}
jobResults = map[string]JobResult{}
)
type JobResult struct {
JobName string
JobType string
Success bool
LastError error
Message string
}
func (r JobResult) Format() string {
return fmt.Sprintf("%s %s ok? %v\n\n%+v", r.JobName, r.JobType, r.Success, r.LastError)
}
func JobComplete(result JobResult) {
fmt.Printf("Completed job %+v\n", result)
jobResultsLock.Lock()
jobResults[result.JobName] = result
jobResultsLock.Unlock()
}
func writeJobResult(writer http.ResponseWriter, jobName string) {
writer.Header().Set("Content-Type", "application/json")
if jobResult, ok := jobResults[jobName]; ok {
if !jobResult.Success {
// Set a 503 status code if the last job run was not successful
writer.WriteHeader(http.StatusServiceUnavailable)
}
// Set message from LastError if available
if jobResult.LastError != nil {
jobResult.Message = jobResult.LastError.Error()
}
// Write the job result as JSON
if err := json.NewEncoder(writer).Encode(jobResult); err != nil {
// If encoding fails, write an error message
_, _ = writer.Write(fmt.Appendf(nil, "failed writing json for %s", jobResult.Format()))
}
} else {
// Job not found
writer.WriteHeader(http.StatusNotFound)
_, _ = writer.Write([]byte("{\"Message\": \"Unknown job\"}"))
}
}
// HealthHandleFunc handles health check requests
func HealthHandleFunc(writer http.ResponseWriter, request *http.Request) {
query := request.URL.Query()
if jobName, ok := query["job"]; ok {
writeJobResult(writer, jobName[0])
return
}
_, _ = writer.Write([]byte("ok"))
}
func RunHTTPHandlers(addr string) error {
http.HandleFunc("/health", HealthHandleFunc)
http.Handle("/metrics", promhttp.HandlerFor(
Metrics.Registry,
promhttp.HandlerOpts{Registry: Metrics.Registry}, //nolint:exhaustruct
))
return fmt.Errorf("error on http server: %w", http.ListenAndServe(addr, nil)) //#nosec: g114
}
func ScheduleAndRunJobs(jobs []Job) error {
signalChan := make(chan os.Signal, 1)
signal.Notify(
signalChan,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT,
)
scheduler := cron.New()
for _, job := range jobs {
fmt.Println("Scheduling", job.Name)
if _, err := scheduler.AddJob(job.Schedule, job); err != nil {
return fmt.Errorf("error scheduling job %s: %w", job.Name, err)
}
}
scheduler.Start()
switch <-signalChan {
case syscall.SIGINT:
fmt.Println("Stopping now...")
defer scheduler.Stop()
return nil
case syscall.SIGTERM:
fallthrough
case syscall.SIGQUIT:
// Wait for all jobs to complete
fmt.Println("Stopping after running jobs complete...")
defer func() {
ctx := scheduler.Stop()
<-ctx.Done()
fmt.Println("All jobs successfully stopped")
}()
return nil
}
return nil
}