Skip to content

Commit 6b27470

Browse files
committed
[ES-1122621] Fix receive connection idle if ingestor restarts
Signed-off-by: Yi Jin <[email protected]>
1 parent d8a548d commit 6b27470

File tree

3 files changed

+204
-51
lines changed

3 files changed

+204
-51
lines changed

pkg/receive/handler.go

Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
149149
writer: o.Writer,
150150
router: route.New(),
151151
options: o,
152-
peers: newPeerGroup(logger,
152+
peers: newPeerGroup(
153+
logger,
153154
backoff.Backoff{
154155
Factor: 2,
155156
Min: 100 * time.Millisecond,
@@ -396,6 +397,9 @@ func (h *Handler) getStats(r *http.Request, statsByLabelName string) ([]statusap
396397
// Close stops the Handler.
397398
func (h *Handler) Close() {
398399
runutil.CloseWithLogOnErr(h.logger, h.httpSrv, "receive HTTP server")
400+
if err := h.peers.closeAll(); err != nil {
401+
level.Error(h.logger).Log("msg", "closing peer connections failed, we might have leaked file descriptors", "err", err)
402+
}
399403
}
400404

401405
// Run serves the HTTP endpoints.
@@ -1218,6 +1222,27 @@ type peerWorker struct {
12181222
forwardDelay prometheus.Histogram
12191223
}
12201224

1225+
func (pw *peerWorker) isReady() bool {
1226+
return pw.cc != nil
1227+
}
1228+
1229+
func (pw *peerWorker) markNotReady() error {
1230+
if pw.cc != nil {
1231+
err := pw.cc.Close()
1232+
pw.cc = nil
1233+
return err
1234+
}
1235+
return nil
1236+
}
1237+
1238+
func (pw *peerWorker) close() error {
1239+
pw.wp.Close()
1240+
if pw.cc != nil {
1241+
return pw.cc.Close()
1242+
}
1243+
return nil
1244+
}
1245+
12211246
func newPeerGroup(logger log.Logger, backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, dialOpts ...grpc.DialOption) peersContainer {
12221247
return &peerGroup{
12231248
logger: logger,
@@ -1233,6 +1258,7 @@ func newPeerGroup(logger log.Logger, backoff backoff.Backoff, forwardDelay prome
12331258
}
12341259

12351260
type peersContainer interface {
1261+
closeAll() error
12361262
close(string) error
12371263
getConnection(context.Context, string) (WriteableStoreAsyncClient, error)
12381264
markPeerUnavailable(string)
@@ -1281,6 +1307,17 @@ type peerGroup struct {
12811307
dialer func(ctx context.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error)
12821308
}
12831309

1310+
func (p *peerGroup) closeAll() error {
1311+
p.m.Lock()
1312+
defer p.m.Unlock()
1313+
for addr := range p.connections {
1314+
if err := p.closeUnlocked(addr); err != nil {
1315+
return err
1316+
}
1317+
}
1318+
return nil
1319+
}
1320+
12841321
func (p *peerGroup) close(addr string) error {
12851322
p.m.Lock()
12861323
defer p.m.Unlock()
@@ -1294,10 +1331,8 @@ func (p *peerGroup) closeUnlocked(addr string) error {
12941331
// was never established.
12951332
return nil
12961333
}
1297-
level.Error(p.logger).Log("msg", "closing connection", "addr", addr)
1298-
p.connections[addr].wp.Close()
12991334
delete(p.connections, addr)
1300-
if err := c.cc.Close(); err != nil {
1335+
if err := c.close(); err != nil {
13011336
return fmt.Errorf("closing connection for %s", addr)
13021337
}
13031338

@@ -1312,6 +1347,7 @@ func (p *peerGroup) getConnection(ctx context.Context, addr string) (WriteableSt
13121347
// use a RLock first to prevent blocking if we don't need to.
13131348
p.m.RLock()
13141349
c, ok := p.connections[addr]
1350+
ok = ok && c.isReady()
13151351
p.m.RUnlock()
13161352
if ok {
13171353
return c, nil
@@ -1321,36 +1357,32 @@ func (p *peerGroup) getConnection(ctx context.Context, addr string) (WriteableSt
13211357
defer p.m.Unlock()
13221358
// Make sure that another caller hasn't created the connection since obtaining the write lock.
13231359
c, ok = p.connections[addr]
1324-
if ok {
1360+
if ok && c.isReady() {
13251361
return c, nil
13261362
}
1327-
level.Info(p.logger).Log("msg", "dialing peer", "peer", addr)
1363+
level.Debug(p.logger).Log("msg", "dialing peer", "addr", addr)
13281364
conn, err := p.dialer(ctx, addr, p.dialOpts...)
13291365
if err != nil {
1330-
// clear retry state if dial failed, this is necessary to avoid dialing peer too often.
1331-
delete(p.peerStates, addr)
13321366
p.markPeerUnavailableUnlocked(addr)
13331367
dialError := errors.Wrap(err, "failed to dial peer")
13341368
return nil, errors.Wrap(dialError, errUnavailable.Error())
13351369
}
1336-
1337-
p.connections[addr] = newPeerWorker(conn, p.forwardDelay, p.asyncForwardWorkersCount)
1370+
if !ok {
1371+
p.connections[addr] = newPeerWorker(conn, p.forwardDelay, p.asyncForwardWorkersCount)
1372+
} else {
1373+
p.connections[addr].cc = conn
1374+
}
13381375
return p.connections[addr], nil
13391376
}
13401377

13411378
func (p *peerGroup) markPeerUnavailable(addr string) {
13421379
p.m.Lock()
13431380
defer p.m.Unlock()
13441381

1345-
if p.markPeerUnavailableUnlocked(addr) {
1346-
// close the connection if the backoff tries too many times, likely server is down.
1347-
if err := p.closeUnlocked(addr); err != nil {
1348-
level.Error(p.logger).Log("msg", "failed to close connection", "addr", addr, "err", err)
1349-
}
1350-
}
1382+
p.markPeerUnavailableUnlocked(addr)
13511383
}
13521384

1353-
func (p *peerGroup) markPeerUnavailableUnlocked(addr string) bool {
1385+
func (p *peerGroup) markPeerUnavailableUnlocked(addr string) {
13541386
state, ok := p.peerStates[addr]
13551387
if !ok {
13561388
state = &retryState{attempt: -1}
@@ -1359,7 +1391,12 @@ func (p *peerGroup) markPeerUnavailableUnlocked(addr string) bool {
13591391
delay := p.expBackoff.ForAttempt(state.attempt)
13601392
state.nextAllowed = time.Now().Add(delay)
13611393
p.peerStates[addr] = state
1362-
return delay >= p.expBackoff.Max
1394+
if delay >= p.expBackoff.Max {
1395+
level.Warn(p.logger).Log("msg", "peer is not ready", "addr", addr, "next_allowed", state.nextAllowed)
1396+
if err := p.connections[addr].markNotReady(); err != nil {
1397+
level.Error(p.logger).Log("msg", "failed to mark peer as not ready", "err", err)
1398+
}
1399+
}
13631400
}
13641401

13651402
func (p *peerGroup) markPeerAvailable(addr string) {

pkg/receive/handler_test.go

Lines changed: 123 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ import (
2626
"gopkg.in/yaml.v3"
2727

2828
"github.com/alecthomas/units"
29+
"github.com/efficientgo/core/testutil"
2930
"github.com/go-kit/log"
31+
"github.com/go-kit/log/level"
3032
"github.com/gogo/protobuf/proto"
3133
"github.com/golang/snappy"
3234
"github.com/pkg/errors"
@@ -40,19 +42,63 @@ import (
4042
"github.com/prometheus/prometheus/storage"
4143
"github.com/prometheus/prometheus/tsdb"
4244
"github.com/stretchr/testify/require"
43-
44-
"github.com/efficientgo/core/testutil"
45+
"google.golang.org/grpc/credentials/insecure"
46+
"google.golang.org/grpc/resolver"
4547

4648
"github.com/thanos-io/thanos/pkg/block/metadata"
49+
"github.com/thanos-io/thanos/pkg/component"
4750
"github.com/thanos-io/thanos/pkg/extkingpin"
4851
"github.com/thanos-io/thanos/pkg/logging"
4952
"github.com/thanos-io/thanos/pkg/runutil"
53+
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
54+
"github.com/thanos-io/thanos/pkg/store"
5055
"github.com/thanos-io/thanos/pkg/store/labelpb"
5156
"github.com/thanos-io/thanos/pkg/store/storepb"
5257
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
5358
"github.com/thanos-io/thanos/pkg/tenancy"
5459
)
5560

61+
const dnsScheme = "dns"
62+
63+
type dnsResolver struct {
64+
logger log.Logger
65+
target resolver.Target
66+
cc resolver.ClientConn
67+
addrStore map[string][]string
68+
}
69+
70+
func (r *dnsResolver) start() {
71+
addrStrs := r.addrStore[r.target.Endpoint()]
72+
addrs := make([]resolver.Address, len(addrStrs))
73+
for i, s := range addrStrs {
74+
addrs[i] = resolver.Address{Addr: s}
75+
}
76+
if err := r.cc.UpdateState(resolver.State{Addresses: addrs}); err != nil {
77+
level.Error(r.logger).Log("msg", "failed to update state", "err", err)
78+
}
79+
}
80+
81+
func (*dnsResolver) ResolveNow(_ resolver.ResolveNowOptions) {}
82+
83+
func (*dnsResolver) Close() {}
84+
85+
type dnsResolverBuilder struct {
86+
logger log.Logger
87+
addrStore map[string][]string
88+
}
89+
90+
func (b *dnsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
91+
r := &dnsResolver{
92+
logger: b.logger,
93+
target: target,
94+
cc: cc,
95+
addrStore: b.addrStore,
96+
}
97+
r.start()
98+
return r, nil
99+
}
100+
func (*dnsResolverBuilder) Scheme() string { return dnsScheme }
101+
56102
type fakeTenantAppendable struct {
57103
f *fakeAppendable
58104
}
@@ -191,6 +237,10 @@ func (g *fakePeersGroup) close(addr string) error {
191237
return nil
192238
}
193239

240+
func (g *fakePeersGroup) closeAll() error {
241+
return nil
242+
}
243+
194244
func (g *fakePeersGroup) getConnection(_ context.Context, addr string) (WriteableStoreAsyncClient, error) {
195245
c, ok := g.clients[addr]
196246
if !ok {
@@ -1736,50 +1786,90 @@ func TestHandlerFlippingHashrings(t *testing.T) {
17361786
wg.Wait()
17371787
}
17381788

1739-
func TestPeerGroup(t *testing.T) {
1789+
func TestIngestorRestart(t *testing.T) {
1790+
var err error
17401791
logger := log.NewLogfmtLogger(os.Stderr)
1741-
serverAddress := "http://localhost:19090"
1742-
dialOpts := []grpc.DialOption{grpc.WithInsecure()}
1743-
srv := startServer(logger, serverAddress)
1744-
ctx := context.TODO()
1792+
addr1, addr2, addr3 := "localhost:14090", "localhost:14091", "localhost:14092"
1793+
ing1, ing2 := startIngestor(logger, addr1, 0), startIngestor(logger, addr2, 0)
1794+
defer ing1.Shutdown(err) // srv1 is stable and will only be closed after the test ends
1795+
1796+
clientAddr := "ingestor.com"
1797+
dnsBuilder := &dnsResolverBuilder{
1798+
logger: logger,
1799+
addrStore: map[string][]string{clientAddr: {addr2}},
1800+
}
1801+
resolver.Register(dnsBuilder)
1802+
dialOpts := []grpc.DialOption{
1803+
grpc.WithTransportCredentials(insecure.NewCredentials()),
1804+
grpc.WithResolvers(resolver.Get(dnsScheme)),
1805+
}
17451806
client := NewHandler(logger, &Options{
1746-
MaxBackoff: 1 * time.Second,
1747-
DialOpts: dialOpts,
1807+
MaxBackoff: 1 * time.Second,
1808+
DialOpts: dialOpts,
1809+
ReplicationFactor: 2,
1810+
ReceiverMode: RouterOnly,
1811+
ForwardTimeout: 15 * time.Second,
17481812
})
1749-
_, err := client.peers.getConnection(ctx, serverAddress)
1750-
require.NoError(t, err)
1751-
// close the server and wait for the backoff to kick in and see how long it takes
1752-
srv.Close()
1753-
// server is closed, now we can't send requests to it
1754-
er := endpointReplica{endpoint: serverAddress, replica: 0}
1755-
data := trackedSeries{
1756-
timeSeries: []prompb.TimeSeries{
1813+
// one of the endpoints is DNS and wire up to different backend address on the fly
1814+
client.Hashring(&simpleHashring{addr1, fmt.Sprintf("%s:///%s", dnsScheme, clientAddr)})
1815+
defer client.Close()
1816+
1817+
ctx := context.TODO()
1818+
data := &prompb.WriteRequest{
1819+
Timeseries: []prompb.TimeSeries{
17571820
{
1758-
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
1821+
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", addr3)),
17591822
Samples: []prompb.Sample{{Timestamp: time.Now().Unix(), Value: 123}},
17601823
},
17611824
},
17621825
}
17631826

1764-
N := 50
1765-
responses := make(chan writeResponse, N)
1766-
for i := 0; i < N; i++ {
1767-
var wg sync.WaitGroup
1768-
wg.Add(1)
1769-
client.sendRemoteWrite(ctx, "", er, data, false, responses, &wg)
1770-
wg.Wait()
1771-
_, err = client.peers.getConnection(ctx, serverAddress)
1772-
require.Error(t, errUnavailable, err)
1773-
time.Sleep(100 * time.Millisecond)
1827+
err = client.handleRequest(ctx, 0, "test", data)
1828+
require.NoError(t, err)
1829+
1830+
// close srv2 to simulate ingestor down
1831+
ing2.Shutdown(err)
1832+
ing3 := startIngestor(logger, addr3, 2*time.Second)
1833+
defer ing3.Shutdown(err)
1834+
// bind the new backend to the same DNS
1835+
dnsBuilder.addrStore[clientAddr] = []string{addr3}
1836+
1837+
iter, errs := 10, 0
1838+
for i := 0; i < iter; i++ {
1839+
err = client.handleRequest(ctx, 0, "test", data)
1840+
if err != nil {
1841+
require.Error(t, errUnavailable, err)
1842+
errs++
1843+
} else {
1844+
break
1845+
}
1846+
time.Sleep(500 * time.Millisecond)
17741847
}
1848+
require.Greater(t, errs, 0, "expected to have unavailable errors initially")
1849+
require.Less(t, errs, iter, "expected to recover quickly after server restarts")
17751850
}
17761851

1777-
func startServer(logger log.Logger, serverAddress string) *Handler {
1778-
srv := NewHandler(logger, &Options{
1779-
ListenAddress: serverAddress,
1780-
})
1852+
type fakeStoreServer struct {
1853+
logger log.Logger
1854+
}
1855+
1856+
func (f *fakeStoreServer) RemoteWrite(_ context.Context, in *storepb.WriteRequest) (*storepb.WriteResponse, error) {
1857+
level.Debug(f.logger).Log("msg", "received remote write request", "request", in.String())
1858+
return &storepb.WriteResponse{}, nil
1859+
}
1860+
1861+
func startIngestor(logger log.Logger, serverAddress string, delay time.Duration) *grpcserver.Server {
1862+
h := &fakeStoreServer{logger: logger}
1863+
srv := grpcserver.TestServer(logger, component.Receive, serverAddress,
1864+
grpcserver.WithServer(store.RegisterWritableStoreServer(h)),
1865+
)
17811866
go func() {
1782-
srv.Run()
1867+
if delay > 0 {
1868+
time.Sleep(delay)
1869+
}
1870+
if err := srv.ListenAndServe(); err != nil {
1871+
level.Error(logger).Log("msg", "server error", "addr", serverAddress, "err", err)
1872+
}
17831873
}()
17841874
return srv
17851875
}

pkg/server/grpc/grpc.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,3 +163,29 @@ func (s *Server) Shutdown(err error) {
163163
}
164164
level.Info(s.logger).Log("msg", "internal server is shutdown gracefully", "err", err)
165165
}
166+
167+
// TestServer is used only for unit tests to create a new gRPC server.
168+
func TestServer(logger log.Logger, comp component.Component, address string, opts ...Option) *Server {
169+
logger = log.With(logger, "service", "gRPC/server", "component", comp.String())
170+
options := options{
171+
network: "tcp",
172+
listen: address,
173+
grpcOpts: []grpc.ServerOption{
174+
grpc.MaxSendMsgSize(math.MaxInt32),
175+
},
176+
}
177+
for _, o := range opts {
178+
o.apply(&options)
179+
}
180+
s := grpc.NewServer(options.grpcOpts...)
181+
// Register all configured servers.
182+
for _, f := range options.registerServerFuncs {
183+
f(s)
184+
}
185+
return &Server{
186+
logger: logger,
187+
comp: comp,
188+
srv: s,
189+
opts: options,
190+
}
191+
}

0 commit comments

Comments
 (0)