Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions api/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package api
import (
"context"
"fmt"
"sync"
"time"

"github.com/prometheus/prometheus/model/labels"
Expand All @@ -16,8 +17,20 @@ type RemoteQuery interface {
fmt.Stringer
}

// RemoteEndpoints returns remote engines.
//
// Implementations should use mint and maxt to prune engine metadata
// (e.g., filter TSDBInfos to only those overlapping the time range),
// reducing unnecessary computations in subsequent calls to methods like
// RemoteEngine.LabelSets().
//
// All available engines should be returned regardless of pruning.
type RemoteEndpoints interface {
Engines() []RemoteEngine
// Engines returns remote engines.
//
// If mint and/or maxt of the query is unknown, the caller must pass
// math.MinInt64 and math.MaxInt64 respectively to retrieve unpruned engines.
Engines(mint, maxt int64) []RemoteEngine
}

type RemoteEngine interface {
Expand All @@ -40,10 +53,41 @@ type staticEndpoints struct {
engines []RemoteEngine
}

func (m staticEndpoints) Engines() []RemoteEngine {
func (m staticEndpoints) Engines(mint, maxt int64) []RemoteEngine {
return m.engines
}

func NewStaticEndpoints(engines []RemoteEngine) RemoteEndpoints {
return &staticEndpoints{engines: engines}
}

type cachedEndpoints struct {
endpoints RemoteEndpoints

enginesOnce sync.Once
engines []RemoteEngine
}

func (l *cachedEndpoints) Engines(mint, maxt int64) []RemoteEngine {
l.enginesOnce.Do(func() {
l.engines = l.endpoints.Engines(mint, maxt)
})
return l.engines
}

// NewCachedEndpoints returns an endpoints wrapper that
// resolves and caches engines on first access.
//
// All subsequent Engines calls return cached engines, ignoring any query
// parameters.
func NewCachedEndpoints(endpoints RemoteEndpoints) RemoteEndpoints {
if endpoints == nil {
panic("api.NewCachedEndpoints: endpoints is nil")
}

if le, ok := endpoints.(*cachedEndpoints); ok {
return le
}

return &cachedEndpoints{endpoints: endpoints}
}
72 changes: 72 additions & 0 deletions api/remote_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.

package api

import (
"testing"

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/model/labels"
)

func TestCachedEndpoints(t *testing.T) {
engines := remoteEndpointsFunc(func(mint, maxt int64) []RemoteEngine {
testutil.Equals(t, int64(10), mint)
testutil.Equals(t, int64(20), maxt)
return []RemoteEngine{newEngineMock(0, 1, nil)}
})
endpoints := NewCachedEndpoints(engines)

es := endpoints.Engines(10, 20)
testutil.Equals(t, 1, len(es))
}

func TestCachedEndpointsCachesEngines(t *testing.T) {
var calls int
engines := remoteEndpointsFunc(func(mint, maxt int64) []RemoteEngine {
calls++
return []RemoteEngine{
newEngineMock(100*int64(calls), 1000*int64(calls), nil),
newEngineMock(200*int64(calls), 2000*int64(calls), nil),
}
})
endpoints := NewCachedEndpoints(engines)

es1 := endpoints.Engines(10, 10000)
testutil.Equals(t, 2, len(es1))

es2 := endpoints.Engines(20, 20000)
testutil.Equals(t, 2, len(es2))

testutil.Equals(t, 1, calls)
testutil.Equals(t, es1, es2)

// Engines must be mutable.
es1[0].(*engineMock).maxT = 1337
testutil.Equals(t, int64(1337), es1[0].MaxT())
testutil.Equals(t, int64(1337), es2[0].MaxT())
}

type remoteEndpointsFunc func(mint, maxt int64) []RemoteEngine

func (f remoteEndpointsFunc) Engines(mint, maxt int64) []RemoteEngine {
return f(mint, maxt)
}

type engineMock struct {
RemoteEngine
minT int64
maxT int64
labelSets []labels.Labels
partitionLabelSets []labels.Labels
}

func (e engineMock) MaxT() int64 { return e.maxT }
func (e engineMock) MinT() int64 { return e.minT }
func (e engineMock) LabelSets() []labels.Labels { return e.labelSets }
func (e engineMock) PartitionLabelSets() []labels.Labels { return e.partitionLabelSets }

func newEngineMock(mint, maxt int64, labelSets []labels.Labels) *engineMock {
return &engineMock{minT: mint, maxT: maxt, labelSets: labelSets, partitionLabelSets: labelSets}
}
24 changes: 24 additions & 0 deletions engine/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ func (l DistributedEngine) MakeInstantQueryFromPlan(ctx context.Context, q stora
// Some clients might only support second precision when executing queries.
ts = ts.Truncate(time.Second)

// Cache engines to give optimizers a consistent view of Engines().
// Some RemoteEndpoints implementations also compute and cache
// MinT() / MaxT() / LabelSets() on the fly, so the cache prevents
// recomputing those fields in each optimizer.
e = api.NewCachedEndpoints(e)

qOpts := fromPromQLOpts(opts)
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
logicalplan.PassthroughOptimizer{Endpoints: e},
Expand All @@ -84,6 +90,12 @@ func (l DistributedEngine) MakeRangeQueryFromPlan(ctx context.Context, q storage
end = end.Truncate(time.Second)
interval = interval.Truncate(time.Second)

// Cache engines to give optimizers a consistent view of Engines().
// Some RemoteEndpoints implementations also compute and cache
// MinT() / MaxT() / LabelSets() on the fly, so the cache prevents
// recomputing those fields in each optimizer.
e = api.NewCachedEndpoints(e)

qOpts := fromPromQLOpts(opts)
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
logicalplan.PassthroughOptimizer{Endpoints: e},
Expand All @@ -98,6 +110,12 @@ func (l DistributedEngine) MakeInstantQuery(ctx context.Context, q storage.Query
// Some clients might only support second precision when executing queries.
ts = ts.Truncate(time.Second)

// Cache engines to give optimizers a consistent view of Engines().
// Some RemoteEndpoints implementations also compute and cache
// MinT() / MaxT() / LabelSets() on the fly, so the cache prevents
// recomputing those fields in each optimizer.
e = api.NewCachedEndpoints(e)

qOpts := fromPromQLOpts(opts)
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
logicalplan.PassthroughOptimizer{Endpoints: e},
Expand All @@ -114,6 +132,12 @@ func (l DistributedEngine) MakeRangeQuery(ctx context.Context, q storage.Queryab
end = end.Truncate(time.Second)
interval = interval.Truncate(time.Second)

// Cache engines to give optimizers a consistent view of Engines().
// Some RemoteEndpoints implementations also compute and cache
// MinT() / MaxT() / LabelSets() on the fly, so the cache prevents
// recomputing those fields in each optimizer.
e = api.NewCachedEndpoints(e)

qOpts := fromPromQLOpts(opts)
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
logicalplan.PassthroughOptimizer{Endpoints: e},
Expand Down
2 changes: 1 addition & 1 deletion logicalplan/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ type DistributedExecutionOptimizer struct {
}

func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) {
engines := m.Endpoints.Engines()
engines := m.Endpoints.Engines(MinMaxTime(plan, opts))
sort.Slice(engines, func(i, j int) bool {
return engines[i].MinT() < engines[j].MinT()
})
Expand Down
39 changes: 22 additions & 17 deletions logicalplan/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,40 +43,45 @@ func matchingEngineTime(e api.RemoteEngine, opts *query.Options) bool {
}

func (m PassthroughOptimizer) Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) {
engines := m.Endpoints.Engines()
if len(engines) == 1 {
if !matchingEngineTime(engines[0], opts) {
return plan, nil
}
return RemoteExecution{
Engine: engines[0],
Query: plan.Clone(),
QueryRangeStart: opts.Start,
QueryRangeEnd: opts.End,
}, nil
}

engines := m.Endpoints.Engines(MinMaxTime(plan, opts))
if len(engines) == 0 {
return plan, nil
}

matchingLabelsEngines := make([]api.RemoteEngine, 0, len(engines))
var (
hasSelector bool
matchingEngines int
firstMatchingEngine api.RemoteEngine
)
TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) {
if vs, ok := (*current).(*VectorSelector); ok {
hasSelector = true

for _, e := range engines {
if !labelSetsMatch(vs.LabelMatchers, e.LabelSets()...) {
continue
}

matchingLabelsEngines = append(matchingLabelsEngines, e)
matchingEngines++
if matchingEngines > 1 {
return true
}

firstMatchingEngine = e
}
}
return false
})

if len(matchingLabelsEngines) == 1 && matchingEngineTime(matchingLabelsEngines[0], opts) {
// Fallback to all engines.
if !hasSelector && matchingEngines == 0 {
matchingEngines = len(engines)
firstMatchingEngine = engines[0]
}

if matchingEngines == 1 && matchingEngineTime(firstMatchingEngine, opts) {
return RemoteExecution{
Engine: matchingLabelsEngines[0],
Engine: firstMatchingEngine,
Query: plan.Clone(),
QueryRangeStart: opts.Start,
QueryRangeEnd: opts.End,
Expand Down
20 changes: 8 additions & 12 deletions logicalplan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ var DefaultOptimizers = []Optimizer{
type Plan interface {
Optimize([]Optimizer) (Plan, annotations.Annotations)
Root() Node
MinMaxTime(*query.Options) (int64, int64)
}

type Optimizer interface {
Expand Down Expand Up @@ -152,15 +151,19 @@ func extractFuncFromPath(p []*Node) string {
return extractFuncFromPath(p[:len(p)-1])
}

func (p *plan) MinMaxTime(qOpts *query.Options) (int64, int64) {
func (p *plan) Root() Node {
return p.expr
}

// MinMaxTime returns the min and max timestamp that any selector in the query
// can read.
func MinMaxTime(root Node, qOpts *query.Options) (int64, int64) {
var minTimestamp, maxTimestamp int64 = math.MaxInt64, math.MinInt64
// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
// The evaluation of the VectorSelector inside then evaluates the given range and unsets
// the variable.
var evalRange time.Duration

root := p.Root()

TraverseWithParents(nil, &root, func(parents []*Node, node *Node) {
switch n := (*node).(type) {
case *VectorSelector:
Expand Down Expand Up @@ -205,10 +208,6 @@ func (p *plan) Optimize(optimizers []Optimizer) (Plan, annotations.Annotations)
return &plan{expr: expr, opts: p.opts}, *annos
}

func (p *plan) Root() Node {
return p.expr
}

func Traverse(expr *Node, transform func(*Node)) {
children := (*expr).Children()
transform(expr)
Expand All @@ -230,10 +229,7 @@ func TraverseBottomUp(parent *Node, current *Node, transform func(parent *Node,
for _, c := range (*current).Children() {
stop = TraverseBottomUp(current, c, transform) || stop
}
if stop {
return stop
}
return transform(parent, current)
return stop || transform(parent, current)
}

func replacePrometheusNodes(plan parser.Expr) Node {
Expand Down
2 changes: 1 addition & 1 deletion storage/prometheus/scanners.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *Scanners) Close() error {
func NewPrometheusScanners(queryable storage.Queryable, qOpts *query.Options, lplan logicalplan.Plan) (*Scanners, error) {
var min, max int64
if lplan != nil {
min, max = lplan.MinMaxTime(qOpts)
min, max = logicalplan.MinMaxTime(lplan.Root(), qOpts)
} else {
min, max = qOpts.Start.UnixMilli(), qOpts.End.UnixMilli()
}
Expand Down
2 changes: 1 addition & 1 deletion storage/prometheus/scanners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestScannersMinMaxTime(t *testing.T) {

plan, _ := logicalplan.NewFromAST(p, qOpts, logicalplan.PlanOptions{})

min, max := plan.MinMaxTime(qOpts)
min, max := logicalplan.MinMaxTime(plan.Root(), qOpts)

require.Equal(t, tcase.min, min)
require.Equal(t, tcase.max, max)
Expand Down