Skip to content

Commit 4a13d9b

Browse files
committed
isession: add savepoint support
This adds Savepoint support to the internal session. LDR needs save points for performance reasons. It allows it to handle individual LWW losers without aborting the entire replication batch. Release note: none Epic: CRDB-48647
1 parent f2d1d9b commit 4a13d9b

File tree

3 files changed

+144
-1
lines changed

3 files changed

+144
-1
lines changed

pkg/sql/isession/internal_session.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,25 @@ var (
5252
AST: &tree.RollbackTransaction{},
5353
SQL: "ROLLBACK",
5454
}
55+
56+
savepointBegin = statements.Statement[tree.Statement]{
57+
AST: &tree.Savepoint{
58+
Name: tree.Name("internal_session"),
59+
},
60+
SQL: "SAVEPOINT internal_session",
61+
}
62+
savepointRelease = statements.Statement[tree.Statement]{
63+
AST: &tree.ReleaseSavepoint{
64+
Savepoint: tree.Name("internal_session"),
65+
},
66+
SQL: "RELEASE SAVEPOINT internal_session",
67+
}
68+
savepointRollback = statements.Statement[tree.Statement]{
69+
AST: &tree.RollbackToSavepoint{
70+
Savepoint: tree.Name("internal_session"),
71+
},
72+
SQL: "ROLLBACK TO SAVEPOINT internal_session",
73+
}
5574
)
5675

5776
var _ isql.Session = &InternalSession{}
@@ -115,6 +134,40 @@ func (i *InternalSession) Txn(ctx context.Context, do func(ctx context.Context)
115134
return err
116135
}
117136

137+
func (i *InternalSession) Savepoint(ctx context.Context, do func(ctx context.Context) error) error {
138+
if i.poison != nil {
139+
return i.poison
140+
}
141+
142+
if err := i.executeStatement(ctx, savepointBegin); err != nil {
143+
return errors.Wrap(err, "failed to create savepoint")
144+
}
145+
146+
innerErr := do(ctx)
147+
if innerErr != nil {
148+
// Return the rollback error as primary since it indicates a more
149+
// serious problem than the original error.
150+
savePointErr := i.executeStatement(ctx, savepointRollback)
151+
if savePointErr != nil {
152+
return errors.CombineErrors(savePointErr, innerErr)
153+
}
154+
// NOTE: Rollback does not release the savepoint. We need to release the
155+
// savepoint to avoid leaking it and causing weird behavior if the user is
156+
// nesting savepoint calls.
157+
releaseErr := i.executeStatement(ctx, savepointRelease)
158+
if releaseErr != nil {
159+
return errors.CombineErrors(releaseErr, innerErr)
160+
}
161+
return innerErr
162+
}
163+
164+
if err := i.executeStatement(ctx, savepointRelease); err != nil {
165+
return errors.Wrap(err, "failed to release the savepoint")
166+
}
167+
168+
return nil
169+
}
170+
118171
func (i *InternalSession) Prepare(
119172
ctx context.Context, name string, stmt statements.Statement[tree.Statement], types []*types.T,
120173
) (isql.PreparedStatement, error) {
@@ -233,7 +286,7 @@ func (i *InternalSession) executeStatement(
233286
return errors.Wrap(err, "unable to push sync statement")
234287
}
235288
_, _, err = i.readResults(ctx)
236-
return errors.Wrap(err, "unable to execute raw statement")
289+
return errors.Wrapf(err, "unable to execute raw statement %s", stmt.SQL)
237290
}
238291

