Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 64 additions & 33 deletions eventscheduler/event_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,34 +75,57 @@ func (ee *eventExecutor) start() {
for {
time.Sleep(pollingDuration)

ctx, _, err := ee.ctxGetterFunc()
if err != nil {
logrus.Errorf("unable to create context for event executor: %s", err)
continue
}
type behavior int
const (
cont behavior = iota
ret
run
)

var timeNow, nextAt time.Time

behave := func() behavior {
ctx, _, err := ee.ctxGetterFunc()
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
defer sql.SessionEnd(ctx.Session)
if err != nil {
logrus.Errorf("unable to create context for event executor: %s", err)
return cont
}

needsToReloadEvents, err := ee.needsToReloadEvents(ctx)
if err != nil {
ctx.GetLogger().Errorf("unable to determine if events need to be reloaded: %s", err)
}
if needsToReloadEvents {
err := ee.loadAllEvents(ctx)
needsToReloadEvents, err := ee.needsToReloadEvents(ctx)
if err != nil {
ctx.GetLogger().Errorf("unable to reload events: %s", err)
ctx.GetLogger().Errorf("unable to determine if events need to be reloaded: %s", err)
}
if needsToReloadEvents {
err := ee.loadAllEvents(ctx)
if err != nil {
ctx.GetLogger().Errorf("unable to reload events: %s", err)
}
}
}

timeNow := time.Now()
if ee.stop.Load() {
logrus.Trace("Stopping eventExecutor")
return
} else if ee.list.len() == 0 {
continue
}
timeNow = time.Now()
if ee.stop.Load() {
logrus.Trace("Stopping eventExecutor")
return ret
} else if ee.list.len() == 0 {
return cont
}

// safeguard list entry getting removed while in check
nextAt, ok := ee.list.getNextExecutionTime()
if !ok {
// safeguard list entry getting removed while in check
var ok bool
nextAt, ok = ee.list.getNextExecutionTime()
if !ok {
return cont
}

return run
}()

if behave == ret {
return
} else if behave == cont {
continue
}

Expand All @@ -117,22 +140,27 @@ func (ee *eventExecutor) start() {
} else if secondsUntilExecution <= 0.0000001 {
curEvent := ee.list.pop()
if curEvent != nil {
ctx.GetLogger().Debugf("Executing event %s, seconds until execution: %f", curEvent.name(), secondsUntilExecution)
ctx, commit, err := ee.ctxGetterFunc()
if err != nil {
ctx.GetLogger().Errorf("Received error '%s' getting ctx in event scheduler", err)
}
err = ee.executeEventAndUpdateList(ctx, curEvent, timeNow)
if err != nil {
ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name)
}
err = commit()
if err != nil {
ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name)
}
func() {
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
defer sql.SessionEnd(ctx.Session)
ctx.GetLogger().Debugf("Executing event %s, seconds until execution: %f", curEvent.name(), secondsUntilExecution)
err := ee.executeEventAndUpdateList(ctx, curEvent, timeNow)
if err != nil {
ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name)
}
err = commit()
if err != nil {
ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name)
}
}()
}
} else {
ctx.GetLogger().Tracef("Not executing event %s yet, seconds until execution: %f", ee.list.peek().name(), secondsUntilExecution)
logrus.Tracef("Not executing event %s yet, seconds until execution: %f", ee.list.peek().name(), secondsUntilExecution)
}
}
}
Expand Down Expand Up @@ -294,6 +322,9 @@ func (ee *eventExecutor) reevaluateEvent(edb sql.EventDatabase, event sql.EventD
if err != nil {
ctx.GetLogger().Errorf("Received error '%s' getting ctx in event scheduler", err)
}
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
defer sql.SessionEnd(ctx.Session)

newEvent, created, err := newEnabledEvent(ctx, edb, event, time.Now())
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions eventscheduler/event_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func InitEventScheduler(
if err != nil {
return nil, err
}
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
defer sql.SessionEnd(ctx.Session)
err = es.loadEventsAndStartEventExecutor(ctx, a)
if err != nil {
return nil, err
Expand Down