Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
git diff --exit-code

- name: Lint
uses: golangci/golangci-lint-action@v3
uses: golangci/golangci-lint-action@v8
with:
version: v1.61.0
version: v2.2.1
skip-pkg-cache: true
32 changes: 23 additions & 9 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
run:
timeout: 5m
version: "2"
linters:
disable-all: true
default: none
enable:
- bodyclose
#- depguard
- exportloopref
- gofmt
- goimports
- goprintffuncname
- gosimple
- govet
- ineffassign
- misspell
- noctx
- nolintlint
- staticcheck
- typecheck
- unconvert
- unused
exclusions:
generated: lax
presets:
- comments
- common-false-positives
- legacy
- std-error-handling
paths:
- third_party$
- builtin$
- examples$
formatters:
enable:
- gofmt
- goimports
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$
8 changes: 4 additions & 4 deletions error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ type ErrDeadlock struct {
}

func (d *ErrDeadlock) Error() string {
return d.Deadlock.String()
return d.String()
}

// PDError wraps *pdpb.Error to implement the error interface.
Expand All @@ -152,7 +152,7 @@ type ErrKeyExist struct {
}

func (k *ErrKeyExist) Error() string {
return k.AlreadyExist.String()
return k.String()
}

// IsErrKeyExist returns true if it is ErrKeyExist.
Expand All @@ -167,7 +167,7 @@ type ErrWriteConflict struct {
}

func (k *ErrWriteConflict) Error() string {
return fmt.Sprintf("write conflict { %s }", k.WriteConflict.String())
return fmt.Sprintf("write conflict { %s }", k.String())
}

// IsErrWriteConflict returns true if it is ErrWriteConflict.
Expand Down Expand Up @@ -310,7 +310,7 @@ type ErrLockOnlyIfExistsNoPrimaryKey struct {
}

func (e *ErrAssertionFailed) Error() string {
return fmt.Sprintf("assertion failed { %s }", e.AssertionFailed.String())
return fmt.Sprintf("assertion failed { %s }", e.String())
}

