Skip to content

Commit b7e019d

Browse files
authored
txnkv: prevent some actions from being interrupted by kill (tikv#1665)
fix pingcap/tidb#61454 Signed-off-by: zyguan <zhongyangguan@gmail.com>
1 parent 1430158 commit b7e019d

File tree

9 files changed

+87
-5
lines changed

9 files changed

+87
-5
lines changed

integration_tests/2pc_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2707,6 +2707,50 @@ func (s *testCommitterSuite) TestKillSignal() {
27072707
s.ErrorContains(err, "query interrupted")
27082708
}
27092709

2710+
func (s *testCommitterSuite) TestUninterruptibleAction() {
2711+
s.Run("Cleanup", func() {
2712+
var killed uint32 = 0
2713+
txn := s.begin()
2714+
txn.SetVars(kv.NewVariables(&killed))
2715+
err := txn.Set([]byte("k1"), []byte("v1"))
2716+
s.NoError(err)
2717+
committer, err := txn.NewCommitter(0)
2718+
s.NoError(err)
2719+
err = committer.PrewriteAllMutations(context.Background())
2720+
s.NoError(err)
2721+
atomic.StoreUint32(&killed, 2)
2722+
s.NoError(committer.CleanupMutations(context.Background()))
2723+
})
2724+
s.Run("PessimisticRollback", func() {
2725+
var killed uint32 = 0
2726+
txn := s.begin()
2727+
txn.SetVars(kv.NewVariables(&killed))
2728+
txn.SetPessimistic(true)
2729+
err := txn.LockKeys(context.Background(), kv.NewLockCtx(txn.StartTS(), kv.LockNoWait, time.Now()), []byte("k2"))
2730+
s.NoError(err)
2731+
atomic.StoreUint32(&killed, 2)
2732+
committer, err := txn.NewCommitter(0)
2733+
s.NoError(err)
2734+
s.NoError(committer.PessimisticRollbackMutations(context.Background(), committer.GetMutations()))
2735+
})
2736+
s.Run("Commit", func() {
2737+
var killed uint32 = 0
2738+
txn := s.begin()
2739+
txn.SetVars(kv.NewVariables(&killed))
2740+
err := txn.Set([]byte("k1"), []byte("v1"))
2741+
s.NoError(err)
2742+
committer, err := txn.NewCommitter(0)
2743+
s.NoError(err)
2744+
err = committer.PrewriteAllMutations(context.Background())
2745+
s.NoError(err)
2746+
atomic.StoreUint32(&killed, 2)
2747+
commitTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
2748+
s.NoError(err)
2749+
committer.SetCommitTS(commitTS)
2750+
s.NoError(committer.CommitMutations(context.Background()))
2751+
})
2752+
}
2753+
27102754
func (s *testCommitterSuite) Test2PCLifecycleHooks() {
27112755
reachedPre := atomic.Bool{}
27122756
reachedPost := atomic.Bool{}

internal/locate/region_request.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -912,9 +912,11 @@ func (s *sendReqState) next() (done bool) {
912912
bo, req := s.args.bo, s.args.req
913913

914914
// check whether the session/query is killed during the Next()
915-
if err := bo.CheckKilled(); err != nil {
916-
s.vars.resp, s.vars.err = nil, err
917-
return true
915+
if req.IsInterruptible() {
916+
if err := bo.CheckKilled(); err != nil {
917+
s.vars.resp, s.vars.err = nil, err
918+
return true
919+
}
918920
}
919921

920922
// handle send error

tikvrpc/tikvrpc.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,16 @@ func (req *Request) IsDebugReq() bool {
340340
return false
341341
}
342342

343+
// IsInterruptible checks if the request can be interrupted when the query is killed.
344+
func (req *Request) IsInterruptible() bool {
345+
switch req.Type {
346+
case CmdPessimisticRollback, CmdBatchRollback, CmdCommit:
347+
return false
348+
default:
349+
return true
350+
}
351+
}
352+
343353
// Get returns GetRequest in request.
344354
func (req *Request) Get() *kvrpcpb.GetRequest {
345355
return req.Req.(*kvrpcpb.GetRequest)

txnkv/transaction/2pc.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ const slowRequestThreshold = time.Minute
7575
type twoPhaseCommitAction interface {
7676
handleSingleBatch(*twoPhaseCommitter, *retry.Backoffer, batchMutations) error
7777
tiKVTxnRegionsNumHistogram() prometheus.Observer
78+
isInterruptible() bool
7879
String() string
7980
}
8081

@@ -1069,9 +1070,10 @@ func (c *twoPhaseCommitter) doActionOnBatches(
10691070
// killSignal should never be nil for TiDB
10701071
if c.txn != nil && c.txn.vars != nil && c.txn.vars.Killed != nil {
10711072
// Do not reset the killed flag here. Let the upper layer reset the flag.
1072-
// Before it resets, any request is considered valid to be killed.
1073+
// Before it resets, any request is considered valid to be killed if the
1074+
// corresponding action is interruptible.
10731075
status := atomic.LoadUint32(c.txn.vars.Killed)
1074-
if status != 0 {
1076+
if status != 0 && action.isInterruptible() {
10751077
logutil.BgLogger().Info(
10761078
"query is killed", zap.Uint32(
10771079
"signal",

txnkv/transaction/cleanup.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ func (action actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Ba
103103
return nil
104104
}
105105

106+
func (actionCleanup) isInterruptible() bool {
107+
return false
108+
}
109+
106110
func (c *twoPhaseCommitter) cleanupMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
107111
return c.doActionOnMutations(bo, actionCleanup{isInternal: c.txn.isInternal()}, mutations)
108112
}

txnkv/transaction/commit.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,10 @@ func (action actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Bac
247247
return nil
248248
}
249249

250+
func (actionCommit) isInterruptible() bool {
251+
return false
252+
}
253+
250254
func (c *twoPhaseCommitter) commitMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
251255
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
252256
span1 := span.Tracer().StartSpan("twoPhaseCommitter.commitMutations", opentracing.ChildOf(span.Context()))

txnkv/transaction/pessimistic.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,10 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(
525525
return true, nil
526526
}
527527

528+
func (actionPessimisticLock) isInterruptible() bool {
529+
return true
530+
}
531+
528532
func (actionPessimisticRollback) handleSingleBatch(
529533
c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations,
530534
) error {
@@ -560,6 +564,10 @@ func (actionPessimisticRollback) handleSingleBatch(
560564
return nil
561565
}
562566

567+
func (actionPessimisticRollback) isInterruptible() bool {
568+
return false
569+
}
570+
563571
func (c *twoPhaseCommitter) pessimisticLockMutations(
564572
bo *retry.Backoffer, lockCtx *kv.LockCtx, lockWaitMode kvrpcpb.PessimisticLockWakeUpMode,
565573
mutations CommitterMutations,

txnkv/transaction/pipelined_flush.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,10 @@ func (action actionPipelinedFlush) handleSingleBatch(
284284
}
285285
}
286286

287+
func (actionPipelinedFlush) isInterruptible() bool {
288+
return true
289+
}
290+
287291
func (c *twoPhaseCommitter) pipelinedFlushMutations(bo *retry.Backoffer, mutations CommitterMutations, generation uint64) error {
288292
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
289293
span1 := span.Tracer().StartSpan("twoPhaseCommitter.pipelinedFlushMutations", opentracing.ChildOf(span.Context()))

txnkv/transaction/prewrite.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,10 @@ func (action actionPrewrite) handleSingleBatchFailpoint(c *twoPhaseCommitter, bo
282282
return nil
283283
}
284284

285+
func (actionPrewrite) isInterruptible() bool {
286+
return true
287+
}
288+
285289
func (c *twoPhaseCommitter) prewriteMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
286290
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
287291
span1 := span.Tracer().StartSpan("twoPhaseCommitter.prewriteMutations", opentracing.ChildOf(span.Context()))

0 commit comments

Comments
 (0)