Skip to content

Commit 120d328

Browse files
Initial implementation of blocking expensive ad-hoc queries in the frontend (#40)
2 parents 94eb766 + aff27a7 commit 120d328

File tree

3 files changed

+153
-12
lines changed

3 files changed

+153
-12
lines changed

cmd/thanos/query_frontend.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"net/http"
99
"time"
1010

11+
"gopkg.in/yaml.v2"
12+
1113
extflag "github.com/efficientgo/tools/extkingpin"
1214
"github.com/go-kit/log"
1315
"github.com/go-kit/log/level"
@@ -19,7 +21,6 @@ import (
1921
"github.com/prometheus/prometheus/promql/parser"
2022
"github.com/thanos-io/promql-engine/execution/parse"
2123
"github.com/weaveworks/common/user"
22-
"gopkg.in/yaml.v2"
2324

2425
cortexfrontend "github.com/thanos-io/thanos/internal/cortex/frontend"
2526
"github.com/thanos-io/thanos/internal/cortex/frontend/transport"
@@ -148,6 +149,9 @@ func registerQueryFrontend(app *extkingpin.App) {
148149

149150
cmd.Flag("query-frontend.log-failed-queries", "Log failed queries due to any reason").Default("true").BoolVar(&cfg.CortexHandlerConfig.LogFailedQueries)
150151

152+
cmd.Flag("failed-query-cache-capacity", "Capacity of cache for failed queries. 0 means this feature is disabled.").
153+
Default("0").IntVar(&cfg.CortexHandlerConfig.FailedQueryCacheCapacity)
154+
151155
cmd.Flag("query-frontend.org-id-header", "Deprecation Warning - This flag will be soon deprecated in favor of query-frontend.tenant-header"+
152156
" and both flags cannot be used at the same time. "+
153157
"Request header names used to identify the source of slow queries (repeated flag). "+

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ require (
116116

117117
require (
118118
github.com/cortexproject/promqlsmith v0.0.0-20240326071418-c2a9ca1e89f5
119+
github.com/hashicorp/golang-lru v0.6.0
119120
github.com/hashicorp/golang-lru/v2 v2.0.7
120121
github.com/mitchellh/go-ps v1.0.0
121122
github.com/onsi/gomega v1.29.0

internal/cortex/frontend/transport/handler.go

Lines changed: 147 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,23 @@ import (
1111
"io"
1212
"net/http"
1313
"net/url"
14+
"regexp"
1415
"strconv"
1516
"strings"
1617
"syscall"
1718
"time"
1819

1920
"github.com/go-kit/log"
2021
"github.com/go-kit/log/level"
22+
"github.com/hashicorp/golang-lru"
2123
"github.com/prometheus/client_golang/prometheus"
2224
"github.com/prometheus/client_golang/prometheus/promauto"
23-
"github.com/weaveworks/common/httpgrpc"
24-
"github.com/weaveworks/common/httpgrpc/server"
25-
2625
querier_stats "github.com/thanos-io/thanos/internal/cortex/querier/stats"
2726
"github.com/thanos-io/thanos/internal/cortex/tenant"
2827
"github.com/thanos-io/thanos/internal/cortex/util"
2928
util_log "github.com/thanos-io/thanos/internal/cortex/util/log"
29+
"github.com/weaveworks/common/httpgrpc"
30+
"github.com/weaveworks/common/httpgrpc/server"
3031
)
3132

3233
const (
@@ -39,14 +40,16 @@ var (
3940
errCanceled = httpgrpc.Errorf(StatusClientClosedRequest, context.Canceled.Error())
4041
errDeadlineExceeded = httpgrpc.Errorf(http.StatusGatewayTimeout, context.DeadlineExceeded.Error())
4142
errRequestEntityTooLarge = httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "http: request body too large")
43+
cacheableResponseCodes = []int{http.StatusRequestTimeout, http.StatusGatewayTimeout, http.StatusBadRequest}
4244
)
4345

44-
// Config for a Handler.
46+
// HandlerConfig Config for a Handler.
4547
type HandlerConfig struct {
46-
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
47-
MaxBodySize int64 `yaml:"max_body_size"`
48-
QueryStatsEnabled bool `yaml:"query_stats_enabled"`
49-
LogFailedQueries bool `yaml:"log_failed_queries"`
48+
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
49+
MaxBodySize int64 `yaml:"max_body_size"`
50+
QueryStatsEnabled bool `yaml:"query_stats_enabled"`
51+
LogFailedQueries bool `yaml:"log_failed_queries"`
52+
FailedQueryCacheCapacity int `yaml:"failed_query_cache_capacity"`
5053
}
5154

5255
// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
@@ -55,20 +58,40 @@ type Handler struct {
5558
cfg HandlerConfig
5659
log log.Logger
5760
roundTripper http.RoundTripper
61+
lruCache *lru.Cache
62+
regex *regexp.Regexp
63+
errorExtract *regexp.Regexp
5864

5965
// Metrics.
6066
querySeconds *prometheus.CounterVec
6167
querySeries *prometheus.CounterVec
6268
queryBytes *prometheus.CounterVec
69+
cachedHits prometheus.Counter
6370
activeUsers *util.ActiveUsersCleanupService
6471
}
6572

6673
// NewHandler creates a new frontend handler.
6774
func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) http.Handler {
75+
var (
76+
LruCache *lru.Cache
77+
err error
78+
)
79+
80+
if cfg.FailedQueryCacheCapacity > 0 {
81+
LruCache, err = lru.New(cfg.FailedQueryCacheCapacity)
82+
if err != nil {
83+
LruCache = nil
84+
level.Warn(log).Log("msg", "Failed to create LruCache", "error", err)
85+
}
86+
}
87+
6888
h := &Handler{
6989
cfg: cfg,
7090
log: log,
7191
roundTripper: roundTripper,
92+
lruCache: LruCache,
93+
regex: regexp.MustCompile(`[\s\n\t]+`),
94+
errorExtract: regexp.MustCompile(`Code\((\d+)\)`),
7295
}
7396

7497
if cfg.QueryStatsEnabled {
@@ -92,17 +115,25 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
92115
h.querySeries.DeleteLabelValues(user)
93116
h.queryBytes.DeleteLabelValues(user)
94117
})
118+
95119
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
96120
_ = h.activeUsers.StartAsync(context.Background())
97121
}
98122

123+
h.cachedHits = promauto.With(reg).NewCounter(prometheus.CounterOpts{
124+
Name: "cached_failed_queries_count",
125+
Help: "Total number of queries that hit the failed query cache.",
126+
})
127+
99128
return h
100129
}
101130

102131
func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
103132
var (
104-
stats *querier_stats.Stats
105-
queryString url.Values
133+
stats *querier_stats.Stats
134+
queryString url.Values
135+
queryExpressionNormalized string
136+
queryExpressionRangeLength int
106137
)
107138

108139
// Initialise the stats in the context and make sure it's propagated
@@ -122,14 +153,41 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
122153
r.Body = http.MaxBytesReader(w, r.Body, f.cfg.MaxBodySize)
123154
r.Body = io.NopCloser(io.TeeReader(r.Body, &buf))
124155

156+
// Check if caching is enabled.
157+
if f.lruCache != nil {
158+
// Store query expression.
159+
queryExpressionNormalized = f.regex.ReplaceAllString(r.URL.Query().Get("query"), " ")
160+
161+
// Store query time range length.
162+
queryExpressionRangeLength = getQueryRangeSeconds(r)
163+
164+
// Check if query in cache and whether value exceeds time range length.
165+
if value, ok := f.lruCache.Get(queryExpressionNormalized); ok && value.(int) >= queryExpressionRangeLength {
166+
w.WriteHeader(http.StatusForbidden)
167+
level.Info(util_log.WithContext(r.Context(), f.log)).Log(
168+
"msg", "Retrieved query from cache",
169+
"normalized_query", queryExpressionNormalized,
170+
"range_seconds", queryExpressionRangeLength,
171+
)
172+
f.cachedHits.Inc()
173+
return
174+
}
175+
}
176+
125177
startTime := time.Now()
126178
resp, err := f.roundTripper.RoundTrip(r)
127179
queryResponseTime := time.Since(startTime)
128180

129181
if err != nil {
130182
writeError(w, err)
183+
queryString = f.parseRequestQueryString(r, buf)
184+
185+
// Check if caching is enabled.
186+
if f.lruCache != nil {
187+
f.updateFailedQueryCache(err, queryExpressionNormalized, queryExpressionRangeLength, r)
188+
}
189+
131190
if f.cfg.LogFailedQueries {
132-
queryString = f.parseRequestQueryString(r, buf)
133191
f.reportFailedQuery(r, queryString, err)
134192
}
135193
return
@@ -165,6 +223,84 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
165223
}
166224
}
167225

226+
func (f *Handler) updateFailedQueryCache(err error, queryExpressionNormalized string, queryExpressionRangeLength int, r *http.Request) {
227+
// Extracting error code from error string.
228+
codeExtract := f.errorExtract.FindStringSubmatch(err.Error())
229+
230+
// Checking if error code extracted successfully.
231+
if codeExtract == nil || len(codeExtract) < 2 {
232+
level.Error(util_log.WithContext(r.Context(), f.log)).Log(
233+
"msg", "Error string regex conversion error",
234+
"normalized_query", queryExpressionNormalized,
235+
"range_seconds", queryExpressionRangeLength,
236+
"error", err)
237+
return
238+
}
239+
240+
// Converting error code to int.
241+
errCode, strConvError := strconv.Atoi(codeExtract[1])
242+
243+
// Checking if error code extracted properly from string.
244+
if strConvError != nil {
245+
level.Error(util_log.WithContext(r.Context(), f.log)).Log(
246+
"msg", "String to int conversion error",
247+
"normalized_query", queryExpressionNormalized,
248+
"range_seconds", queryExpressionRangeLength,
249+
"error", err)
250+
return
251+
}
252+
253+
// If error should be cached, store it in cache.
254+
if !isCacheableError(errCode) {
255+
level.Debug(util_log.WithContext(r.Context(), f.log)).Log(
256+
"msg", "Query not cached due to non-cacheable error code",
257+
"normalized_query", queryExpressionNormalized,
258+
"range_seconds", queryExpressionRangeLength,
259+
"error", err,
260+
)
261+
return
262+
}
263+
264+
// Checks if queryExpression is already in cache, and updates time range length value to min of stored and new value.
265+
if contains, _ := f.lruCache.ContainsOrAdd(queryExpressionNormalized, queryExpressionRangeLength); contains {
266+
if oldValue, ok := f.lruCache.Get(queryExpressionNormalized); ok {
267+
queryExpressionRangeLength = min(queryExpressionRangeLength, oldValue.(int))
268+
}
269+
f.lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength)
270+
}
271+
272+
level.Debug(util_log.WithContext(r.Context(), f.log)).Log(
273+
"msg", "Cached a failed query",
274+
"normalized_query", queryExpressionNormalized,
275+
"range_seconds", queryExpressionRangeLength,
276+
"error", err,
277+
)
278+
279+
}
280+
281+
// isCacheableError Returns true if response code is in pre-defined cacheable errors list, else returns false.
282+
func isCacheableError(statusCode int) bool {
283+
for _, errStatusCode := range cacheableResponseCodes {
284+
if errStatusCode == statusCode {
285+
return true
286+
}
287+
}
288+
return false
289+
}
290+
291+
// Time range length for queries, if either of "start" or "end" are not present, return 0.
292+
func getQueryRangeSeconds(r *http.Request) int {
293+
start, err := strconv.Atoi(r.URL.Query().Get("start"))
294+
if err != nil {
295+
return 0
296+
}
297+
end, err := strconv.Atoi(r.URL.Query().Get("end"))
298+
if err != nil {
299+
return 0
300+
}
301+
return end - start
302+
}
303+
168304
func (f *Handler) reportFailedQuery(r *http.Request, queryString url.Values, err error) {
169305
// NOTE(GiedriusS): see https://github.com/grafana/grafana/pull/60301 for more info.
170306
grafanaDashboardUID := "-"

0 commit comments

Comments
 (0)