Skip to content

Commit 30119fc

Browse files
authored
txnkv: Add a named error ErrCommitTSLag when waiting for commitWaitUntilTSO timeout (tikv#1842)
close tikv#1843 Signed-off-by: Chao Wang <cclcwangchao@hotmail.com>
1 parent 847f93b commit 30119fc

File tree

10 files changed

+340
-51
lines changed

10 files changed

+340
-51
lines changed

config/retry/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ var (
133133
BoTxnNotFound = NewConfig("txnNotFound", &metrics.BackoffHistogramEmpty, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrResolveLockTimeout)
134134
BoStaleCmd = NewConfig("staleCommand", &metrics.BackoffHistogramStaleCmd, NewBackoffFnCfg(2, 1000, NoJitter), tikverr.ErrTiKVStaleCommand)
135135
BoMaxTsNotSynced = NewConfig("maxTsNotSynced", &metrics.BackoffHistogramEmpty, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrTiKVMaxTimestampNotSynced)
136+
BoCommitTSLag = NewConfig("commitTSLag", &metrics.BackoffHistogramEmpty, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrCommitTSLag)
136137
BoMaxRegionNotInitialized = NewConfig("regionNotInitialized", &metrics.BackoffHistogramEmpty, NewBackoffFnCfg(2, 1000, NoJitter), tikverr.ErrRegionNotInitialized)
137138
BoIsWitness = NewConfig("isWitness", &metrics.BackoffHistogramIsWitness, NewBackoffFnCfg(1000, 10000, EqualJitter), tikverr.ErrIsWitness)
138139
// TxnLockFast's `base` load from vars.BackoffLockFast when create BackoffFn.

error/error.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ var (
9191
ErrRegionNotInitialized = errors.New("region not Initialized")
9292
// ErrTiKVDiskFull is the error when tikv server disk usage is full.
9393
ErrTiKVDiskFull = errors.New("tikv disk full")
94+
// ErrCommitTSLag is the error when the commit TS which fetched from PD drifts and falls behind the expected value.
95+
ErrCommitTSLag = errors.New("commit timestamp lags behind expected")
9496
// ErrRegionRecoveryInProgress is the error when region is recovering.
9597
ErrRegionRecoveryInProgress = errors.New("region is being online unsafe recovered")
9698
// ErrRegionFlashbackInProgress is the error when a region in the flashback progress receive any other request.
@@ -420,3 +422,8 @@ func ExtractDebugInfoStrFromKeyErr(keyErr *kvrpcpb.KeyError) string {
420422
}
421423
return string(debugStr)
422424
}
425+
426+
// IsErrorCommitTSLag checks if the error is commit ts lag error.
427+
func IsErrorCommitTSLag(err error) bool {
428+
return errors.Is(err, ErrCommitTSLag)
429+
}

examples/txnkv/go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module txnkv
22

3-
go 1.23.0
3+
go 1.23.12
44

55
require github.com/tikv/client-go/v2 v2.0.0
66

@@ -22,7 +22,7 @@ require (
2222
github.com/opentracing/opentracing-go v1.2.0 // indirect
2323
github.com/pingcap/errors v0.11.5-0.20241219054535-6b8c588c3122 // indirect
2424
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
25-
github.com/pingcap/kvproto v0.0.0-20251109100001-1907922fbd18 // indirect
25+
github.com/pingcap/kvproto v0.0.0-20251218093338-9f0ac2fc9a1a // indirect
2626
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
2727
github.com/pkg/errors v0.9.1 // indirect
2828
github.com/prometheus/client_golang v1.20.5 // indirect
@@ -31,7 +31,7 @@ require (
3131
github.com/prometheus/procfs v0.15.1 // indirect
3232
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
3333
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
34-
github.com/tikv/pd/client v0.0.0-20250625073039-fb496b371ff3 // indirect
34+
github.com/tikv/pd/client v0.0.0-20251211035544-6cebb3314abe // indirect
3535
github.com/twmb/murmur3 v1.1.3 // indirect
3636
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
3737
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect

integration_tests/option_test.go

Lines changed: 165 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,68 @@ package tikv_test
3636

3737
import (
3838
"context"
39+
"sync"
3940
"testing"
4041
"time"
4142

43+
"github.com/google/uuid"
44+
"github.com/prometheus/client_golang/prometheus"
45+
dto "github.com/prometheus/client_model/go"
4246
"github.com/stretchr/testify/suite"
47+
tikverr "github.com/tikv/client-go/v2/error"
48+
"github.com/tikv/client-go/v2/metrics"
4349
"github.com/tikv/client-go/v2/oracle"
4450
"github.com/tikv/client-go/v2/tikv"
51+
"github.com/tikv/client-go/v2/txnkv/transaction"
52+
"github.com/tikv/client-go/v2/util"
53+
"github.com/tikv/client-go/v2/util/intest"
4554
)
4655

56+
type mockCommitTSOracle struct {
57+
oracle.Oracle
58+
mu struct {
59+
sync.Mutex
60+
startTS uint64
61+
commitTSOffsets []uint64
62+
}
63+
}
64+
65+
func (o *mockCommitTSOracle) ResetMock(startTS uint64, commitTSOffsets []uint64) {
66+
o.mu.Lock()
67+
defer o.mu.Unlock()
68+
o.mu.startTS = startTS
69+
o.mu.commitTSOffsets = commitTSOffsets
70+
}
71+
72+
func (o *mockCommitTSOracle) GetTimestamp(ctx context.Context, option *oracle.Option) (uint64, error) {
73+
if ts := ctx.Value(transaction.CtxInGetTimestampForCommitKey); ts != nil {
74+
o.mu.Lock()
75+
startTS, ok := ts.(uint64)
76+
if !ok || startTS != o.mu.startTS {
77+
o.mu.Unlock()
78+
} else {
79+
defer o.mu.Unlock()
80+
if len(o.mu.commitTSOffsets) == 0 {
81+
panic("empty mock oracle ts set")
82+
}
83+
offset := o.mu.commitTSOffsets[0]
84+
o.mu.commitTSOffsets = o.mu.commitTSOffsets[1:]
85+
return o.mu.startTS + offset, nil
86+
}
87+
}
88+
return o.Oracle.GetTimestamp(ctx, option)
89+
}
90+
4791
type testOptionSuite struct {
4892
suite.Suite
49-
store tikv.StoreProbe
93+
mockOracle *mockCommitTSOracle
94+
store tikv.StoreProbe
5095
}
5196

5297
func (s *testOptionSuite) SetupSuite() {
5398
s.store = tikv.StoreProbe{KVStore: NewTestStore(s.T())}
99+
s.mockOracle = &mockCommitTSOracle{Oracle: s.store.GetOracle()}
100+
s.store.SetOracle(s.mockOracle)
54101
}
55102

56103
func (s *testOptionSuite) TearDownSuite() {
@@ -61,36 +108,128 @@ func TestOption(t *testing.T) {
61108
suite.Run(t, new(testOptionSuite))
62109
}
63110

111+
func (s *testOptionSuite) MetricValues(m any) *dto.Metric {
112+
metric := &dto.Metric{}
113+
s.NoError(m.(prometheus.Metric).Write(metric))
114+
return metric
115+
}
116+
117+
func (s *testOptionSuite) GetHistogramMetricSampleCount(m any) uint64 {
118+
return s.MetricValues(m).GetHistogram().GetSampleCount()
119+
}
120+
64121
func (s *testOptionSuite) TestSetCommitWaitUntilTSO() {
65-
getTS1 := func(startTS uint64) uint64 { return 100 }
66-
getTS2 := func(startTS uint64) uint64 {
67-
return startTS + 200
122+
if *withTiKV {
123+
s.T().Skip("TestSetCommitWaitUntilTSO only runs on local mock storage")
68124
}
69-
getTS3 := func(startTS uint64) uint64 {
70-
now := oracle.GetTimeFromTS(startTS)
71-
return oracle.GoTimeToTS(now.Add(10 * time.Second))
72-
}
73-
type testCase struct {
74-
getTS func(startTS uint64) uint64
75-
noError bool
125+
origInTest := intest.InTest
126+
defer func() {
127+
intest.InTest = origInTest
128+
}()
129+
intest.InTest = true
130+
doWithCollectHistSamplesInc := func(do func() error, metrics []any) ([]uint64, error) {
131+
cnt := make([]uint64, len(metrics))
132+
for i, metric := range metrics {
133+
cnt[i] = s.GetHistogramMetricSampleCount(metric)
134+
}
135+
err := do()
136+
for i, metric := range metrics {
137+
cnt[i] = s.GetHistogramMetricSampleCount(metric) - cnt[i]
138+
}
139+
return cnt, err
76140
}
77141

78-
for _, tc := range []testCase{
79-
{getTS1, true},
80-
{getTS2, true},
81-
{getTS3, false},
142+
for _, tc := range []struct {
143+
name string
144+
// We will call `txn.SetCommitWaitUntilTSO(commitWaitTSO + txn.StartTS())`
145+
commitWaitTSO uint64
146+
// Mock the PD TSOs in commit phase as {
147+
// mockCommitTSO[0] + txn.StartTS(), // first attempt
148+
// mockCommitTSO[1] + txn.StartTS(), // second attempt
149+
// ...
150+
// }
151+
mockCommitTSO []uint64
152+
setWaitTimeout time.Duration
153+
err bool
154+
}{
155+
{
156+
name: "no lag commit ts",
157+
commitWaitTSO: 1,
158+
mockCommitTSO: []uint64{100},
159+
},
160+
{
161+
name: "lag, retry once and success",
162+
commitWaitTSO: 200,
163+
mockCommitTSO: []uint64{100, 201},
164+
},
165+
{
166+
name: "lag, retry twice and success",
167+
commitWaitTSO: 300,
168+
mockCommitTSO: []uint64{100, 200, 301},
169+
},
170+
{
171+
name: "lag too much, fail directly",
172+
commitWaitTSO: oracle.ComposeTS(10000, 0),
173+
mockCommitTSO: []uint64{100},
174+
err: true,
175+
},
176+
{
177+
name: "lag, retry but timeout",
178+
commitWaitTSO: 100,
179+
mockCommitTSO: []uint64{10, 20},
180+
setWaitTimeout: time.Millisecond,
181+
err: true,
182+
},
82183
} {
83-
txn, err := s.store.Begin()
84-
s.NoError(err)
85-
commitUntil := tc.getTS(txn.StartTS())
86-
txn.KVTxn.SetCommitWaitUntilTSO(commitUntil)
87-
s.NoError(txn.Set([]byte("somekey"), []byte("somevalue")))
88-
err = txn.Commit(context.Background())
89-
s.Equal(tc.noError, err == nil)
90-
91-
if tc.noError {
92-
s.Greater(txn.CommitTS(), uint64(100))
93-
}
184+
s.Run(tc.name, func() {
185+
txn, err := s.store.Begin()
186+
s.NoError(err)
187+
s.NoError(txn.Set([]byte("somekey:"+uuid.NewString()), []byte("somevalue")))
188+
189+
txn.SetCommitWaitUntilTSO(tc.commitWaitTSO + txn.StartTS())
190+
if tc.setWaitTimeout > 0 {
191+
txn.SetCommitWaitUntilTSOTimeout(tc.setWaitTimeout)
192+
}
193+
var commitDetail *util.CommitDetails
194+
inc, err := doWithCollectHistSamplesInc(func() error {
195+
s.mockOracle.ResetMock(txn.StartTS(), tc.mockCommitTSO)
196+
defer s.mockOracle.ResetMock(0, nil)
197+
return txn.Commit(context.WithValue(context.Background(), util.CommitDetailCtxKey, &commitDetail))
198+
}, []any{
199+
// ok metrics
200+
metrics.LagCommitTSWaitHistogramWithOK,
201+
metrics.LagCommitTSAttemptHistogramWithOK,
202+
// err metrics
203+
metrics.LagCommitTSWaitHistogramWithError,
204+
metrics.LagCommitTSAttemptHistogramWithError,
205+
})
206+
207+
s.Equal(txn.StartTS()+tc.commitWaitTSO, txn.GetCommitWaitUntilTSO())
208+
if !tc.err {
209+
s.NoError(err)
210+
s.Equal(txn.CommitTS(), txn.StartTS()+tc.mockCommitTSO[len(tc.mockCommitTSO)-1])
211+
if len(tc.mockCommitTSO) == 1 {
212+
// if fetch TSO once and then success, there is no lag metrics
213+
s.Equal([]uint64{0, 0, 0, 0}, inc)
214+
s.Equal(util.CommitTSLagDetails{}, commitDetail.LagDetails)
215+
} else {
216+
s.Equal([]uint64{1, 1, 0, 0}, inc)
217+
s.NotZero(txn.KVTxn.GetCommitWaitUntilTSO())
218+
lagWaitTime := commitDetail.LagDetails.WaitTime
219+
s.Positive(lagWaitTime)
220+
s.Equal(util.CommitTSLagDetails{
221+
WaitTime: lagWaitTime,
222+
BackoffCnt: len(tc.mockCommitTSO) - 1,
223+
FirstLagTS: txn.StartTS() + tc.mockCommitTSO[0],
224+
WaitUntilTS: txn.GetCommitWaitUntilTSO(),
225+
}, commitDetail.LagDetails)
226+
}
227+
} else {
228+
s.Error(err)
229+
s.True(tikverr.IsErrorCommitTSLag(err))
230+
s.Equal([]uint64{0, 0, 1, 1}, inc)
231+
}
232+
})
94233
}
95234
}
96235

metrics/metrics.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ var (
127127
TiKVAsyncSendReqCounter *prometheus.CounterVec
128128
TiKVAsyncBatchGetCounter *prometheus.CounterVec
129129
TiKVReadRequestBytes *prometheus.SummaryVec
130+
TiKVTxnLagCommitTSWaitHistogram *prometheus.HistogramVec
131+
TiKVTxnLagCommitTSAttemptHistogram *prometheus.HistogramVec
130132
)
131133

132134
// Label constants.
@@ -947,6 +949,26 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) {
947949
Help: "Summary of read requests bytes",
948950
}, []string{LblType, LblResult})
949951

952+
TiKVTxnLagCommitTSWaitHistogram = prometheus.NewHistogramVec(
953+
prometheus.HistogramOpts{
954+
Namespace: namespace,
955+
Subsystem: subsystem,
956+
Name: "txn_lag_commit_ts_wait_seconds",
957+
Help: "Bucketed histogram of seconds waiting commit TSO lag.",
958+
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 16), // 0.5ms ~ 16s
959+
ConstLabels: constLabels,
960+
}, []string{LblResult})
961+
962+
TiKVTxnLagCommitTSAttemptHistogram = prometheus.NewHistogramVec(
963+
prometheus.HistogramOpts{
964+
Namespace: namespace,
965+
Subsystem: subsystem,
966+
Name: "txn_lag_commit_ts_attempt_count",
967+
Help: "Bucketed histogram of attempts to get the lagging TSO in one commit",
968+
Buckets: prometheus.ExponentialBuckets(1, 2, 6), // 1 ~ 32
969+
ConstLabels: constLabels,
970+
}, []string{LblResult})
971+
950972
initShortcuts()
951973
storeMetricVecList.Store(&storeMetrics)
952974
}
@@ -1050,6 +1072,8 @@ func RegisterMetrics() {
10501072
prometheus.MustRegister(TiKVAsyncSendReqCounter)
10511073
prometheus.MustRegister(TiKVAsyncBatchGetCounter)
10521074
prometheus.MustRegister(TiKVReadRequestBytes)
1075+
prometheus.MustRegister(TiKVTxnLagCommitTSWaitHistogram)
1076+
prometheus.MustRegister(TiKVTxnLagCommitTSAttemptHistogram)
10531077
}
10541078

10551079
// readCounter reads the value of a prometheus.Counter.

metrics/shortcuts.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,11 @@ var (
192192
ReadRequestLeaderRemoteBytes prometheus.Observer
193193
ReadRequestFollowerLocalBytes prometheus.Observer
194194
ReadRequestFollowerRemoteBytes prometheus.Observer
195+
196+
LagCommitTSWaitHistogramWithOK prometheus.Observer
197+
LagCommitTSWaitHistogramWithError prometheus.Observer
198+
LagCommitTSAttemptHistogramWithOK prometheus.Observer
199+
LagCommitTSAttemptHistogramWithError prometheus.Observer
195200
)
196201

197202
func initShortcuts() {
@@ -353,4 +358,9 @@ func initShortcuts() {
353358
ReadRequestLeaderRemoteBytes = TiKVReadRequestBytes.WithLabelValues("leader", "cross-zone")
354359
ReadRequestFollowerLocalBytes = TiKVReadRequestBytes.WithLabelValues("follower", "local")
355360
ReadRequestFollowerRemoteBytes = TiKVReadRequestBytes.WithLabelValues("follower", "cross-zone")
361+
362+
LagCommitTSWaitHistogramWithOK = TiKVTxnLagCommitTSWaitHistogram.WithLabelValues("ok")
363+
LagCommitTSWaitHistogramWithError = TiKVTxnLagCommitTSWaitHistogram.WithLabelValues("err")
364+
LagCommitTSAttemptHistogramWithOK = TiKVTxnLagCommitTSAttemptHistogram.WithLabelValues("ok")
365+
LagCommitTSAttemptHistogramWithError = TiKVTxnLagCommitTSAttemptHistogram.WithLabelValues("err")
356366
}

txnkv/transaction/2pc.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1769,6 +1769,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
17691769
return err
17701770
}
17711771
commitDetail.GetLatestTsTime = time.Since(start)
1772+
c.txn.fillCommitTSLagDetails(&commitDetail.LagDetails)
17721773
// Plus 1 to avoid producing the same commit TS with previously committed transactions
17731774
c.minCommitTSMgr.tryUpdate(latestTS+1, twoPCAccess)
17741775
}
@@ -1904,6 +1905,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
19041905
return err
19051906
}
19061907
commitDetail.GetCommitTsTime = time.Since(start)
1908+
c.txn.fillCommitTSLagDetails(&commitDetail.LagDetails)
19071909
logutil.Event(ctx, "finish get commit ts")
19081910
logutil.SetTag(ctx, "commitTs", commitTS)
19091911
}

txnkv/transaction/test_probe.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,6 @@ func (txn TxnProbe) GetAggressiveLockingPreviousKeysInfo() []AggressiveLockedKey
136136
return keys
137137
}
138138

139-
func (txn TxnProbe) GetCommitWaitUntilTSOTimeout() time.Duration {
140-
return txn.KVTxn.commitWaitUntilTSOTimeout
141-
}
142-
143139
func newTwoPhaseCommitterWithInit(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, error) {
144140
c, err := newTwoPhaseCommitter(txn, sessionID)
145141
if err != nil {

0 commit comments

Comments
 (0)