Skip to content
38 changes: 38 additions & 0 deletions go/vt/vtgate/executorcontext/safe_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,44 @@ func (session *SafeSession) InTransaction() bool {
return session.Session.InTransaction
}

// We take and return a snapshot of PreSessions, ShardSessions, and PostSessions
// because multiple goroutines might access or mutate these slices at the same time,
// which can lead to a race condition. This snapshot is used by Rollback/Release
// so they can safely work on a stable copy without holding the lock.

func (session *SafeSession) GetShardSessionsForCleanup() []*vtgatepb.Session_ShardSession {
session.mu.Lock()
defer session.mu.Unlock()
totalSize := len(session.PreSessions) + len(session.ShardSessions) + len(session.PostSessions)
shardSessions := make([]*vtgatepb.Session_ShardSession, 0, totalSize)
shardSessions = append(shardSessions, session.PreSessions...)
shardSessions = append(shardSessions, session.ShardSessions...)
shardSessions = append(shardSessions, session.PostSessions...)
return shardSessions
}

// Returns a snapshot of all shard sessions (including LockSession)
// for use in ReleaseAll, so it can safely operate without worrying
// about other goroutines mutating the session at the same time.

func (session *SafeSession) GetShardSessionsForReleaseAll() []*vtgatepb.Session_ShardSession {
session.mu.Lock()
defer session.mu.Unlock()
baseSize := len(session.PreSessions) + len(session.ShardSessions) + len(session.PostSessions)
totalSize := baseSize
if session.LockSession != nil {
totalSize = baseSize + 1
}
allShardSessions := make([]*vtgatepb.Session_ShardSession, 0, totalSize)
allShardSessions = append(allShardSessions, session.PreSessions...)
allShardSessions = append(allShardSessions, session.ShardSessions...)
allShardSessions = append(allShardSessions, session.PostSessions...)
if session.LockSession != nil {
allShardSessions = append(allShardSessions, session.LockSession)
}
return allShardSessions
}

// FindAndChangeSessionIfInSingleTxMode retrieves the ShardSession matching the given keyspace, shard, and tablet type.
// It performs additional checks and may modify the ShardSession in specific cases for single-mode transactions.
//
Expand Down
12 changes: 3 additions & 9 deletions go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,7 @@ func (txc *TxConn) Rollback(ctx context.Context, session *econtext.SafeSession)
}
defer session.ResetTx()

allsessions := append(session.PreSessions, session.ShardSessions...)
allsessions = append(allsessions, session.PostSessions...)
allsessions := session.GetShardSessionsForCleanup()

err := txc.runSessions(ctx, allsessions, session.GetLogger(), func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *econtext.ExecuteLogger) error {
if s.TransactionId == 0 {
Expand Down Expand Up @@ -451,8 +450,7 @@ func (txc *TxConn) Release(ctx context.Context, session *econtext.SafeSession) e
}
defer session.Reset()

allsessions := append(session.PreSessions, session.ShardSessions...)
allsessions = append(allsessions, session.PostSessions...)
allsessions := session.GetShardSessionsForCleanup()

return txc.runSessions(ctx, allsessions, session.GetLogger(), func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *econtext.ExecuteLogger) error {
if s.ReservedId == 0 && s.TransactionId == 0 {
Expand Down Expand Up @@ -498,11 +496,7 @@ func (txc *TxConn) ReleaseAll(ctx context.Context, session *econtext.SafeSession
}
defer session.ResetAll()

allsessions := append(session.PreSessions, session.ShardSessions...)
allsessions = append(allsessions, session.PostSessions...)
if session.LockSession != nil {
allsessions = append(allsessions, session.LockSession)
}
allsessions := session.GetShardSessionsForReleaseAll()

return txc.runSessions(ctx, allsessions, session.GetLogger(), func(ctx context.Context, s *vtgatepb.Session_ShardSession, loggging *econtext.ExecuteLogger) error {
if s.ReservedId == 0 && s.TransactionId == 0 {
Expand Down