From 09ae10333f9bfa5dab66881c3433e1f813de5d35 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Tue, 28 Jan 2025 15:03:43 -0800 Subject: [PATCH] eventscheduler: Add SessionCommand{Begin,End} and SessionEnd lifecycle callbacks for SQL sessions used to call event queries. --- eventscheduler/event_executor.go | 97 ++++++++++++++++++++----------- eventscheduler/event_scheduler.go | 3 + 2 files changed, 67 insertions(+), 33 deletions(-) diff --git a/eventscheduler/event_executor.go b/eventscheduler/event_executor.go index c21445d954..ed3e8adf81 100644 --- a/eventscheduler/event_executor.go +++ b/eventscheduler/event_executor.go @@ -75,34 +75,57 @@ func (ee *eventExecutor) start() { for { time.Sleep(pollingDuration) - ctx, _, err := ee.ctxGetterFunc() - if err != nil { - logrus.Errorf("unable to create context for event executor: %s", err) - continue - } + type behavior int + const ( + cont behavior = iota + ret + run + ) + + var timeNow, nextAt time.Time + + behave := func() behavior { + ctx, _, err := ee.ctxGetterFunc() + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + defer sql.SessionEnd(ctx.Session) + if err != nil { + logrus.Errorf("unable to create context for event executor: %s", err) + return cont + } - needsToReloadEvents, err := ee.needsToReloadEvents(ctx) - if err != nil { - ctx.GetLogger().Errorf("unable to determine if events need to be reloaded: %s", err) - } - if needsToReloadEvents { - err := ee.loadAllEvents(ctx) + needsToReloadEvents, err := ee.needsToReloadEvents(ctx) if err != nil { - ctx.GetLogger().Errorf("unable to reload events: %s", err) + ctx.GetLogger().Errorf("unable to determine if events need to be reloaded: %s", err) + } + if needsToReloadEvents { + err := ee.loadAllEvents(ctx) + if err != nil { + ctx.GetLogger().Errorf("unable to reload events: %s", err) + } } - } - timeNow := time.Now() - if ee.stop.Load() { - logrus.Trace("Stopping eventExecutor") - return - } else if ee.list.len() == 0 { - continue - } + timeNow = time.Now() + if ee.stop.Load() { + logrus.Trace("Stopping eventExecutor") + return ret + } else if ee.list.len() == 0 { + return cont + } - // safeguard list entry getting removed while in check - nextAt, ok := ee.list.getNextExecutionTime() - if !ok { + // safeguard list entry getting removed while in check + var ok bool + nextAt, ok = ee.list.getNextExecutionTime() + if !ok { + return cont + } + + return run + }() + + if behave == ret { + return + } else if behave == cont { continue } @@ -117,22 +140,27 @@ func (ee *eventExecutor) start() { } else if secondsUntilExecution <= 0.0000001 { curEvent := ee.list.pop() if curEvent != nil { - ctx.GetLogger().Debugf("Executing event %s, seconds until execution: %f", curEvent.name(), secondsUntilExecution) ctx, commit, err := ee.ctxGetterFunc() if err != nil { ctx.GetLogger().Errorf("Received error '%s' getting ctx in event scheduler", err) } - err = ee.executeEventAndUpdateList(ctx, curEvent, timeNow) - if err != nil { - ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name) - } - err = commit() - if err != nil { - ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name) - } + func() { + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + defer sql.SessionEnd(ctx.Session) + ctx.GetLogger().Debugf("Executing event %s, seconds until execution: %f", curEvent.name(), secondsUntilExecution) + err := ee.executeEventAndUpdateList(ctx, curEvent, timeNow) + if err != nil { + ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name) + } + err = commit() + if err != nil { + ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name) + } + }() } } else { - ctx.GetLogger().Tracef("Not executing event %s yet, seconds until execution: %f", ee.list.peek().name(), secondsUntilExecution) + logrus.Tracef("Not executing event %s yet, seconds until execution: %f", ee.list.peek().name(), secondsUntilExecution) } } } @@ -294,6 +322,9 @@ func (ee *eventExecutor) reevaluateEvent(edb sql.EventDatabase, event sql.EventD if err != nil { ctx.GetLogger().Errorf("Received error '%s' getting ctx in event scheduler", err) } + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + defer sql.SessionEnd(ctx.Session) newEvent, created, err := newEnabledEvent(ctx, edb, event, time.Now()) if err != nil { diff --git a/eventscheduler/event_scheduler.go b/eventscheduler/event_scheduler.go index 3920e15a8f..6fe90d8a27 100644 --- a/eventscheduler/event_scheduler.go +++ b/eventscheduler/event_scheduler.go @@ -91,6 +91,9 @@ func InitEventScheduler( if err != nil { return nil, err } + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + defer sql.SessionEnd(ctx.Session) err = es.loadEventsAndStartEventExecutor(ctx, a) if err != nil { return nil, err