Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2513,6 +2513,50 @@ func (s *testCommitterSuite) TestKillSignal() {
s.ErrorContains(err, "query interrupted")
}

func (s *testCommitterSuite) TestUninterruptibleAction() {
s.Run("Cleanup", func() {
var killed uint32 = 0
txn := s.begin()
txn.SetVars(kv.NewVariables(&killed))
err := txn.Set([]byte("k1"), []byte("v1"))
s.NoError(err)
committer, err := txn.NewCommitter(0)
s.NoError(err)
err = committer.PrewriteAllMutations(context.Background())
s.NoError(err)
atomic.StoreUint32(&killed, 2)
s.NoError(committer.CleanupMutations(context.Background()))
})
s.Run("PessimisticRollback", func() {
var killed uint32 = 0
txn := s.begin()
txn.SetVars(kv.NewVariables(&killed))
txn.SetPessimistic(true)
err := txn.LockKeys(context.Background(), kv.NewLockCtx(txn.StartTS(), kv.LockNoWait, time.Now()), []byte("k2"))
s.NoError(err)
atomic.StoreUint32(&killed, 2)
committer, err := txn.NewCommitter(0)
s.NoError(err)
s.NoError(committer.PessimisticRollbackMutations(context.Background(), committer.GetMutations()))
})
s.Run("Commit", func() {
var killed uint32 = 0
txn := s.begin()
txn.SetVars(kv.NewVariables(&killed))
err := txn.Set([]byte("k1"), []byte("v1"))
s.NoError(err)
committer, err := txn.NewCommitter(0)
s.NoError(err)
err = committer.PrewriteAllMutations(context.Background())
s.NoError(err)
atomic.StoreUint32(&killed, 2)
commitTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
s.NoError(err)
committer.SetCommitTS(commitTS)
s.NoError(committer.CommitMutations(context.Background()))
})
}

func (s *testCommitterSuite) Test2PCLifecycleHooks() {
reachedPre := atomic.Bool{}
reachedPost := atomic.Bool{}
Expand Down
6 changes: 4 additions & 2 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,8 +893,10 @@ func (s *RegionRequestSender) SendReqCtx(
}

// recheck whether the session/query is killed during the Next()
if err2 := bo.CheckKilled(); err2 != nil {
return nil, nil, retryTimes, err2
if req.IsInterruptible() {
if err2 := bo.CheckKilled(); err2 != nil {
return nil, nil, retryTimes, err2
}
}
if val, err := util.EvalFailpoint("mockRetrySendReqToRegion"); err == nil {
if val.(bool) {
Expand Down
10 changes: 10 additions & 0 deletions tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,16 @@ func (req *Request) IsDebugReq() bool {
return false
}

// IsInterruptible checks if the request can be interrupted when the query is killed.
func (req *Request) IsInterruptible() bool {
switch req.Type {
case CmdPessimisticRollback, CmdBatchRollback, CmdCommit:
return false
default:
return true
}
}

// Get returns GetRequest in request.
func (req *Request) Get() *kvrpcpb.GetRequest {
return req.Req.(*kvrpcpb.GetRequest)
Expand Down
6 changes: 4 additions & 2 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const slowRequestThreshold = time.Minute
type twoPhaseCommitAction interface {
handleSingleBatch(*twoPhaseCommitter, *retry.Backoffer, batchMutations) error
tiKVTxnRegionsNumHistogram() prometheus.Observer
isInterruptible() bool
String() string
}

Expand Down Expand Up @@ -1064,9 +1065,10 @@ func (c *twoPhaseCommitter) doActionOnBatches(
// killSignal should never be nil for TiDB
if c.txn != nil && c.txn.vars != nil && c.txn.vars.Killed != nil {
// Do not reset the killed flag here. Let the upper layer reset the flag.
// Before it resets, any request is considered valid to be killed.
// Before it resets, any request is considered valid to be killed if the
// corresponding action is interruptible.
status := atomic.LoadUint32(c.txn.vars.Killed)
if status != 0 {
if status != 0 && action.isInterruptible() {
logutil.BgLogger().Info(
"query is killed", zap.Uint32(
"signal",
Expand Down
4 changes: 4 additions & 0 deletions txnkv/transaction/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (action actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Ba
return nil
}

func (actionCleanup) isInterruptible() bool {
return false
}

func (c *twoPhaseCommitter) cleanupMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
return c.doActionOnMutations(bo, actionCleanup{isInternal: c.txn.isInternal()}, mutations)
}
4 changes: 4 additions & 0 deletions txnkv/transaction/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ func (action actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Bac
return nil
}

func (actionCommit) isInterruptible() bool {
return false
}

func (c *twoPhaseCommitter) commitMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("twoPhaseCommitter.commitMutations", opentracing.ChildOf(span.Context()))
Expand Down
8 changes: 8 additions & 0 deletions txnkv/transaction/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,10 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(
return true, nil
}

func (actionPessimisticLock) isInterruptible() bool {
return true
}

func (actionPessimisticRollback) handleSingleBatch(
c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations,
) error {
Expand Down Expand Up @@ -572,6 +576,10 @@ func (actionPessimisticRollback) handleSingleBatch(
return nil
}

func (actionPessimisticRollback) isInterruptible() bool {
return false
}

func (c *twoPhaseCommitter) pessimisticLockMutations(
bo *retry.Backoffer, lockCtx *kv.LockCtx, lockWaitMode kvrpcpb.PessimisticLockWakeUpMode,
mutations CommitterMutations,
Expand Down
4 changes: 4 additions & 0 deletions txnkv/transaction/pipelined_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ func (action actionPipelinedFlush) handleSingleBatch(
}
}

func (actionPipelinedFlush) isInterruptible() bool {
return true
}

func (c *twoPhaseCommitter) pipelinedFlushMutations(bo *retry.Backoffer, mutations CommitterMutations, generation uint64) error {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("twoPhaseCommitter.pipelinedFlushMutations", opentracing.ChildOf(span.Context()))
Expand Down
4 changes: 4 additions & 0 deletions txnkv/transaction/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,10 @@ func (action actionPrewrite) handleSingleBatch(
}
}

func (actionPrewrite) isInterruptible() bool {
return true
}

func (c *twoPhaseCommitter) prewriteMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("twoPhaseCommitter.prewriteMutations", opentracing.ChildOf(span.Context()))
Expand Down
Loading