Skip to content

Commit d1885ac

Browse files
committed
Add RemoteEndpointsV2/V3 that requests pruned remote engines
1 parent 4124d3f commit d1885ac

File tree

7 files changed

+177
-32
lines changed

7 files changed

+177
-32
lines changed

api/remote.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ package api
66
import (
77
"context"
88
"fmt"
9+
"math"
10+
"sync"
911
"time"
1012

1113
"github.com/prometheus/prometheus/model/labels"
@@ -16,10 +18,46 @@ type RemoteQuery interface {
1618
fmt.Stringer
1719
}
1820

21+
// Deprecated: RemoteEndpoints will be replaced with
22+
// RemoteEndpointsV2 / RemoteEndpointsV3 in a future breaking change.
1923
type RemoteEndpoints interface {
2024
Engines() []RemoteEngine
2125
}
2226

27+
// RemoteEndpointsV2 describes endpoints that accept pruning hints when
28+
// selecting remote engines.
29+
//
30+
// For example implementations may use the hints to prune the TSDBInfos, but
31+
// also may safely ignore them and return all available remote engines.
32+
//
33+
// NOTE(Aleksandr Krivoshchekov):
34+
// We add a new interface as a temporary backward compatibility.
35+
// RemoteEndpoints will be replaced with it in a future breaking change.
36+
type RemoteEndpointsV2 interface {
37+
EnginesV2(mint, maxt int64) []RemoteEngine
38+
}
39+
40+
type RemoteEndpointsQuery struct {
41+
MinT int64
42+
MaxT int64
43+
}
44+
45+
// RemoteEndpointsV3 describes endpoints that accept pruning hints when
46+
// selecting remote engines.
47+
//
48+
// For example implementations may use the hints to prune the TSDBInfos, but
49+
// also may safely ignore them and return all available remote engines.
50+
//
51+
// NOTE(Aleksandr Krivoshchekov):
52+
// We add a new interface as a temporary backward compatibility.
53+
// RemoteEndpoints will be replaced with it in a future breaking change.
54+
//
55+
// Unlike RemoteEndpointsV2, this interface can be extended with more hints
56+
// in the future, without making any breaking changes.
57+
type RemoteEndpointsV3 interface {
58+
EnginesV3(query RemoteEndpointsQuery) []RemoteEngine
59+
}
60+
2361
type RemoteEngine interface {
2462
MaxT() int64
2563
MinT() int64
@@ -44,6 +82,71 @@ func (m staticEndpoints) Engines() []RemoteEngine {
4482
return m.engines
4583
}
4684

85+
func (m staticEndpoints) EnginesV2(mint, maxt int64) []RemoteEngine {
86+
return m.engines
87+
}
88+
89+
func (m staticEndpoints) EnginesV3(query RemoteEndpointsQuery) []RemoteEngine {
90+
return m.engines
91+
}
92+
4793
func NewStaticEndpoints(engines []RemoteEngine) RemoteEndpoints {
4894
return &staticEndpoints{engines: engines}
4995
}
96+
97+
type cachedEndpoints struct {
98+
endpoints RemoteEndpoints
99+
100+
enginesOnce sync.Once
101+
engines []RemoteEngine
102+
}
103+
104+
func (l *cachedEndpoints) Engines() []RemoteEngine {
105+
return l.EnginesV3(RemoteEndpointsQuery{
106+
MaxT: math.MinInt64,
107+
MinT: math.MaxInt64,
108+
})
109+
}
110+
111+
func (l *cachedEndpoints) EnginesV2(mint, maxt int64) []RemoteEngine {
112+
return l.EnginesV3(RemoteEndpointsQuery{
113+
MaxT: maxt,
114+
MinT: mint,
115+
})
116+
}
117+
118+
func (l *cachedEndpoints) EnginesV3(query RemoteEndpointsQuery) []RemoteEngine {
119+
l.enginesOnce.Do(func() {
120+
l.engines = getEngines(l.endpoints, query)
121+
})
122+
return l.engines
123+
}
124+
125+
func getEngines(endpoints RemoteEndpoints, query RemoteEndpointsQuery) []RemoteEngine {
126+
if v3, ok := endpoints.(RemoteEndpointsV3); ok {
127+
return v3.EnginesV3(query)
128+
}
129+
130+
if v2, ok := endpoints.(RemoteEndpointsV2); ok {
131+
return v2.EnginesV2(query.MinT, query.MaxT)
132+
}
133+
134+
return endpoints.Engines()
135+
}
136+
137+
// NewCachedEndpoints returns an endpoints wrapper that
138+
// resolves and caches engines on first access.
139+
//
140+
// All subsequent Engines calls return cached engines, ignoring any query
141+
// parameters.
142+
func NewCachedEndpoints(endpoints RemoteEndpoints) RemoteEndpoints {
143+
if endpoints == nil {
144+
panic("api.NewCachedEndpoints: endpoints is nil")
145+
}
146+
147+
if le, ok := endpoints.(*cachedEndpoints); ok {
148+
return le
149+
}
150+
151+
return &cachedEndpoints{endpoints: endpoints}
152+
}

engine/distributed.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ func (l DistributedEngine) MakeInstantQueryFromPlan(ctx context.Context, q stora
6868
// Some clients might only support second precision when executing queries.
6969
ts = ts.Truncate(time.Second)
7070

71+
// Cache engines to give optimizers a consistent view of Engines().
72+
// Some RemoteEndpoints implementations also compute and cache
73+
// MinT() / MaxT() / LabelSets() on the fly, so the cache prevents
74+
// recomputing those fields in each optimizer.
75+
e = api.NewCachedEndpoints(e)
76+
7177
qOpts := fromPromQLOpts(opts)
7278
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
7379
logicalplan.PassthroughOptimizer{Endpoints: e},
@@ -84,6 +90,12 @@ func (l DistributedEngine) MakeRangeQueryFromPlan(ctx context.Context, q storage
8490
end = end.Truncate(time.Second)
8591
interval = interval.Truncate(time.Second)
8692

93+
// Cache engines to give optimizers a consistent view of Engines().
94+
// Some RemoteEndpoints implementations also compute and cache
95+
// MinT() / MaxT() / LabelSets() on the fly, so the cache prevents
96+
// recomputing those fields in each optimizer.
97+
e = api.NewCachedEndpoints(e)
98+
8799
qOpts := fromPromQLOpts(opts)
88100
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
89101
logicalplan.PassthroughOptimizer{Endpoints: e},
@@ -98,6 +110,12 @@ func (l DistributedEngine) MakeInstantQuery(ctx context.Context, q storage.Query
98110
// Some clients might only support second precision when executing queries.
99111
ts = ts.Truncate(time.Second)
100112

113+
// Cache engines to give optimizers a consistent view of Engines().
114+
// Some RemoteEndpoints implementations also compute and cache
115+
// MinT() / MaxT() / LabelSets() on the fly, so the cache prevents
116+
// recomputing those fields in each optimizer.
117+
e = api.NewCachedEndpoints(e)
118+
101119
qOpts := fromPromQLOpts(opts)
102120
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
103121
logicalplan.PassthroughOptimizer{Endpoints: e},
@@ -114,6 +132,12 @@ func (l DistributedEngine) MakeRangeQuery(ctx context.Context, q storage.Queryab
114132
end = end.Truncate(time.Second)
115133
interval = interval.Truncate(time.Second)
116134

135+
// Cache engines to give optimizers a consistent view of Engines().
136+
// Some RemoteEndpoints implementations also compute and cache
137+
// MinT() / MaxT() / LabelSets() on the fly, so the cache prevents
138+
// recomputing those fields in each optimizer.
139+
e = api.NewCachedEndpoints(e)
140+
117141
qOpts := fromPromQLOpts(opts)
118142
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
119143
logicalplan.PassthroughOptimizer{Endpoints: e},

logicalplan/distribute.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ type DistributedExecutionOptimizer struct {
157157
}
158158

159159
func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) {
160-
engines := m.Endpoints.Engines()
160+
engines := getRemoteEngines(m.Endpoints, plan, opts)
161161
sort.Slice(engines, func(i, j int) bool {
162162
return engines[i].MinT() < engines[j].MinT()
163163
})
@@ -858,3 +858,20 @@ func maxDuration(a, b time.Duration) time.Duration {
858858
}
859859
return b
860860
}
861+
862+
func getRemoteEngines(endpoints api.RemoteEndpoints, plan Node, opts *query.Options) []api.RemoteEngine {
863+
if v3, ok := endpoints.(api.RemoteEndpointsV3); ok {
864+
mint, maxt := MinMaxTime(plan, opts)
865+
return v3.EnginesV3(api.RemoteEndpointsQuery{
866+
MinT: mint,
867+
MaxT: maxt,
868+
})
869+
}
870+
871+
if v2, ok := endpoints.(api.RemoteEndpointsV2); ok {
872+
mint, maxt := MinMaxTime(plan, opts)
873+
return v2.EnginesV2(mint, maxt)
874+
}
875+
876+
return endpoints.Engines()
877+
}

logicalplan/passthrough.go

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -43,40 +43,45 @@ func matchingEngineTime(e api.RemoteEngine, opts *query.Options) bool {
4343
}
4444

4545
func (m PassthroughOptimizer) Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) {
46-
engines := m.Endpoints.Engines()
47-
if len(engines) == 1 {
48-
if !matchingEngineTime(engines[0], opts) {
49-
return plan, nil
50-
}
51-
return RemoteExecution{
52-
Engine: engines[0],
53-
Query: plan.Clone(),
54-
QueryRangeStart: opts.Start,
55-
QueryRangeEnd: opts.End,
56-
}, nil
57-
}
58-
46+
engines := getRemoteEngines(m.Endpoints, plan, opts)
5947
if len(engines) == 0 {
6048
return plan, nil
6149
}
6250

63-
matchingLabelsEngines := make([]api.RemoteEngine, 0, len(engines))
51+
var (
52+
hasSelector bool
53+
matchingEngines int
54+
firstMatchingEngine api.RemoteEngine
55+
)
6456
TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) {
6557
if vs, ok := (*current).(*VectorSelector); ok {
58+
hasSelector = true
59+
6660
for _, e := range engines {
6761
if !labelSetsMatch(vs.LabelMatchers, e.LabelSets()...) {
6862
continue
6963
}
7064

71-
matchingLabelsEngines = append(matchingLabelsEngines, e)
65+
matchingEngines++
66+
if matchingEngines > 1 {
67+
return true
68+
}
69+
70+
firstMatchingEngine = e
7271
}
7372
}
7473
return false
7574
})
7675

77-
if len(matchingLabelsEngines) == 1 && matchingEngineTime(matchingLabelsEngines[0], opts) {
76+
// Fallback to all engines.
77+
if !hasSelector && matchingEngines == 0 {
78+
matchingEngines = len(engines)
79+
firstMatchingEngine = engines[0]
80+
}
81+
82+
if matchingEngines == 1 && matchingEngineTime(firstMatchingEngine, opts) {
7883
return RemoteExecution{
79-
Engine: matchingLabelsEngines[0],
84+
Engine: firstMatchingEngine,
8085
Query: plan.Clone(),
8186
QueryRangeStart: opts.Start,
8287
QueryRangeEnd: opts.End,

logicalplan/plan.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ var DefaultOptimizers = []Optimizer{
3030
type Plan interface {
3131
Optimize([]Optimizer) (Plan, annotations.Annotations)
3232
Root() Node
33-
MinMaxTime(*query.Options) (int64, int64)
3433
}
3534

3635
type Optimizer interface {
@@ -152,15 +151,19 @@ func extractFuncFromPath(p []*Node) string {
152151
return extractFuncFromPath(p[:len(p)-1])
153152
}
154153

155-
func (p *plan) MinMaxTime(qOpts *query.Options) (int64, int64) {
154+
func (p *plan) Root() Node {
155+
return p.expr
156+
}
157+
158+
// MinMaxTime returns the min and max timestamp that any selector in the query
159+
// can read.
160+
func MinMaxTime(root Node, qOpts *query.Options) (int64, int64) {
156161
var minTimestamp, maxTimestamp int64 = math.MaxInt64, math.MinInt64
157162
// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
158163
// The evaluation of the VectorSelector inside then evaluates the given range and unsets
159164
// the variable.
160165
var evalRange time.Duration
161166

162-
root := p.Root()
163-
164167
TraverseWithParents(nil, &root, func(parents []*Node, node *Node) {
165168
switch n := (*node).(type) {
166169
case *VectorSelector:
@@ -205,10 +208,6 @@ func (p *plan) Optimize(optimizers []Optimizer) (Plan, annotations.Annotations)
205208
return &plan{expr: expr, opts: p.opts}, *annos
206209
}
207210

208-
func (p *plan) Root() Node {
209-
return p.expr
210-
}
211-
212211
func Traverse(expr *Node, transform func(*Node)) {
213212
children := (*expr).Children()
214213
transform(expr)
@@ -230,10 +229,7 @@ func TraverseBottomUp(parent *Node, current *Node, transform func(parent *Node,
230229
for _, c := range (*current).Children() {
231230
stop = TraverseBottomUp(current, c, transform) || stop
232231
}
233-
if stop {
234-
return stop
235-
}
236-
return transform(parent, current)
232+
return stop || transform(parent, current)
237233
}
238234

239235
func replacePrometheusNodes(plan parser.Expr) Node {

storage/prometheus/scanners.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func (s *Scanners) Close() error {
3535
func NewPrometheusScanners(queryable storage.Queryable, qOpts *query.Options, lplan logicalplan.Plan) (*Scanners, error) {
3636
var min, max int64
3737
if lplan != nil {
38-
min, max = lplan.MinMaxTime(qOpts)
38+
min, max = logicalplan.MinMaxTime(lplan.Root(), qOpts)
3939
} else {
4040
min, max = qOpts.Start.UnixMilli(), qOpts.End.UnixMilli()
4141
}

storage/prometheus/scanners_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func TestScannersMinMaxTime(t *testing.T) {
8080

8181
plan, _ := logicalplan.NewFromAST(p, qOpts, logicalplan.PlanOptions{})
8282

83-
min, max := plan.MinMaxTime(qOpts)
83+
min, max := logicalplan.MinMaxTime(plan.Root(), qOpts)
8484

8585
require.Equal(t, tcase.min, min)
8686
require.Equal(t, tcase.max, max)

0 commit comments

Comments
 (0)