Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (e *Engine) QueryWithBindings(ctx *sql.Context, query string, parsed sqlpar
return nil, nil, nil, err
}

iter = finalizeIters(analyzed, qFlags, iter)
iter = finalizeIters(e.Analyzer, analyzed, qFlags, iter)

return analyzed.Schema(), iter, qFlags, nil
}
Expand Down Expand Up @@ -482,7 +482,7 @@ func (e *Engine) PrepQueryPlanForExecution(ctx *sql.Context, _ string, plan sql.
return nil, nil, nil, err
}

iter = finalizeIters(plan, nil, iter)
iter = finalizeIters(e.Analyzer, plan, nil, iter)

return plan.Schema(), iter, nil, nil
}
Expand Down Expand Up @@ -705,7 +705,7 @@ func clearAutocommitTransaction(ctx *sql.Context) error {
return nil
}

autocommit, err := plan.IsSessionAutocommit(ctx)
autocommit, err := rowexec.IsSessionAutocommit(ctx)
if err != nil {
return err
}
Expand All @@ -725,7 +725,7 @@ func (e *Engine) CloseSession(connID uint32) {
}

func (e *Engine) beginTransaction(ctx *sql.Context) error {
beginNewTransaction := ctx.GetTransaction() == nil || plan.ReadCommitted(ctx)
beginNewTransaction := ctx.GetTransaction() == nil || rowexec.ReadCommitted(ctx)
if beginNewTransaction {
ctx.GetLogger().Tracef("beginning new transaction")
ts, ok := ctx.Session.(sql.TransactionSession)
Expand Down Expand Up @@ -830,7 +830,7 @@ func (e *Engine) executeEvent(ctx *sql.Context, dbName, createEventStatement, us
return err
}

iter = finalizeIters(definitionNode, nil, iter)
iter = finalizeIters(e.Analyzer, definitionNode, nil, iter)

// Drain the iterate to execute the event body/definition
// NOTE: No row data is returned for an event; we just need to execute the statements
Expand Down Expand Up @@ -865,8 +865,9 @@ func findCreateEventNode(planTree sql.Node) (*plan.CreateEvent, error) {
}

// finalizeIters applies the final transformations on sql.RowIter before execution.
func finalizeIters(analyzed sql.Node, qFlags *sql.QueryFlags, iter sql.RowIter) sql.RowIter {
iter = rowexec.AddTransactionCommittingIter(iter, qFlags)
func finalizeIters(analyzer *analyzer.Analyzer, analyzed sql.Node, qFlags *sql.QueryFlags, iter sql.RowIter) sql.RowIter {
iter = rowexec.AddTransactionCommittingIter(qFlags, iter)
iter = rowexec.AddExchangeRowIter(analyzer.Parallelism, analyzed, iter)
iter = rowexec.AddExpressionCloser(analyzed, iter)
return iter
}
47 changes: 0 additions & 47 deletions sql/plan/transaction_committing_iter.go

This file was deleted.

8 changes: 8 additions & 0 deletions sql/rowexec/other_iters.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,14 @@ type exchangeRowIter struct {

var _ sql.RowIter = (*exchangeRowIter)(nil)

func AddExchangeRowIter(parallelism int, node sql.Node, iter sql.RowIter) sql.RowIter {
if parallelism <= 1 {
return iter
}

return iter
}

func (i *exchangeRowIter) Next(ctx *sql.Context) (sql.Row, error) {
if i.rows == nil {
panic("Next called for a Next2 iterator")
Expand Down
40 changes: 34 additions & 6 deletions sql/rowexec/transaction_iters.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,17 @@ type TransactionCommittingIter struct {
transactionDatabase string
}

func AddTransactionCommittingIter(child sql.RowIter, qFlags *sql.QueryFlags) sql.RowIter {
func AddTransactionCommittingIter(qFlags *sql.QueryFlags, iter sql.RowIter) sql.RowIter {
// TODO: This is a bit of a hack. Need to figure out better relationship between new transaction node and warnings.
if qFlags != nil && qFlags.IsSet(sql.QFlagShowWarnings) {
return child
return iter
}
// TODO: remove this once trackedRowIter is moved out of planbuilder
// Insert TransactionCommittingIter as child of TrackedRowIter
if trackedRowIter, ok := child.(*plan.TrackedRowIter); ok {
if trackedRowIter, ok := iter.(*plan.TrackedRowIter); ok {
return trackedRowIter.WithChildIter(&TransactionCommittingIter{childIter: trackedRowIter.GetIter()})
}
return &TransactionCommittingIter{childIter: child}
return &TransactionCommittingIter{childIter: iter}
}

func (t *TransactionCommittingIter) Next(ctx *sql.Context) (sql.Row, error) {
Expand All @@ -102,9 +102,9 @@ func (t *TransactionCommittingIter) Close(ctx *sql.Context) error {

tx := ctx.GetTransaction()
// TODO: In the future we should ensure that analyzer supports implicit commits instead of directly
// accessing autocommit here.
// accessing autocommit here.
// cc. https://dev.mysql.com/doc/refman/8.0/en/implicit-commit.html
autocommit, err := plan.IsSessionAutocommit(ctx)
autocommit, err := IsSessionAutocommit(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -137,3 +137,31 @@ func (t *TransactionCommittingIter) WithChildIter(childIter sql.RowIter) sql.Row
nt.childIter = childIter
return &nt
}

// IsSessionAutocommit returns true if the current session is using implicit transaction management
// through autocommit.
func IsSessionAutocommit(ctx *sql.Context) (bool, error) {
if ReadCommitted(ctx) {
return true, nil
}

autoCommitSessionVar, err := ctx.GetSessionVariable(ctx, sql.AutoCommitSessionVar)
if err != nil {
return false, err
}
return sql.ConvertToBool(ctx, autoCommitSessionVar)
}

func ReadCommitted(ctx *sql.Context) bool {
val, err := ctx.GetSessionVariable(ctx, "transaction_isolation")
if err != nil {
return false
}

valStr, ok := val.(string)
if !ok {
return false
}

return valStr == "READ-COMMITTED"
}
Loading