Skip to content

Commit 61a4eb1

Browse files
author
James Cor
committed
starting to remove exchange
1 parent baa759c commit 61a4eb1

File tree

4 files changed

+50
-60
lines changed

4 files changed

+50
-60
lines changed

engine.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ func (e *Engine) QueryWithBindings(ctx *sql.Context, query string, parsed sqlpar
447447
return nil, nil, nil, err
448448
}
449449

450-
iter = finalizeIters(analyzed, qFlags, iter)
450+
iter = finalizeIters(e.Analyzer, analyzed, qFlags, iter)
451451

452452
return analyzed.Schema(), iter, qFlags, nil
453453
}
@@ -482,7 +482,7 @@ func (e *Engine) PrepQueryPlanForExecution(ctx *sql.Context, _ string, plan sql.
482482
return nil, nil, nil, err
483483
}
484484

485-
iter = finalizeIters(plan, nil, iter)
485+
iter = finalizeIters(e.Analyzer, plan, nil, iter)
486486

487487
return plan.Schema(), iter, nil, nil
488488
}
@@ -705,7 +705,7 @@ func clearAutocommitTransaction(ctx *sql.Context) error {
705705
return nil
706706
}
707707

708-
autocommit, err := plan.IsSessionAutocommit(ctx)
708+
autocommit, err := rowexec.IsSessionAutocommit(ctx)
709709
if err != nil {
710710
return err
711711
}
@@ -725,7 +725,7 @@ func (e *Engine) CloseSession(connID uint32) {
725725
}
726726

727727
func (e *Engine) beginTransaction(ctx *sql.Context) error {
728-
beginNewTransaction := ctx.GetTransaction() == nil || plan.ReadCommitted(ctx)
728+
beginNewTransaction := ctx.GetTransaction() == nil || rowexec.ReadCommitted(ctx)
729729
if beginNewTransaction {
730730
ctx.GetLogger().Tracef("beginning new transaction")
731731
ts, ok := ctx.Session.(sql.TransactionSession)
@@ -830,7 +830,7 @@ func (e *Engine) executeEvent(ctx *sql.Context, dbName, createEventStatement, us
830830
return err
831831
}
832832

833-
iter = finalizeIters(definitionNode, nil, iter)
833+
iter = finalizeIters(e.Analyzer, definitionNode, nil, iter)
834834

835835
// Drain the iterate to execute the event body/definition
836836
// NOTE: No row data is returned for an event; we just need to execute the statements
@@ -865,8 +865,9 @@ func findCreateEventNode(planTree sql.Node) (*plan.CreateEvent, error) {
865865
}
866866

867867
// finalizeIters applies the final transformations on sql.RowIter before execution.
868-
func finalizeIters(analyzed sql.Node, qFlags *sql.QueryFlags, iter sql.RowIter) sql.RowIter {
869-
iter = rowexec.AddTransactionCommittingIter(iter, qFlags)
868+
func finalizeIters(analyzer *analyzer.Analyzer, analyzed sql.Node, qFlags *sql.QueryFlags, iter sql.RowIter) sql.RowIter {
869+
iter = rowexec.AddTransactionCommittingIter(qFlags, iter)
870+
iter = rowexec.AddExchangeRowIter(analyzer.Parallelism, analyzed, iter)
870871
iter = rowexec.AddExpressionCloser(analyzed, iter)
871872
return iter
872873
}

sql/plan/transaction_committing_iter.go

Lines changed: 0 additions & 47 deletions
This file was deleted.

sql/rowexec/other_iters.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,14 @@ type exchangeRowIter struct {
398398

399399
var _ sql.RowIter = (*exchangeRowIter)(nil)
400400

401+
func AddExchangeRowIter(parallelism int, node sql.Node, iter sql.RowIter) sql.RowIter {
402+
if parallelism <= 1 {
403+
return iter
404+
}
405+
406+
return iter
407+
}
408+
401409
func (i *exchangeRowIter) Next(ctx *sql.Context) (sql.Row, error) {
402410
if i.rows == nil {
403411
panic("Next called for a Next2 iterator")

sql/rowexec/transaction_iters.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,17 @@ type TransactionCommittingIter struct {
7474
transactionDatabase string
7575
}
7676

77-
func AddTransactionCommittingIter(child sql.RowIter, qFlags *sql.QueryFlags) sql.RowIter {
77+
func AddTransactionCommittingIter(qFlags *sql.QueryFlags, iter sql.RowIter) sql.RowIter {
7878
// TODO: This is a bit of a hack. Need to figure out better relationship between new transaction node and warnings.
7979
if qFlags != nil && qFlags.IsSet(sql.QFlagShowWarnings) {
80-
return child
80+
return iter
8181
}
8282
// TODO: remove this once trackedRowIter is moved out of planbuilder
8383
// Insert TransactionCommittingIter as child of TrackedRowIter
84-
if trackedRowIter, ok := child.(*plan.TrackedRowIter); ok {
84+
if trackedRowIter, ok := iter.(*plan.TrackedRowIter); ok {
8585
return trackedRowIter.WithChildIter(&TransactionCommittingIter{childIter: trackedRowIter.GetIter()})
8686
}
87-
return &TransactionCommittingIter{childIter: child}
87+
return &TransactionCommittingIter{childIter: iter}
8888
}
8989

9090
func (t *TransactionCommittingIter) Next(ctx *sql.Context) (sql.Row, error) {
@@ -102,9 +102,9 @@ func (t *TransactionCommittingIter) Close(ctx *sql.Context) error {
102102

103103
tx := ctx.GetTransaction()
104104
// TODO: In the future we should ensure that analyzer supports implicit commits instead of directly
105-
// accessing autocommit here.
105+
// accessing autocommit here.
106106
// cc. https://dev.mysql.com/doc/refman/8.0/en/implicit-commit.html
107-
autocommit, err := plan.IsSessionAutocommit(ctx)
107+
autocommit, err := IsSessionAutocommit(ctx)
108108
if err != nil {
109109
return err
110110
}
@@ -137,3 +137,31 @@ func (t *TransactionCommittingIter) WithChildIter(childIter sql.RowIter) sql.Row
137137
nt.childIter = childIter
138138
return &nt
139139
}
140+
141+
// IsSessionAutocommit returns true if the current session is using implicit transaction management
142+
// through autocommit.
143+
func IsSessionAutocommit(ctx *sql.Context) (bool, error) {
144+
if ReadCommitted(ctx) {
145+
return true, nil
146+
}
147+
148+
autoCommitSessionVar, err := ctx.GetSessionVariable(ctx, sql.AutoCommitSessionVar)
149+
if err != nil {
150+
return false, err
151+
}
152+
return sql.ConvertToBool(ctx, autoCommitSessionVar)
153+
}
154+
155+
func ReadCommitted(ctx *sql.Context) bool {
156+
val, err := ctx.GetSessionVariable(ctx, "transaction_isolation")
157+
if err != nil {
158+
return false
159+
}
160+
161+
valStr, ok := val.(string)
162+
if !ok {
163+
return false
164+
}
165+
166+
return valStr == "READ-COMMITTED"
167+
}

0 commit comments

Comments
 (0)