Skip to content

Commit ce7299f

Browse files
authored
chore(spanner): preserving lock order - R/W mux session (googleapis#11585)
* chore(spanner): preserving lock order - R/W mux session * fix build and tests * remove not used options
1 parent f321d36 commit ce7299f

File tree

4 files changed

+166
-10
lines changed

4 files changed

+166
-10
lines changed

spanner/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,8 +1065,13 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
10651065
return ToSpannerError(err)
10661066
}
10671067
} else {
1068+
var previousTx transactionID
1069+
if t != nil {
1070+
previousTx = t.previousTx
1071+
}
10681072
t = &ReadWriteTransaction{
10691073
txReadyOrClosed: make(chan struct{}),
1074+
previousTx: previousTx,
10701075
}
10711076
t.txReadOnly.sh = sh
10721077
}

spanner/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func setupMockedTestServerWithConfigAndGCPMultiendpointPool(t *testing.T, config
129129
if err != nil {
130130
t.Fatal(err)
131131
}
132-
if isMultiplexEnabled {
132+
if isMultiplexEnabled || config.enableMultiplexSession {
133133
waitFor(t, func() error {
134134
client.idleSessions.mu.Lock()
135135
defer client.idleSessions.mu.Unlock()

spanner/transaction.go

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,6 +1165,7 @@ type ReadWriteTransaction struct {
11651165
// ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin()
11661166
// during the initialization of ReadWriteTransaction.
11671167
tx transactionID
1168+
previousTx transactionID
11681169
precommitToken *sppb.MultiplexedSessionPrecommitToken
11691170

11701171
// txReadyOrClosed is for broadcasting that transaction ID has been returned
@@ -1473,7 +1474,8 @@ func (t *ReadWriteTransaction) getTransactionSelector() *sppb.TransactionSelecto
14731474
Begin: &sppb.TransactionOptions{
14741475
Mode: &sppb.TransactionOptions_ReadWrite_{
14751476
ReadWrite: &sppb.TransactionOptions_ReadWrite{
1476-
ReadLockMode: t.txOpts.ReadLockMode,
1477+
ReadLockMode: t.txOpts.ReadLockMode,
1478+
MultiplexedSessionPreviousTransactionId: t.previousTx,
14771479
},
14781480
},
14791481
ExcludeTxnFromChangeStreams: t.txOpts.ExcludeTxnFromChangeStreams,
@@ -1535,18 +1537,19 @@ func (t *ReadWriteTransaction) setSessionEligibilityForLongRunning(sh *sessionHa
15351537
}
15361538
}
15371539

1538-
func beginTransaction(ctx context.Context, sid string, client spannerClient, opts TransactionOptions, mutationKey *sppb.Mutation) (transactionID, *sppb.MultiplexedSessionPrecommitToken, error) {
1539-
res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
1540-
Session: sid,
1540+
func beginTransaction(ctx context.Context, opts transactionBeginOptions) (transactionID, *sppb.MultiplexedSessionPrecommitToken, error) {
1541+
res, err := opts.client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
1542+
Session: opts.sessionID,
15411543
Options: &sppb.TransactionOptions{
15421544
Mode: &sppb.TransactionOptions_ReadWrite_{
15431545
ReadWrite: &sppb.TransactionOptions_ReadWrite{
1544-
ReadLockMode: opts.ReadLockMode,
1546+
ReadLockMode: opts.txOptions.ReadLockMode,
1547+
MultiplexedSessionPreviousTransactionId: opts.previousTx,
15451548
},
15461549
},
1547-
ExcludeTxnFromChangeStreams: opts.ExcludeTxnFromChangeStreams,
1550+
ExcludeTxnFromChangeStreams: opts.txOptions.ExcludeTxnFromChangeStreams,
15481551
},
1549-
MutationKey: mutationKey,
1552+
MutationKey: opts.mutation,
15501553
})
15511554
if err != nil {
15521555
return nil, nil, err
@@ -1581,6 +1584,7 @@ func (t *ReadWriteTransaction) begin(ctx context.Context, mutation *sppb.Mutatio
15811584
return nil
15821585
}
15831586
sh := t.sh
1587+
previousTx := t.previousTx
15841588
t.mu.Unlock()
15851589

15861590
var (
@@ -1604,7 +1608,13 @@ func (t *ReadWriteTransaction) begin(ctx context.Context, mutation *sppb.Mutatio
16041608
if sh != nil {
16051609
sh.updateLastUseTime()
16061610
}
1607-
tx, precommitToken, err = beginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), sh.getID(), sh.getClient(), t.txOpts, mutation)
1611+
tx, precommitToken, err = beginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), transactionBeginOptions{
1612+
sessionID: sh.getID(),
1613+
client: sh.getClient(),
1614+
txOptions: t.txOpts,
1615+
mutation: mutation,
1616+
previousTx: previousTx,
1617+
})
16081618
if isSessionNotFoundError(err) {
16091619
sh.destroy()
16101620
// this should not happen with multiplexed session, but if it does, we should not retry with multiplexed session
@@ -1791,6 +1801,9 @@ func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(cont
17911801
errDuringCommit = err != nil
17921802
}
17931803
if err != nil {
1804+
if t.tx != nil {
1805+
t.previousTx = t.tx
1806+
}
17941807
if isAbortedErr(err) {
17951808
// Retry the transaction using the same session on ABORT error.
17961809
// Cloud Spanner will create the new transaction with the previous
@@ -2073,3 +2086,12 @@ func isAbortedErr(err error) bool {
20732086
}
20742087
return false
20752088
}
2089+
2090+
// transactionBeginOptions holds the parameters for beginning a transaction.
2091+
type transactionBeginOptions struct {
2092+
sessionID string
2093+
client spannerClient
2094+
txOptions TransactionOptions
2095+
previousTx transactionID
2096+
mutation *sppb.Mutation
2097+
}

spanner/transaction_test.go

Lines changed: 130 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,131 @@ func TestCommitWithMultiplexedSessionRetry(t *testing.T) {
596596
}
597597
}
598598

599+
func TestClient_ReadWriteTransaction_PreviousTransactionID(t *testing.T) {
600+
t.Parallel()
601+
ctx := context.Background()
602+
cfg := SessionPoolConfig{
603+
MinOpened: 1,
604+
MaxOpened: 1,
605+
enableMultiplexSession: true,
606+
enableMultiplexedSessionForRW: true,
607+
}
608+
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
609+
DisableNativeMetrics: true,
610+
SessionPoolConfig: cfg,
611+
})
612+
defer teardown()
613+
614+
// First ExecuteSql will fail with InvalidArgument to force explicit BeginTransaction
615+
invalidSQL := "Update FOO Set BAR=1"
616+
server.TestSpanner.PutStatementResult(
617+
invalidSQL,
618+
&StatementResult{
619+
Type: StatementResultError,
620+
Err: status.Error(codes.InvalidArgument, "Invalid update"),
621+
},
622+
)
623+
624+
// First Commit will fail with Aborted to force transaction retry
625+
server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
626+
SimulatedExecutionTime{
627+
Errors: []error{status.Error(codes.Aborted, "Transaction aborted")},
628+
})
629+
630+
var attempts int
631+
expectedAttempts := 4
632+
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
633+
if attempts == 1 || attempts == 3 {
634+
// Replace the aborted result with a real result to prevent the
635+
// transaction from aborting indefinitely.
636+
server.TestSpanner.PutStatementResult(
637+
invalidSQL,
638+
&StatementResult{
639+
Type: StatementResultUpdateCount,
640+
UpdateCount: 3,
641+
},
642+
)
643+
}
644+
if attempts == 2 {
645+
server.TestSpanner.PutStatementResult(
646+
invalidSQL,
647+
&StatementResult{
648+
Type: StatementResultError,
649+
Err: status.Error(codes.InvalidArgument, "Invalid update"),
650+
},
651+
)
652+
}
653+
attempts++
654+
// First attempt: Inline begin fails due to invalid update
655+
// Second attempt: Explicit begin succeeds but commit aborts
656+
// Third attempt: Inline begin fails, previousTx set to txn2
657+
// Fourth attempt: Explicit begin succeeds with previousTx=txn2
658+
_, err := tx.Update(ctx, NewStatement(invalidSQL))
659+
if err != nil {
660+
return err
661+
}
662+
return nil
663+
})
664+
if err != nil {
665+
t.Fatal(err)
666+
}
667+
if attempts != expectedAttempts {
668+
t.Fatalf("got %d attempts, want %d", attempts, expectedAttempts)
669+
}
670+
671+
requests := drainRequestsFromServer(server.TestSpanner)
672+
var txID2 []byte
673+
var foundPrevTxID bool
674+
675+
// Verify the sequence of requests and transaction IDs
676+
for i, req := range requests {
677+
switch r := req.(type) {
678+
case *sppb.ExecuteSqlRequest:
679+
// First and third attempts use inline begin
680+
if i == 1 || i == 5 {
681+
if _, ok := r.Transaction.GetSelector().(*sppb.TransactionSelector_Begin); !ok {
682+
t.Errorf("Request %d: got %T, want Begin", i, r.Transaction.GetSelector())
683+
}
684+
}
685+
if txID2 == nil && r.Transaction.GetId() != nil {
686+
txID2 = r.Transaction.GetId()
687+
}
688+
case *sppb.BeginTransactionRequest:
689+
if i == 7 {
690+
opts := r.Options.GetReadWrite()
691+
if opts == nil {
692+
t.Fatal("Request 7: missing ReadWrite options")
693+
}
694+
if !testEqual(opts.MultiplexedSessionPreviousTransactionId, txID2) {
695+
t.Errorf("Request 7: got prev txID %v, want %v",
696+
opts.MultiplexedSessionPreviousTransactionId, txID2)
697+
}
698+
foundPrevTxID = true
699+
}
700+
}
701+
}
702+
703+
if !foundPrevTxID {
704+
t.Error("Did not find BeginTransaction request with previous transaction ID")
705+
}
706+
707+
// Verify the complete sequence of requests
708+
wantRequests := []interface{}{
709+
&sppb.BatchCreateSessionsRequest{},
710+
&sppb.ExecuteSqlRequest{}, // Attempt 1: Inline begin fails
711+
&sppb.BeginTransactionRequest{}, // Attempt 2: Explicit begin
712+
&sppb.ExecuteSqlRequest{}, // Attempt 2: Update succeeds
713+
&sppb.CommitRequest{}, // Attempt 2: Commit aborts
714+
&sppb.ExecuteSqlRequest{}, // Attempt 3: Inline begin fails
715+
&sppb.BeginTransactionRequest{}, // Attempt 4: Explicit begin with prev txID
716+
&sppb.ExecuteSqlRequest{}, // Attempt 4: Update succeeds
717+
&sppb.CommitRequest{}, // Attempt 4: Commit succeeds
718+
}
719+
if err := compareRequestsWithConfig(wantRequests, requests, &cfg); err != nil {
720+
t.Fatal(err)
721+
}
722+
}
723+
599724
func TestMutationOnlyCaseAborted(t *testing.T) {
600725
t.Parallel()
601726
ctx := context.Background()
@@ -1229,6 +1354,10 @@ func shouldHaveReceived(server InMemSpannerServer, want []interface{}) ([]interf
12291354

12301355
// Compares expected requests (want) with actual requests (got).
12311356
func compareRequests(want []interface{}, got []interface{}) error {
1357+
return compareRequestsWithConfig(want, got, nil)
1358+
}
1359+
1360+
func compareRequestsWithConfig(want []interface{}, got []interface{}, config *SessionPoolConfig) error {
12321361
if reflect.TypeOf(want[0]) != reflect.TypeOf(&sppb.BatchCreateSessionsRequest{}) {
12331362
sessReq := 0
12341363
for i := 0; i < len(want); i++ {
@@ -1239,7 +1368,7 @@ func compareRequests(want []interface{}, got []interface{}) error {
12391368
}
12401369
want[0], want[sessReq] = want[sessReq], want[0]
12411370
}
1242-
if isMultiplexEnabled {
1371+
if isMultiplexEnabled || (config != nil && config.enableMultiplexSession) {
12431372
if reflect.TypeOf(want[0]) != reflect.TypeOf(&sppb.CreateSessionRequest{}) {
12441373
want = append([]interface{}{&sppb.CreateSessionRequest{}}, want...)
12451374
}

0 commit comments

Comments
 (0)