Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (e *Engine) AnalyzeQuery(
ctx *sql.Context,
query string,
) (sql.Node, error) {
binder := planbuilder.New(ctx, e.Analyzer.Catalog, e.Parser)
binder := planbuilder.New(ctx, e.Analyzer.Catalog, e.EventScheduler, e.Parser)
parsed, _, _, qFlags, err := binder.Parse(query, nil, false)
if err != nil {
return nil, err
Expand Down Expand Up @@ -237,7 +237,7 @@ func (e *Engine) PrepareParsedQuery(
statementKey, query string,
stmt sqlparser.Statement,
) (sql.Node, error) {
binder := planbuilder.New(ctx, e.Analyzer.Catalog, e.Parser)
binder := planbuilder.New(ctx, e.Analyzer.Catalog, e.EventScheduler, e.Parser)
node, _, err := binder.BindOnly(stmt, query, nil)

if err != nil {
Expand Down Expand Up @@ -517,7 +517,7 @@ func (e *Engine) BoundQueryPlan(ctx *sql.Context, query string, parsed sqlparser

query = sql.RemoveSpaceAndDelimiter(query, ';')

binder := planbuilder.New(ctx, e.Analyzer.Catalog, e.Parser)
binder := planbuilder.New(ctx, e.Analyzer.Catalog, e.EventScheduler, e.Parser)
binder.SetBindings(bindings)

// Begin a transaction if necessary (no-op if one is in flight)
Expand Down Expand Up @@ -571,7 +571,7 @@ func (e *Engine) preparedStatement(ctx *sql.Context, query string, parsed sqlpar
preparedAst, preparedDataFound = e.PreparedDataCache.GetCachedStmt(ctx.Session.ID(), query)
}

binder := planbuilder.New(ctx, e.Analyzer.Catalog, e.Parser)
binder := planbuilder.New(ctx, e.Analyzer.Catalog, e.EventScheduler, e.Parser)
if preparedDataFound {
parsed = preparedAst
binder.SetBindings(bindings)
Expand Down Expand Up @@ -804,6 +804,10 @@ func (e *Engine) EngineAnalyzer() *analyzer.Analyzer {
return e.Analyzer
}

func (e *Engine) EngineEventScheduler() sql.EventScheduler {
return e.EventScheduler
}

// InitializeEventScheduler initializes the EventScheduler for the engine with the given sql.Context
// getter function, |ctxGetterFunc, the EventScheduler |status|, and the |period| for the event scheduler
// to check for events to execute. If |period| is less than 1, then it is ignored and the default period
Expand All @@ -814,8 +818,6 @@ func (e *Engine) InitializeEventScheduler(ctxGetterFunc func() (*sql.Context, fu
if err != nil {
return err
}

e.Analyzer.EventScheduler = e.EventScheduler
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion enginetest/engine_only_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func TestAnalyzer_Exp(t *testing.T) {
require.NoError(t, err)

ctx := enginetest.NewContext(harness)
b := planbuilder.New(ctx, e.EngineAnalyzer().Catalog, sql.NewMysqlParser())
b := planbuilder.New(ctx, e.EngineAnalyzer().Catalog, e.EngineEventScheduler(), nil)
parsed, _, _, _, err := b.Parse(tt.query, nil, false)
require.NoError(t, err)

Expand Down
1 change: 0 additions & 1 deletion enginetest/enginetests.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,6 @@ func TestQueryPlan(t *testing.T, harness Harness, e QueryEngine, tt queries.Quer
func TestQueryPlanWithName(t *testing.T, name string, harness Harness, e QueryEngine, query, expectedPlan string, options sql.DescribeOptions) {
t.Run(name, func(t *testing.T) {
ctx := NewContext(harness)

parsed, qFlags, err := planbuilder.Parse(ctx, e.EngineAnalyzer().Catalog, query)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion enginetest/evaluation.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func injectBindVarsAndPrepare(
}
}

b := planbuilder.New(ctx, e.EngineAnalyzer().Catalog, sql.NewMysqlParser())
b := planbuilder.New(ctx, e.EngineAnalyzer().Catalog, e.EngineEventScheduler(), nil)
b.SetParserOptions(sql.LoadSqlMode(ctx).ParserOptions())
resPlan, _, err := b.BindOnly(parsed, q, nil)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion enginetest/plangen/cmd/plangen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func generatePlansForSuite(spec PlanSpec, w *bytes.Buffer) error {

if !tt.Skip {
ctx := enginetest.NewContext(harness)
binder := planbuilder.New(ctx, engine.EngineAnalyzer().Catalog, sql.NewMysqlParser())
binder := planbuilder.New(ctx, engine.EngineAnalyzer().Catalog, engine.EngineEventScheduler(), nil)
parsed, _, _, qFlags, err := binder.Parse(tt.Query, nil, false)
if err != nil {
exit(fmt.Errorf("%w\nfailed to parse query: %s", err, tt.Query))
Expand Down
1 change: 1 addition & 0 deletions enginetest/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type QueryEngine interface {
Query(ctx *sql.Context, query string) (sql.Schema, sql.RowIter, *sql.QueryFlags, error)
// TODO: get rid of this, should not be exposed to engine tests
EngineAnalyzer() *analyzer.Analyzer
EngineEventScheduler() sql.EventScheduler
// TODO: get rid of this, should not be exposed to engine tests
EnginePreparedDataCache() *sqle.PreparedDataCache
QueryWithBindings(ctx *sql.Context, query string, parsed sqlparser.Statement, bindings map[string]sqlparser.Expr, qFlags *sql.QueryFlags) (sql.Schema, sql.RowIter, *sql.QueryFlags, error)
Expand Down
6 changes: 5 additions & 1 deletion enginetest/server_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ func (s *ServerQueryEngine) EngineAnalyzer() *analyzer.Analyzer {
return s.engine.Analyzer
}

func (s *ServerQueryEngine) EngineEventScheduler() sql.EventScheduler {
return s.engine.EventScheduler
}

func (s *ServerQueryEngine) EnginePreparedDataCache() *sqle.PreparedDataCache {
return s.engine.PreparedDataCache
}
Expand Down Expand Up @@ -632,7 +636,7 @@ func convertGoSqlType(columnType *gosql.ColumnType) (sql.Type, error) {
// It cannot sort user-defined binding variables (e.g. :var, :foo)
func prepareBindingArgs(ctx *sql.Context, bindings map[string]sqlparser.Expr) ([]any, error) {
// NOTE: using binder with nil catalog and parser since we're only using it to convert SQLVal.
binder := planbuilder.New(ctx, nil, nil)
binder := planbuilder.New(ctx, nil, nil, nil)
numBindVars := len(bindings)
args := make([]any, numBindVars)
for i := 0; i < numBindVars; i++ {
Expand Down
24 changes: 24 additions & 0 deletions eventscheduler/event_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func InitEventScheduler(

// Close closes the EventScheduler.
func (es *EventScheduler) Close() {
if es == nil {
return
}
es.status = SchedulerOff
es.executor.shutdown()
}
Expand All @@ -99,6 +102,9 @@ func (es *EventScheduler) Close() {
// This function requires valid analyzer and sql context to evaluate all events in all databases
// to load enabled events to the EventScheduler.
func (es *EventScheduler) TurnOnEventScheduler(a *analyzer.Analyzer) error {
if es == nil {
return nil
}
if es.status == SchedulerDisabled {
return ErrEventSchedulerDisabled
} else if es.status == SchedulerOn {
Expand All @@ -120,6 +126,9 @@ func (es *EventScheduler) TurnOnEventScheduler(a *analyzer.Analyzer) error {

// TurnOffEventScheduler is called when user sets --event-scheduler system variable to OFF or 0.
func (es *EventScheduler) TurnOffEventScheduler() error {
if es == nil {
return nil
}
if es.status == SchedulerDisabled {
return ErrEventSchedulerDisabled
} else if es.status == SchedulerOff {
Expand All @@ -135,6 +144,9 @@ func (es *EventScheduler) TurnOffEventScheduler() error {
// loadEventsAndStartEventExecutor evaluates all events in all databases and evaluates the enabled events
// with valid schedule to load into the eventExecutor. Then, it starts the eventExecutor.
func (es *EventScheduler) loadEventsAndStartEventExecutor(ctx *sql.Context, a *analyzer.Analyzer) error {
if es == nil {
return nil
}
es.executor.catalog = a.Catalog
es.executor.loadAllEvents(ctx)
go es.executor.start()
Expand All @@ -144,6 +156,9 @@ func (es *EventScheduler) loadEventsAndStartEventExecutor(ctx *sql.Context, a *a
// AddEvent implements sql.EventScheduler interface.
// This function is called when there is an event created at runtime.
func (es *EventScheduler) AddEvent(ctx *sql.Context, edb sql.EventDatabase, details sql.EventDefinition) {
if es == nil {
return
}
if es.status == SchedulerDisabled || es.status == SchedulerOff {
return
}
Expand All @@ -153,6 +168,9 @@ func (es *EventScheduler) AddEvent(ctx *sql.Context, edb sql.EventDatabase, deta
// UpdateEvent implements sql.EventScheduler interface.
// This function is called when there is an event altered at runtime.
func (es *EventScheduler) UpdateEvent(ctx *sql.Context, edb sql.EventDatabase, orgEventName string, details sql.EventDefinition) {
if es == nil {
return
}
if es.status == SchedulerDisabled || es.status == SchedulerOff {
return
}
Expand All @@ -163,6 +181,9 @@ func (es *EventScheduler) UpdateEvent(ctx *sql.Context, edb sql.EventDatabase, o
// This function is called when there is an event dropped at runtime. This function
// removes the given event if it exists in the enabled events list of the EventScheduler.
func (es *EventScheduler) RemoveEvent(dbName, eventName string) {
if es == nil {
return
}
if es.status == SchedulerDisabled || es.status == SchedulerOff {
return
}
Expand All @@ -173,6 +194,9 @@ func (es *EventScheduler) RemoveEvent(dbName, eventName string) {
// This function is called when there is a database dropped at runtime. This function
// removes all events of given database that exist in the enabled events list of the EventScheduler.
func (es *EventScheduler) RemoveSchemaEvents(dbName string) {
if es == nil {
return
}
if es.status == SchedulerDisabled || es.status == SchedulerOff {
return
}
Expand Down
3 changes: 0 additions & 3 deletions sql/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,6 @@ type Analyzer struct {
Coster memo.Coster
// ExecBuilder converts a sql.Node tree into an executable iterator.
ExecBuilder sql.NodeExecBuilder
// EventScheduler is used to communiate with the event scheduler
// for any EVENT related statements. It can be nil if EventScheduler is not defined.
EventScheduler sql.EventScheduler
}

// NewDefault creates a default Analyzer instance with all default Rules and configuration.
Expand Down
34 changes: 0 additions & 34 deletions sql/analyzer/apply_event_scheduler_notifier.go

This file was deleted.

2 changes: 1 addition & 1 deletion sql/analyzer/optimization_rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func TestPushNotFilters(t *testing.T) {
ctx := sql.NewContext(context.Background(), sql.WithSession(sess))
ctx.SetCurrentDatabase("mydb")

b := planbuilder.New(ctx, cat, sql.NewMysqlParser())
b := planbuilder.New(ctx, cat, nil, nil)

for _, tt := range tests {
t.Run(tt.in, func(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion sql/analyzer/rule_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ const (
validateReadOnlyTransactionId // validateReadOnlyTransaction
validateDatabaseSetId // validateDatabaseSet
validatePrivilegesId // validatePrivileges
applyEventSchedulerId // applyEventScheduler

// default
flattenTableAliasesId // flattenTableAliases
Expand Down
105 changes: 52 additions & 53 deletions sql/analyzer/ruleid_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion sql/analyzer/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func init() {
var OnceBeforeDefault = []Rule{
{applyDefaultSelectLimitId, applyDefaultSelectLimit},
{replaceCountStarId, replaceCountStar},
{applyEventSchedulerId, applyEventScheduler},
{validateOffsetAndLimitId, validateOffsetAndLimit},
{validateCreateTableId, validateCreateTable},
{validateAlterTableId, validateAlterTable},
Expand Down
Loading
Loading