239292
func (i *InternalSession) readResults(ctx context.Context) ([]tree.Datums, int, error) {

pkg/sql/isession/internal_session_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,3 +538,77 @@ func tcTableNotFound(
538538
_, err = session.Prepare(ctx, "table-not-found-select", stmt, nil)
539539
require.ErrorContains(t, err, "relation \"defaultdb.non_existent_table\" does not exist")
540540
}
541+
542+
func TestSavepoint(t *testing.T) {
543+
defer leaktest.AfterTest(t)()
544+
defer log.Scope(t).Close(t)
545+
546+
ctx := context.Background()
547+
s := serverutils.StartServerOnly(t, base.TestServerArgs{})
548+
defer s.Stopper().Stop(ctx)
549+
550+
db := s.SQLConn(t)
551+
_, err := db.Exec("CREATE TABLE test (id INT PRIMARY KEY, val INT)")
552+
require.NoError(t, err)
553+
554+
idb := s.InternalDB().(descs.DB)
555+
session, err := idb.Session(ctx, "test-session")
556+
require.NoError(t, err)
557+
defer session.Close(ctx)
558+
559+
insertStmt, err := parser.ParseOne("INSERT INTO defaultdb.test VALUES ($1, $2)")
560+
require.NoError(t, err)
561+
562+
insertPrepared, err := session.Prepare(ctx, "insert", insertStmt, []*types.T{types.Int, types.Int})
563+
require.NoError(t, err)
564+
565+
selectStmt, err := parser.ParseOne("SELECT id, val FROM defaultdb.test ORDER BY id")
566+
require.NoError(t, err)
567+
568+
selectPrepared, err := session.Prepare(ctx, "select", selectStmt, nil)
569+
require.NoError(t, err)
570+
571+
// Create a a three deep savepoint and rollback the inner two savepoints.
572+
err = session.Txn(ctx, func(ctx context.Context) error {
573+
return session.Savepoint(ctx, func(ctx context.Context) error {
574+
_, err := session.ExecutePrepared(ctx, insertPrepared, tree.Datums{
575+
tree.NewDInt(tree.DInt(1)),
576+
tree.NewDInt(tree.DInt(10)),
577+
})
578+
if err != nil {
579+
return err
580+
}
581+
err = session.Savepoint(ctx, func(ctx context.Context) error {
582+
_, err := session.ExecutePrepared(ctx, insertPrepared, tree.Datums{
583+
tree.NewDInt(tree.DInt(2)),
584+
tree.NewDInt(tree.DInt(20)),
585+
})
586+
if err != nil {
587+
return err
588+
}
589+
return session.Savepoint(ctx, func(ctx context.Context) error {
590+
_, err := session.ExecutePrepared(ctx, insertPrepared, tree.Datums{
591+
tree.NewDInt(tree.DInt(3)),
592+
tree.NewDInt(tree.DInt(30)),
593+
})
594+
if err != nil {
595+
return err
596+
}
597+
// Rollback this innermost savepoint
598+
return errors.New("rollback innermost savepoint")
599+
})
600+
})
601+
// The second savepoint should fail due to the nested savepoint rollback
602+
require.ErrorContains(t, err, "rollback innermost savepoint")
603+
return nil
604+
})
605+
})
606+
require.NoError(t, err)
607+
608+
// Verify final state - only first insert should be committed
609+
rows, err := session.QueryPrepared(ctx, selectPrepared, nil)
610+
require.NoError(t, err)
611+
require.Equal(t, []tree.Datums{
612+
{tree.NewDInt(tree.DInt(1)), tree.NewDInt(tree.DInt(10))},
613+
}, rows)
614+
}

pkg/sql/isql/isql_session.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,22 @@ type Session interface {
6464
// }
6565
Txn(ctx context.Context, do func(context.Context) error) error
6666

67+
// Savepoint creates a savepoint within an existing transaction and executes
68+
// the given function. If the function returns an error, the savepoint is
69+
// rolled back. If the function succeeds, the savepoint is released.
70+
// Savepoints must be used within a transaction.
71+
//
72+
// Example:
73+
// err := session.Txn(ctx, func(ctx context.Context) error {
74+
// return session.Savepoint(ctx, func(ctx context.Context) error {
75+
// return session.ExecutePrepared(ctx, stmt, []tree.Datum{tree.NewDInt(1)})
76+
// })
77+
// })
78+
// if err != nil {
79+
// return err
80+
// }
81+
Savepoint(ctx context.Context, do func(context.Context) error) error
82+
6783
// Close closes the session and cleans up internal resources.
6884
Close(ctx context.Context)
6985
}

0 commit comments

Comments
 (0)