Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,15 @@ spanner> SELECT * FROM Singers;

Note: Any stats are not available in partitioned query.

##### Isolation Level

You can set session-level default isolation level and transaction-level isolation level.

```
spanner> SET DEFAULT_ISOLATION_LEVEL = "REPEATABLE_READ";
spanner> BEGIN ISOLATION LEVEL REPEATABLE READ;
```

##### Enable Data Boost

You can enable Data Boost using `DATA_BOOST_ENABLED`.
Expand Down
39 changes: 32 additions & 7 deletions client_side_statement_def.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
"github.com/apstndb/gsqlutils/stmtkind"
"github.com/cloudspannerecosystem/memefish"
"github.com/cloudspannerecosystem/memefish/ast"
Expand Down Expand Up @@ -330,18 +331,23 @@ var clientSideStatementDefs = []*clientSideStatementDef{
Descriptions: []clientSideStatementDescription{
{
Usage: `Start R/W transaction`,
Syntax: `BEGIN RW [TRANSACTION] [PRIORITY {HIGH|MEDIUM|LOW}]`,
Syntax: `BEGIN RW [TRANSACTION] [ISOLATION LEVEL {SERIALIZABLE|REPEATABLE READ}] [PRIORITY {HIGH|MEDIUM|LOW}]`,
Note: `(spanner-cli style); See [Request Priority](#request-priority) for details on the priority.`,
},
},
Pattern: regexp.MustCompile(`(?is)^BEGIN\s+RW(?:\s+TRANSACTION)?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?$`),
Pattern: regexp.MustCompile(`(?is)^BEGIN\s+RW(?:\s+TRANSACTION)?(?:\s+ISOLATION\s+LEVEL\s+(SERIALIZABLE|REPEATABLE\s+READ))?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?$`),
Comment on lines 337 to +338
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The regex (?is)^BEGIN\s+RW(?:\s+TRANSACTION)?(?:\s+ISOLATION\s+LEVEL\s+(SERIALIZABLE|REPEATABLE\s+READ))?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?$ uses numeric indexing for submatches. If the order of the optional clauses changes, the indexes in HandleSubmatch will need to be manually updated. Consider using named capture groups to improve maintainability. For example, (?is)^BEGIN\s+RW(?:\s+TRANSACTION)?(?:\s+ISOLATION\s+LEVEL\s+(?P<isolation>SERIALIZABLE|REPEATABLE\s+READ))?(?:\s+PRIORITY\s+(?P<priority>HIGH|MEDIUM|LOW))?$

HandleSubmatch: func(matched []string) (Statement, error) {
priority, err := parsePriority(matched[1])
isolationLevel, err := parseIsolationLevel(matched[1])
if err != nil {
return nil, err
}

return &BeginRwStatement{Priority: priority}, nil
priority, err := parsePriority(matched[2])
Comment on lines 339 to +345
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since you're using numeric indexing for the regex submatches, it's important to ensure that the indexes are correct. It looks like parsePriority is using matched[2] but it's not clear that the second submatch will always be the priority. Consider using named capture groups in the regex to avoid this issue.

if err != nil {
return nil, err
}

return &BeginRwStatement{IsolationLevel: isolationLevel, Priority: priority}, nil
},
},
{
Expand Down Expand Up @@ -390,14 +396,19 @@ var clientSideStatementDefs = []*clientSideStatementDef{
Note: "(Spanner JDBC driver style); It respects `READONLY` system variable. See [Request Priority](#request-priority) for details on the priority.",
},
},
Pattern: regexp.MustCompile(`(?is)^BEGIN(?:\s+TRANSACTION)?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?$`),
Pattern: regexp.MustCompile(`(?is)^BEGIN(?:\s+TRANSACTION)?(?:\s+ISOLATION\s+LEVEL\s+(SERIALIZABLE|REPEATABLE\s+READ))?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?$`),
HandleSubmatch: func(matched []string) (Statement, error) {
priority, err := parsePriority(matched[1])
isolationLevel, err := parseIsolationLevel(matched[1])
if err != nil {
return nil, err
}

priority, err := parsePriority(matched[2])
Comment on lines +399 to +406
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the BEGIN RW statement, the regex for the BEGIN statement also uses numeric indexing for submatches. Consider using named capture groups to improve maintainability.

if err != nil {
return nil, err
}

return &BeginStatement{Priority: priority}, nil
return &BeginStatement{IsolationLevel: isolationLevel, Priority: priority}, nil
},
},
{
Expand Down Expand Up @@ -755,3 +766,17 @@ func exprToFullName(expr ast.Expr) (string, error) {
return "", fmt.Errorf("must be ident or path, but: %T", expr)
}
}

