Skip to content

Commit 535e052

Browse files
authored
Fix blocking failed queries (#63)
2 parents babcbee + 26fe3c3 commit 535e052

File tree

4 files changed

+210
-56
lines changed

4 files changed

+210
-56
lines changed

cmd/thanos/query_frontend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ func runQueryFrontend(
326326
roundTripper = tripperWare(roundTripper)
327327

328328
// Create the query frontend transport.
329-
handler := transport.NewHandler(*cfg.CortexHandlerConfig, roundTripper, logger, nil)
329+
handler := transport.NewHandler(*cfg.CortexHandlerConfig, roundTripper, logger, reg)
330330
if cfg.CompressResponses {
331331
handler = gzhttp.GzipHandler(handler)
332332
}

internal/cortex/frontend/transport/handler.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
7474
}
7575

7676
if cfg.FailedQueryCacheCapacity > 0 {
77+
level.Info(log).Log("msg", "Creating failed query cache", "capacity", cfg.FailedQueryCacheCapacity)
7778
FailedQueryCache, errQueryCache := utils.NewFailedQueryCache(cfg.FailedQueryCacheCapacity, reg)
7879
if errQueryCache != nil {
7980
level.Warn(log).Log(errQueryCache.Error())
@@ -114,7 +115,6 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
114115
var (
115116
stats *querier_stats.Stats
116117
queryString url.Values
117-
urlQuery url.Values
118118
)
119119

120120
// Initialise the stats in the context and make sure it's propagated
@@ -134,14 +134,14 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
134134
r.Body = http.MaxBytesReader(w, r.Body, f.cfg.MaxBodySize)
135135
r.Body = io.NopCloser(io.TeeReader(r.Body, &buf))
136136

137-
urlQuery = r.URL.Query()
137+
queryString = f.parseRequestQueryString(r, buf)
138138

139139
// Check if query is cached
140140
if f.failedQueryCache != nil {
141-
cached, message := f.failedQueryCache.QueryHitCache(urlQuery)
141+
cached, message := f.failedQueryCache.QueryHitCache(queryString)
142142
if cached {
143143
w.WriteHeader(http.StatusForbidden)
144-
level.Info(util_log.WithContext(r.Context(), f.log)).Log(message)
144+
level.Info(util_log.WithContext(r.Context(), f.log)).Log("msg", message)
145145
return
146146
}
147147
}
@@ -152,20 +152,19 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
152152

153153
if err != nil {
154154
writeError(w, err)
155-
queryString = f.parseRequestQueryString(r, buf)
156155

157156
// Update cache for failed queries.
158157
if f.failedQueryCache != nil {
159-
success, message := f.failedQueryCache.UpdateFailedQueryCache(err, urlQuery)
158+
success, message := f.failedQueryCache.UpdateFailedQueryCache(err, queryString, queryResponseTime)
160159
if success {
161-
level.Info(util_log.WithContext(r.Context(), f.log)).Log(message)
160+
level.Info(util_log.WithContext(r.Context(), f.log)).Log("msg", message)
162161
} else {
163-
level.Debug(util_log.WithContext(r.Context(), f.log)).Log(message)
162+
level.Debug(util_log.WithContext(r.Context(), f.log)).Log("msg", message)
164163
}
165164
}
166165

167166
if f.cfg.LogFailedQueries {
168-
f.reportFailedQuery(r, queryString, err)
167+
f.reportFailedQuery(r, queryString, err, queryResponseTime)
169168
}
170169
return
171170
}
@@ -200,7 +199,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
200199
}
201200
}
202201

203-
func (f *Handler) reportFailedQuery(r *http.Request, queryString url.Values, err error) {
202+
func (f *Handler) reportFailedQuery(r *http.Request, queryString url.Values, err error, queryResponseTime time.Duration) {
204203
// NOTE(GiedriusS): see https://github.com/grafana/grafana/pull/60301 for more info.
205204
grafanaDashboardUID := "-"
206205
if dashboardUID := r.Header.Get("X-Dashboard-Uid"); dashboardUID != "" {
@@ -222,6 +221,7 @@ func (f *Handler) reportFailedQuery(r *http.Request, queryString url.Values, err
222221
"error", err.Error(),
223222
"grafana_dashboard_uid", grafanaDashboardUID,
224223
"grafana_panel_id", grafanaPanelID,
224+
"query_response_time", queryResponseTime.String(),
225225
}, formatQueryString(queryString)...)
226226

227227
level.Error(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)

internal/cortex/frontend/transport/utils/utils.go

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"net/url"
1111
"regexp"
1212
"strconv"
13+
"time"
1314

1415
lru "github.com/hashicorp/golang-lru"
1516
"github.com/prometheus/client_golang/prometheus"
@@ -22,10 +23,11 @@ var (
2223

2324
// FailedQueryCache Handler holds an instance of FailedQueryCache and calls its methods
2425
type FailedQueryCache struct {
25-
regex *regexp.Regexp
26-
errorExtract *regexp.Regexp
27-
lruCache *lru.Cache
28-
cachedHits *prometheus.CounterVec
26+
regex *regexp.Regexp
27+
errorExtract *regexp.Regexp
28+
lruCache *lru.Cache
29+
cachedHits prometheus.Counter
30+
cachedQueries prometheus.Gauge
2931
}
3032

3133
func NewFailedQueryCache(capacity int, reg prometheus.Registerer) (*FailedQueryCache, error) {
@@ -34,24 +36,32 @@ func NewFailedQueryCache(capacity int, reg prometheus.Registerer) (*FailedQueryC
3436
lruCache, err := lru.New(capacity)
3537
if err != nil {
3638
lruCache = nil
37-
err = fmt.Errorf("Failed to create lru cache: %s", err)
39+
err = fmt.Errorf("failed to create lru cache: %s", err)
3840
return nil, err
3941
}
40-
cachedHits := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
42+
cachedHits := promauto.With(reg).NewCounter(prometheus.CounterOpts{
43+
Namespace: "cortex",
4144
Name: "cached_failed_queries_count",
4245
Help: "Total number of queries that hit the failed query cache.",
43-
}, []string{})
46+
})
47+
cachedQueries := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
48+
Namespace: "cortex",
49+
Name: "failed_query_cache_size",
50+
Help: "How many queries are cached in the failed query cache.",
51+
})
52+
cachedQueries.Set(0)
4453

4554
return &FailedQueryCache{
46-
regex: regex,
47-
errorExtract: errorExtract,
48-
lruCache: lruCache,
49-
cachedHits: cachedHits,
55+
regex: regex,
56+
errorExtract: errorExtract,
57+
lruCache: lruCache,
58+
cachedHits: cachedHits,
59+
cachedQueries: cachedQueries,
5060
}, err
5161
}
5262

5363
// UpdateFailedQueryCache returns true if query is cached so that callsite can increase counter, returns message as a string for callsite to log outcome
54-
func (f *FailedQueryCache) updateFailedQueryCache(err error, queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *lru.Cache) (bool, string) {
64+
func (f *FailedQueryCache) updateFailedQueryCache(err error, queryExpressionNormalized string, queryExpressionRangeLength int) (bool, string) {
5565
// Extracting error code from error string.
5666
codeExtract := f.errorExtract.FindStringSubmatch(err.Error())
5767

@@ -75,25 +85,28 @@ func (f *FailedQueryCache) updateFailedQueryCache(err error, queryExpressionNorm
7585
message := createLogMessage("Query not cached due to non-cacheable error code", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
7686
return false, message
7787
}
88+
f.addCacheEntry(queryExpressionNormalized, queryExpressionRangeLength)
89+
message := createLogMessage("Cached a failed query", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
90+
return true, message
91+
}
7892

93+
func (f *FailedQueryCache) addCacheEntry(queryExpressionNormalized string, queryExpressionRangeLength int) {
7994
// Checks if queryExpression is already in cache, and updates time range length value to min of stored and new value.
80-
if contains, _ := lruCache.ContainsOrAdd(queryExpressionNormalized, queryExpressionRangeLength); contains {
81-
if oldValue, ok := lruCache.Get(queryExpressionNormalized); ok {
95+
if contains, _ := f.lruCache.ContainsOrAdd(queryExpressionNormalized, queryExpressionRangeLength); contains {
96+
if oldValue, ok := f.lruCache.Get(queryExpressionNormalized); ok {
8297
queryExpressionRangeLength = min(queryExpressionRangeLength, oldValue.(int))
8398
}
84-
lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength)
99+
f.lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength)
85100
}
86-
87-
message := createLogMessage("Cached a failed query", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
88-
return true, message
101+
f.cachedQueries.Set(float64(f.lruCache.Len()))
89102
}
90103

91104
// QueryHitCache checks if the lru cache is hit and returns whether to increment counter for cache hits along with appropriate message.
92-
func queryHitCache(queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *lru.Cache, cachedHits *prometheus.CounterVec) (bool, string) {
105+
func queryHitCache(queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *lru.Cache, cachedHits prometheus.Counter) (bool, string) {
93106
if value, ok := lruCache.Get(queryExpressionNormalized); ok && value.(int) <= queryExpressionRangeLength {
94107
cachedQueryRangeSeconds := value.(int)
95108
message := createLogMessage("Retrieved query from cache", queryExpressionNormalized, cachedQueryRangeSeconds, queryExpressionRangeLength, nil)
96-
cachedHits.WithLabelValues().Inc()
109+
cachedHits.Inc()
97110
return true, message
98111
}
99112
return false, ""
@@ -129,27 +142,43 @@ func (f *FailedQueryCache) normalizeQueryString(query url.Values) string {
129142
func createLogMessage(message string, queryExpressionNormalized string, cachedQueryRangeSeconds int, queryExpressionRangeLength int, err error) string {
130143
if err == nil {
131144
return fmt.Sprintf(
132-
`%s: %s, %s: %s, %s: %d, %s: %d`, "msg", message,
145+
`%s, %s: %s, %s: %d, %s: %d`, message,
133146
"cached_query", queryExpressionNormalized,
134147
"cached_range_seconds", cachedQueryRangeSeconds,
135148
"query_range_seconds", queryExpressionRangeLength)
136149
}
137150
return fmt.Sprintf(
138-
`%s: %s, %s: %s, %s: %d, %s: %s`, "msg", message,
151+
`%s, %s: %s, %s: %d, %s: %s`, message,
139152
"cached_query", queryExpressionNormalized,
140153
"query_range_seconds", queryExpressionRangeLength,
141154
"cached_error", err)
142155
}
143156

144-
func (f *FailedQueryCache) UpdateFailedQueryCache(err error, query url.Values) (bool, string) {
157+
func (f *FailedQueryCache) UpdateFailedQueryCache(err error, query url.Values, queryResponseTime time.Duration) (bool, string) {
145158
queryExpressionNormalized := f.normalizeQueryString(query)
146159
queryExpressionRangeLength := getQueryRangeSeconds(query)
147-
success, message := f.updateFailedQueryCache(err, queryExpressionNormalized, queryExpressionRangeLength, f.lruCache)
160+
// TODO(hc.zhu): add a flag for the threshold
161+
// The current gateway timeout is 5 minutes, so we cache the failed query running longer than 5 minutes - 10 seconds.
162+
if queryResponseTime > time.Second * (60 * 5 - 10) {
163+
// Cache long running queries regardless of the error code. The most common case is "context canceled".
164+
f.addCacheEntry(queryExpressionNormalized, queryExpressionRangeLength)
165+
message := createLogMessage("Cached a failed long running query", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
166+
return true, message
167+
}
168+
if queryExpressionNormalized == "" {
169+
// Other APIs don't have "query" parameter e.g., /api/v1/series, /api/v1/labels
170+
return false, "Query parameter is empty"
171+
}
172+
success, message := f.updateFailedQueryCache(err, queryExpressionNormalized, queryExpressionRangeLength)
148173
return success, message
149174
}
150175

151176
func (f *FailedQueryCache) QueryHitCache(query url.Values) (bool, string) {
152177
queryExpressionNormalized := f.normalizeQueryString(query)
178+
if queryExpressionNormalized == "" {
179+
// Other APIs don't have "query" parameter e.g., /api/v1/series, /api/v1/labels
180+
return false, "Query parameter is empty"
181+
}
153182
queryExpressionRangeLength := getQueryRangeSeconds(query)
154183
cached, message := queryHitCache(queryExpressionNormalized, queryExpressionRangeLength, f.lruCache, f.cachedHits)
155184
return cached, message

0 commit comments

Comments
 (0)