Skip to content

Commit 89d3895

Browse files
authored
Handle runtime errors in ingester (#6769)
1 parent cf407e4 commit 89d3895

File tree

3 files changed

+126
-16
lines changed

3 files changed

+126
-16
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog
22

33
## master / unreleased
4+
* [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769
45
* [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718
56
* [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603
67
* [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727

pkg/ingester/ingester.go

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"net/http"
1111
"os"
1212
"path/filepath"
13+
"runtime"
1314
"slices"
1415
"strings"
1516
"sync"
@@ -1637,8 +1638,9 @@ func (u *userTSDB) releaseAppendLock() {
16371638
}
16381639

16391640
// QueryExemplars implements service.IngesterServer
1640-
func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (*client.ExemplarQueryResponse, error) {
1641-
if err := i.checkRunning(); err != nil {
1641+
func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (resp *client.ExemplarQueryResponse, err error) {
1642+
defer recoverIngester(i.logger, &err)
1643+
if err = i.checkRunning(); err != nil {
16421644
return nil, err
16431645
}
16441646

@@ -1659,7 +1661,7 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
16591661
return &client.ExemplarQueryResponse{}, nil
16601662
}
16611663

1662-
if err := db.acquireReadLock(); err != nil {
1664+
if err = db.acquireReadLock(); err != nil {
16631665
return &client.ExemplarQueryResponse{}, nil
16641666
}
16651667
defer db.releaseReadLock()
@@ -1701,14 +1703,16 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
17011703
}
17021704

17031705
// LabelValues returns all label values that are associated with a given label name.
1704-
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) {
1706+
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (resp *client.LabelValuesResponse, err error) {
1707+
defer recoverIngester(i.logger, &err)
17051708
resp, cleanup, err := i.labelsValuesCommon(ctx, req)
17061709
defer cleanup()
17071710
return resp, err
17081711
}
17091712

17101713
// LabelValuesStream returns all label values that are associated with a given label name.
1711-
func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) error {
1714+
func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) (err error) {
1715+
defer recoverIngester(i.logger, &err)
17121716
resp, cleanup, err := i.labelsValuesCommon(stream.Context(), req)
17131717
defer cleanup()
17141718

@@ -1796,14 +1800,16 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
17961800
}
17971801

17981802
// LabelNames return all the label names.
1799-
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) {
1803+
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (resp *client.LabelNamesResponse, err error) {
1804+
defer recoverIngester(i.logger, &err)
18001805
resp, cleanup, err := i.labelNamesCommon(ctx, req)
18011806
defer cleanup()
18021807
return resp, err
18031808
}
18041809

18051810
// LabelNamesStream return all the label names.
1806-
func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) error {
1811+
func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) (err error) {
1812+
defer recoverIngester(i.logger, &err)
18071813
resp, cleanup, err := i.labelNamesCommon(stream.Context(), req)
18081814
defer cleanup()
18091815

@@ -1819,7 +1825,7 @@ func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client
18191825
resp := &client.LabelNamesStreamResponse{
18201826
LabelNames: resp.LabelNames[i:j],
18211827
}
1822-
err := client.SendLabelNamesStream(stream, resp)
1828+
err = client.SendLabelNamesStream(stream, resp)
18231829
if err != nil {
18241830
return err
18251831
}
@@ -1891,8 +1897,9 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
18911897
}
18921898

18931899
// MetricsForLabelMatchers returns all the metrics which match a set of matchers.
1894-
func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) {
1895-
result := &client.MetricsForLabelMatchersResponse{}
1900+
func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (result *client.MetricsForLabelMatchersResponse, err error) {
1901+
defer recoverIngester(i.logger, &err)
1902+
result = &client.MetricsForLabelMatchersResponse{}
18961903
cleanup, err := i.metricsForLabelMatchersCommon(ctx, req, func(l labels.Labels) error {
18971904
result.Metric = append(result.Metric, &cortexpb.Metric{
18981905
Labels: cortexpb.FromLabelsToLabelAdapters(l),
@@ -1903,7 +1910,8 @@ func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.Metr
19031910
return result, err
19041911
}
19051912

1906-
func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatchersRequest, stream client.Ingester_MetricsForLabelMatchersStreamServer) error {
1913+
func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatchersRequest, stream client.Ingester_MetricsForLabelMatchersStreamServer) (err error) {
1914+
defer recoverIngester(i.logger, &err)
19071915
result := &client.MetricsForLabelMatchersStreamResponse{}
19081916

19091917
cleanup, err := i.metricsForLabelMatchersCommon(stream.Context(), req, func(l labels.Labels) error {
@@ -1927,7 +1935,7 @@ func (i *Ingester) MetricsForLabelMatchersStream(req *client.MetricsForLabelMatc
19271935

19281936
// Send last batch
19291937
if len(result.Metric) > 0 {
1930-
err := client.SendMetricsForLabelMatchersStream(stream, result)
1938+
err = client.SendMetricsForLabelMatchersStream(stream, result)
19311939
if err != nil {
19321940
return err
19331941
}
@@ -2160,8 +2168,10 @@ const queryStreamBatchMessageSize = 1 * 1024 * 1024
21602168

21612169
// QueryStream implements service.IngesterServer
21622170
// Streams metrics from a TSDB. This implements the client.IngesterServer interface
2163-
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error {
2164-
if err := i.checkRunning(); err != nil {
2171+
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) (err error) {
2172+
defer recoverIngester(i.logger, &err)
2173+
2174+
if err = i.checkRunning(); err != nil {
21652175
return err
21662176
}
21672177

@@ -3443,3 +3453,20 @@ func (c *labelSetReasonCounters) increment(matchedLabelSetLimits []validation.Li
34433453
}
34443454
}
34453455
}
3456+
3457+
func recoverIngester(logger log.Logger, errp *error) {
3458+
e := recover()
3459+
if e == nil {
3460+
return
3461+
}
3462+
3463+
switch err := e.(type) {
3464+
case runtime.Error:
3465+
// Print the stack trace but do not inhibit the running application.
3466+
buf := make([]byte, 64<<10)
3467+
buf = buf[:runtime.Stack(buf, false)]
3468+
3469+
level.Error(logger).Log("msg", "runtime panic in ingester", "err", err, "stacktrace", string(buf))
3470+
*errp = errors.Wrap(err, "unexpected error")
3471+
}
3472+
}

pkg/ingester/ingester_test.go

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"encoding/json"
7+
"errors"
78
"fmt"
89
"io"
910
"math"
@@ -13,6 +14,7 @@ import (
1314
"net/url"
1415
"os"
1516
"path/filepath"
17+
"runtime"
1618
"sort"
1719
"strconv"
1820
"strings"
@@ -22,7 +24,6 @@ import (
2224

2325
"github.com/go-kit/log"
2426
"github.com/oklog/ulid"
25-
"github.com/pkg/errors"
2627
"github.com/prometheus/client_golang/prometheus"
2728
"github.com/prometheus/client_golang/prometheus/testutil"
2829
"github.com/prometheus/common/model"
@@ -40,6 +41,7 @@ import (
4041
"github.com/thanos-io/objstore"
4142
"github.com/thanos-io/thanos/pkg/runutil"
4243
"github.com/thanos-io/thanos/pkg/shipper"
44+
storecache "github.com/thanos-io/thanos/pkg/store/cache"
4345
"github.com/weaveworks/common/httpgrpc"
4446
"github.com/weaveworks/common/middleware"
4547
"github.com/weaveworks/common/user"
@@ -3814,12 +3816,16 @@ func (w *wrappedExpandedPostingsCache) PurgeExpiredItems() {
38143816

38153817
type mockQueryStreamServer struct {
38163818
grpc.ServerStream
3817-
ctx context.Context
3819+
ctx context.Context
3820+
shouldPanic bool
38183821

38193822
series []client.TimeSeriesChunk
38203823
}
38213824

38223825
func (m *mockQueryStreamServer) Send(response *client.QueryStreamResponse) error {
3826+
if m.shouldPanic {
3827+
panic("runtime error")
3828+
}
38233829
m.series = append(m.series, response.Chunkseries...)
38243830
return nil
38253831
}
@@ -6937,6 +6943,74 @@ func TestIngester_UpdateLabelSetMetrics(t *testing.T) {
69376943
`), "cortex_discarded_samples_total", "cortex_discarded_samples_per_labelset_total"))
69386944
}
69396945

6946+
func TestIngesterPanicHandling(t *testing.T) {
6947+
ctx := context.Background()
6948+
cfg := defaultIngesterTestConfig(t)
6949+
cfg.BlocksStorageConfig.TSDB.CloseIdleTSDBTimeout = 0 // Will not run the loop, but will allow us to close any TSDB fast.
6950+
cfg.BlocksStorageConfig.TSDB.KeepUserTSDBOpenOnShutdown = true
6951+
6952+
// Create ingester
6953+
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
6954+
require.NoError(t, err)
6955+
6956+
// Induce panic in matchers cache calls
6957+
i.matchersCache = &panickingMatchersCache{}
6958+
6959+
require.NoError(t, services.StartAndAwaitRunning(ctx, i))
6960+
defer services.StopAndAwaitTerminated(ctx, i) //nolint:errcheck
6961+
6962+
// Wait until it's ACTIVE
6963+
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
6964+
return i.lifecycler.GetState()
6965+
})
6966+
6967+
pushSingleSampleAtTime(t, i, 1*time.Minute.Milliseconds())
6968+
6969+
ctx = user.InjectOrgID(context.Background(), userID)
6970+
checkRuntimeError := func(err error) {
6971+
var re runtime.Error
6972+
ok := errors.As(err, &re)
6973+
require.True(t, ok, "expected runtime.Error")
6974+
}
6975+
6976+
err = i.QueryStream(&client.QueryRequest{
6977+
StartTimestampMs: 0,
6978+
EndTimestampMs: math.MaxInt64,
6979+
Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: "foo"}},
6980+
}, &mockQueryStreamServer{ctx: ctx})
6981+
require.Error(t, err)
6982+
checkRuntimeError(err)
6983+
6984+
_, err = i.LabelNames(ctx, &client.LabelNamesRequest{
6985+
Matchers: &client.LabelMatchers{
6986+
Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: "foo"}},
6987+
},
6988+
Limit: int64(1),
6989+
})
6990+
require.Error(t, err)
6991+
checkRuntimeError(err)
6992+
6993+
_, err = i.LabelValues(ctx, &client.LabelValuesRequest{
6994+
Matchers: &client.LabelMatchers{
6995+
Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: "foo"}},
6996+
},
6997+
Limit: int64(1),
6998+
})
6999+
require.Error(t, err)
7000+
checkRuntimeError(err)
7001+
7002+
_, err = i.MetricsForLabelMatchers(ctx, &client.MetricsForLabelMatchersRequest{
7003+
MatchersSet: []*client.LabelMatchers{{
7004+
Matchers: []*client.LabelMatcher{
7005+
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "unknown"},
7006+
},
7007+
}},
7008+
Limit: int64(1),
7009+
})
7010+
require.Error(t, err)
7011+
checkRuntimeError(err)
7012+
}
7013+
69407014
// mockTenantLimits exposes per-tenant limits based on a provided map
69417015
type mockTenantLimits struct {
69427016
limits map[string]*validation.Limits
@@ -7004,3 +7078,11 @@ func CreateBlock(t *testing.T, ctx context.Context, dir string, mint, maxt int64
70047078

70057079
return block
70067080
}
7081+
7082+
type panickingMatchersCache struct{}
7083+
7084+
func (_ *panickingMatchersCache) GetOrSet(_ storecache.ConversionLabelMatcher, _ storecache.NewItemFunc) (*labels.Matcher, error) {
7085+
var a []int
7086+
a[1] = 2 // index out of range
7087+
return nil, nil
7088+
}

0 commit comments

Comments
 (0)