Skip to content

Commit 971761a

Browse files
authored
[ES-1122621] fix receive connection re-establish taking too long (5m) (#36)
2 parents 74c2326 + 98088e7 commit 971761a

File tree

4 files changed

+199
-2
lines changed

4 files changed

+199
-2
lines changed

cmd/thanos/receive.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ func runReceive(
158158
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(conf.compression)))
159159
}
160160
if receiveMode == receive.RouterOnly {
161+
dialOpts = append(dialOpts, grpc.WithIdleTimeout(time.Duration(*conf.maxBackoff)))
161162
dialOpts = append(dialOpts, extgrpc.EndpointGroupGRPCOpts()...)
162163
}
163164

pkg/receive/handler.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"net/http"
1616
"sort"
1717
"strconv"
18+
"strings"
1819
"sync"
1920
"time"
2021

@@ -396,6 +397,9 @@ func (h *Handler) getStats(r *http.Request, statsByLabelName string) ([]statusap
396397
// Close stops the Handler.
397398
func (h *Handler) Close() {
398399
runutil.CloseWithLogOnErr(h.logger, h.httpSrv, "receive HTTP server")
400+
if err := h.peers.closeAll(); err != nil {
401+
level.Error(h.logger).Log("msg", "closing peer connections failed, we might have leaked file descriptors", "err", err)
402+
}
399403
}
400404

401405
// Run serves the HTTP endpoints.
@@ -790,6 +794,12 @@ func (h *Handler) distributeTimeseriesToReplicas(
790794
writeDestination[endpointReplica] = writeableSeries
791795
}
792796
}
797+
if h.receiverMode == RouterOnly && len(localWrites) > 0 {
798+
panic("router only mode should not have any local writes")
799+
}
800+
if h.receiverMode == IngestorOnly && len(remoteWrites) > 0 {
801+
panic("ingestor only mode should not have any remote writes")
802+
}
793803
return localWrites, remoteWrites, nil
794804
}
795805

@@ -1226,6 +1236,7 @@ func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, as
12261236
}
12271237

