Skip to content

Commit 95e4da3

Browse files
Moving code for cache functions to utils file (#48)
2 parents 994422f + 49c736b commit 95e4da3

File tree

3 files changed

+372
-124
lines changed

3 files changed

+372
-124
lines changed

internal/cortex/frontend/transport/handler.go

Lines changed: 30 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,16 @@ import (
1111
"io"
1212
"net/http"
1313
"net/url"
14-
"regexp"
1514
"strconv"
1615
"strings"
1716
"syscall"
1817
"time"
1918

2019
"github.com/go-kit/log"
2120
"github.com/go-kit/log/level"
22-
"github.com/hashicorp/golang-lru"
2321
"github.com/prometheus/client_golang/prometheus"
2422
"github.com/prometheus/client_golang/prometheus/promauto"
23+
"github.com/thanos-io/thanos/internal/cortex/frontend/transport/utils"
2524
querier_stats "github.com/thanos-io/thanos/internal/cortex/querier/stats"
2625
"github.com/thanos-io/thanos/internal/cortex/tenant"
2726
"github.com/thanos-io/thanos/internal/cortex/util"
@@ -40,7 +39,6 @@ var (
4039
errCanceled = httpgrpc.Errorf(StatusClientClosedRequest, context.Canceled.Error())
4140
errDeadlineExceeded = httpgrpc.Errorf(http.StatusGatewayTimeout, context.DeadlineExceeded.Error())
4241
errRequestEntityTooLarge = httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "http: request body too large")
43-
cacheableResponseCodes = []int{http.StatusRequestTimeout, http.StatusGatewayTimeout, http.StatusBadRequest}
4442
)
4543

4644
// HandlerConfig Config for a Handler.
@@ -55,12 +53,10 @@ type HandlerConfig struct {
5553
// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
5654
// but all other logic is inside the RoundTripper.
5755
type Handler struct {
58-
cfg HandlerConfig
59-
log log.Logger
60-
roundTripper http.RoundTripper
61-
lruCache *lru.Cache
62-
regex *regexp.Regexp
63-
errorExtract *regexp.Regexp
56+
cfg HandlerConfig
57+
log log.Logger
58+
roundTripper http.RoundTripper
59+
failedQueryCache *utils.FailedQueryCache
6460

6561
// Metrics.
6662
querySeconds *prometheus.CounterVec
@@ -72,26 +68,18 @@ type Handler struct {
7268

7369
// NewHandler creates a new frontend handler.
7470
func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) http.Handler {
75-
var (
76-
LruCache *lru.Cache
77-
err error
78-
)
79-
80-
if cfg.FailedQueryCacheCapacity > 0 {
81-
LruCache, err = lru.New(cfg.FailedQueryCacheCapacity)
82-
if err != nil {
83-
LruCache = nil
84-
level.Warn(log).Log("msg", "Failed to create LruCache", "error", err)
85-
}
86-
}
87-
8871
h := &Handler{
8972
cfg: cfg,
9073
log: log,
9174
roundTripper: roundTripper,
92-
lruCache: LruCache,
93-
regex: regexp.MustCompile(`[\s\n\t]+`),
94-
errorExtract: regexp.MustCompile(`Code\((\d+)\)`),
75+
}
76+
77+
if cfg.FailedQueryCacheCapacity > 0 {
78+
FailedQueryCache, errQueryCache := utils.NewFailedQueryCache(cfg.FailedQueryCacheCapacity)
79+
if errQueryCache != nil {
80+
level.Warn(log).Log(errQueryCache.Error())
81+
}
82+
h.failedQueryCache = FailedQueryCache
9583
}
9684

9785
if cfg.QueryStatsEnabled {
@@ -130,10 +118,9 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
130118

131119
func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
132120
var (
133-
stats *querier_stats.Stats
134-
queryString url.Values
135-
queryExpressionNormalized string
136-
queryExpressionRangeLength int
121+
stats *querier_stats.Stats
122+
queryString url.Values
123+
urlQuery url.Values
137124
)
138125

139126
// Initialise the stats in the context and make sure it's propagated
@@ -153,22 +140,14 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
153140
r.Body = http.MaxBytesReader(w, r.Body, f.cfg.MaxBodySize)
154141
r.Body = io.NopCloser(io.TeeReader(r.Body, &buf))
155142

156-
// Check if caching is enabled.
157-
if f.lruCache != nil {
158-
// Store query expression.
159-
queryExpressionNormalized = f.regex.ReplaceAllString(r.URL.Query().Get("query"), " ")
160-
161-
// Store query time range length.
162-
queryExpressionRangeLength = getQueryRangeSeconds(r)
143+
urlQuery = r.URL.Query()
163144

164-
// Check if query in cache and whether value exceeds time range length.
165-
if value, ok := f.lruCache.Get(queryExpressionNormalized); ok && value.(int) >= queryExpressionRangeLength {
145+
// Check if query is cached
146+
if f.failedQueryCache != nil {
147+
cached, message := f.failedQueryCache.QueryHitCache(urlQuery)
148+
if cached {
166149
w.WriteHeader(http.StatusForbidden)
167-
level.Info(util_log.WithContext(r.Context(), f.log)).Log(
168-
"msg", "Retrieved query from cache",
169-
"normalized_query", queryExpressionNormalized,
170-
"range_seconds", queryExpressionRangeLength,
171-
)
150+
level.Info(util_log.WithContext(r.Context(), f.log)).Log(message)
172151
f.cachedHits.Inc()
173152
return
174153
}
@@ -182,9 +161,14 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
182161
writeError(w, err)
183162
queryString = f.parseRequestQueryString(r, buf)
184163

185-
// Check if caching is enabled.
186-
if f.lruCache != nil {
187-
f.updateFailedQueryCache(err, queryExpressionNormalized, queryExpressionRangeLength, r)
164+
// Update cache for failed queries.
165+
if f.failedQueryCache != nil {
166+
success, message := f.failedQueryCache.UpdateFailedQueryCache(err, urlQuery)
167+
if success {
168+
level.Info(util_log.WithContext(r.Context(), f.log)).Log(message)
169+
} else {
170+
level.Debug(util_log.WithContext(r.Context(), f.log)).Log(message)
171+
}
188172
}
189173

190174
if f.cfg.LogFailedQueries {
@@ -223,84 +207,6 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
223207
}
224208
}
225209

226-
func (f *Handler) updateFailedQueryCache(err error, queryExpressionNormalized string, queryExpressionRangeLength int, r *http.Request) {
227-
// Extracting error code from error string.
228-
codeExtract := f.errorExtract.FindStringSubmatch(err.Error())
229-
230-
// Checking if error code extracted successfully.
231-
if codeExtract == nil || len(codeExtract) < 2 {
232-
level.Error(util_log.WithContext(r.Context(), f.log)).Log(
233-
"msg", "Error string regex conversion error",
234-
"normalized_query", queryExpressionNormalized,
235-
"range_seconds", queryExpressionRangeLength,
236-
"error", err)
237-
return
238-
}
239-
240-
// Converting error code to int.
241-
errCode, strConvError := strconv.Atoi(codeExtract[1])
242-
243-
// Checking if error code extracted properly from string.
244-
if strConvError != nil {
245-
level.Error(util_log.WithContext(r.Context(), f.log)).Log(
246-
"msg", "String to int conversion error",
247-
"normalized_query", queryExpressionNormalized,
248-
"range_seconds", queryExpressionRangeLength,
249-
"error", err)
250-
return
251-
}
252-
253-
// If error should be cached, store it in cache.
254-
if !isCacheableError(errCode) {
255-
level.Debug(util_log.WithContext(r.Context(), f.log)).Log(
256-
"msg", "Query not cached due to non-cacheable error code",
257-
"normalized_query", queryExpressionNormalized,
258-
"range_seconds", queryExpressionRangeLength,
259-
"error", err,
260-
)
261-
return
262-
}
263-
264-
// Checks if queryExpression is already in cache, and updates time range length value to min of stored and new value.
265-
if contains, _ := f.lruCache.ContainsOrAdd(queryExpressionNormalized, queryExpressionRangeLength); contains {
266-
if oldValue, ok := f.lruCache.Get(queryExpressionNormalized); ok {
267-
queryExpressionRangeLength = min(queryExpressionRangeLength, oldValue.(int))
268-
}
269-
f.lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength)
270-
}
271-
272-
level.Debug(util_log.WithContext(r.Context(), f.log)).Log(
273-
"msg", "Cached a failed query",
274-
"normalized_query", queryExpressionNormalized,
275-
"range_seconds", queryExpressionRangeLength,
276-
"error", err,
277-
)
278-
279-
}
280-
281-
// isCacheableError Returns true if response code is in pre-defined cacheable errors list, else returns false.
282-
func isCacheableError(statusCode int) bool {
283-
for _, errStatusCode := range cacheableResponseCodes {
284-
if errStatusCode == statusCode {
285-
return true
286-
}
287-
}
288-
return false
289-
}
290-
291-
// Time range length for queries, if either of "start" or "end" are not present, return 0.
292-
func getQueryRangeSeconds(r *http.Request) int {
293-
start, err := strconv.Atoi(r.URL.Query().Get("start"))
294-
if err != nil {
295-
return 0
296-
}
297-
end, err := strconv.Atoi(r.URL.Query().Get("end"))
298-
if err != nil {
299-
return 0
300-
}
301-
return end - start
302-
}
303-
304210
func (f *Handler) reportFailedQuery(r *http.Request, queryString url.Values, err error) {
305211
// NOTE(GiedriusS): see https://github.com/grafana/grafana/pull/60301 for more info.
306212
grafanaDashboardUID := "-"
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Copyright (c) The Cortex Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
// Package utils Monitoring platform team helper resources for frontend
5+
package utils
6+
7+
import (
8+
"fmt"
9+
"net/http"
10+
"net/url"
11+
"regexp"
12+
"strconv"
13+
14+
lru "github.com/hashicorp/golang-lru"
15+
)
16+
17+
var (
18+
cacheableResponseCodes = []int{http.StatusRequestTimeout, http.StatusGatewayTimeout, http.StatusBadRequest}
19+
)
20+
21+
// FailedQueryCache Handler holds an instance of FailedQueryCache and calls its methods
22+
type FailedQueryCache struct {
23+
regex *regexp.Regexp
24+
errorExtract *regexp.Regexp
25+
lruCache *lru.Cache
26+
}
27+
28+
func NewFailedQueryCache(capacity int) (*FailedQueryCache, error) {
29+
regex := regexp.MustCompile(`[\s\n\t]+`)
30+
errorExtract := regexp.MustCompile(`Code\((\d+)\)`)
31+
lruCache, err := lru.New(capacity)
32+
if err != nil {
33+
lruCache = nil
34+
err = fmt.Errorf("Failed to create lru cache: %s", err)
35+
return nil, err
36+
}
37+
return &FailedQueryCache{
38+
regex: regex,
39+
errorExtract: errorExtract,
40+
lruCache: lruCache}, err
41+
}
42+
43+
// UpdateFailedQueryCache returns true if query is cached so that callsite can increase counter, returns message as a string for callsite to log outcome
44+
func (f *FailedQueryCache) updateFailedQueryCache(err error, queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *lru.Cache) (bool, string) {
45+
// Extracting error code from error string.
46+
codeExtract := f.errorExtract.FindStringSubmatch(err.Error())
47+
48+
// Checking if error code extracted successfully.
49+
if codeExtract == nil || len(codeExtract) < 2 {
50+
message := createLogMessage("String to regex conversion error", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
51+
return false, message
52+
}
53+
54+
// Converting error code to int.
55+
errCode, strConvError := strconv.Atoi(codeExtract[1])
56+
57+
// Checking if error code extracted properly from string.
58+
if strConvError != nil {
59+
message := createLogMessage("String to int conversion error", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
60+
return false, message
61+
}
62+
63+
// If error should be cached, store it in cache.
64+
if !isCacheableError(errCode) {
65+
message := createLogMessage("Query not cached due to non-cacheable error code", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
66+
return false, message
67+
}
68+
69+
// Checks if queryExpression is already in cache, and updates time range length value to min of stored and new value.
70+
if contains, _ := lruCache.ContainsOrAdd(queryExpressionNormalized, queryExpressionRangeLength); contains {
71+
if oldValue, ok := lruCache.Get(queryExpressionNormalized); ok {
72+
queryExpressionRangeLength = min(queryExpressionRangeLength, oldValue.(int))
73+
}
74+
lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength)
75+
}
76+
77+
message := createLogMessage("Cached a failed query", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
78+
return true, message
79+
}
80+
81+
// QueryHitCache checks if the lru cache is hit and returns whether to increment counter for cache hits along with appropriate message.
82+
func queryHitCache(queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *lru.Cache) (bool, string) {
83+
if value, ok := lruCache.Get(queryExpressionNormalized); ok && value.(int) <= queryExpressionRangeLength {
84+
cachedQueryRangeSeconds := value.(int)
85+
message := createLogMessage("Retrieved query from cache", queryExpressionNormalized, cachedQueryRangeSeconds, queryExpressionRangeLength, nil)
86+
return true, message
87+
}
88+
return false, ""
89+
}
90+
91+
// isCacheableError Returns true if response code is in pre-defined cacheable errors list, else returns false.
92+
func isCacheableError(statusCode int) bool {
93+
for _, errStatusCode := range cacheableResponseCodes {
94+
if errStatusCode == statusCode {
95+
return true
96+
}
97+
}
98+
return false
99+
}
100+
101+
// GetQueryRangeSeconds Time range length for queries, if either of "start" or "end" are not present, return 0.
102+
func getQueryRangeSeconds(query url.Values) int {
103+
start, err := strconv.Atoi(query.Get("start"))
104+
if err != nil {
105+
return 0
106+
}
107+
end, err := strconv.Atoi(query.Get("end"))
108+
if err != nil {
109+
return 0
110+
}
111+
return end - start
112+
}
113+
114+
func (f *FailedQueryCache) normalizeQueryString(query url.Values) string {
115+
return f.regex.ReplaceAllString(query.Get("query"), " ")
116+
}
117+
118+
func createLogMessage(message string, queryExpressionNormalized string, cachedQueryRangeSeconds int, queryExpressionRangeLength int, err error) string {
119+
if err == nil {
120+
return fmt.Sprintf(
121+
`%s: %s, %s: %s, %s: %d, %s: %d`, "msg", message,
122+
"cached_query", queryExpressionNormalized,
123+
"cached_range_seconds", cachedQueryRangeSeconds,
124+
"query_range_seconds", queryExpressionRangeLength)
125+
}
126+
return fmt.Sprintf(
127+
`%s: %s, %s: %s, %s: %d, %s: %s`, "msg", message,
128+
"cached_query", queryExpressionNormalized,
129+
"query_range_seconds", queryExpressionRangeLength,
130+
"cached_error", err)
131+
}
132+
133+
func (f *FailedQueryCache) UpdateFailedQueryCache(err error, query url.Values) (bool, string) {
134+
queryExpressionNormalized := f.normalizeQueryString(query)
135+
queryExpressionRangeLength := getQueryRangeSeconds(query)
136+
success, message := f.updateFailedQueryCache(err, queryExpressionNormalized, queryExpressionRangeLength, f.lruCache)
137+
return success, message
138+
}
139+
140+
func (f *FailedQueryCache) QueryHitCache(query url.Values) (bool, string) {
141+
queryExpressionNormalized := f.normalizeQueryString(query)
142+
queryExpressionRangeLength := getQueryRangeSeconds(query)
143+
cached, message := queryHitCache(queryExpressionNormalized, queryExpressionRangeLength, f.lruCache)
144+
return cached, message
145+
}

0 commit comments

Comments
 (0)