-
Notifications
You must be signed in to change notification settings - Fork 78
Expand file tree
/
Copy pathdistributed.go
More file actions
148 lines (119 loc) · 5.31 KB
/
distributed.go
File metadata and controls
148 lines (119 loc) · 5.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.
package engine
import (
"context"
"time"
"github.com/thanos-io/promql-engine/api"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
)
type remoteEngine struct {
q storage.Queryable
engine *Engine
labelSets []labels.Labels
maxt int64
mint int64
}
func NewRemoteEngine(opts Opts, q storage.Queryable, mint, maxt int64, labelSets []labels.Labels) *remoteEngine {
return &remoteEngine{
q: q,
labelSets: labelSets,
maxt: maxt,
mint: mint,
engine: New(opts),
}
}
func (l remoteEngine) MaxT() int64 {
return l.maxt
}
func (l remoteEngine) MinT() int64 {
return l.mint
}
func (l remoteEngine) LabelSets() []labels.Labels {
return l.labelSets
}
func (l remoteEngine) PartitionLabelSets() []labels.Labels {
return l.labelSets
}
func (l remoteEngine) NewRangeQuery(ctx context.Context, opts promql.QueryOpts, plan api.RemoteQuery, start, end time.Time, interval time.Duration) (promql.Query, error) {
return l.engine.NewRangeQuery(ctx, l.q, opts, plan.String(), start, end, interval)
}
type DistributedEngine struct {
engine *Engine
}
func NewDistributedEngine(opts Opts) *DistributedEngine {
return &DistributedEngine{
engine: New(opts),
}
}
func (l DistributedEngine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, e api.RemoteEndpoints, opts promql.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) {
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
ts = ts.Truncate(time.Second)
// Cache engines to give optimizers a consistent view of Engines().
// Some RemoteEndpoints implementations also compute and cache
// MinT() / MaxT() / LabelSets() on the fly, so the cache prevents
// recomputing those fields in each optimizer.
e = api.NewCachedEndpoints(e)
qOpts := fromPromQLOpts(opts)
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
logicalplan.PassthroughOptimizer{Endpoints: e},
logicalplan.DistributedExecutionOptimizer{Endpoints: e},
}
return l.engine.MakeInstantQueryFromPlan(ctx, q, qOpts, plan, ts)
}
func (l DistributedEngine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, e api.RemoteEndpoints, opts promql.QueryOpts, plan logicalplan.Node, start, end time.Time, interval time.Duration) (promql.Query, error) {
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
start = start.Truncate(time.Second)
end = end.Truncate(time.Second)
interval = interval.Truncate(time.Second)
// Cache engines to give optimizers a consistent view of Engines().
// Some RemoteEndpoints implementations also compute and cache
// MinT() / MaxT() / LabelSets() on the fly, so the cache prevents
// recomputing those fields in each optimizer.
e = api.NewCachedEndpoints(e)
qOpts := fromPromQLOpts(opts)
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
logicalplan.PassthroughOptimizer{Endpoints: e},
logicalplan.DistributedExecutionOptimizer{Endpoints: e},
}
return l.engine.MakeRangeQueryFromPlan(ctx, q, qOpts, plan, start, end, interval)
}
func (l DistributedEngine) MakeInstantQuery(ctx context.Context, q storage.Queryable, e api.RemoteEndpoints, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
ts = ts.Truncate(time.Second)
// Cache engines to give optimizers a consistent view of Engines().
// Some RemoteEndpoints implementations also compute and cache
// MinT() / MaxT() / LabelSets() on the fly, so the cache prevents
// recomputing those fields in each optimizer.
e = api.NewCachedEndpoints(e)
qOpts := fromPromQLOpts(opts)
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
logicalplan.PassthroughOptimizer{Endpoints: e},
logicalplan.DistributedExecutionOptimizer{Endpoints: e},
}
return l.engine.MakeInstantQuery(ctx, q, qOpts, qs, ts)
}
func (l DistributedEngine) MakeRangeQuery(ctx context.Context, q storage.Queryable, e api.RemoteEndpoints, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
start = start.Truncate(time.Second)
end = end.Truncate(time.Second)
interval = interval.Truncate(time.Second)
// Cache engines to give optimizers a consistent view of Engines().
// Some RemoteEndpoints implementations also compute and cache
// MinT() / MaxT() / LabelSets() on the fly, so the cache prevents
// recomputing those fields in each optimizer.
e = api.NewCachedEndpoints(e)
qOpts := fromPromQLOpts(opts)
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
logicalplan.PassthroughOptimizer{Endpoints: e},
logicalplan.DistributedExecutionOptimizer{Endpoints: e},
}
return l.engine.MakeRangeQuery(ctx, q, qOpts, qs, start, end, interval)
}