func (e *ErrLockOnlyIfExistsNoReturnValue) Error() string {
Expand Down
14 changes: 7 additions & 7 deletions internal/apicodec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,27 +92,27 @@ func DecodeKey(encoded []byte, version kvrpcpb.APIVersion) ([]byte, []byte, erro
}

func setAPICtx(c Codec, r *tikvrpc.Request) {
r.Context.ApiVersion = c.GetAPIVersion()
r.Context.KeyspaceId = uint32(c.GetKeyspaceID())
r.ApiVersion = c.GetAPIVersion()
r.KeyspaceId = uint32(c.GetKeyspaceID())
keyspaceMeta := c.GetKeyspaceMeta()
if keyspaceMeta != nil {
r.Context.KeyspaceName = keyspaceMeta.Name
r.KeyspaceName = keyspaceMeta.Name
}

switch r.Type {
case tikvrpc.CmdMPPTask:
mpp := *r.DispatchMPPTask()
// Shallow copy the meta to avoid concurrent modification.
meta := *mpp.Meta
meta.KeyspaceId = r.Context.KeyspaceId
meta.ApiVersion = r.Context.ApiVersion
meta.KeyspaceId = r.KeyspaceId
meta.ApiVersion = r.ApiVersion
mpp.Meta = &meta
r.Req = &mpp

case tikvrpc.CmdCompact:
compact := *r.Compact()
compact.KeyspaceId = r.Context.KeyspaceId
compact.ApiVersion = r.Context.ApiVersion
compact.KeyspaceId = r.KeyspaceId
compact.ApiVersion = r.ApiVersion
r.Req = &compact
}
}
8 changes: 4 additions & 4 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
allowBatch := (cfg.TiKVClient.MaxBatchSize > 0) && enableBatch
if allowBatch {
a.batchConn = newBatchConn(uint(len(a.v)), cfg.TiKVClient.MaxBatchSize, idleNotify)
a.batchConn.initMetrics(a.target)
a.initMetrics(a.target)
}
keepAlive := cfg.TiKVClient.GrpcKeepAliveTime
for i := range a.v {
Expand Down Expand Up @@ -723,7 +723,7 @@ func (c *RPCClient) getCopStreamResponse(ctx context.Context, client tikvpb.Tikv
// Put the lease object to the timeout channel, so it would be checked periodically.
copStream := resp.Resp.(*tikvrpc.CopStreamResponse)
copStream.Timeout = timeout
copStream.Lease.Cancel = cancel
copStream.Cancel = cancel
connArray.streamTimeout <- &copStream.Lease

// Read the first streaming response to get CopStreamResponse.
Expand Down Expand Up @@ -758,7 +758,7 @@ func (c *RPCClient) getBatchCopStreamResponse(ctx context.Context, client tikvpb
// Put the lease object to the timeout channel, so it would be checked periodically.
copStream := resp.Resp.(*tikvrpc.BatchCopStreamResponse)
copStream.Timeout = timeout
copStream.Lease.Cancel = cancel
copStream.Cancel = cancel
connArray.streamTimeout <- &copStream.Lease

// Read the first streaming response to get CopStreamResponse.
Expand Down Expand Up @@ -792,7 +792,7 @@ func (c *RPCClient) getMPPStreamResponse(ctx context.Context, client tikvpb.Tikv
// Put the lease object to the timeout channel, so it would be checked periodically.
copStream := resp.Resp.(*tikvrpc.MPPStreamResponse)
copStream.Timeout = timeout
copStream.Lease.Cancel = cancel
copStream.Cancel = cancel
connArray.streamTimeout <- &copStream.Lease

// Read the first streaming response to get CopStreamResponse.
Expand Down
7 changes: 4 additions & 3 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,8 @@ func (t *turboBatchTrigger) turboWaitTime() time.Duration {
}

func (t *turboBatchTrigger) needFetchMore(reqArrivalInterval time.Duration) bool {
if t.opts.V == turboBatchTimeBased {
switch t.opts.V {
case turboBatchTimeBased:
thisArrivalInterval := reqArrivalInterval.Seconds()
if t.maxArrivalInterval == 0 {
t.maxArrivalInterval = t.turboWaitSeconds() * float64(t.opts.N)
Expand All @@ -510,14 +511,14 @@ func (t *turboBatchTrigger) needFetchMore(reqArrivalInterval time.Duration) bool
t.estArrivalInterval = t.opts.W*thisArrivalInterval + (1-t.opts.W)*t.estArrivalInterval
}
return t.estArrivalInterval < t.turboWaitSeconds()*t.opts.P
} else if t.opts.V == turboBatchProbBased {
case turboBatchProbBased:
thisProb := .0
if reqArrivalInterval.Seconds() < t.turboWaitSeconds() {
thisProb = 1
}
t.estFetchMoreProb = t.opts.W*thisProb + (1-t.opts.W)*t.estFetchMoreProb
return t.estFetchMoreProb > t.opts.P
} else {
default:
return true
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/client/client_collapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (r reqCollapse) SendRequestAsync(ctx context.Context, addr string, req *tik
}
if req.Type == tikvrpc.CmdResolveLock && len(req.ResolveLock().Keys) == 0 && len(req.ResolveLock().TxnInfos) == 0 {
// try collapse resolve lock request.
key := strconv.FormatUint(req.Context.RegionId, 10) + "-" + strconv.FormatUint(req.ResolveLock().StartVersion, 10)
key := strconv.FormatUint(req.RegionId, 10) + "-" + strconv.FormatUint(req.ResolveLock().StartVersion, 10)
copyReq := *req
rsC := resolveRegionSf.DoChan(key, func() (interface{}, error) {
// resolveRegionSf will call this function in a goroutine, thus use SendRequest directly.
Expand Down Expand Up @@ -124,7 +124,7 @@ func (r reqCollapse) tryCollapseRequest(ctx context.Context, addr string, req *t
return
}
canCollapse = true
key := strconv.FormatUint(req.Context.RegionId, 10) + "-" + strconv.FormatUint(resolveLock.StartVersion, 10)
key := strconv.FormatUint(req.RegionId, 10) + "-" + strconv.FormatUint(resolveLock.StartVersion, 10)
resp, err = r.collapse(ctx, key, &resolveRegionSf, addr, req, timeout)
return
default:
Expand Down
10 changes: 5 additions & 5 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestSendWhenReconnect(t *testing.T) {
assert.Nil(t, err)

// Suppose all connections are re-establishing.
for _, client := range conn.batchConn.batchCommandsClients {
for _, client := range conn.batchCommandsClients {
client.lockForRecreate()
}

Expand Down Expand Up @@ -707,9 +707,9 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) {
grpcConn := conn.Get()
require.NotNil(t, grpcConn)
var cli *batchCommandsClient
for i := range conn.batchConn.batchCommandsClients {
for i := range conn.batchCommandsClients {
if conn.batchConn.batchCommandsClients[i].tryLockForSend() {
cli = conn.batchConn.batchCommandsClients[i]
cli = conn.batchCommandsClients[i]
break
}
}
Expand Down Expand Up @@ -966,7 +966,7 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) {
}
wg.Wait()

for _, cli := range conn.batchConn.batchCommandsClients {
for _, cli := range conn.batchCommandsClients {
require.Equal(t, int64(9223372036854775807), cli.maxConcurrencyRequestLimit.Load())
require.True(t, cli.available() > 0, fmt.Sprintf("sent: %d", cli.sent.Load()))
require.True(t, cli.sent.Load() >= 0, fmt.Sprintf("sent: %d", cli.sent.Load()))
Expand Down Expand Up @@ -1036,7 +1036,7 @@ func TestFastFailWhenNoAvailableConn(t *testing.T) {
_, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second, 0)
require.NoError(t, err)

for _, c := range conn.batchConn.batchCommandsClients {
for _, c := range conn.batchCommandsClients {
// mock all client a in recreate.
c.lockForRecreate()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/latch/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (scheduler *LatchesScheduler) wakeup(wakeupList []*Lock) {
// Close closes LatchesScheduler.
func (scheduler *LatchesScheduler) Close() {
scheduler.RWMutex.Lock()
defer scheduler.RWMutex.Unlock()
defer scheduler.Unlock()
if !scheduler.closed {
close(scheduler.unlockCh)
scheduler.closed = true
Expand Down
17 changes: 8 additions & 9 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ import (
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
"github.com/tikv/client-go/v2/util/async"
"github.com/tikv/pd/client/errs"
pderr "github.com/tikv/pd/client/errs"
)

Expand Down Expand Up @@ -482,8 +481,8 @@ func (s *RegionRequestSender) SendReqAsync(
return
}

if req.Context.MaxExecutionDurationMs == 0 {
req.Context.MaxExecutionDurationMs = uint64(timeout.Milliseconds())
if req.MaxExecutionDurationMs == 0 {
req.MaxExecutionDurationMs = uint64(timeout.Milliseconds())
}

s.reset()
Expand Down Expand Up @@ -989,7 +988,7 @@ func (s *sendReqState) next() (done bool) {
logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, s.args.regionID.id, s.vars.rpcCtx.Addr)
s.storeAddr = s.vars.rpcCtx.Addr

req.Context.ClusterId = s.vars.rpcCtx.ClusterID
req.ClusterId = s.vars.rpcCtx.ClusterID
if req.InputRequestSource != "" && s.replicaSelector != nil {
patchRequestSource(req, s.replicaSelector.replicaType())
}
Expand Down Expand Up @@ -1240,7 +1239,7 @@ func (s *sendReqState) initForAsyncRequest() (ok bool) {

// set access location based on source and target "zone" label.
s.setReqAccessLocation(req)
req.Context.ClusterId = s.vars.rpcCtx.ClusterID
req.ClusterId = s.vars.rpcCtx.ClusterID
if req.InputRequestSource != "" && s.replicaSelector != nil {
patchRequestSource(req, s.replicaSelector.replicaType())
}
Expand Down Expand Up @@ -1425,8 +1424,8 @@ func (s *RegionRequestSender) SendReqCtx(

// If the MaxExecutionDurationMs is not set yet, we set it to be the RPC timeout duration
// so TiKV can give up the requests whose response TiDB cannot receive due to timeout.
if req.Context.MaxExecutionDurationMs == 0 {
req.Context.MaxExecutionDurationMs = uint64(timeout.Milliseconds())
if req.MaxExecutionDurationMs == 0 {
req.MaxExecutionDurationMs = uint64(timeout.Milliseconds())
}

state := &sendReqState{
Expand Down Expand Up @@ -1513,7 +1512,7 @@ func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, r
builder.WriteString(", timeout: ")
builder.WriteString(util.FormatDuration(timeout))
builder.WriteString(", req-max-exec-timeout: ")
builder.WriteString(util.FormatDuration(time.Duration(int64(req.Context.MaxExecutionDurationMs) * int64(time.Millisecond))))
builder.WriteString(util.FormatDuration(time.Duration(int64(req.MaxExecutionDurationMs) * int64(time.Millisecond))))
builder.WriteString(", retry-times: ")
builder.WriteString(strconv.Itoa(retryTimes))
if s.AccessStats != nil {
Expand Down Expand Up @@ -1600,7 +1599,7 @@ func fetchRespInfo(resp *tikvrpc.Response) string {

func isRPCError(err error) bool {
// exclude ErrClientResourceGroupThrottled
return err != nil && errs.ErrClientResourceGroupThrottled.NotEqual(err)
return err != nil && pderr.ErrClientResourceGroupThrottled.NotEqual(err)
}

func storeIDLabel(rpcCtx *RPCContext) string {
Expand Down
12 changes: 5 additions & 7 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,8 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
sender.client = innerClient
atomic.StoreUint32(&storeState, uint32(reachable))
start := time.Now()
for {
if leaderStore.getLivenessState() == reachable {
break
}
for leaderStore.getLivenessState() != reachable {

if time.Since(start) > 3*time.Second {
s.FailNow("store didn't recover to normal in time")
}
Expand Down Expand Up @@ -1131,10 +1129,10 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
bo := retry.NewBackoffer(context.Background(), 10000)
mockRPCClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
reqTargetAddrs[addr] = struct{}{}
if req.Context.MaxExecutionDurationMs < 10 {
if req.MaxExecutionDurationMs < 10 {
return nil, context.DeadlineExceeded
}
if addr != leaderAddr && !req.Context.ReplicaRead && !req.Context.StaleRead {
if addr != leaderAddr && !req.ReplicaRead && !req.StaleRead {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil
Expand Down Expand Up @@ -1186,7 +1184,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
req.ReplicaRead = tp.IsFollowerRead()
req.ReplicaReadType = tp
}
req.Context.MaxExecutionDurationMs = 0
req.MaxExecutionDurationMs = 0
resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
regionErr, err = resp.GetRegionError()
Expand Down
2 changes: 1 addition & 1 deletion internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ type noCauseError struct {
error
}

func (_ noCauseError) Cause() error {
func (noCauseError) Cause() error {
return nil
}

Expand Down
12 changes: 7 additions & 5 deletions internal/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,8 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64,
}
}

if dec.value.valueType == typePut || dec.value.valueType == typeLock {
switch dec.value.valueType {
case typePut, typeLock:
if needCheckShouldNotExistForPessimisticLock {
if writeConflictErr != nil {
return nil, writeConflictErr
Expand All @@ -946,7 +947,7 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64,
ExistingCommitTS: dec.value.commitTS,
}
}
} else if dec.value.valueType == typeDelete {
case typeDelete:
if lockOnlyIfExists && writeConflictErr != nil {
// If lockOnlyIfExists is enabled and the key doesn't exist, force locking shouldn't take effect.
return nil, writeConflictErr
Expand Down Expand Up @@ -1135,11 +1136,12 @@ func commitKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS, commit

func commitLock(batch *leveldb.Batch, lock mvccLock, key []byte, startTS, commitTS uint64) error {
var valueType mvccValueType
if lock.op == kvrpcpb.Op_Put {
switch lock.op {
case kvrpcpb.Op_Put:
valueType = typePut
} else if lock.op == kvrpcpb.Op_Lock {
case kvrpcpb.Op_Lock:
valueType = typeLock
} else {
default:
valueType = typeDelete
}
value := mvccValue{
Expand Down
Loading