Skip to content

Commit 2c8d1c8

Browse files
committed
kvdb: make etcd calls timeout to ensure liveness
Previously our RPC calls to etcd would hang even in the case of properly set dial timeouts and even if there was a network partition. To ensure liveness we need to make sure that calls fail correctly in case of system failure. To fix this we add a default timeout of 30 seconds to each etcd RPC call.
1 parent 0fd4c7d commit 2c8d1c8

File tree

1 file changed

+32
-4
lines changed

1 file changed

+32
-4
lines changed

kvdb/etcd/stm.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,25 @@ import (
88
"fmt"
99
"math"
1010
"strings"
11+
"time"
1112

1213
"github.com/google/btree"
1314
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
1415
v3 "go.etcd.io/etcd/client/v3"
1516
)
1617

18+
const (
19+
// rpcTimeout is the timeout for all RPC calls to etcd. It is set to 30
20+
// seconds to avoid blocking the server for too long but give reasonable
21+
// time for etcd to respond. If any operations would take longer than 30
22+
// seconds that generally means there's a problem with the etcd server
23+
// or the network resulting in degraded performance in which case we
24+
// want LND to fail fast. Due to the underlying gRPC implementation in
25+
// etcd calls without a timeout can hang indefinitely even in the case
26+
// of network partitions or other critical failures.
27+
rpcTimeout = time.Second * 30
28+
)
29+
1730
type CommitStats struct {
1831
Rset int
1932
Wset int
@@ -609,8 +622,13 @@ func (s *stm) FetchRangePaginatedRaw(prefix string, limit int64,
609622

610623
key := prefix
611624
for {
625+
timeoutCtx, cancel := context.WithTimeout(
626+
s.options.ctx, rpcTimeout,
627+
)
628+
defer cancel()
629+
612630
resp, err := s.client.Get(
613-
s.options.ctx, key, append(opts, s.getOpts...)...,
631+
timeoutCtx, key, append(opts, s.getOpts...)...,
614632
)
615633
if err != nil {
616634
return DatabaseError{
@@ -645,8 +663,12 @@ func (s *stm) FetchRangePaginatedRaw(prefix string, limit int64,
645663
// We'll also cache the returned key/value in the read set.
646664
func (s *stm) fetch(key string, opts ...v3.OpOption) ([]KV, error) {
647665
s.callCount++
666+
667+
timeoutCtx, cancel := context.WithTimeout(s.options.ctx, rpcTimeout)
668+
defer cancel()
669+
648670
resp, err := s.client.Get(
649-
s.options.ctx, key, append(opts, s.getOpts...)...,
671+
timeoutCtx, key, append(opts, s.getOpts...)...,
650672
)
651673
if err != nil {
652674
return nil, DatabaseError{
@@ -1049,7 +1071,10 @@ func (s *stm) Prefetch(keys []string, prefixes []string) {
10491071
[]v3.OpOption{v3.WithPrefix()}, s.getOpts...,
10501072
)
10511073

1052-
txn := s.client.Txn(s.options.ctx)
1074+
timeoutCtx, cancel := context.WithTimeout(s.options.ctx, rpcTimeout)
1075+
defer cancel()
1076+
1077+
txn := s.client.Txn(timeoutCtx)
10531078
ops := make([]v3.Op, 0, len(fetchKeys)+len(fetchPrefixes))
10541079

10551080
for _, key := range fetchKeys {
@@ -1103,8 +1128,11 @@ func (s *stm) commit() (CommitStats, error) {
11031128

11041129
// Create the compare set.
11051130
cmps := append(rset, wset...)
1131+
11061132
// Create a transaction with the optional abort context.
1107-
txn := s.client.Txn(s.options.ctx)
1133+
timeoutCtx, cancel := context.WithTimeout(s.options.ctx, rpcTimeout)
1134+
defer cancel()
1135+
txn := s.client.Txn(timeoutCtx)
11081136

11091137
// If the compare set holds, try executing the puts.
11101138
txn = txn.If(cmps...)

0 commit comments

Comments
 (0)