Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"runtime"
"runtime/debug"
"time"

"github.com/go-kit/log/level"
"github.com/opentracing-contrib/go-stdlib/nethttp"
Expand Down Expand Up @@ -533,13 +534,25 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
prometheusCodec,
shardedPrometheusCodec,
t.Cfg.Querier.LookbackDelta,
func(time.Duration) time.Duration {
return t.Cfg.Querier.DefaultEvaluationInterval
},
t.Cfg.Frontend.DistributedExecEnabled,
)
if err != nil {
return nil, err
}

instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, instantQueryCodec, queryAnalyzer, t.Cfg.Querier.LookbackDelta, t.Cfg.Frontend.DistributedExecEnabled)
instantQueryMiddlewares, err := instantquery.Middlewares(
util_log.Logger,
t.Overrides,
instantQueryCodec,
queryAnalyzer,
t.Cfg.Querier.LookbackDelta,
func(time.Duration) time.Duration {
return t.Cfg.Querier.DefaultEvaluationInterval
},
t.Cfg.Frontend.DistributedExecEnabled)
if err != nil {
return nil, err
}
Expand Down
24 changes: 13 additions & 11 deletions pkg/querier/tripperware/distributed_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ const (
stepBatch = 10
)

func DistributedQueryMiddleware(lookBackDelta time.Duration) Middleware {
func DistributedQueryMiddleware(noStepSubqueryIntervalFn func(time.Duration) time.Duration) Middleware {
return MiddlewareFunc(func(next Handler) Handler {
return distributedQueryMiddleware{
next: next,
lookBackDelta: lookBackDelta,
next: next,
noStepSubqueryIntervalFn: noStepSubqueryIntervalFn,
}
})
}
Expand All @@ -32,21 +32,23 @@ func getStartAndEnd(start time.Time, end time.Time, step time.Duration) (time.Ti
}

type distributedQueryMiddleware struct {
next Handler
lookBackDelta time.Duration
next Handler
noStepSubqueryIntervalFn func(time.Duration) time.Duration
}

func (d distributedQueryMiddleware) newLogicalPlan(qs string, start time.Time, end time.Time, step time.Duration) (*logicalplan.Plan, error) {

start, end = getStartAndEnd(start, end, step)

qOpts := query.Options{
Start: start,
End: end,
Step: step,
StepsBatch: stepBatch,
LookbackDelta: d.lookBackDelta,
EnablePerStepStats: false, // Hardcoded value that will be re-populated again in the querier stage
Start: start,
End: end,
Step: step,
StepsBatch: stepBatch,
NoStepSubqueryIntervalFn: d.noStepSubqueryIntervalFn,
// Hardcoded value for execution-time-params that will be re-populated again in the querier stage
LookbackDelta: 0,
EnablePerStepStats: false,
}

expr, err := parser.NewParser(qs, parser.WithFunctions(parser.Functions)).ParseExpr()
Expand Down
4 changes: 3 additions & 1 deletion pkg/querier/tripperware/distributed_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ func TestLogicalPlanGeneration(t *testing.T) {
t.Run(strconv.Itoa(i)+"_"+tc.name, func(t *testing.T) {
t.Parallel()

middleware := DistributedQueryMiddleware(5 * time.Minute)
middleware := DistributedQueryMiddleware(func(time.Duration) time.Duration {
return time.Minute
})

handler := middleware.Wrap(HandlerFunc(func(_ context.Context, req Request) (Response, error) {
return nil, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func Middlewares(
merger tripperware.Merger,
queryAnalyzer querysharding.Analyzer,
lookbackDelta time.Duration,
noStepSubqueryIntervalFn func(time.Duration) time.Duration,
distributedExecEnabled bool,
) ([]tripperware.Middleware, error) {
m := []tripperware.Middleware{
Expand All @@ -24,7 +25,7 @@ func Middlewares(

if distributedExecEnabled {
m = append(m,
tripperware.DistributedQueryMiddleware(lookbackDelta))
tripperware.DistributedQueryMiddleware(noStepSubqueryIntervalFn))
}

return m, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ func TestRoundTrip(t *testing.T) {
nil,
qa,
5*time.Minute,
false, // distributedExecEnabled
func(time.Duration) time.Duration {
return time.Minute
},
false,
)
require.NoError(t, err)

Expand Down Expand Up @@ -172,6 +175,9 @@ func TestRoundTripWithAndWithoutDistributedExec(t *testing.T) {
nil,
qa,
5*time.Minute,
func(time.Duration) time.Duration {
return 5 * time.Minute
},
tc.distributedEnabled,
)
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func Middlewares(
prometheusCodec tripperware.Codec,
shardedPrometheusCodec tripperware.Codec,
lookbackDelta time.Duration,
noStepSubqueryIntervalFn func(time.Duration) time.Duration,
distributedExecEnabled bool,
) ([]tripperware.Middleware, cache.Cache, error) {
// Metric used to keep track of each middleware execution duration.
Expand Down Expand Up @@ -141,7 +142,7 @@ func Middlewares(
if distributedExecEnabled {
queryRangeMiddleware = append(queryRangeMiddleware,
tripperware.InstrumentMiddleware("range_logical_plan_gen", metrics),
tripperware.DistributedQueryMiddleware(lookbackDelta))
tripperware.DistributedQueryMiddleware(noStepSubqueryIntervalFn))
}

return queryRangeMiddleware, c, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ func TestRoundTrip(t *testing.T) {
PrometheusCodec,
ShardedPrometheusCodec,
5*time.Minute,
false, // distributedExecEnabled
func(time.Duration) time.Duration {
return time.Minute
},
false,
)
require.NoError(t, err)

Expand Down Expand Up @@ -191,6 +194,9 @@ func TestRoundTripWithAndWithoutDistributedExec(t *testing.T) {
PrometheusCodec,
ShardedPrometheusCodec,
5*time.Minute,
func(time.Duration) time.Duration {
return time.Minute
},
tc.distributedEnabled,
)
require.NoError(t, err)
Expand Down
Loading