@@ -24,6 +24,7 @@ import (
2424 "github.com/cortexproject/cortex/pkg/cortexpb"
2525 "github.com/cortexproject/cortex/pkg/querier"
2626 "github.com/cortexproject/cortex/pkg/querier/stats"
27+ "github.com/cortexproject/cortex/pkg/ring/client"
2728 util_log "github.com/cortexproject/cortex/pkg/util/log"
2829 promql_util "github.com/cortexproject/cortex/pkg/util/promql"
2930 "github.com/cortexproject/cortex/pkg/util/validation"
@@ -157,7 +158,7 @@ type RulesLimits interface {
157158// EngineQueryFunc returns a new engine query function validating max queryLength.
158159// Modified from Prometheus rules.EngineQueryFunc
159160// https://github.com/prometheus/prometheus/blob/v2.39.1/rules/manager.go#L189.
160- func EngineQueryFunc (engine promql.QueryEngine , q storage.Queryable , overrides RulesLimits , userID string , lookbackDelta time.Duration ) rules.QueryFunc {
161+ func EngineQueryFunc (engine promql.QueryEngine , frontendClient * frontendClient , q storage.Queryable , overrides RulesLimits , userID string , lookbackDelta time.Duration ) rules.QueryFunc {
161162 return func (ctx context.Context , qs string , t time.Time ) (promql.Vector , error ) {
162163 // Enforce the max query length.
163164 maxQueryLength := overrides .MaxQueryLength (userID )
@@ -174,25 +175,34 @@ func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable, overrides R
174175 }
175176 }
176177
177- q , err := engine .NewInstantQuery (ctx , q , nil , qs , t )
178- if err != nil {
179- return nil , err
180- }
181- res := q .Exec (ctx )
182- if res .Err != nil {
183- return nil , res .Err
184- }
185- switch v := res .Value .(type ) {
186- case promql.Vector :
178+ if frontendClient != nil {
179+ v , err := frontendClient .InstantQuery (ctx , qs , t )
180+ if err != nil {
181+ return nil , err
182+ }
183+
187184 return v , nil
188- case promql.Scalar :
189- return promql.Vector {promql.Sample {
190- T : v .T ,
191- F : v .V ,
192- Metric : labels.Labels {},
193- }}, nil
194- default :
195- return nil , errors .New ("rule result is not a vector or scalar" )
185+ } else {
186+ q , err := engine .NewInstantQuery (ctx , q , nil , qs , t )
187+ if err != nil {
188+ return nil , err
189+ }
190+ res := q .Exec (ctx )
191+ if res .Err != nil {
192+ return nil , res .Err
193+ }
194+ switch v := res .Value .(type ) {
195+ case promql.Vector :
196+ return v , nil
197+ case promql.Scalar :
198+ return promql.Vector {promql.Sample {
199+ T : v .T ,
200+ F : v .V ,
201+ Metric : labels.Labels {},
202+ }}, nil
203+ default :
204+ return nil , errors .New ("rule result is not a vector or scalar" )
205+ }
196206 }
197207 }
198208}
@@ -300,22 +310,30 @@ type RulesManager interface {
300310}
301311
302312// ManagerFactory is a function that creates new RulesManager for given user and notifier.Manager.
303- type ManagerFactory func (ctx context.Context , userID string , notifier * notifier.Manager , logger log.Logger , reg prometheus.Registerer ) RulesManager
313+ type ManagerFactory func (ctx context.Context , userID string , notifier * notifier.Manager , logger log.Logger , frontendPool * client. Pool , reg prometheus.Registerer ) ( RulesManager , error )
304314
305315func DefaultTenantManagerFactory (cfg Config , p Pusher , q storage.Queryable , engine promql.QueryEngine , overrides RulesLimits , evalMetrics * RuleEvalMetrics , reg prometheus.Registerer ) ManagerFactory {
306316 // Wrap errors returned by Queryable to our wrapper, so that we can distinguish between those errors
307317 // and errors returned by PromQL engine. Errors from Queryable can be either caused by user (limits) or internal errors.
308318 // Errors from PromQL are always "user" errors.
309319 q = querier .NewErrorTranslateQueryableWithFn (q , WrapQueryableErrors )
310320
311- return func (ctx context.Context , userID string , notifier * notifier.Manager , logger log.Logger , reg prometheus.Registerer ) RulesManager {
321+ return func (ctx context.Context , userID string , notifier * notifier.Manager , logger log.Logger , frontendPool * client.Pool , reg prometheus.Registerer ) (RulesManager , error ) {
322+ var client * frontendClient
312323 failedQueries := evalMetrics .FailedQueriesVec .WithLabelValues (userID )
313324 totalQueries := evalMetrics .TotalQueriesVec .WithLabelValues (userID )
314325 totalWrites := evalMetrics .TotalWritesVec .WithLabelValues (userID )
315326 failedWrites := evalMetrics .FailedWritesVec .WithLabelValues (userID )
316327
328+ if cfg .FrontendAddress != "" {
329+ c , err := frontendPool .GetClientFor (cfg .FrontendAddress )
330+ if err != nil {
331+ return nil , err
332+ }
333+ client = c .(* frontendClient )
334+ }
317335 var queryFunc rules.QueryFunc
318- engineQueryFunc := EngineQueryFunc (engine , q , overrides , userID , cfg .LookbackDelta )
336+ engineQueryFunc := EngineQueryFunc (engine , client , q , overrides , userID , cfg .LookbackDelta )
319337 metricsQueryFunc := MetricsQueryFunc (engineQueryFunc , totalQueries , failedQueries )
320338 if cfg .EnableQueryStats {
321339 queryFunc = RecordAndReportRuleQueryMetrics (metricsQueryFunc , userID , evalMetrics , logger )
@@ -340,7 +358,7 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
340358 DefaultRuleQueryOffset : func () time.Duration {
341359 return overrides .RulerQueryOffset (userID )
342360 },
343- })
361+ }), nil
344362 }
345363}
346364
0 commit comments