Skip to content

Commit 70e32de

Browse files
authored
Merge pull request #3246 from dolthub/aaron/sql-session-command-safepoint
sql/rowexec: Add a SessionCommandSafepoint session callback for long-running write commands. Call it in some rowexec implementations.
2 parents 928f5c4 + e5f856a commit 70e32de

File tree

4 files changed

+76
-11
lines changed

4 files changed

+76
-11
lines changed

sql/rowexec/ddl_iters.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,8 @@ func (i *modifyColumnIter) rewriteTable(ctx *sql.Context, rwt sql.RewritableTabl
570570
return false, err
571571
}
572572

573-
rowIter := sql.NewTableRowIter(ctx, rwt, partitions)
573+
var rowIter sql.RowIter = sql.NewTableRowIter(ctx, rwt, partitions)
574+
rowIter = withSafepointPeriodicallyIter(rowIter)
574575
for {
575576
r, err := rowIter.Next(ctx)
576577
if err == io.EOF {
@@ -1117,7 +1118,8 @@ func (c *createPkIter) rewriteTable(ctx *sql.Context, rwt sql.RewritableTable) e
11171118
return err
11181119
}
11191120

1120-
rowIter := sql.NewTableRowIter(ctx, rwt, partitions)
1121+
var rowIter sql.RowIter = sql.NewTableRowIter(ctx, rwt, partitions)
1122+
rowIter = withSafepointPeriodicallyIter(rowIter)
11211123

11221124
for {
11231125
r, err := rowIter.Next(ctx)
@@ -1221,7 +1223,8 @@ func (d *dropPkIter) rewriteTable(ctx *sql.Context, rwt sql.RewritableTable) err
12211223
return err
12221224
}
12231225

1224-
rowIter := sql.NewTableRowIter(ctx, rwt, partitions)
1226+
var rowIter sql.RowIter = sql.NewTableRowIter(ctx, rwt, partitions)
1227+
rowIter = withSafepointPeriodicallyIter(rowIter)
12251228

12261229
for {
12271230
r, err := rowIter.Next(ctx)
@@ -1329,6 +1332,7 @@ func (i *addColumnIter) UpdateRowsWithDefaults(ctx *sql.Context, table sql.Table
13291332
if err != nil {
13301333
return err
13311334
}
1335+
tableIter = withSafepointPeriodicallyIter(tableIter)
13321336

13331337
schema := updatable.Schema()
13341338
idx := -1
@@ -1430,7 +1434,8 @@ func (i *addColumnIter) rewriteTable(ctx *sql.Context, rwt sql.RewritableTable)
14301434
return false, err
14311435
}
14321436

1433-
rowIter := sql.NewTableRowIter(ctx, rwt, partitions)
1437+
var rowIter sql.RowIter = sql.NewTableRowIter(ctx, rwt, partitions)
1438+
rowIter = withSafepointPeriodicallyIter(rowIter)
14341439

14351440
var val uint64
14361441
var autoTbl sql.AutoIncrementTable
@@ -1740,7 +1745,8 @@ func (i *dropColumnIter) rewriteTable(ctx *sql.Context, rwt sql.RewritableTable)
17401745
return false, err
17411746
}
17421747

1743-
rowIter := sql.NewTableRowIter(ctx, rwt, partitions)
1748+
var rowIter sql.RowIter = sql.NewTableRowIter(ctx, rwt, partitions)
1749+
rowIter = withSafepointPeriodicallyIter(rowIter)
17441750

17451751
for {
17461752
r, err := rowIter.Next(ctx)
@@ -2252,7 +2258,8 @@ func buildIndex(ctx *sql.Context, n *plan.AlterIndex, ibt sql.IndexBuildingTable
22522258
return err
22532259
}
22542260

2255-
rowIter := sql.NewTableRowIter(ctx, ibt, partitions)
2261+
var rowIter sql.RowIter = sql.NewTableRowIter(ctx, ibt, partitions)
2262+
rowIter = withSafepointPeriodicallyIter(rowIter)
22562263

22572264
// Our table scan needs to include projections for virtual columns if there are any
22582265
isVirtual := ibt.Schema().HasVirtualColumns()
@@ -2339,7 +2346,8 @@ func rewriteTableForIndexCreate(ctx *sql.Context, n *plan.AlterIndex, table sql.
23392346
return err
23402347
}
23412348

2342-
rowIter := sql.NewTableRowIter(ctx, rwt, partitions)
2349+
var rowIter sql.RowIter = sql.NewTableRowIter(ctx, rwt, partitions)
2350+
rowIter = withSafepointPeriodicallyIter(rowIter)
23432351

23442352
isVirtual := table.Schema().HasVirtualColumns()
23452353
var projections []sql.Expression

sql/rowexec/dml_iters.go

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ func (i *triggerBlockIter) Next(ctx *sql.Context) (sql.Row, error) {
121121
if err != nil {
122122
return nil, err
123123
}
124+
subIter = withSafepointPeriodicallyIter(subIter)
124125

125126
for {
126127
newRow, err := subIter.Next(ctx)
@@ -143,6 +144,7 @@ func (i *triggerBlockIter) Next(ctx *sql.Context) (sql.Row, error) {
143144
}
144145
}
145146
}
147+
sql.SessionCommandSafepoint(ctx.Session)
146148

147149
return row, nil
148150
}
@@ -264,6 +266,7 @@ func (t *triggerIter) Next(ctx *sql.Context) (row sql.Row, returnErr error) {
264266
if err != nil {
265267
return nil, err
266268
}
269+
logicIter = withSafepointPeriodicallyIter(logicIter)
267270

268271
defer func() {
269272
err := logicIter.Close(t.ctx)
@@ -631,15 +634,15 @@ func AddAccumulatorIter(ctx *sql.Context, iter sql.RowIter) (sql.RowIter, sql.Sc
631634
switch innerIter := i.InnerIter().(type) {
632635
case *insertIter:
633636
if len(innerIter.returnExprs) > 0 {
634-
return innerIter, innerIter.returnSchema
637+
return withSafepointPeriodicallyIter(innerIter), innerIter.returnSchema
635638
}
636639
case *updateIter:
637640
if len(innerIter.returnExprs) > 0 {
638-
return innerIter, innerIter.returnSchema
641+
return withSafepointPeriodicallyIter(innerIter), innerIter.returnSchema
639642
}
640643
case *deleteIter:
641644
if len(innerIter.returnExprs) > 0 {
642-
return innerIter, innerIter.returnSchema
645+
return withSafepointPeriodicallyIter(innerIter), innerIter.returnSchema
643646
}
644647
}
645648
case *triggerIter:
@@ -663,6 +666,43 @@ func AddAccumulatorIter(ctx *sql.Context, iter sql.RowIter) (sql.RowIter, sql.Sc
663666
return defaultAccumulatorIter(ctx, iter)
664667
}
665668

669+
func withSafepointPeriodicallyIter(child sql.RowIter) *safepointPeriodicallyIter {
670+
return &safepointPeriodicallyIter{child: child}
671+
}
672+
673+
// A wrapper iterator which calls sql.SessionCommandSafepoint on the
674+
// ctx.Session periodically while returning rows through calls to
675+
// |Next|.
676+
//
677+
// Should be used to wrap any iterators which are involved in
678+
// long-running write operations and which are exhausted or iterated
679+
// by other iterators in the iterator tree, such as accumulatorIter.
680+
//
681+
// This iterator makes the assumption that a safepoint, from the
682+
// Engine's perspective, can be established at any moment we are
683+
// within a Next() call. This is generally true given the Engine's
684+
// lack of concurrency on a given Session, but if something like
685+
// Exchange node came back, this would not necessarily be true.
686+
type safepointPeriodicallyIter struct {
687+
child sql.RowIter
688+
n int
689+
}
690+
691+
const safepointEveryNRows = 1024
692+
693+
func (i *safepointPeriodicallyIter) Next(ctx *sql.Context) (r sql.Row, err error) {
694+
i.n++
695+
if i.n >= safepointEveryNRows {
696+
i.n = 0
697+
sql.SessionCommandSafepoint(ctx.Session)
698+
}
699+
return i.child.Next(ctx)
700+
}
701+
702+
func (i *safepointPeriodicallyIter) Close(ctx *sql.Context) error {
703+
return i.child.Close(ctx)
704+
}
705+
666706
// defaultAccumulatorIter returns the default accumulator iter for a DML node
667707
func defaultAccumulatorIter(ctx *sql.Context, iter sql.RowIter) (sql.RowIter, sql.Schema) {
668708
clientFoundRowsToggled := (ctx.Client().Capabilities & mysql.CapabilityClientFoundRows) > 0
@@ -671,7 +711,7 @@ func defaultAccumulatorIter(ctx *sql.Context, iter sql.RowIter) (sql.RowIter, sq
671711
return iter, nil
672712
}
673713
return &accumulatorIter{
674-
iter: iter,
714+
iter: withSafepointPeriodicallyIter(iter),
675715
updateRowHandler: rowHandler,
676716
}, types.OkResultSchema
677717
}

sql/rowexec/proc.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ func (b *BaseBuilder) buildLoop(ctx *sql.Context, n *plan.Loop, row sql.Row) (sq
340340
return nil, err
341341
}
342342
}
343+
loopBodyIter = withSafepointPeriodicallyIter(loopBodyIter)
343344

344345
includeResultSet := false
345346

sql/session.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,15 @@ type LifecycleAwareSession interface {
229229
SessionEnd()
230230
}
231231

232+
// An optional Lifecycle callback which a session can receive. This can be
233+
// delivered periodically during a long running operation, between the
234+
// CommandBegin and CommandEnd calls. Across the call to this method, the
235+
// gms.Engine is not accessing the session or any of its state, such as
236+
// table editors, database providers, etc.
237+
type SafepointAwareSession interface {
238+
CommandSafepoint()
239+
}
240+
232241
type (
233242
// TypedValue is a value along with its type.
234243
TypedValue struct {
@@ -763,6 +772,13 @@ func SessionCommandEnd(s Session) {
763772
}
764773
}
765774

775+
// Helper function to call CommandSafepoint on a SafepointAwareSession, or do nothing.
776+
func SessionCommandSafepoint(s Session) {
777+
if cur, ok := s.(SafepointAwareSession); ok {
778+
cur.CommandSafepoint()
779+
}
780+
}
781+
766782
// Helper function to call SessionEnd on a LifecycleAwareSession, or do nothing.
767783
func SessionEnd(s Session) {
768784
if cur, ok := s.(LifecycleAwareSession); ok {

0 commit comments

Comments
 (0)