Skip to content

Commit 09ae103

Browse files
committed
eventscheduler: Add SessionCommand{Begin,End} and SessionEnd lifecycle callbacks for SQL sessions used to call event queries.
1 parent 3f5bb8c commit 09ae103

File tree

2 files changed

+67
-33
lines changed

2 files changed

+67
-33
lines changed

eventscheduler/event_executor.go

Lines changed: 64 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -75,34 +75,57 @@ func (ee *eventExecutor) start() {
7575
for {
7676
time.Sleep(pollingDuration)
7777

78-
ctx, _, err := ee.ctxGetterFunc()
79-
if err != nil {
80-
logrus.Errorf("unable to create context for event executor: %s", err)
81-
continue
82-
}
78+
type behavior int
79+
const (
80+
cont behavior = iota
81+
ret
82+
run
83+
)
84+
85+
var timeNow, nextAt time.Time
86+
87+
behave := func() behavior {
88+
ctx, _, err := ee.ctxGetterFunc()
89+
sql.SessionCommandBegin(ctx.Session)
90+
defer sql.SessionCommandEnd(ctx.Session)
91+
defer sql.SessionEnd(ctx.Session)
92+
if err != nil {
93+
logrus.Errorf("unable to create context for event executor: %s", err)
94+
return cont
95+
}
8396

84-
needsToReloadEvents, err := ee.needsToReloadEvents(ctx)
85-
if err != nil {
86-
ctx.GetLogger().Errorf("unable to determine if events need to be reloaded: %s", err)
87-
}
88-
if needsToReloadEvents {
89-
err := ee.loadAllEvents(ctx)
97+
needsToReloadEvents, err := ee.needsToReloadEvents(ctx)
9098
if err != nil {
91-
ctx.GetLogger().Errorf("unable to reload events: %s", err)
99+
ctx.GetLogger().Errorf("unable to determine if events need to be reloaded: %s", err)
100+
}
101+
if needsToReloadEvents {
102+
err := ee.loadAllEvents(ctx)
103+
if err != nil {
104+
ctx.GetLogger().Errorf("unable to reload events: %s", err)
105+
}
92106
}
93-
}
94107

95-
timeNow := time.Now()
96-
if ee.stop.Load() {
97-
logrus.Trace("Stopping eventExecutor")
98-
return
99-
} else if ee.list.len() == 0 {
100-
continue
101-
}
108+
timeNow = time.Now()
109+
if ee.stop.Load() {
110+
logrus.Trace("Stopping eventExecutor")
111+
return ret
112+
} else if ee.list.len() == 0 {
113+
return cont
114+
}
102115

103-
// safeguard list entry getting removed while in check
104-
nextAt, ok := ee.list.getNextExecutionTime()
105-
if !ok {
116+
// safeguard list entry getting removed while in check
117+
var ok bool
118+
nextAt, ok = ee.list.getNextExecutionTime()
119+
if !ok {
120+
return cont
121+
}
122+
123+
return run
124+
}()
125+
126+
if behave == ret {
127+
return
128+
} else if behave == cont {
106129
continue
107130
}
108131

@@ -117,22 +140,27 @@ func (ee *eventExecutor) start() {
117140
} else if secondsUntilExecution <= 0.0000001 {
118141
curEvent := ee.list.pop()
119142
if curEvent != nil {
120-
ctx.GetLogger().Debugf("Executing event %s, seconds until execution: %f", curEvent.name(), secondsUntilExecution)
121143
ctx, commit, err := ee.ctxGetterFunc()
122144
if err != nil {
123145
ctx.GetLogger().Errorf("Received error '%s' getting ctx in event scheduler", err)
124146
}
125-
err = ee.executeEventAndUpdateList(ctx, curEvent, timeNow)
126-
if err != nil {
127-
ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name)
128-
}
129-
err = commit()
130-
if err != nil {
131-
ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name)
132-
}
147+
func() {
148+
sql.SessionCommandBegin(ctx.Session)
149+
defer sql.SessionCommandEnd(ctx.Session)
150+
defer sql.SessionEnd(ctx.Session)
151+
ctx.GetLogger().Debugf("Executing event %s, seconds until execution: %f", curEvent.name(), secondsUntilExecution)
152+
err := ee.executeEventAndUpdateList(ctx, curEvent, timeNow)
153+
if err != nil {
154+
ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name)
155+
}
156+
err = commit()
157+
if err != nil {
158+
ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name)
159+
}
160+
}()
133161
}
134162
} else {
135-
ctx.GetLogger().Tracef("Not executing event %s yet, seconds until execution: %f", ee.list.peek().name(), secondsUntilExecution)
163+
logrus.Tracef("Not executing event %s yet, seconds until execution: %f", ee.list.peek().name(), secondsUntilExecution)
136164
}
137165
}
138166
}
@@ -294,6 +322,9 @@ func (ee *eventExecutor) reevaluateEvent(edb sql.EventDatabase, event sql.EventD
294322
if err != nil {
295323
ctx.GetLogger().Errorf("Received error '%s' getting ctx in event scheduler", err)
296324
}
325+
sql.SessionCommandBegin(ctx.Session)
326+
defer sql.SessionCommandEnd(ctx.Session)
327+
defer sql.SessionEnd(ctx.Session)
297328

298329
newEvent, created, err := newEnabledEvent(ctx, edb, event, time.Now())
299330
if err != nil {

eventscheduler/event_scheduler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ func InitEventScheduler(
9191
if err != nil {
9292
return nil, err
9393
}
94+
sql.SessionCommandBegin(ctx.Session)
95+
defer sql.SessionCommandEnd(ctx.Session)
96+
defer sql.SessionEnd(ctx.Session)
9497
err = es.loadEventsAndStartEventExecutor(ctx, a)
9598
if err != nil {
9699
return nil, err

0 commit comments

Comments
 (0)