Skip to content

Commit f0bf1a4

Browse files
authored
Merge db_main/ into release (#74)
2 parents cc22ed6 + eddbbb3 commit f0bf1a4

File tree

5 files changed

+46
-15
lines changed

5 files changed

+46
-15
lines changed

internal/cortex/frontend/transport/handler.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,12 @@ type Handler struct {
5959
failedQueryCache *utils.FailedQueryCache
6060

6161
// Metrics.
62-
querySeconds *prometheus.CounterVec
63-
querySeries *prometheus.CounterVec
64-
queryBytes *prometheus.CounterVec
65-
activeUsers *util.ActiveUsersCleanupService
62+
querySeconds *prometheus.CounterVec
63+
querySeries *prometheus.CounterVec
64+
queryBytes *prometheus.CounterVec
65+
activeUsers *util.ActiveUsersCleanupService
66+
slowQueryCount prometheus.Counter
67+
failedQueryCount prometheus.Counter
6668
}
6769

6870
// NewHandler creates a new frontend handler.
@@ -108,16 +110,24 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
108110
_ = h.activeUsers.StartAsync(context.Background())
109111
}
110112

113+
h.slowQueryCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{
114+
Name: "cortex_slow_query_total",
115+
Help: "Total number of slow queries detected.",
116+
})
117+
h.failedQueryCount = promauto.With(reg).NewCounter(prometheus.CounterOpts{
118+
Name: "cortex_failed_query_total",
119+
Help: "Total number of failed queries detected.",
120+
})
111121
return h
112122
}
113123

114124
func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
115125
var (
116-
stats *querier_stats.Stats
126+
stats *querier_stats.Stats
117127
// For failed/slow query logging and query stats.
118128
queryString url.Values
119129
// For failed query cache
120-
urlQuery url.Values
130+
urlQuery url.Values
121131
)
122132

123133
// Initialise the stats in the context and make sure it's propagated
@@ -206,6 +216,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
206216
}
207217

