Skip to content

Commit cc22ed6

Browse files
merge db_main into release (#70)
2 parents babcbee + 936f992 commit cc22ed6

File tree

12 files changed

+567
-97
lines changed

12 files changed

+567
-97
lines changed

cmd/thanos/query.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ func registerQuery(app *extkingpin.App) {
129129

130130
enableDedupMerge := cmd.Flag("query.dedup-merge", "Enable deduplication merge of multiple time series with the same labels.").
131131
Default("false").Bool()
132+
enableQuorumChunkDedup := cmd.Flag("query.quorum-chunk-dedup", "Enable quorum-based deduplication for chuncks from replicas.").
133+
Default("false").Bool()
132134

133135
instantDefaultMaxSourceResolution := extkingpin.ModelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())
134136

@@ -378,6 +380,7 @@ func registerQuery(app *extkingpin.App) {
378380
*tenantLabel,
379381
*enableGroupReplicaPartialStrategy,
380382
*enableDedupMerge,
383+
*enableQuorumChunkDedup,
381384
)
382385
})
383386
}
@@ -462,6 +465,7 @@ func runQuery(
462465
tenantLabel string,
463466
groupReplicaPartialResponseStrategy bool,
464467
enableDedupMerge bool,
468+
enableQuorumChunkDedup bool,
465469
) error {
466470
if alertQueryURL == "" {
467471
lastColon := strings.LastIndex(httpBindAddr, ":")
@@ -536,6 +540,7 @@ func runQuery(
536540
options := []store.ProxyStoreOption{
537541
store.WithTSDBSelector(tsdbSelector),
538542
store.WithProxyStoreDebugLogging(debugLogging),
543+
store.WithQuorumChunkDedup(enableQuorumChunkDedup),
539544
}
540545

541546
var (

cmd/thanos/query_frontend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ func runQueryFrontend(
326326
roundTripper = tripperWare(roundTripper)
327327

328328
// Create the query frontend transport.
329-
handler := transport.NewHandler(*cfg.CortexHandlerConfig, roundTripper, logger, nil)
329+
handler := transport.NewHandler(*cfg.CortexHandlerConfig, roundTripper, logger, reg)
330330
if cfg.CompressResponses {
331331
handler = gzhttp.GzipHandler(handler)
332332
}

cmd/thanos/receive.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ func runReceive(
286286

287287
level.Debug(logger).Log("msg", "setting up TSDB")
288288
{
289-
if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm)); err != nil {
289+
if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm), conf.tsdbDisableFlushOnShutdown); err != nil {
290290
return err
291291
}
292292
}
@@ -578,6 +578,7 @@ func startTSDBAndUpload(g *run.Group,
578578
statusProber prober.Probe,
579579
bkt objstore.Bucket,
580580
hashringAlgorithm receive.HashringAlgorithm,
581+
disableFlushOnShutDown bool,
581582
) error {
582583

583584
log.With(logger, "component", "storage")
@@ -603,10 +604,12 @@ func startTSDBAndUpload(g *run.Group,
603604
// Before quitting, ensure the WAL is flushed and the DBs are closed.
604605
defer func() {
605606
level.Info(logger).Log("msg", "shutting down storage")
606-
if err := dbs.Flush(); err != nil {
607-
level.Error(logger).Log("err", err, "msg", "failed to flush storage")
608-
} else {
609-
level.Info(logger).Log("msg", "storage is flushed successfully")
607+
if !disableFlushOnShutDown {
608+
if err := dbs.Flush(); err != nil {
609+
level.Error(logger).Log("err", err, "msg", "failed to flush storage")
610+
} else {
611+
level.Info(logger).Log("msg", "storage is flushed successfully")
612+
}
610613
}
611614
if err := dbs.Close(); err != nil {
612615
level.Error(logger).Log("err", err, "msg", "failed to close storage")
@@ -822,6 +825,7 @@ type receiveConfig struct {
822825
tsdbMaxBytes units.Base2Bytes
823826
tsdbWriteQueueSize int64
824827
tsdbMemorySnapshotOnShutdown bool
828+
tsdbDisableFlushOnShutdown bool
825829
tsdbEnableNativeHistograms bool
826830

827831
walCompression bool
@@ -958,6 +962,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
958962
"[EXPERIMENTAL] Enables feature to snapshot in-memory chunks on shutdown for faster restarts.").
959963
Default("false").Hidden().BoolVar(&rc.tsdbMemorySnapshotOnShutdown)
960964

965+
cmd.Flag("tsdb.disable-flush-on-shutdown",
966+
"[EXPERIMENTAL] disable flush on receive shutdown for rebuilding mempostings on restart.").
967+
Default("false").Hidden().BoolVar(&rc.tsdbDisableFlushOnShutdown)
968+
961969
cmd.Flag("tsdb.enable-native-histograms",
962970
"[EXPERIMENTAL] Enables the ingestion of native histograms.").
963971
Default("false").Hidden().BoolVar(&rc.tsdbEnableNativeHistograms)

internal/cortex/frontend/transport/handler.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
7474
}
7575

7676
if cfg.FailedQueryCacheCapacity > 0 {
77+
level.Info(log).Log("msg", "Creating failed query cache", "capacity", cfg.FailedQueryCacheCapacity)
7778
FailedQueryCache, errQueryCache := utils.NewFailedQueryCache(cfg.FailedQueryCacheCapacity, reg)
7879
if errQueryCache != nil {
7980
level.Warn(log).Log(errQueryCache.Error())
@@ -113,7 +114,9 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
113114
func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
114115
var (
115116
stats *querier_stats.Stats
117+
// For failed/slow query logging and query stats.
116118
queryString url.Values
119+
// For failed query cache
117120
urlQuery url.Values
118121
)
119122

@@ -134,14 +137,16 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
134137
r.Body = http.MaxBytesReader(w, r.Body, f.cfg.MaxBodySize)
135138
r.Body = io.NopCloser(io.TeeReader(r.Body, &buf))
136139

137-
urlQuery = r.URL.Query()
138-
139140
// Check if query is cached
140141
if f.failedQueryCache != nil {
142+
// NB: don't call f.parseRequestQueryString(r, buf) before f.roundTripper.RoundTrip(r)
143+
// because the call closes the buffer which has content if the request is a POST with
144+
// form data in body.
145+
urlQuery = r.URL.Query()
141146
cached, message := f.failedQueryCache.QueryHitCache(urlQuery)
142147
if cached {
143148
w.WriteHeader(http.StatusForbidden)
144-
level.Info(util_log.WithContext(r.Context(), f.log)).Log(message)
149+
level.Info(util_log.WithContext(r.Context(), f.log)).Log("msg", message)
145150
return
146151
}
147152
}
@@ -152,20 +157,20 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
152157

153158
if err != nil {
154159
writeError(w, err)
155-
queryString = f.parseRequestQueryString(r, buf)
156160

157161
// Update cache for failed queries.
158162
if f.failedQueryCache != nil {
159-
success, message := f.failedQueryCache.UpdateFailedQueryCache(err, urlQuery)
163+
success, message := f.failedQueryCache.UpdateFailedQueryCache(err, urlQuery, queryResponseTime)
160164
if success {
161-
level.Info(util_log.WithContext(r.Context(), f.log)).Log(message)
165+
level.Info(util_log.WithContext(r.Context(), f.log)).Log("msg", message)
162166
} else {
163-
level.Debug(util_log.WithContext(r.Context(), f.log)).Log(message)
167+
level.Debug(util_log.WithContext(r.Context(), f.log)).Log("msg", message)
164168
}
165169
}
166170

167171
if f.cfg.LogFailedQueries {
168-
f.reportFailedQuery(r, queryString, err)
172+
queryString = f.parseRequestQueryString(r, buf)
173+
f.reportFailedQuery(r, queryString, err, queryResponseTime)
169174
}
170175
return
171176
}
@@ -200,7 +205,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
200205
}
201206
}
202207

203-
func (f *Handler) reportFailedQuery(r *http.Request, queryString url.Values, err error) {
208+
func (f *Handler) reportFailedQuery(r *http.Request, queryString url.Values, err error, queryResponseTime time.Duration) {
204209
// NOTE(GiedriusS): see https://github.com/grafana/grafana/pull/60301 for more info.
205210
grafanaDashboardUID := "-"
206211
if dashboardUID := r.Header.Get("X-Dashboard-Uid"); dashboardUID != "" {
@@ -222,6 +227,7 @@ func (f *Handler) reportFailedQuery(r *http.Request, queryString url.Values, err
222227
"error", err.Error(),
223228
"grafana_dashboard_uid", grafanaDashboardUID,
224229
"grafana_panel_id", grafanaPanelID,
230+
"query_response_time", queryResponseTime.String(),
225231
}, formatQueryString(queryString)...)
226232

227233
level.Error(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)

internal/cortex/frontend/transport/utils/utils.go

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"net/url"
1111
"regexp"
1212
"strconv"
13+
"time"
1314

1415
lru "github.com/hashicorp/golang-lru"
1516
"github.com/prometheus/client_golang/prometheus"
@@ -22,10 +23,11 @@ var (
2223

2324
// FailedQueryCache Handler holds an instance of FailedQueryCache and calls its methods
2425
type FailedQueryCache struct {
25-
regex *regexp.Regexp
26-
errorExtract *regexp.Regexp
27-
lruCache *lru.Cache
28-
cachedHits *prometheus.CounterVec
26+
regex *regexp.Regexp
27+
errorExtract *regexp.Regexp
28+
lruCache *lru.Cache
29+
cachedHits prometheus.Counter
30+
cachedQueries prometheus.Gauge
2931
}
3032

3133
func NewFailedQueryCache(capacity int, reg prometheus.Registerer) (*FailedQueryCache, error) {
@@ -34,24 +36,32 @@ func NewFailedQueryCache(capacity int, reg prometheus.Registerer) (*FailedQueryC
3436
lruCache, err := lru.New(capacity)
3537
if err != nil {
3638
lruCache = nil
37-
err = fmt.Errorf("Failed to create lru cache: %s", err)
39+
err = fmt.Errorf("failed to create lru cache: %s", err)
3840
return nil, err
3941
}
40-
cachedHits := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
42+
cachedHits := promauto.With(reg).NewCounter(prometheus.CounterOpts{
43+
Namespace: "cortex",
4144
Name: "cached_failed_queries_count",
4245
Help: "Total number of queries that hit the failed query cache.",
43-
}, []string{})
46+
})
47+
cachedQueries := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
48+
Namespace: "cortex",
49+
Name: "failed_query_cache_size",
50+
Help: "How many queries are cached in the failed query cache.",
51+
})
52+
cachedQueries.Set(0)
4453

4554
return &FailedQueryCache{
46-
regex: regex,
47-
errorExtract: errorExtract,
48-
lruCache: lruCache,
49-
cachedHits: cachedHits,
55+
regex: regex,
56+
errorExtract: errorExtract,
57+
lruCache: lruCache,
58+
cachedHits: cachedHits,
59+
cachedQueries: cachedQueries,
5060
}, err
5161
}
5262

5363
// UpdateFailedQueryCache returns true if query is cached so that callsite can increase counter, returns message as a string for callsite to log outcome
54-
func (f *FailedQueryCache) updateFailedQueryCache(err error, queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *lru.Cache) (bool, string) {
64+
func (f *FailedQueryCache) updateFailedQueryCache(err error, queryExpressionNormalized string, queryExpressionRangeLength int) (bool, string) {
5565
// Extracting error code from error string.
5666
codeExtract := f.errorExtract.FindStringSubmatch(err.Error())
5767

@@ -75,25 +85,28 @@ func (f *FailedQueryCache) updateFailedQueryCache(err error, queryExpressionNorm
7585
message := createLogMessage("Query not cached due to non-cacheable error code", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
7686
return false, message
7787
}
88+
f.addCacheEntry(queryExpressionNormalized, queryExpressionRangeLength)
89+
message := createLogMessage("Cached a failed query", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
90+
return true, message
91+
}
7892

93+
func (f *FailedQueryCache) addCacheEntry(queryExpressionNormalized string, queryExpressionRangeLength int) {
7994
// Checks if queryExpression is already in cache, and updates time range length value to min of stored and new value.
80-
if contains, _ := lruCache.ContainsOrAdd(queryExpressionNormalized, queryExpressionRangeLength); contains {
81-
if oldValue, ok := lruCache.Get(queryExpressionNormalized); ok {
95+
if contains, _ := f.lruCache.ContainsOrAdd(queryExpressionNormalized, queryExpressionRangeLength); contains {
96+
if oldValue, ok := f.lruCache.Get(queryExpressionNormalized); ok {
8297
queryExpressionRangeLength = min(queryExpressionRangeLength, oldValue.(int))
8398
}
84-
lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength)
99+
f.lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength)
85100
}
86-
87-
message := createLogMessage("Cached a failed query", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
88-
return true, message
101+
f.cachedQueries.Set(float64(f.lruCache.Len()))
89102
}
90103

91104
// QueryHitCache checks if the lru cache is hit and returns whether to increment counter for cache hits along with appropriate message.
92-
func queryHitCache(queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *lru.Cache, cachedHits *prometheus.CounterVec) (bool, string) {
105+
func queryHitCache(queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *lru.Cache, cachedHits prometheus.Counter) (bool, string) {
93106
if value, ok := lruCache.Get(queryExpressionNormalized); ok && value.(int) <= queryExpressionRangeLength {
94107
cachedQueryRangeSeconds := value.(int)
95108
message := createLogMessage("Retrieved query from cache", queryExpressionNormalized, cachedQueryRangeSeconds, queryExpressionRangeLength, nil)
96-
cachedHits.WithLabelValues().Inc()
109+
cachedHits.Inc()
97110
return true, message
98111
}
99112
return false, ""
@@ -129,27 +142,43 @@ func (f *FailedQueryCache) normalizeQueryString(query url.Values) string {
129142
func createLogMessage(message string, queryExpressionNormalized string, cachedQueryRangeSeconds int, queryExpressionRangeLength int, err error) string {
130143
if err == nil {
131144
return fmt.Sprintf(
132-
`%s: %s, %s: %s, %s: %d, %s: %d`, "msg", message,
145+
`%s, %s: %s, %s: %d, %s: %d`, message,
133146
"cached_query", queryExpressionNormalized,
134147
"cached_range_seconds", cachedQueryRangeSeconds,
135148
"query_range_seconds", queryExpressionRangeLength)
136149
}
137150
return fmt.Sprintf(
138-
`%s: %s, %s: %s, %s: %d, %s: %s`, "msg", message,
151+
`%s, %s: %s, %s: %d, %s: %s`, message,
139152
"cached_query", queryExpressionNormalized,
140153
"query_range_seconds", queryExpressionRangeLength,
141154
"cached_error", err)
142155
}
143156

144-
func (f *FailedQueryCache) UpdateFailedQueryCache(err error, query url.Values) (bool, string) {
157+
func (f *FailedQueryCache) UpdateFailedQueryCache(err error, query url.Values, queryResponseTime time.Duration) (bool, string) {
145158
queryExpressionNormalized := f.normalizeQueryString(query)
146159
queryExpressionRangeLength := getQueryRangeSeconds(query)
147-
success, message := f.updateFailedQueryCache(err, queryExpressionNormalized, queryExpressionRangeLength, f.lruCache)
160+
// TODO(hc.zhu): add a flag for the threshold
161+
// The current gateway timeout is 5 minutes, so we cache the failed query running longer than 5 minutes - 10 seconds.
162+
if queryResponseTime > time.Second * (60 * 5 - 10) {
163+
// Cache long running queries regardless of the error code. The most common case is "context canceled".
164+
f.addCacheEntry(queryExpressionNormalized, queryExpressionRangeLength)
165+
message := createLogMessage("Cached a failed long running query", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
166+
return true, message
167+
}
168+
if queryExpressionNormalized == "" {
169+
// Other APIs don't have "query" parameter e.g., /api/v1/series, /api/v1/labels
170+
return false, "Query parameter is empty"
171+
}
172+
success, message := f.updateFailedQueryCache(err, queryExpressionNormalized, queryExpressionRangeLength)
148173
return success, message
149174
}
150175

151176
func (f *FailedQueryCache) QueryHitCache(query url.Values) (bool, string) {
152177
queryExpressionNormalized := f.normalizeQueryString(query)
178+
if queryExpressionNormalized == "" {
179+
// Other APIs don't have "query" parameter e.g., /api/v1/series, /api/v1/labels
180+
return false, "Query parameter is empty"
181+
}
153182
queryExpressionRangeLength := getQueryRangeSeconds(query)
154183
cached, message := queryHitCache(queryExpressionNormalized, queryExpressionRangeLength, f.lruCache, f.cachedHits)
155184
return cached, message

0 commit comments

Comments
 (0)