Skip to content

Commit 435477f

Browse files
committed
sql/rowexec: Add a SessionCommandSafepoint session callback for long-running write commands. Call it in some rowexec implementations.
This allows for integrators to see quiesced states on engine session utilization even on very long running write operations.
1 parent fb75d24 commit 435477f

File tree

4 files changed

+64
-11
lines changed

4 files changed

+64
-11
lines changed

sql/rowexec/ddl_iters.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,8 @@ func (i *modifyColumnIter) rewriteTable(ctx *sql.Context, rwt sql.RewritableTabl
570570
return false, err
571571
}
572572

573-
rowIter := sql.NewTableRowIter(ctx, rwt, partitions)
573+
var rowIter sql.RowIter = sql.NewTableRowIter(ctx, rwt, partitions)
574+
rowIter = withSafepointPeriodicallyIter(rowIter)
574575
for {
575576
r, err := rowIter.Next(ctx)
576577
if err == io.EOF {
@@ -1117,7 +1118,8 @@ func (c *createPkIter) rewriteTable(ctx *sql.Context, rwt sql.RewritableTable) e
11171118
return err
11181119
}
11191120

1120-
rowIter := sql.NewTableRowIter(ctx, rwt, partitions)
1121+
var rowIter sql.RowIter = sql.NewTableRowIter(ctx, rwt, partitions)
1122+
rowIter = withSafepointPeriodicallyIter(rowIter)
11211123

11221124
for {
11231125
r, err := rowIter.Next(ctx)
@@ -1221,7 +1223,8 @@ func (d *dropPkIter) rewriteTable(ctx *sql.Context, rwt sql.RewritableTable) err
12211223
return err
12221224
}
12231225

1224-
rowIter := sql.NewTableRowIter(ctx, rwt, partitions)
1226+
var rowIter sql.RowIter = sql.NewTableRowIter(ctx, rwt, partitions)
1227+
rowIter = withSafepointPeriodicallyIter(rowIter)
12251228

12261229
for {
12271230
r, err := rowIter.Next(ctx)
@@ -1329,6 +1332,7 @@ func (i *addColumnIter) UpdateRowsWithDefaults(ctx *sql.Context, table sql.Table
13291332
if err != nil {
13301333
return err
13311334
}
1335+
tableIter = withSafepointPeriodicallyIter(tableIter)
13321336

13331337
schema := updatable.Schema()
13341338
idx := -1
@@ -1430,7 +1434,8 @@ func (i *addColumnIter) rewriteTable(ctx *sql.Context, rwt sql.RewritableTable)
14301434
return false, err
14311435
}
14321436

1433-
rowIter := sql.NewTableRowIter(ctx, rwt, partitions)
1437+
var rowIter sql.RowIter = sql.NewTableRowIter(ctx, rwt, partitions)
1438+
rowIter = withSafepointPeriodicallyIter(rowIter)
14341439

14351440
var val uint64
14361441
var autoTbl sql.AutoIncrementTable
@@ -1740,7 +1745,8 @@ func (i *dropColumnIter) rewriteTable(ctx *sql.Context, rwt sql.RewritableTable)
17401745
return false, err
17411746
}
17421747

1743-
rowIter := sql.NewTableRowIter(ctx, rwt, partitions)
1748+
var rowIter sql.RowIter = sql.NewTableRowIter(ctx, rwt, partitions)
1749+
rowIter = withSafepointPeriodicallyIter(rowIter)
17441750

17451751
for {
17461752
r, err := rowIter.Next(ctx)
@@ -1849,6 +1855,7 @@ func (b *BaseBuilder) executeCreateCheck(ctx *sql.Context, c *plan.CreateCheck)
18491855
if err != nil {
18501856
return err
18511857
}
1858+
rowIter = withSafepointPeriodicallyIter(rowIter)
18521859

18531860
for {
18541861
row, err := rowIter.Next(ctx)
@@ -2252,7 +2259,8 @@ func buildIndex(ctx *sql.Context, n *plan.AlterIndex, ibt sql.IndexBuildingTable
22522259
return err
22532260
}
22542261

2255-
rowIter := sql.NewTableRowIter(ctx, ibt, partitions)
2262+
var rowIter sql.RowIter = sql.NewTableRowIter(ctx, ibt, partitions)
2263+
rowIter = withSafepointPeriodicallyIter(rowIter)
22562264

22572265
// Our table scan needs to include projections for virtual columns if there are any
22582266
isVirtual := ibt.Schema().HasVirtualColumns()
@@ -2339,7 +2347,8 @@ func rewriteTableForIndexCreate(ctx *sql.Context, n *plan.AlterIndex, table sql.
23392347
return err
23402348
}
23412349

2342-
rowIter := sql.NewTableRowIter(ctx, rwt, partitions)
2350+
var rowIter sql.RowIter = sql.NewTableRowIter(ctx, rwt, partitions)
2351+
rowIter = withSafepointPeriodicallyIter(rowIter)
23432352

23442353
isVirtual := table.Schema().HasVirtualColumns()
23452354
var projections []sql.Expression

sql/rowexec/dml_iters.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ func (i *triggerBlockIter) Next(ctx *sql.Context) (sql.Row, error) {
121121
if err != nil {
122122
return nil, err
123123
}
124+
subIter = withSafepointPeriodicallyIter(subIter)
124125

125126
for {
126127
newRow, err := subIter.Next(ctx)
@@ -143,6 +144,7 @@ func (i *triggerBlockIter) Next(ctx *sql.Context) (sql.Row, error) {
143144
}
144145
}
145146
}
147+
sql.SessionCommandSafepoint(ctx.Session)
146148

147149
return row, nil
148150
}
@@ -264,6 +266,7 @@ func (t *triggerIter) Next(ctx *sql.Context) (row sql.Row, returnErr error) {
264266
if err != nil {
265267
return nil, err
266268
}
269+
logicIter = withSafepointPeriodicallyIter(logicIter)
267270

268271
defer func() {
269272
err := logicIter.Close(t.ctx)
@@ -613,15 +616,15 @@ func AddAccumulatorIter(ctx *sql.Context, iter sql.RowIter) (sql.RowIter, sql.Sc
613616
switch innerIter := i.InnerIter().(type) {
614617
case *insertIter:
615618
if len(innerIter.returnExprs) > 0 {
616-
return innerIter, innerIter.returnSchema
619+
return withSafepointPeriodicallyIter(innerIter), innerIter.returnSchema
617620
}
618621
case *updateIter:
619622
if len(innerIter.returnExprs) > 0 {
620-
return innerIter, innerIter.returnSchema
623+
return withSafepointPeriodicallyIter(innerIter), innerIter.returnSchema
621624
}
622625
case *deleteIter:
623626
if len(innerIter.returnExprs) > 0 {
624-
return innerIter, innerIter.returnSchema
627+
return withSafepointPeriodicallyIter(innerIter), innerIter.returnSchema
625628
}
626629
}
627630

@@ -631,6 +634,30 @@ func AddAccumulatorIter(ctx *sql.Context, iter sql.RowIter) (sql.RowIter, sql.Sc
631634
}
632635
}
633636

637+
func withSafepointPeriodicallyIter(child sql.RowIter) *safepointPeriodicallyIter {
638+
return &safepointPeriodicallyIter{child: child}
639+
}
640+
641+
type safepointPeriodicallyIter struct {
642+
child sql.RowIter
643+
n int
644+
}
645+
646+
const safepointEveryNRows = 1024
647+
648+
func (i *safepointPeriodicallyIter) Next(ctx *sql.Context) (r sql.Row, err error) {
649+
i.n++
650+
if i.n >= safepointEveryNRows {
651+
i.n = 0
652+
sql.SessionCommandSafepoint(ctx.Session)
653+
}
654+
return i.child.Next(ctx)
655+
}
656+
657+
func (i *safepointPeriodicallyIter) Close(ctx *sql.Context) error {
658+
return i.child.Close(ctx)
659+
}
660+
634661
// defaultAccumulatorIter returns the default accumulator iter for a DML node
635662
func defaultAccumulatorIter(ctx *sql.Context, iter sql.RowIter) (sql.RowIter, sql.Schema) {
636663
clientFoundRowsToggled := (ctx.Client().Capabilities & mysql.CapabilityClientFoundRows) > 0
@@ -639,7 +666,7 @@ func defaultAccumulatorIter(ctx *sql.Context, iter sql.RowIter) (sql.RowIter, sq
639666
return iter, nil
640667
}
641668
return &accumulatorIter{
642-
iter: iter,
669+
iter: withSafepointPeriodicallyIter(iter),
643670
updateRowHandler: rowHandler,
644671
}, types.OkResultSchema
645672
}

sql/rowexec/proc.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ func (b *BaseBuilder) buildLoop(ctx *sql.Context, n *plan.Loop, row sql.Row) (sq
340340
return nil, err
341341
}
342342
}
343+
loopBodyIter = withSafepointPeriodicallyIter(loopBodyIter)
343344

344345
includeResultSet := false
345346

sql/session.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,15 @@ type LifecycleAwareSession interface {
229229
SessionEnd()
230230
}
231231

232+
// An optional Lifecycle callback which a session can receive. This can be
233+
// delivered periodically during a long running operation, between the
234+
// CommandBegin and CommandEnd calls. Across the call to this method, the
235+
// gms.Egnine is not accessing the session or any of its state, such as
236+
// table editors, database providers, etc.
237+
type SafepointAwareSession interface {
238+
CommandSafepoint()
239+
}
240+
232241
type (
233242
// TypedValue is a value along with its type.
234243
TypedValue struct {
@@ -763,6 +772,13 @@ func SessionCommandEnd(s Session) {
763772
}
764773
}
765774

775+
// Helper function to call CommandSafepoint on a SafepointAwareSession, or do nothing.
776+
func SessionCommandSafepoint(s Session) {
777+
if cur, ok := s.(SafepointAwareSession); ok {
778+
cur.CommandSafepoint()
779+
}
780+
}
781+
766782
// Helper function to call SessionEnd on a LifecycleAwareSession, or do nothing.
767783
func SessionEnd(s Session) {
768784
if cur, ok := s.(LifecycleAwareSession); ok {

0 commit comments

Comments
 (0)