208218
func (f *Handler) reportFailedQuery(r *http.Request, queryString url.Values, err error, queryResponseTime time.Duration) {
219+
f.failedQueryCount.Inc()
209220
// NOTE(GiedriusS): see https://github.com/grafana/grafana/pull/60301 for more info.
210221
grafanaDashboardUID := "-"
211222
if dashboardUID := r.Header.Get("X-Dashboard-Uid"); dashboardUID != "" {
@@ -235,6 +246,7 @@ func (f *Handler) reportFailedQuery(r *http.Request, queryString url.Values, err
235246

236247
// reportSlowQuery reports slow queries.
237248
func (f *Handler) reportSlowQuery(r *http.Request, responseHeaders http.Header, queryString url.Values, queryResponseTime time.Duration) {
249+
f.slowQueryCount.Inc()
238250
// NOTE(GiedriusS): see https://github.com/grafana/grafana/pull/60301 for more info.
239251
grafanaDashboardUID := "-"
240252
if dashboardUID := r.Header.Get("X-Dashboard-Uid"); dashboardUID != "" {

pkg/query/endpointset.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,13 @@ func (e *EndpointSet) Update(ctx context.Context) {
551551
func (e *EndpointSet) updateEndpoint(ctx context.Context, spec *GRPCEndpointSpec, er *endpointRef) {
552552
metadata, err := er.Metadata(ctx, infopb.NewInfoClient(er.cc), storepb.NewStoreClient(er.cc))
553553
if err != nil {
554-
level.Warn(e.logger).Log("msg", "update of endpoint failed", "err", errors.Wrap(err, "getting metadata"), "address", spec.Addr())
554+
level.Warn(e.logger).Log(
555+
"msg", "update of endpoint failed",
556+
"err", errors.Wrap(err, "getting metadata"),
557+
"address", spec.Addr(),
558+
"group_key", spec.groupKey,
559+
"replica_key", spec.replicaKey,
560+
)
555561
}
556562
er.update(e.now, metadata, err)
557563
}

pkg/receive/multitsdb.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func (l *localClient) String() string {
141141
mint, maxt := l.store.TimeRange()
142142
return fmt.Sprintf(
143143
"LabelSets: %v MinTime: %d MaxTime: %d",
144-
labelpb.PromLabelSetsToStringN(l.LabelSets(), 10), mint, maxt,
144+
labelpb.PromLabelSetsToStringN(l.LabelSets(), 500), mint, maxt,
145145
)
146146
}
147147

pkg/store/labelpb/label.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -303,15 +303,18 @@ func ExtendSortedLabels(lset, extend labels.Labels) labels.Labels {
303303
}
304304

305305
func PromLabelSetsToString(lsets []labels.Labels) string {
306-
return PromLabelSetsToStringN(lsets, 1000000)
306+
return PromLabelSetsToStringN(lsets, 500)
307307
}
308308

309-
func PromLabelSetsToStringN(lsets []labels.Labels, maxNumLabels int) string {
309+
func PromLabelSetsToStringN(lsets []labels.Labels, maxLength int) string {
310+
if len(lsets) == 0 {
311+
return ""
312+
}
310313
s := []string{}
311314
for _, ls := range lsets {
312315
s = append(s, ls.String())
313-
maxNumLabels--
314-
if maxNumLabels <= 0 {
316+
maxLength -= len(ls.String())
317+
if maxLength <= 0 {
315318
break
316319
}
317320
}

pkg/store/proxy.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ type ProxyStore struct {
9999

100100
type proxyStoreMetrics struct {
101101
emptyStreamResponses prometheus.Counter
102+
storeFailureCount *prometheus.CounterVec
102103
}
103104

104105
func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics {
@@ -108,6 +109,10 @@ func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics {
108109
Name: "thanos_proxy_store_empty_stream_responses_total",
109110
Help: "Total number of empty responses received.",
110111
})
112+
m.storeFailureCount = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
113+
Name: "thanos_proxy_store_failure_total",
114+
Help: "Total number of store failures.",
115+
}, []string{"group", "replica"})
111116

112117
return &m
113118
}
@@ -431,7 +436,8 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
431436
if err != nil {
432437
// NB: respSet is nil in case of error.
433438
level.Error(reqLogger).Log("err", err)
434-
level.Warn(s.logger).Log("msg", "Store failure", "group", st.GroupKey(), "replica", st.ReplicaKey())
439+
level.Warn(s.logger).Log("msg", "Store failure", "group", st.GroupKey(), "replica", st.ReplicaKey(), "err", err)
440+
s.metrics.storeFailureCount.WithLabelValues(st.GroupKey(), st.ReplicaKey()).Inc()
435441
bumpCounter(st.GroupKey(), st.ReplicaKey(), failedStores)
436442
totalFailedStores++
437443
if r.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA {
@@ -464,14 +470,18 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
464470

465471
if resp.GetWarning() != "" {
466472
totalFailedStores++
467-
level.Error(s.logger).Log("msg", "Series: warning from store", "warning", resp.GetWarning())
473+
maxWarningBytes := 2000
474+
warning := resp.GetWarning()[:min(maxWarningBytes, len(resp.GetWarning()))]
475+
level.Error(s.logger).Log("msg", "Store failure with warning", "warning", warning)
476+
// Don't have group/replica keys here, so we can't attribute the warning to a specific store.
477+
s.metrics.storeFailureCount.WithLabelValues("", "").Inc()
468478
if r.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA {
469479
// TODO: attribute the warning to the store(group key and replica key) that produced it.
470480
// Each client streams a sequence of time series, so it's not trivial to attribute the warning to a specific client.
471481
if totalFailedStores > 1 {
472482
level.Error(reqLogger).Log("msg", "more than one stores have failed")
473483
// If we don't know which store has failed, we can tolerate at most one failed store.
474-
return status.Error(codes.Aborted, resp.GetWarning())
484+
return status.Error(codes.Aborted, warning)
475485
}
476486
} else if r.PartialResponseDisabled || r.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT {
477487
return status.Error(codes.Aborted, resp.GetWarning())

0 commit comments

Comments
 (0)