12281238
type peersContainer interface {
1239+
closeAll() error
12291240
close(string) error
12301241
getConnection(context.Context, string) (WriteableStoreAsyncClient, error)
12311242
markPeerUnavailable(string)
@@ -1273,18 +1284,37 @@ type peerGroup struct {
12731284
dialer func(ctx context.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error)
12741285
}
12751286

1287+
func (p *peerGroup) closeAll() error {
1288+
p.m.Lock()
1289+
defer p.m.Unlock()
1290+
var closeErrors []string // Slice to store error messages
1291+
for addr := range p.connections {
1292+
if err := p.closeUnlocked(addr); err != nil {
1293+
// Collect error messages
1294+
closeErrors = append(closeErrors, fmt.Sprintf("failed to close %s: %v", addr, err))
1295+
}
1296+
}
1297+
if len(closeErrors) > 0 {
1298+
// Return aggregated error messages
1299+
return fmt.Errorf("multiple errors closing connections: %s", strings.Join(closeErrors, ", "))
1300+
}
1301+
return nil
1302+
}
1303+
12761304
func (p *peerGroup) close(addr string) error {
12771305
p.m.Lock()
12781306
defer p.m.Unlock()
1307+
return p.closeUnlocked(addr)
1308+
}
12791309

1310+
func (p *peerGroup) closeUnlocked(addr string) error {
12801311
c, ok := p.connections[addr]
12811312
if !ok {
12821313
// NOTE(GiedriusS): this could be valid case when the connection
12831314
// was never established.
12841315
return nil
12851316
}
1286-
1287-
p.connections[addr].wp.Close()
1317+
c.wp.Close()
12881318
delete(p.connections, addr)
12891319
if err := c.cc.Close(); err != nil {
12901320
return fmt.Errorf("closing connection for %s", addr)

pkg/receive/handler_test.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
"github.com/alecthomas/units"
2929
"github.com/go-kit/log"
30+
"github.com/go-kit/log/level"
3031
"github.com/gogo/protobuf/proto"
3132
"github.com/golang/snappy"
3233
"github.com/pkg/errors"
@@ -40,19 +41,65 @@ import (
4041
"github.com/prometheus/prometheus/storage"
4142
"github.com/prometheus/prometheus/tsdb"
4243
"github.com/stretchr/testify/require"
44+
"google.golang.org/grpc/credentials/insecure"
45+
"google.golang.org/grpc/resolver"
4346

4447
"github.com/efficientgo/core/testutil"
4548

4649
"github.com/thanos-io/thanos/pkg/block/metadata"
50+
"github.com/thanos-io/thanos/pkg/component"
4751
"github.com/thanos-io/thanos/pkg/extkingpin"
4852
"github.com/thanos-io/thanos/pkg/logging"
4953
"github.com/thanos-io/thanos/pkg/runutil"
54+
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
55+
"github.com/thanos-io/thanos/pkg/store"
5056
"github.com/thanos-io/thanos/pkg/store/labelpb"
5157
"github.com/thanos-io/thanos/pkg/store/storepb"
5258
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
5359
"github.com/thanos-io/thanos/pkg/tenancy"
5460
)
5561

62+
const dnsScheme = "dns"
63+
64+
type dnsResolver struct {
65+
logger log.Logger
66+
target resolver.Target
67+
cc resolver.ClientConn
68+
addrStore map[string][]string
69+
}
70+
71+
func (r *dnsResolver) start() {
72+
addrStrs := r.addrStore[r.target.Endpoint()]
73+
addrs := make([]resolver.Address, len(addrStrs))
74+
for i, s := range addrStrs {
75+
addrs[i] = resolver.Address{Addr: s}
76+
}
77+
if err := r.cc.UpdateState(resolver.State{Addresses: addrs}); err != nil {
78+
level.Error(r.logger).Log("msg", "failed to update state", "err", err)
79+
}
80+
}
81+
82+
func (*dnsResolver) ResolveNow(_ resolver.ResolveNowOptions) {}
83+
84+
func (*dnsResolver) Close() {}
85+
86+
type dnsResolverBuilder struct {
87+
logger log.Logger
88+
addrStore map[string][]string
89+
}
90+
91+
func (b *dnsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
92+
r := &dnsResolver{
93+
logger: b.logger,
94+
target: target,
95+
cc: cc,
96+
addrStore: b.addrStore,
97+
}
98+
r.start()
99+
return r, nil
100+
}
101+
func (*dnsResolverBuilder) Scheme() string { return dnsScheme }
102+
56103
type fakeTenantAppendable struct {
57104
f *fakeAppendable
58105
}
@@ -191,6 +238,10 @@ func (g *fakePeersGroup) close(addr string) error {
191238
return nil
192239
}
193240

241+
func (g *fakePeersGroup) closeAll() error {
242+
return nil
243+
}
244+
194245
func (g *fakePeersGroup) getConnection(_ context.Context, addr string) (WriteableStoreAsyncClient, error) {
195246
c, ok := g.clients[addr]
196247
if !ok {
@@ -1735,3 +1786,92 @@ func TestHandlerFlippingHashrings(t *testing.T) {
17351786
cancel()
17361787
wg.Wait()
17371788
}
1789+
1790+
func TestIngestorRestart(t *testing.T) {
1791+
var err error
1792+
logger := log.NewLogfmtLogger(os.Stderr)
1793+
addr1, addr2, addr3 := "localhost:14090", "localhost:14091", "localhost:14092"
1794+
ing1, ing2 := startIngestor(logger, addr1, 0), startIngestor(logger, addr2, 0)
1795+
defer ing1.Shutdown(err) // srv1 is stable and will only be closed after the test ends
1796+
1797+
clientAddr := "ingestor.com"
1798+
dnsBuilder := &dnsResolverBuilder{
1799+
logger: logger,
1800+
addrStore: map[string][]string{clientAddr: {addr2}},
1801+
}
1802+
resolver.Register(dnsBuilder)
1803+
dialOpts := []grpc.DialOption{
1804+
grpc.WithIdleTimeout(1 * time.Second), // set idle timeout to 1s will re-establish the connection quickly
1805+
grpc.WithTransportCredentials(insecure.NewCredentials()),
1806+
grpc.WithResolvers(resolver.Get(dnsScheme)),
1807+
}
1808+
client := NewHandler(logger, &Options{
1809+
MaxBackoff: 1 * time.Second,
1810+
DialOpts: dialOpts,
1811+
ReplicationFactor: 2,
1812+
ReceiverMode: RouterOnly,
1813+
ForwardTimeout: 15 * time.Second,
1814+
})
1815+
// one of the endpoints is DNS and wire up to different backend address on the fly
1816+
client.Hashring(&simpleHashring{addr1, fmt.Sprintf("%s:///%s", dnsScheme, clientAddr)})
1817+
defer client.Close()
1818+
1819+
ctx := context.TODO()
1820+
data := &prompb.WriteRequest{
1821+
Timeseries: []prompb.TimeSeries{
1822+
{
1823+
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", addr3)),
1824+
Samples: []prompb.Sample{{Timestamp: time.Now().Unix(), Value: 123}},
1825+
},
1826+
},
1827+
}
1828+
1829+
err = client.handleRequest(ctx, 0, "test", data)
1830+
require.NoError(t, err)
1831+
1832+
// close srv2 to simulate ingestor down
1833+
ing2.Shutdown(err)
1834+
ing3 := startIngestor(logger, addr3, 2*time.Second)
1835+
defer ing3.Shutdown(err)
1836+
// bind the new backend to the same DNS
1837+
dnsBuilder.addrStore[clientAddr] = []string{addr3}
1838+
1839+
iter, errs := 10, 0
1840+
for i := 0; i < iter; i++ {
1841+
err = client.handleRequest(ctx, 0, "test", data)
1842+
if err != nil {
1843+
require.Error(t, errUnavailable, err)
1844+
errs++
1845+
} else {
1846+
break
1847+
}
1848+
time.Sleep(500 * time.Millisecond)
1849+
}
1850+
require.Greater(t, errs, 0, "expected to have unavailable errors initially")
1851+
require.Less(t, errs, iter, "expected to recover quickly after server restarts")
1852+
}
1853+
1854+
type fakeStoreServer struct {
1855+
logger log.Logger
1856+
}
1857+
1858+
func (f *fakeStoreServer) RemoteWrite(_ context.Context, in *storepb.WriteRequest) (*storepb.WriteResponse, error) {
1859+
level.Debug(f.logger).Log("msg", "received remote write request", "request", in.String())
1860+
return &storepb.WriteResponse{}, nil
1861+
}
1862+
1863+
func startIngestor(logger log.Logger, serverAddress string, delay time.Duration) *grpcserver.Server {
1864+
h := &fakeStoreServer{logger: logger}
1865+
srv := grpcserver.TestServer(logger, component.Receive, serverAddress,
1866+
grpcserver.WithServer(store.RegisterWritableStoreServer(h)),
1867+
)
1868+
go func() {
1869+
if delay > 0 {
1870+
time.Sleep(delay)
1871+
}
1872+
if err := srv.ListenAndServe(); err != nil {
1873+
level.Error(logger).Log("msg", "server error", "addr", serverAddress, "err", err)
1874+
}
1875+
}()
1876+
return srv
1877+
}

pkg/server/grpc/grpc.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,3 +163,29 @@ func (s *Server) Shutdown(err error) {
163163
}
164164
level.Info(s.logger).Log("msg", "internal server is shutdown gracefully", "err", err)
165165
}
166+
167+
// TestServer is used only for unit tests to create a new gRPC server.
168+
func TestServer(logger log.Logger, comp component.Component, address string, opts ...Option) *Server {
169+
logger = log.With(logger, "service", "gRPC/server", "component", comp.String())
170+
options := options{
171+
network: "tcp",
172+
listen: address,
173+
grpcOpts: []grpc.ServerOption{
174+
grpc.MaxSendMsgSize(math.MaxInt32),
175+
},
176+
}
177+
for _, o := range opts {
178+
o.apply(&options)
179+
}
180+
s := grpc.NewServer(options.grpcOpts...)
181+
// Register all configured servers.
182+
for _, f := range options.registerServerFuncs {
183+
f(s)
184+
}
185+
return &Server{
186+
logger: logger,
187+
comp: comp,
188+
srv: s,
189+
opts: options,
190+
}
191+
}

0 commit comments

Comments
 (0)