Skip to content

Commit acd9215

Browse files
authored
MQE: shift responsibilities of Materializer (#12377)
#### What this PR does This PR changes the responsibilities of MQE's `Materializer` type. Previously, `Materializer.ConvertNodeToOperator` would materialize all of the given node's children and pass it to `Node.OperatorFactory`, and `OperatorFactory` would have no access to the `Materializer`. With the existing set of query plan nodes, this was not an issue, but it is problematic for some scenarios, for example: * for remote execution, there's no point materializing operators for the parts of the query plan that will be executed remotely * for caching and splitting, there's also no point materializing operators for cached results, but we'll then need to materialize the child operator if there is a cache miss, or only materialize the child operator for a subset of the time range This PR also moves `Materializer` to the `planning` package to prevent an import cycle (and seems like a more natural fit for there anyway). Depending on which PR merges first, either this PR or #12302 will need changes to account for the changes in the other. This PR has no user-visible impact and so I've chosen not to add a changelog entry. #### Which issue(s) this PR fixes or relates to (none) #### Checklist - [n/a] Tests updated. - [n/a] Documentation added. - [n/a] `CHANGELOG.md` updated - the order of entries should be `[CHANGE]`, `[FEATURE]`, `[ENHANCEMENT]`, `[BUGFIX]`. If changelog entry is not needed, please add the `changelog-not-needed` label to the PR. - [n/a] [`about-versioning.md`](https://github.com/grafana/mimir/blob/main/docs/sources/mimir/configure/about-versioning.md) updated with experimental features.
1 parent 540f431 commit acd9215

File tree

15 files changed

+160
-144
lines changed

15 files changed

+160
-144
lines changed

pkg/streamingpromql/engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (e *Engine) newQueryFromPlanner(ctx context.Context, queryable storage.Quer
168168
EagerLoadSelectors: e.eagerLoadSelectors,
169169
}
170170

171-
materializer := NewMaterializer(operatorParams)
171+
materializer := planning.NewMaterializer(operatorParams)
172172
root, err := materializer.ConvertNodeToOperator(plan.Root, plan.TimeRange)
173173
if err != nil {
174174
return nil, err

pkg/streamingpromql/materialize.go

Lines changed: 0 additions & 50 deletions
This file was deleted.

pkg/streamingpromql/optimize/plan/commonsubexpressionelimination/node.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,13 @@ func (d *Duplicate) ResultType() (parser.ValueType, error) {
6767
return d.Inner.ResultType()
6868
}
6969

70-
func (d *Duplicate) OperatorFactory(children []types.Operator, _ types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) {
71-
if len(children) != 1 {
72-
return nil, fmt.Errorf("expected exactly 1 child for Duplicate, got %v", len(children))
70+
func (d *Duplicate) OperatorFactory(materializer *planning.Materializer, timeRange types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) {
71+
inner, err := materializer.ConvertNodeToOperator(d.Inner, timeRange)
72+
if err != nil {
73+
return nil, err
7374
}
7475

75-
switch inner := children[0].(type) {
76+
switch inner := inner.(type) {
7677
case types.InstantVectorOperator:
7778
return &InstantVectorDuplicationConsumerOperatorFactory{
7879
Buffer: NewInstantVectorDuplicationBuffer(inner, params.MemoryConsumptionTracker),
@@ -82,7 +83,7 @@ func (d *Duplicate) OperatorFactory(children []types.Operator, _ types.QueryTime
8283
Buffer: NewRangeVectorDuplicationBuffer(inner, params.MemoryConsumptionTracker),
8384
}, nil
8485
default:
85-
return nil, fmt.Errorf("expected InstantVectorOperator as child of Duplicate, got %T", children[0])
86+
return nil, fmt.Errorf("expected InstantVectorOperator or RangeVectorOperator as child of Duplicate, got %T", inner)
8687
}
8788
}
8889

pkg/streamingpromql/planning/core/aggregate_expression.go

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -100,64 +100,43 @@ func (a *AggregateExpression) ChildrenLabels() []string {
100100
return []string{"expression", "parameter"}
101101
}
102102

103-
func (a *AggregateExpression) OperatorFactory(children []types.Operator, timeRange types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) {
104-
if len(children) < 1 {
105-
return nil, fmt.Errorf("expected at least 1 child for AggregateExpression, got %v", len(children))
106-
}
107-
108-
inner, ok := children[0].(types.InstantVectorOperator)
109-
if !ok {
110-
return nil, fmt.Errorf("expected InstantVectorOperator as expression child of AggregateExpression, got %T", children[0])
103+
func (a *AggregateExpression) OperatorFactory(materializer *planning.Materializer, timeRange types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) {
104+
inner, err := materializer.ConvertNodeToInstantVectorOperator(a.Inner, timeRange)
105+
if err != nil {
106+
return nil, fmt.Errorf("could not create inner operator for AggregateExpression: %w", err)
111107
}
112108

113109
var o types.InstantVectorOperator
114110

115111
switch a.Op {
116112
case AGGREGATION_TOPK, AGGREGATION_BOTTOMK:
117-
if len(children) != 2 {
118-
return nil, fmt.Errorf("expected exactly 2 children for AggregateExpression with operation %s, got %v", a.Op.String(), len(children))
119-
}
120-
121-
param, ok := children[1].(types.ScalarOperator)
122-
if !ok {
123-
return nil, fmt.Errorf("expected ScalarOperator as parameter child of AggregateExpression with operation %s, got %T", a.Op.String(), children[0])
113+
param, err := materializer.ConvertNodeToScalarOperator(a.Param, timeRange)
114+
if err != nil {
115+
return nil, fmt.Errorf("could not create parameter operator for AggregateExpression %s: %w", a.Op.String(), err)
124116
}
125117

126118
o = topkbottomk.New(inner, param, timeRange, a.Grouping, a.Without, a.Op == AGGREGATION_TOPK, params.MemoryConsumptionTracker, params.Annotations, a.ExpressionPosition.ToPrometheusType())
127119

128120
case AGGREGATION_QUANTILE:
129-
if len(children) != 2 {
130-
return nil, fmt.Errorf("expected exactly 2 children for AggregateExpression with operation %s, got %v", a.Op.String(), len(children))
131-
}
132-
133-
param, ok := children[1].(types.ScalarOperator)
134-
if !ok {
135-
return nil, fmt.Errorf("expected ScalarOperator as parameter child of AggregateExpression with operation %s, got %T", a.Op.String(), children[0])
121+
param, err := materializer.ConvertNodeToScalarOperator(a.Param, timeRange)
122+
if err != nil {
123+
return nil, fmt.Errorf("could not create parameter operator for AggregateExpression %s: %w", a.Op.String(), err)
136124
}
137125

138-
var err error
139126
o, err = aggregations.NewQuantileAggregation(inner, param, timeRange, a.Grouping, a.Without, params.MemoryConsumptionTracker, params.Annotations, a.ExpressionPosition.ToPrometheusType())
140127
if err != nil {
141128
return nil, err
142129
}
143130

144131
case AGGREGATION_COUNT_VALUES:
145-
if len(children) != 2 {
146-
return nil, fmt.Errorf("expected exactly 2 children for AggregateExpression with operation %s, got %v", a.Op.String(), len(children))
147-
}
148-
149-
param, ok := children[1].(types.StringOperator)
150-
if !ok {
151-
return nil, fmt.Errorf("expected StringOperator as parameter child of AggregateExpression with operation %s, got %T", a.Op.String(), children[0])
132+
param, err := materializer.ConvertNodeToStringOperator(a.Param, timeRange)
133+
if err != nil {
134+
return nil, fmt.Errorf("could not create parameter operator for AggregateExpression %s: %w", a.Op.String(), err)
152135
}
153136

154137
o = aggregations.NewCountValues(inner, param, timeRange, a.Grouping, a.Without, params.MemoryConsumptionTracker, a.ExpressionPosition.ToPrometheusType())
155138

156139
default:
157-
if len(children) != 1 {
158-
return nil, fmt.Errorf("expected exactly 1 child for AggregateExpression with operation %s, got %v", a.Op.String(), len(children))
159-
}
160-
161140
itemType, ok := a.Op.ToItemType()
162141
if !ok {
163142
return nil, fmt.Errorf("unknown aggregation operation %s", a.Op.String())

pkg/streamingpromql/planning/core/binary_expression.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -117,25 +117,20 @@ func (b *BinaryExpression) ChildrenLabels() []string {
117117
return []string{"LHS", "RHS"}
118118
}
119119

120-
func (b *BinaryExpression) OperatorFactory(children []types.Operator, timeRange types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) {
120+
func (b *BinaryExpression) OperatorFactory(materializer *planning.Materializer, timeRange types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) {
121121
op, ok := b.Op.ToItemType()
122122
if !ok {
123123
return nil, compat.NewNotSupportedError(fmt.Sprintf("'%v' binary expression", b.Op.String()))
124124
}
125125

126-
if len(children) != 2 {
127-
return nil, fmt.Errorf("expected exactly 2 children for BinaryExpression, got %v", len(children))
128-
}
129-
130-
lhsVector, lhsScalar := b.getChildOperator(children[0])
131-
rhsVector, rhsScalar := b.getChildOperator(children[1])
132-
133-
if lhsVector == nil && lhsScalar == nil {
134-
return nil, fmt.Errorf("expected either InstantVectorOperator or ScalarOperator on left-hand side of BinaryExpression, got %T", children[0])
126+
lhsVector, lhsScalar, err := b.getChildOperator(b.LHS, timeRange, materializer, "left")
127+
if err != nil {
128+
return nil, err
135129
}
136130

137-
if rhsVector == nil && rhsScalar == nil {
138-
return nil, fmt.Errorf("expected either InstantVectorOperator or ScalarOperator on right-hand side of BinaryExpression, got %T", children[1])
131+
rhsVector, rhsScalar, err := b.getChildOperator(b.RHS, timeRange, materializer, "right")
132+
if err != nil {
133+
return nil, err
139134
}
140135

141136
if lhsScalar != nil && rhsScalar != nil {
@@ -177,14 +172,19 @@ func (b *BinaryExpression) OperatorFactory(children []types.Operator, timeRange
177172
return planning.NewSingleUseOperatorFactory(operators.NewDeduplicateAndMerge(o, params.MemoryConsumptionTracker)), nil
178173
}
179174

180-
func (b *BinaryExpression) getChildOperator(child types.Operator) (types.InstantVectorOperator, types.ScalarOperator) {
181-
switch child := child.(type) {
175+
func (b *BinaryExpression) getChildOperator(node planning.Node, timeRange types.QueryTimeRange, materializer *planning.Materializer, side string) (types.InstantVectorOperator, types.ScalarOperator, error) {
176+
o, err := materializer.ConvertNodeToOperator(node, timeRange)
177+
if err != nil {
178+
return nil, nil, err
179+
}
180+
181+
switch o := o.(type) {
182182
case types.InstantVectorOperator:
183-
return child, nil
183+
return o, nil, nil
184184
case types.ScalarOperator:
185-
return nil, child
185+
return nil, o, nil
186186
default:
187-
return nil, nil
187+
return nil, nil, fmt.Errorf("expected either InstantVectorOperator or ScalarOperator on %s-hand side of BinaryExpression, got %T", side, o)
188188
}
189189
}
190190

pkg/streamingpromql/planning/core/function_call.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,22 @@ func (f *FunctionCall) ChildrenLabels() []string {
8282
return l
8383
}
8484

85-
func (f *FunctionCall) OperatorFactory(children []types.Operator, timeRange types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) {
85+
func (f *FunctionCall) OperatorFactory(materializer *planning.Materializer, timeRange types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) {
8686
fnc, ok := functions.RegisteredFunctions[f.Function]
8787
if !ok {
8888
return nil, compat.NewNotSupportedError(fmt.Sprintf("'%v' function", f.Function.PromQLName()))
8989
}
9090

91+
children := make([]types.Operator, 0, len(f.Args))
92+
for _, arg := range f.Args {
93+
o, err := materializer.ConvertNodeToOperator(arg, timeRange)
94+
if err != nil {
95+
return nil, err
96+
}
97+
98+
children = append(children, o)
99+
}
100+
91101
var absentLabels labels.Labels
92102

93103
if f.Function == functions.FUNCTION_ABSENT || f.Function == functions.FUNCTION_ABSENT_OVER_TIME {

pkg/streamingpromql/planning/core/matrix_selector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (m *MatrixSelector) ChildrenLabels() []string {
6161
return nil
6262
}
6363

64-
func (m *MatrixSelector) OperatorFactory(_ []types.Operator, timeRange types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) {
64+
func (m *MatrixSelector) OperatorFactory(_ *planning.Materializer, timeRange types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) {
6565
matchers, err := LabelMatchersToPrometheusType(m.Matchers)
6666
if err != nil {
6767
return nil, err

pkg/streamingpromql/planning/core/number_literal.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (n *NumberLiteral) ChildrenLabels() []string {
5656
return nil
5757
}
5858

59-
func (n *NumberLiteral) OperatorFactory(_ []types.Operator, timeRange types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) {
59+
func (n *NumberLiteral) OperatorFactory(_ *planning.Materializer, timeRange types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) {
6060
o := scalars.NewScalarConstant(n.Value, timeRange, params.MemoryConsumptionTracker, n.ExpressionPosition.ToPrometheusType())
6161

6262
return planning.NewSingleUseOperatorFactory(o), nil

pkg/streamingpromql/planning/core/string_literal.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (s *StringLiteral) ChildrenLabels() []string {
5656
return nil
5757
}
5858

59-
func (s *StringLiteral) OperatorFactory(_ []types.Operator, _ types.QueryTimeRange, _ *planning.OperatorParameters) (planning.OperatorFactory, error) {
59+
func (s *StringLiteral) OperatorFactory(_ *planning.Materializer, _ types.QueryTimeRange, _ *planning.OperatorParameters) (planning.OperatorFactory, error) {
6060
o := operators.NewStringLiteral(s.Value, s.ExpressionPosition.ToPrometheusType())
6161

6262
return planning.NewSingleUseOperatorFactory(o), nil

pkg/streamingpromql/planning/core/subquery.go

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,6 @@ func (s *Subquery) Describe() string {
4848
}
4949

5050
func (s *Subquery) ChildrenTimeRange(timeRange types.QueryTimeRange) types.QueryTimeRange {
51-
return SubqueryTimeRange(timeRange, s.Range, s.Step, s.Timestamp, s.Offset)
52-
}
53-
54-
// FIXME: inline this into the method above once we no longer support directly converting from PromQL expressions to operators
55-
func SubqueryTimeRange(parentTimeRange types.QueryTimeRange, rng time.Duration, step time.Duration, ts *time.Time, offset time.Duration) types.QueryTimeRange {
5651
// Subqueries are evaluated as a single range query with steps aligned to Unix epoch time 0.
5752
// They are not evaluated as queries aligned to the individual step timestamps.
5853
// See https://www.robustperception.io/promql-subqueries-and-alignment/ for an explanation.
@@ -67,24 +62,23 @@ func SubqueryTimeRange(parentTimeRange types.QueryTimeRange, rng time.Duration,
6762
// This is relatively uncommon, and Prometheus' engine does the same thing. In the future, we
6863
// could be smarter about this if it turns out to be a big problem.
6964

70-
start := parentTimeRange.StartT
71-
end := parentTimeRange.EndT
72-
stepMilliseconds := step.Milliseconds()
65+
start := timeRange.StartT
66+
end := timeRange.EndT
67+
stepMilliseconds := s.Step.Milliseconds()
7368

74-
if ts != nil {
75-
start = timestamp.FromTime(*ts)
69+
if s.Timestamp != nil {
70+
start = timestamp.FromTime(*s.Timestamp)
7671
end = start
7772
}
7873

7974
// Find the first timestamp inside the subquery range that is aligned to the step.
80-
alignedStart := stepMilliseconds * ((start - offset.Milliseconds() - rng.Milliseconds()) / stepMilliseconds)
81-
if alignedStart < start-offset.Milliseconds()-rng.Milliseconds() {
75+
alignedStart := stepMilliseconds * ((start - s.Offset.Milliseconds() - s.Range.Milliseconds()) / stepMilliseconds)
76+
if alignedStart < start-s.Offset.Milliseconds()-s.Range.Milliseconds() {
8277
alignedStart += stepMilliseconds
8378
}
8479

85-
end = end - offset.Milliseconds()
86-
87-
return types.NewRangeQueryTimeRange(timestamp.Time(alignedStart), timestamp.Time(end), step)
80+
end = end - s.Offset.Milliseconds()
81+
return types.NewRangeQueryTimeRange(timestamp.Time(alignedStart), timestamp.Time(end), s.Step)
8882
}
8983

9084
func (s *Subquery) Details() proto.Message {
@@ -124,16 +118,14 @@ func (s *Subquery) ChildrenLabels() []string {
124118
return []string{""}
125119
}
126120

127-
func (s *Subquery) OperatorFactory(children []types.Operator, timeRange types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) {
128-
if len(children) != 1 {
129-
return nil, fmt.Errorf("expected exactly 1 child for Subquery, got %v", len(children))
121+
func (s *Subquery) OperatorFactory(materializer *planning.Materializer, timeRange types.QueryTimeRange, params *planning.OperatorParameters) (planning.OperatorFactory, error) {
122+
innerTimeRange := s.ChildrenTimeRange(timeRange)
123+
inner, err := materializer.ConvertNodeToInstantVectorOperator(s.Inner, innerTimeRange)
124+
if err != nil {
125+
return nil, fmt.Errorf("could not create inner operator for Subquery: %w", err)
130126
}
131127

132-
inner, ok := children[0].(types.InstantVectorOperator)
133-
if !ok {
134-
return nil, fmt.Errorf("expected InstantVectorOperator as child of Subquery, got %T", children[0])
135-
}
136-
o, err := operators.NewSubquery(inner, timeRange, s.ChildrenTimeRange(timeRange), TimestampFromTime(s.Timestamp), s.Offset, s.Range, s.ExpressionPosition.ToPrometheusType(), params.MemoryConsumptionTracker)
128+
o, err := operators.NewSubquery(inner, timeRange, innerTimeRange, TimestampFromTime(s.Timestamp), s.Offset, s.Range, s.ExpressionPosition.ToPrometheusType(), params.MemoryConsumptionTracker)
137129
if err != nil {
138130
return nil, err
139131
}

0 commit comments

Comments
 (0)