Skip to content

Commit 98088e7

Browse files
committed
[ES-1122621] use idle timeout to reconnect
Signed-off-by: Yi Jin <[email protected]>
1 parent 6b27470 commit 98088e7

File tree

3 files changed

+19
-44
lines changed

3 files changed

+19
-44
lines changed

cmd/thanos/receive.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ func runReceive(
158158
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(conf.compression)))
159159
}
160160
if receiveMode == receive.RouterOnly {
161+
dialOpts = append(dialOpts, grpc.WithIdleTimeout(time.Duration(*conf.maxBackoff)))
161162
dialOpts = append(dialOpts, extgrpc.EndpointGroupGRPCOpts()...)
162163
}
163164

pkg/receive/handler.go

Lines changed: 15 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"net/http"
1616
"sort"
1717
"strconv"
18+
"strings"
1819
"sync"
1920
"time"
2021

@@ -150,7 +151,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
150151
router: route.New(),
151152
options: o,
152153
peers: newPeerGroup(
153-
logger,
154154
backoff.Backoff{
155155
Factor: 2,
156156
Min: 100 * time.Millisecond,
@@ -1222,30 +1222,8 @@ type peerWorker struct {
12221222
forwardDelay prometheus.Histogram
12231223
}
12241224

1225-
func (pw *peerWorker) isReady() bool {
1226-
return pw.cc != nil
1227-
}
1228-
1229-
func (pw *peerWorker) markNotReady() error {
1230-
if pw.cc != nil {
1231-
err := pw.cc.Close()
1232-
pw.cc = nil
1233-
return err
1234-
}
1235-
return nil
1236-
}
1237-
1238-
func (pw *peerWorker) close() error {
1239-
pw.wp.Close()
1240-
if pw.cc != nil {
1241-
return pw.cc.Close()
1242-
}
1243-
return nil
1244-
}
1245-
1246-
func newPeerGroup(logger log.Logger, backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, dialOpts ...grpc.DialOption) peersContainer {
1225+
func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, dialOpts ...grpc.DialOption) peersContainer {
12471226
return &peerGroup{
1248-
logger: logger,
12491227
dialOpts: dialOpts,
12501228
connections: map[string]*peerWorker{},
12511229
m: sync.RWMutex{},
@@ -1293,7 +1271,6 @@ func (p *peerWorker) RemoteWriteAsync(ctx context.Context, req *storepb.WriteReq
12931271
}
12941272

12951273
type peerGroup struct {
1296-
logger log.Logger
12971274
dialOpts []grpc.DialOption
12981275
connections map[string]*peerWorker
12991276
peerStates map[string]*retryState
@@ -1310,11 +1287,17 @@ type peerGroup struct {
13101287
func (p *peerGroup) closeAll() error {
13111288
p.m.Lock()
13121289
defer p.m.Unlock()
1290+
var closeErrors []string // Slice to store error messages
13131291
for addr := range p.connections {
13141292
if err := p.closeUnlocked(addr); err != nil {
1315-
return err
1293+
// Collect error messages
1294+
closeErrors = append(closeErrors, fmt.Sprintf("failed to close %s: %v", addr, err))
13161295
}
13171296
}
1297+
if len(closeErrors) > 0 {
1298+
// Return aggregated error messages
1299+
return fmt.Errorf("multiple errors closing connections: %s", strings.Join(closeErrors, ", "))
1300+
}
13181301
return nil
13191302
}
13201303

@@ -1331,8 +1314,9 @@ func (p *peerGroup) closeUnlocked(addr string) error {
13311314
// was never established.
13321315
return nil
13331316
}
1317+
c.wp.Close()
13341318
delete(p.connections, addr)
1335-
if err := c.close(); err != nil {
1319+
if err := c.cc.Close(); err != nil {
13361320
return fmt.Errorf("closing connection for %s", addr)
13371321
}
13381322

@@ -1347,7 +1331,6 @@ func (p *peerGroup) getConnection(ctx context.Context, addr string) (WriteableSt
13471331
// use a RLock first to prevent blocking if we don't need to.
13481332
p.m.RLock()
13491333
c, ok := p.connections[addr]
1350-
ok = ok && c.isReady()
13511334
p.m.RUnlock()
13521335
if ok {
13531336
return c, nil
@@ -1357,21 +1340,17 @@ func (p *peerGroup) getConnection(ctx context.Context, addr string) (WriteableSt
13571340
defer p.m.Unlock()
13581341
// Make sure that another caller hasn't created the connection since obtaining the write lock.
13591342
c, ok = p.connections[addr]
1360-
if ok && c.isReady() {
1343+
if ok {
13611344
return c, nil
13621345
}
1363-
level.Debug(p.logger).Log("msg", "dialing peer", "addr", addr)
13641346
conn, err := p.dialer(ctx, addr, p.dialOpts...)
13651347
if err != nil {
13661348
p.markPeerUnavailableUnlocked(addr)
13671349
dialError := errors.Wrap(err, "failed to dial peer")
13681350
return nil, errors.Wrap(dialError, errUnavailable.Error())
13691351
}
1370-
if !ok {
1371-
p.connections[addr] = newPeerWorker(conn, p.forwardDelay, p.asyncForwardWorkersCount)
1372-
} else {
1373-
p.connections[addr].cc = conn
1374-
}
1352+
1353+
p.connections[addr] = newPeerWorker(conn, p.forwardDelay, p.asyncForwardWorkersCount)
13751354
return p.connections[addr], nil
13761355
}
13771356

@@ -1388,15 +1367,8 @@ func (p *peerGroup) markPeerUnavailableUnlocked(addr string) {
13881367
state = &retryState{attempt: -1}
13891368
}
13901369
state.attempt++
1391-
delay := p.expBackoff.ForAttempt(state.attempt)
1392-
state.nextAllowed = time.Now().Add(delay)
1370+
state.nextAllowed = time.Now().Add(p.expBackoff.ForAttempt(state.attempt))
13931371
p.peerStates[addr] = state
1394-
if delay >= p.expBackoff.Max {
1395-
level.Warn(p.logger).Log("msg", "peer is not ready", "addr", addr, "next_allowed", state.nextAllowed)
1396-
if err := p.connections[addr].markNotReady(); err != nil {
1397-
level.Error(p.logger).Log("msg", "failed to mark peer as not ready", "err", err)
1398-
}
1399-
}
14001372
}
14011373

14021374
func (p *peerGroup) markPeerAvailable(addr string) {

pkg/receive/handler_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"gopkg.in/yaml.v3"
2727

2828
"github.com/alecthomas/units"
29-
"github.com/efficientgo/core/testutil"
3029
"github.com/go-kit/log"
3130
"github.com/go-kit/log/level"
3231
"github.com/gogo/protobuf/proto"
@@ -45,6 +44,8 @@ import (
4544
"google.golang.org/grpc/credentials/insecure"
4645
"google.golang.org/grpc/resolver"
4746

47+
"github.com/efficientgo/core/testutil"
48+
4849
"github.com/thanos-io/thanos/pkg/block/metadata"
4950
"github.com/thanos-io/thanos/pkg/component"
5051
"github.com/thanos-io/thanos/pkg/extkingpin"
@@ -1800,6 +1801,7 @@ func TestIngestorRestart(t *testing.T) {
18001801
}
18011802
resolver.Register(dnsBuilder)
18021803
dialOpts := []grpc.DialOption{
1804+
grpc.WithIdleTimeout(1 * time.Second), // set idle timeout to 1s will re-establish the connection quickly
18031805
grpc.WithTransportCredentials(insecure.NewCredentials()),
18041806
grpc.WithResolvers(resolver.Get(dnsScheme)),
18051807
}

0 commit comments

Comments
 (0)