func parseIsolationLevel(isolationLevel string) (sppb.TransactionOptions_IsolationLevel, error) {
if isolationLevel == "" {
return sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED, nil
}

value := strings.Join(strings.Fields(strings.ToUpper(isolationLevel)), "_")

p, ok := sppb.TransactionOptions_IsolationLevel_value[value]
if !ok {
return sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED, fmt.Errorf("invalid isolation level: %q", value)
}
return sppb.TransactionOptions_IsolationLevel(p), nil
}
38 changes: 26 additions & 12 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ type transactionContext struct {
priority sppb.RequestOptions_Priority
sendHeartbeat bool // Becomes true only after a user-driven query is executed on the transaction.

txn any
txn any
isolationLevel sppb.TransactionOptions_IsolationLevel

// rwTxn *spanner.ReadWriteStmtBasedTransaction
// roTxn *spanner.ReadOnlyTransaction
Expand Down Expand Up @@ -217,7 +218,7 @@ func (s *Session) InTransaction() bool {

// BeginPendingTransaction starts pending transaction.
// The actual start of the transaction is delayed until the first operation in the transaction is executed.
func (s *Session) BeginPendingTransaction(ctx context.Context, priority sppb.RequestOptions_Priority) error {
func (s *Session) BeginPendingTransaction(ctx context.Context, isolationLevel sppb.TransactionOptions_IsolationLevel, priority sppb.RequestOptions_Priority) error {
if s.InReadWriteTransaction() {
return errors.New("read-write transaction is already running")
}
Expand All @@ -226,14 +227,20 @@ func (s *Session) BeginPendingTransaction(ctx context.Context, priority sppb.Req
return errors.New("read-only transaction is already running")
}

// Use session's default isolation level if transaction priority is not set.
if isolationLevel == sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED {
isolationLevel = s.systemVariables.DefaultIsolationLevel
}

// Use session's priority if transaction priority is not set.
if priority == sppb.RequestOptions_PRIORITY_UNSPECIFIED {
priority = s.systemVariables.RPCPriority
}

s.tc = &transactionContext{
mode: transactionModePending,
priority: priority,
mode: transactionModePending,
priority: priority,
isolationLevel: isolationLevel,
}
return nil
}
Expand All @@ -248,11 +255,11 @@ func (s *Session) DetermineTransaction(ctx context.Context) (time.Time, error) {
return s.BeginReadOnlyTransaction(ctx, timestampBoundUnspecified, 0, time.Time{}, s.tc.priority)
}

return zeroTime, s.BeginReadWriteTransaction(ctx, s.tc.priority)
return zeroTime, s.BeginReadWriteTransaction(ctx, s.tc.isolationLevel, s.tc.priority)
}

// BeginReadWriteTransaction starts read-write transaction.
func (s *Session) BeginReadWriteTransaction(ctx context.Context, priority sppb.RequestOptions_Priority) error {
func (s *Session) BeginReadWriteTransaction(ctx context.Context, isolationLevel sppb.TransactionOptions_IsolationLevel, priority sppb.RequestOptions_Priority) error {
if s.InReadWriteTransaction() {
return errors.New("read-write transaction is already running")
}
Expand All @@ -271,23 +278,29 @@ func (s *Session) BeginReadWriteTransaction(ctx context.Context, priority sppb.R
priority = s.systemVariables.RPCPriority
}

// Use default isolation level if transaction isolation level is not set.
if isolationLevel == sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED {
isolationLevel = s.systemVariables.DefaultIsolationLevel
}

opts := spanner.TransactionOptions{
CommitOptions: spanner.CommitOptions{ReturnCommitStats: true, MaxCommitDelay: s.systemVariables.MaxCommitDelay},
CommitPriority: priority,
TransactionTag: tag,
ExcludeTxnFromChangeStreams: s.systemVariables.ExcludeTxnFromChangeStreams,
IsolationLevel: s.systemVariables.DefaultIsolationLevel,
IsolationLevel: isolationLevel,
}

txn, err := spanner.NewReadWriteStmtBasedTransactionWithOptions(ctx, s.client, opts)
if err != nil {
return err
}
s.tc = &transactionContext{
mode: transactionModeReadWrite,
tag: tag,
priority: priority,
txn: txn,
mode: transactionModeReadWrite,
tag: tag,
priority: priority,
txn: txn,
isolationLevel: isolationLevel,
}
return nil
}
Expand Down Expand Up @@ -671,7 +684,8 @@ func (s *Session) RunInNewOrExistRwTx(ctx context.Context,
var implicitRWTx bool
if !s.InReadWriteTransaction() {
// Start implicit transaction.
if err := s.BeginReadWriteTransaction(ctx, s.currentPriority()); err != nil {
// Note: isolation level is not session level property so it is left as unspecified.
if err := s.BeginReadWriteTransaction(ctx, sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED, s.currentPriority()); err != nil {
return 0, spanner.CommitResponse{}, nil, nil, err
}
implicitRWTx = true
Expand Down
49 changes: 31 additions & 18 deletions session_slow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestRequestPriority(t *testing.T) {
}

// Read-Write Transaction.
if err := session.BeginReadWriteTransaction(ctx, test.transactionPriority); err != nil {
if err := session.BeginReadWriteTransaction(ctx, 0, test.transactionPriority); err != nil {
t.Fatalf("failed to begin read write transaction: %v", err)
}
iter, _ := session.RunQuery(ctx, spanner.NewStatement("SELECT * FROM t1"))
Expand Down Expand Up @@ -171,24 +171,40 @@ func TestIsolationLevel(t *testing.T) {
}

for _, test := range []struct {
desc string
defaultIsolationLevel sppb.TransactionOptions_IsolationLevel
want sppb.TransactionOptions_IsolationLevel
desc string
defaultIsolationLevel sppb.TransactionOptions_IsolationLevel
transactionIsolationLevel sppb.TransactionOptions_IsolationLevel
want sppb.TransactionOptions_IsolationLevel
}{
{
desc: "use default isolation level",
defaultIsolationLevel: sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED,
want: sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED,
desc: "use default isolation level",
defaultIsolationLevel: sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED,
transactionIsolationLevel: sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED,
want: sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED,
},
{
desc: "use serializable isolation level",
defaultIsolationLevel: sppb.TransactionOptions_SERIALIZABLE,
want: sppb.TransactionOptions_SERIALIZABLE,
desc: "use default serializable isolation level",
defaultIsolationLevel: sppb.TransactionOptions_SERIALIZABLE,
transactionIsolationLevel: sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED,
want: sppb.TransactionOptions_SERIALIZABLE,
},
{
desc: "use repeatable_read isolation level",
defaultIsolationLevel: sppb.TransactionOptions_REPEATABLE_READ,
want: sppb.TransactionOptions_REPEATABLE_READ,
desc: "use default repeatable read isolation level",
defaultIsolationLevel: sppb.TransactionOptions_REPEATABLE_READ,
transactionIsolationLevel: sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED,
want: sppb.TransactionOptions_REPEATABLE_READ,
},
{
desc: "use override serializable isolation level",
defaultIsolationLevel: sppb.TransactionOptions_REPEATABLE_READ,
transactionIsolationLevel: sppb.TransactionOptions_SERIALIZABLE,
want: sppb.TransactionOptions_SERIALIZABLE,
},
{
desc: "use override repeatable read isolation level",
defaultIsolationLevel: sppb.TransactionOptions_SERIALIZABLE,
transactionIsolationLevel: sppb.TransactionOptions_REPEATABLE_READ,
want: sppb.TransactionOptions_REPEATABLE_READ,
},
} {
Comment on lines 173 to 209
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider adding a test case where the defaultIsolationLevel is set to REPEATABLE_READ and the transactionIsolationLevel is unspecified. This will ensure that the default isolation level is correctly applied when no explicit isolation level is specified in the transaction.

t.Run(test.desc, func(t *testing.T) {
Expand All @@ -205,18 +221,15 @@ func TestIsolationLevel(t *testing.T) {
}

// Read-Write Transaction.
if err := session.BeginReadWriteTransaction(ctx, sppb.RequestOptions_PRIORITY_UNSPECIFIED); err != nil {
if err := session.BeginReadWriteTransaction(ctx, test.transactionIsolationLevel, sppb.RequestOptions_PRIORITY_UNSPECIFIED); err != nil {
t.Fatalf("failed to begin read write transaction: %v", err)
}
iter, _ := session.RunQuery(ctx, spanner.NewStatement("SELECT * FROM t1"))
iter, _ := session.RunQuery(ctx, spanner.NewStatement("SELECT 1"))
if err := iter.Do(func(r *spanner.Row) error {
return nil
}); err != nil {
t.Fatalf("failed to run query: %v", err)
}
if _, _, _, _, err := session.RunUpdate(ctx, spanner.NewStatement("DELETE FROM t1 WHERE Id = 1"), true); err != nil {
t.Fatalf("failed to run update: %v", err)
}
if _, err := session.CommitReadWriteTransaction(ctx); err != nil {
t.Fatalf("failed to commit: %v", err)
}
Expand Down
44 changes: 44 additions & 0 deletions statement_processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,57 @@ func TestBuildStatement(t *testing.T) {
Priority: sppb.RequestOptions_PRIORITY_MEDIUM,
},
},
{
desc: "BEGIN statement with SERIALIZABLE",
input: "BEGIN ISOLATION LEVEL SERIALIZABLE",
want: &BeginStatement{
IsolationLevel: sppb.TransactionOptions_SERIALIZABLE,
},
},
{
desc: "BEGIN statement with REPEATABLE READ",
input: "BEGIN ISOLATION LEVEL REPEATABLE READ",
want: &BeginStatement{
IsolationLevel: sppb.TransactionOptions_REPEATABLE_READ,
},
},
{
desc: "BEGIN statement with REPEATABLE READ and PRIORITY",
input: "BEGIN ISOLATION LEVEL REPEATABLE READ PRIORITY MEDIUM",
want: &BeginStatement{
IsolationLevel: sppb.TransactionOptions_REPEATABLE_READ,
Priority: sppb.RequestOptions_PRIORITY_MEDIUM,
},
},
{
desc: "BEGIN RW PRIORITY statement",
input: "BEGIN RW PRIORITY LOW",
want: &BeginRwStatement{
Priority: sppb.RequestOptions_PRIORITY_LOW,
},
},
{
desc: "BEGIN RW statement with SERIALIZABLE",
input: "BEGIN RW ISOLATION LEVEL SERIALIZABLE",
want: &BeginRwStatement{
IsolationLevel: sppb.TransactionOptions_SERIALIZABLE,
},
},
{
desc: "BEGIN RW statement with REPEATABLE READ",
input: "BEGIN RW ISOLATION LEVEL REPEATABLE READ",
want: &BeginRwStatement{
IsolationLevel: sppb.TransactionOptions_REPEATABLE_READ,
},
},
{
desc: "BEGIN RW statement with REPEATABLE READ and PRIORITY",
input: "BEGIN RW ISOLATION LEVEL REPEATABLE READ PRIORITY MEDIUM",
want: &BeginRwStatement{
IsolationLevel: sppb.TransactionOptions_REPEATABLE_READ,
Priority: sppb.RequestOptions_PRIORITY_MEDIUM,
},
},
{
desc: "BEGIN RO statement",
input: "BEGIN RO",
Expand Down
12 changes: 7 additions & 5 deletions statements_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type BeginRoStatement struct {
}

type BeginRwStatement struct {
Priority sppb.RequestOptions_Priority
IsolationLevel sppb.TransactionOptions_IsolationLevel
Priority sppb.RequestOptions_Priority
}

func (BeginRwStatement) isMutationStatement() {}
Expand All @@ -40,15 +41,16 @@ func (s *BeginRwStatement) Execute(ctx context.Context, session *Session) (*Resu
return nil, errors.New("you're in read-only transaction. Please finish the transaction by 'CLOSE;'")
}

if err := session.BeginReadWriteTransaction(ctx, s.Priority); err != nil {
if err := session.BeginReadWriteTransaction(ctx, s.IsolationLevel, s.Priority); err != nil {
return nil, err
}

return &Result{IsMutation: true}, nil
}

type BeginStatement struct {
Priority sppb.RequestOptions_Priority
IsolationLevel sppb.TransactionOptions_IsolationLevel
Priority sppb.RequestOptions_Priority
}

func (s *BeginStatement) Execute(ctx context.Context, session *Session) (*Result, error) {
Expand All @@ -68,7 +70,7 @@ func (s *BeginStatement) Execute(ctx context.Context, session *Session) (*Result
}, nil
}

err := session.BeginPendingTransaction(ctx, s.Priority)
err := session.BeginPendingTransaction(ctx, s.IsolationLevel, s.Priority)
if err != nil {
return nil, err
}
Expand All @@ -95,7 +97,7 @@ func (s *SetTransactionStatement) Execute(ctx context.Context, session *Session)
result.Timestamp = ts
return result, nil
} else {
err := session.BeginReadWriteTransaction(ctx, session.tc.priority)
err := session.BeginReadWriteTransaction(ctx, session.tc.isolationLevel, session.tc.priority)
if err != nil {
return nil, err
}
Expand Down