Skip to content

Commit d3abea2

Browse files
authored
Merge pull request #2860 from dolthub/aaron/eventscheduler-txns
[no-release-notes] eventscheduler: Improve lifecycle of session and transaction in eventscheduler.
2 parents e410216 + 4fe29a3 commit d3abea2

File tree

3 files changed

+161
-51
lines changed

3 files changed

+161
-51
lines changed

engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -816,7 +816,7 @@ func (e *Engine) EngineEventScheduler() sql.EventScheduler {
816816
// getter function, |ctxGetterFunc, the EventScheduler |status|, and the |period| for the event scheduler
817817
// to check for events to execute. If |period| is less than 1, then it is ignored and the default period
818818
// (30s currently) is used. This function also initializes the EventScheduler of the analyzer of this engine.
819-
func (e *Engine) InitializeEventScheduler(ctxGetterFunc func() (*sql.Context, func() error, error), status eventscheduler.SchedulerStatus, period int) error {
819+
func (e *Engine) InitializeEventScheduler(ctxGetterFunc func() (*sql.Context, error), status eventscheduler.SchedulerStatus, period int) error {
820820
var err error
821821
e.EventScheduler, err = eventscheduler.InitEventScheduler(e.Analyzer, e.BackgroundThreads, ctxGetterFunc, status, e.executeEvent, period)
822822
if err != nil {

eventscheduler/event_executor.go

Lines changed: 111 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type eventExecutor struct {
3333
bThreads *sql.BackgroundThreads
3434
list *enabledEventsList
3535
runningEventsStatus *runningEventsStatus
36-
ctxGetterFunc func() (*sql.Context, func() error, error)
36+
ctxGetterFunc func() (*sql.Context, error)
3737
queryRunFunc func(ctx *sql.Context, dbName, query, username, address string) error
3838
stop atomic.Bool
3939
catalog sql.Catalog
@@ -43,7 +43,7 @@ type eventExecutor struct {
4343

4444
// newEventExecutor returns a new eventExecutor instance with an empty enabled events list.
4545
// The enabled events list is loaded only when the EventScheduler status is ENABLED.
46-
func newEventExecutor(bgt *sql.BackgroundThreads, ctxFunc func() (*sql.Context, func() error, error), runQueryFunc func(ctx *sql.Context, dbName, query, username, address string) error, period int) *eventExecutor {
46+
func newEventExecutor(bgt *sql.BackgroundThreads, ctxFunc func() (*sql.Context, error), runQueryFunc func(ctx *sql.Context, dbName, query, username, address string) error, period int) *eventExecutor {
4747
return &eventExecutor{
4848
bThreads: bgt,
4949
list: newEnabledEventsList([]*enabledEvent{}),
@@ -75,35 +75,73 @@ func (ee *eventExecutor) start() {
7575
for {
7676
time.Sleep(pollingDuration)
7777

78-
ctx, _, err := ee.ctxGetterFunc()
79-
if err != nil {
80-
logrus.Errorf("unable to create context for event executor: %s", err)
81-
continue
82-
}
78+
type res int
79+
const (
80+
res_fallthrough res = iota
81+
res_continue
82+
res_return
83+
)
8384

84-
needsToReloadEvents, err := ee.needsToReloadEvents(ctx)
85-
if err != nil {
86-
ctx.GetLogger().Errorf("unable to determine if events need to be reloaded: %s", err)
87-
}
88-
if needsToReloadEvents {
89-
err := ee.loadAllEvents(ctx)
85+
var timeNow, nextAt time.Time
86+
var lgr *logrus.Entry
87+
88+
result := func() res {
89+
ctx, err := ee.ctxGetterFunc()
9090
if err != nil {
91-
ctx.GetLogger().Errorf("unable to reload events: %s", err)
91+
logrus.Errorf("unable to create context for event executor: %s", err)
92+
return res_continue
9293
}
93-
}
94+
lgr = ctx.GetLogger()
9495

95-
timeNow := time.Now()
96-
if ee.stop.Load() {
97-
logrus.Trace("Stopping eventExecutor")
98-
return
99-
} else if ee.list.len() == 0 {
100-
continue
101-
}
96+
defer sql.SessionEnd(ctx.Session)
97+
sql.SessionCommandBegin(ctx.Session)
98+
defer sql.SessionCommandEnd(ctx.Session)
10299

103-
// safeguard list entry getting removed while in check
104-
nextAt, ok := ee.list.getNextExecutionTime()
105-
if !ok {
100+
err = beginTx(ctx)
101+
if err != nil {
102+
lgr.Errorf("unable to begin transaction for event executor: %s", err)
103+
return res_continue
104+
}
105+
106+
needsToReloadEvents, err := ee.needsToReloadEvents(ctx)
107+
if err != nil {
108+
lgr.Errorf("unable to determine if events need to be reloaded: %s", err)
109+
}
110+
if needsToReloadEvents {
111+
err := ee.loadAllEvents(ctx)
112+
if err != nil {
113+
lgr.Errorf("unable to reload events: %s", err)
114+
}
115+
}
116+
117+
if ee.stop.Load() {
118+
logrus.Trace("Stopping eventExecutor")
119+
return res_return
120+
} else if ee.list.len() == 0 {
121+
rollbackTx(ctx)
122+
return res_continue
123+
}
124+
125+
// safeguard list entry getting removed while in check
126+
timeNow = time.Now()
127+
var ok bool
128+
nextAt, ok = ee.list.getNextExecutionTime()
129+
if !ok {
130+
rollbackTx(ctx)
131+
return res_continue
132+
}
133+
134+
err = commitTx(ctx)
135+
if err != nil {
136+
lgr.Errorf("unable to commit transaction for reloading events: %s", err)
137+
}
138+
return res_fallthrough
139+
}()
140+
141+
if result == res_continue {
106142
continue
143+
} else if result == res_return {
144+
return
107145
}
108146

109147
secondsUntilExecution := nextAt.Sub(timeNow).Seconds()
@@ -117,22 +155,33 @@ func (ee *eventExecutor) start() {
117155
} else if secondsUntilExecution <= 0.0000001 {
118156
curEvent := ee.list.pop()
119157
if curEvent != nil {
120-
ctx.GetLogger().Debugf("Executing event %s, seconds until execution: %f", curEvent.name(), secondsUntilExecution)
121-
ctx, commit, err := ee.ctxGetterFunc()
122-
if err != nil {
123-
ctx.GetLogger().Errorf("Received error '%s' getting ctx in event scheduler", err)
124-
}
125-
err = ee.executeEventAndUpdateList(ctx, curEvent, timeNow)
126-
if err != nil {
127-
ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name)
128-
}
129-
err = commit()
130-
if err != nil {
131-
ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name)
132-
}
158+
func() {
159+
lgr.Debugf("Executing event %s, seconds until execution: %f", curEvent.name(), secondsUntilExecution)
160+
ctx, err := ee.ctxGetterFunc()
161+
if err != nil {
162+
ctx.GetLogger().Errorf("Received error '%s' getting ctx in event scheduler", err)
163+
return
164+
}
165+
defer sql.SessionEnd(ctx.Session)
166+
sql.SessionCommandBegin(ctx.Session)
167+
defer sql.SessionCommandEnd(ctx.Session)
168+
err = beginTx(ctx)
169+
if err != nil {
170+
ctx.GetLogger().Errorf("Received error '%s' beginning transaction in event scheduler", err)
171+
return
172+
}
173+
err = ee.executeEventAndUpdateList(ctx, curEvent, timeNow)
174+
if err != nil {
175+
ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name)
176+
}
177+
err = commitTx(ctx)
178+
if err != nil {
179+
ctx.GetLogger().Errorf("Received error '%s' executing event: %s", err, curEvent.event.Name)
180+
}
181+
}()
133182
}
134183
} else {
135-
ctx.GetLogger().Tracef("Not executing event %s yet, seconds until execution: %f", ee.list.peek().name(), secondsUntilExecution)
184+
lgr.Tracef("Not executing event %s yet, seconds until execution: %f", ee.list.peek().name(), secondsUntilExecution)
136185
}
137186
}
138187
}
@@ -253,11 +302,19 @@ func (ee *eventExecutor) executeEvent(event *enabledEvent) (bool, error) {
253302
return
254303
default:
255304
// get a new session sql.Context for each event definition execution
256-
sqlCtx, commit, err := ee.ctxGetterFunc()
305+
sqlCtx, err := ee.ctxGetterFunc()
257306
if err != nil {
258307
logrus.WithField("query", event.event.EventBody).Errorf("unable to get context for executed query: %v", err)
259308
return
260309
}
310+
defer sql.SessionEnd(sqlCtx.Session)
311+
sql.SessionCommandBegin(sqlCtx.Session)
312+
defer sql.SessionCommandEnd(sqlCtx.Session)
313+
err = beginTx(sqlCtx)
314+
if err != nil {
315+
logrus.WithField("query", event.event.EventBody).Errorf("unable to begin transaction on context for executed query: %v", err)
316+
return
317+
}
261318

262319
// Note that we pass in the full CREATE EVENT statement so that the engine can parse it
263320
// and pull out the plan nodes for the event body, since the event body doesn't always
@@ -266,11 +323,12 @@ func (ee *eventExecutor) executeEvent(event *enabledEvent) (bool, error) {
266323
err = ee.queryRunFunc(sqlCtx, event.edb.Name(), event.event.CreateEventStatement(), event.username, event.address)
267324
if err != nil {
268325
logrus.WithField("query", event.event.EventBody).Errorf("unable to execute query: %v", err)
326+
rollbackTx(sqlCtx)
269327
return
270328
}
271329

272330
// must commit after done using the sql.Context
273-
err = commit()
331+
err = commitTx(sqlCtx)
274332
if err != nil {
275333
logrus.WithField("query", event.event.EventBody).Errorf("unable to commit transaction: %v", err)
276334
return
@@ -290,9 +348,19 @@ func (ee *eventExecutor) reevaluateEvent(edb sql.EventDatabase, event sql.EventD
290348
return
291349
}
292350

293-
ctx, commit, err := ee.ctxGetterFunc()
351+
ctx, err := ee.ctxGetterFunc()
294352
if err != nil {
295353
ctx.GetLogger().Errorf("Received error '%s' getting ctx in event scheduler", err)
354+
return
355+
}
356+
defer sql.SessionEnd(ctx.Session)
357+
sql.SessionCommandBegin(ctx.Session)
358+
defer sql.SessionCommandEnd(ctx.Session)
359+
360+
err = beginTx(ctx)
361+
if err != nil {
362+
ctx.GetLogger().Errorf("Received error '%s' beginning transaction on ctx in event scheduler", err)
363+
return
296364
}
297365

298366
newEvent, created, err := newEnabledEvent(ctx, edb, event, time.Now())
@@ -302,7 +370,7 @@ func (ee *eventExecutor) reevaluateEvent(edb sql.EventDatabase, event sql.EventD
302370
ee.list.add(newEvent)
303371
}
304372

305-
err = commit()
373+
err = commitTx(ctx)
306374
if err != nil {
307375
ctx.GetLogger().Errorf("Received error '%s' re-evaluating event to scheduler: %s", err, event.Name)
308376
}

eventscheduler/event_scheduler.go

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ var _ sql.EventScheduler = (*EventScheduler)(nil)
5252
type EventScheduler struct {
5353
status SchedulerStatus
5454
executor *eventExecutor
55-
ctxGetterFunc func() (*sql.Context, func() error, error)
55+
ctxGetterFunc func() (*sql.Context, error)
5656
}
5757

5858
// InitEventScheduler is called at the start of the server. This function returns EventScheduler object
@@ -63,7 +63,7 @@ type EventScheduler struct {
6363
func InitEventScheduler(
6464
a *analyzer.Analyzer,
6565
bgt *sql.BackgroundThreads,
66-
getSqlCtxFunc func() (*sql.Context, func() error, error),
66+
getSqlCtxFunc func() (*sql.Context, error),
6767
status SchedulerStatus,
6868
runQueryFunc func(ctx *sql.Context, dbName, query, username, address string) error,
6969
period int,
@@ -81,21 +81,28 @@ func InitEventScheduler(
8181
// If the EventSchedulerStatus is ON, then load enabled
8282
// events and start executing events on schedule.
8383
if es.status == SchedulerOn {
84-
ctx, commit, err := getSqlCtxFunc()
84+
ctx, err := getSqlCtxFunc()
85+
if err != nil {
86+
return nil, err
87+
}
8588
ctx.Session.SetClient(sql.Client{
8689
User: eventSchedulerSuperUserName,
8790
Address: "localhost",
8891
Capabilities: 0,
8992
})
90-
93+
defer sql.SessionEnd(ctx.Session)
94+
sql.SessionCommandBegin(ctx.Session)
95+
defer sql.SessionCommandEnd(ctx.Session)
96+
err = beginTx(ctx)
9197
if err != nil {
9298
return nil, err
9399
}
94100
err = es.loadEventsAndStartEventExecutor(ctx, a)
95101
if err != nil {
102+
rollbackTx(ctx)
96103
return nil, err
97104
}
98-
err = commit()
105+
err = commitTx(ctx)
99106
if err != nil {
100107
return nil, err
101108
}
@@ -104,6 +111,33 @@ func InitEventScheduler(
104111
return es, nil
105112
}
106113

114+
func beginTx(ctx *sql.Context) error {
115+
if ts, ok := ctx.Session.(sql.TransactionSession); ok {
116+
tr, err := ts.StartTransaction(ctx, sql.ReadWrite)
117+
if err != nil {
118+
return err
119+
}
120+
ts.SetTransaction(tr)
121+
}
122+
return nil
123+
}
124+
125+
func commitTx(ctx *sql.Context) error {
126+
if ts, ok := ctx.Session.(sql.TransactionSession); ok {
127+
defer ts.SetTransaction(nil)
128+
return ts.CommitTransaction(ctx, ts.GetTransaction())
129+
}
130+
return nil
131+
}
132+
133+
func rollbackTx(ctx *sql.Context) error {
134+
if ts, ok := ctx.Session.(sql.TransactionSession); ok {
135+
defer ts.SetTransaction(nil)
136+
return ts.Rollback(ctx, ts.GetTransaction())
137+
}
138+
return nil
139+
}
140+
107141
// initializeEventSchedulerSuperUser ensures the event_scheduler superuser exists (as a locked
108142
// account that cannot be directly used to log in) so that the event scheduler can read events
109143
// from all databases.
@@ -147,15 +181,23 @@ func (es *EventScheduler) TurnOnEventScheduler(a *analyzer.Analyzer) error {
147181

148182
es.status = SchedulerOn
149183

150-
ctx, commit, err := es.ctxGetterFunc()
184+
ctx, err := es.ctxGetterFunc()
185+
if err != nil {
186+
return err
187+
}
188+
defer sql.SessionEnd(ctx.Session)
189+
sql.SessionCommandBegin(ctx.Session)
190+
defer sql.SessionCommandEnd(ctx.Session)
191+
err = beginTx(ctx)
151192
if err != nil {
152193
return err
153194
}
154195
err = es.loadEventsAndStartEventExecutor(ctx, a)
155196
if err != nil {
197+
rollbackTx(ctx)
156198
return err
157199
}
158-
return commit()
200+
return commitTx(ctx)
159201
}
160202

161203
// TurnOffEventScheduler is called when user sets --event-scheduler system variable to OFF or 0.

0 commit comments

Comments
 (0)