@@ -11,7 +11,6 @@ import (
11
11
12
12
type Engine struct {
13
13
execQueue chan * Job
14
- resultQueue chan * EvalContext
15
14
clock clock.Clock
16
15
ticker * Ticker
17
16
scheduler Scheduler
@@ -25,7 +24,6 @@ func NewEngine() *Engine {
25
24
e := & Engine {
26
25
ticker : NewTicker (time .Now (), time .Second * 0 , clock .New ()),
27
26
execQueue : make (chan * Job , 1000 ),
28
- resultQueue : make (chan * EvalContext , 1000 ),
29
27
scheduler : NewScheduler (),
30
28
evalHandler : NewEvalHandler (),
31
29
ruleReader : NewRuleReader (),
@@ -39,23 +37,17 @@ func NewEngine() *Engine {
39
37
func (e * Engine ) Run (ctx context.Context ) error {
40
38
e .log .Info ("Initializing Alerting" )
41
39
42
- g , ctx := errgroup .WithContext (ctx )
40
+ alertGroup , ctx := errgroup .WithContext (ctx )
43
41
44
- g .Go (func () error { return e .alertingTicker (ctx ) })
45
- g .Go (func () error { return e .execDispatcher (ctx ) })
46
- g .Go (func () error { return e .resultDispatcher (ctx ) })
42
+ alertGroup .Go (func () error { return e .alertingTicker (ctx ) })
43
+ alertGroup .Go (func () error { return e .runJobDispatcher (ctx ) })
47
44
48
- err := g .Wait ()
45
+ err := alertGroup .Wait ()
49
46
50
47
e .log .Info ("Stopped Alerting" , "reason" , err )
51
48
return err
52
49
}
53
50
54
- func (e * Engine ) Stop () {
55
- close (e .execQueue )
56
- close (e .resultQueue )
57
- }
58
-
59
51
func (e * Engine ) alertingTicker (grafanaCtx context.Context ) error {
60
52
defer func () {
61
53
if err := recover (); err != nil {
@@ -81,69 +73,58 @@ func (e *Engine) alertingTicker(grafanaCtx context.Context) error {
81
73
}
82
74
}
83
75
84
- func (e * Engine ) execDispatcher (grafanaCtx context.Context ) error {
76
+ func (e * Engine ) runJobDispatcher (grafanaCtx context.Context ) error {
77
+ dispatcherGroup , alertCtx := errgroup .WithContext (grafanaCtx )
78
+
85
79
for {
86
80
select {
87
81
case <- grafanaCtx .Done ():
88
- close (e .resultQueue )
89
- return grafanaCtx .Err ()
82
+ return dispatcherGroup .Wait ()
90
83
case job := <- e .execQueue :
91
- go e . executeJob ( grafanaCtx , job )
84
+ dispatcherGroup . Go ( func () error { return e . processJob ( alertCtx , job ) } )
92
85
}
93
86
}
94
87
}
95
88
96
- func (e * Engine ) executeJob (grafanaCtx context.Context , job * Job ) error {
89
+ var (
90
+ unfinishedWorkTimeout time.Duration = time .Second * 5
91
+ alertTimeout time.Duration = time .Second * 30
92
+ )
93
+
94
+ func (e * Engine ) processJob (grafanaCtx context.Context , job * Job ) error {
97
95
defer func () {
98
96
if err := recover (); err != nil {
99
- e .log .Error ("Execute Alert Panic" , "error" , err , "stack" , log .Stack (1 ))
97
+ e .log .Error ("Alert Panic" , "error" , err , "stack" , log .Stack (1 ))
100
98
}
101
99
}()
102
100
103
- done := make (chan * EvalContext , 1 )
101
+ alertCtx , cancelFn := context .WithTimeout (context .TODO (), alertTimeout )
102
+
103
+ job .Running = true
104
+ evalContext := NewEvalContext (alertCtx , job .Rule )
105
+
106
+ done := make (chan struct {})
107
+
104
108
go func () {
105
- job .Running = true
106
- context := NewEvalContext (job .Rule )
107
- e .evalHandler .Eval (context )
108
- job .Running = false
109
- done <- context
109
+ e .evalHandler .Eval (evalContext )
110
+ e .resultHandler .Handle (evalContext )
110
111
close (done )
111
112
}()
112
113
114
+ var err error = nil
113
115
select {
114
-
115
116
case <- grafanaCtx .Done ():
116
- return grafanaCtx .Err ()
117
- case evalContext := <- done :
118
- e .resultQueue <- evalContext
119
- }
120
-
121
- return nil
122
- }
123
-
124
- func (e * Engine ) resultDispatcher (grafanaCtx context.Context ) error {
125
- for {
126
117
select {
127
- case <- grafanaCtx .Done ():
128
- //handle all responses before shutting down.
129
- for result := range e .resultQueue {
130
- e .handleResponse (result )
131
- }
132
-
133
- return grafanaCtx .Err ()
134
- case result := <- e .resultQueue :
135
- e .handleResponse (result )
118
+ case <- time .After (unfinishedWorkTimeout ):
119
+ cancelFn ()
120
+ err = grafanaCtx .Err ()
121
+ case <- done :
136
122
}
123
+ case <- done :
137
124
}
138
- }
139
125
140
- func (e * Engine ) handleResponse (result * EvalContext ) {
141
- defer func () {
142
- if err := recover (); err != nil {
143
- e .log .Error ("Panic in resultDispatcher" , "error" , err , "stack" , log .Stack (1 ))
144
- }
145
- }()
146
-
147
- e .log .Debug ("Alert Rule Result" , "ruleId" , result .Rule .Id , "firing" , result .Firing )
148
- e .resultHandler .Handle (result )
126
+ e .log .Debug ("Job Execution completed" , "timeMs" , evalContext .GetDurationMs (), "alertId" , evalContext .Rule .Id , "name" , evalContext .Rule .Name , "firing" , evalContext .Firing )
127
+ job .Running = false
128
+ cancelFn ()
129
+ return err
149
130
}
0 commit comments