Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
* [FEATURE] Ingester: Added experimental support to run ingesters with no tokens in the ring when ingest storage is enabled. You can set `-ingester.ring.num-tokens=0` to enable this feature. #14024
* [FEATURE] Store-gateway: Add `-store-gateway.sharding-ring.excluded-zones` flag to exclude specific zones from the store-gateway ring. #14120
* [FEATURE] Ingest storage: Add `-ingest-storage.kafka.sasl-mechanism` flag supporting more ways to authenticate with Kafka. #14307 #14344
* [FEATURE] MQE: Add experimental support for splitting and caching intermediate results for functions over range vectors in instant queries. #13472 #14479 #14506 #14499
* [FEATURE] MQE: Add experimental support for splitting and caching intermediate results for functions over range vectors in instant queries. #13472 #14479 #14506 #14499 #14517
* [ENHANCEMENT] Memberlist: Add experimental propagation delay tracker to measure gossip propagation delay across the memberlist cluster. Enable with `-memberlist.propagation-delay-tracker.enabled=true`. #14312 #14406
* [ENHANCEMENT] Compactor: Add 0-100% jitter to the first compaction interval to spread compactions when multiple compactors start simultaneously. #14280
* [ENHANCEMENT] Compactor, Store-gateway: Remove experimental setting `-compactor.upload-sparse-index-headers` and always upload sparse index-headers. This improves lazy loading performance in the store-gateway. #13089 #13882
Expand Down
8 changes: 1 addition & 7 deletions pkg/streamingpromql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,8 @@ func NewEngineWithCache(opts EngineOpts, metrics *stats.QueryMetrics, planner *Q
planning.NODE_TYPE_STEP_INVARIANT_EXPRESSION: planning.NodeMaterializerFunc[*core.StepInvariantExpression](core.MaterializeStepInvariantExpression),
planning.NODE_TYPE_MULTI_AGGREGATION_GROUP: planning.NodeMaterializerFunc[*multiaggregation.MultiAggregationGroup](multiaggregation.MaterializeMultiAggregationGroup),
planning.NODE_TYPE_MULTI_AGGREGATION_INSTANCE: planning.NodeMaterializerFunc[*multiaggregation.MultiAggregationInstance](multiaggregation.MaterializeMultiAggregationInstance),
}

if opts.RangeVectorSplitting.Enabled {
nodeMaterializers[planning.NODE_TYPE_SPLIT_FUNCTION_OVER_RANGE_VECTOR] = rangevectorsplitting.NewMaterializer(intermediateCache)
} else {
nodeMaterializers[planning.NODE_TYPE_SPLIT_FUNCTION_OVER_RANGE_VECTOR] = planning.NewDisabledMaterializer(
errors.New("split function node is present but range vector splitting is disabled, this could happen if splitting is enabled on the query-frontend but not in the querier"),
)
planning.NODE_TYPE_SPLIT_FUNCTION_OVER_RANGE_VECTOR: rangevectorsplitting.NewMaterializer(opts.RangeVectorSplitting.Enabled, intermediateCache, opts.Logger),
}

return &Engine{
Expand Down
17 changes: 14 additions & 3 deletions pkg/streamingpromql/optimize/plan/rangevectorsplitting/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"
Expand Down Expand Up @@ -143,14 +145,18 @@ func (s *SplitFunctionCall) MinimumRequiredPlanVersion() planning.QueryPlanVersi
}

type Materializer struct {
cache *cache.CacheFactory
enabled bool
cache *cache.CacheFactory
logger log.Logger
}

var _ planning.NodeMaterializer = &Materializer{}

func NewMaterializer(cache *cache.CacheFactory) *Materializer {
func NewMaterializer(enabled bool, cache *cache.CacheFactory, logger log.Logger) *Materializer {
return &Materializer{
cache: cache,
enabled: enabled,
cache: cache,
logger: logger,
}
}

Expand All @@ -163,6 +169,11 @@ func (m Materializer) Materialize(n planning.Node, materializer *planning.Materi
return nil, fmt.Errorf("unexpected type passed to materializer: expected SplitFunctionCall, got %T", n)
}

if !m.enabled {
level.Warn(m.logger).Log("msg", "split function node is present but range vector splitting is disabled, falling back to unsplit execution; this can happen if splitting is enabled on the query-frontend but not yet on the querier")
return materializer.FactoryForNode(s.Inner, timeRange)
}

splitFactory, exists := SplitFunctionRegistry[s.Inner.Function]
if !exists {
return nil, fmt.Errorf("function %v does not support range vector splitting", s.Inner.Function.PromQLName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,8 @@ func (o *OptimizationPass) trySplitFunction(ctx context.Context, functionCall *c
SplitRanges: splitRanges,
InnerNodeCacheKey: inner.SplittingCacheKey(),
},
Inner: functionCall,
}
if err := n.SetChildren([]planning.Node{functionCall}); err != nil {
return nil, "", err
}

return n, "", nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,7 @@ func TestQuerySplitting_WithCSE(t *testing.T) {
ts := baseT.Add(6 * time.Hour)
ctx := user.InjectOrgID(context.Background(), "test-user")

opts := streamingpromql.NewTestEngineOpts()
opts.RangeVectorSplitting.Enabled = true
opts.RangeVectorSplitting.SplitInterval = 2 * time.Hour
opts := defaultSplittingOpts()
require.True(t, opts.EnableCommonSubexpressionElimination, "CSE should be enabled")

planner, err := streamingpromql.NewQueryPlanner(opts, streamingpromql.NewMaximumSupportedVersionQueryPlanVersionProvider())
Expand Down Expand Up @@ -442,11 +440,7 @@ func TestQuerySplitting_ProjectionNotApplied(t *testing.T) {
ctx := context.Background()
evalTime := timestamp.Time(0).Add(6 * time.Hour)

opts := streamingpromql.NewTestEngineOpts()
opts.RangeVectorSplitting.Enabled = true
opts.RangeVectorSplitting.SplitInterval = 2 * time.Hour

planner, err := streamingpromql.NewQueryPlanner(opts, streamingpromql.NewMaximumSupportedVersionQueryPlanVersionProvider())
planner, err := streamingpromql.NewQueryPlanner(defaultSplittingOpts(), streamingpromql.NewMaximumSupportedVersionQueryPlanVersionProvider())
require.NoError(t, err)

p, err := planner.NewQueryPlan(ctx, `sum by (job) (rate(some_metric[5h]))`, types.NewInstantQueryTimeRange(evalTime), false, &streamingpromql.NoopPlanningObserver{})
Expand Down Expand Up @@ -620,9 +614,7 @@ func TestQuerySplitting_WithOOOWindow(t *testing.T) {
backend := newTestCacheBackend()
irCache := cache.NewCacheFactoryWithBackend(backend, streamingpromql.NewStaticQueryLimitsProvider(), prometheus.NewRegistry(), log.NewNopLogger())

opts := streamingpromql.NewTestEngineOpts()
opts.RangeVectorSplitting.Enabled = true
opts.RangeVectorSplitting.SplitInterval = 2 * time.Hour
opts := defaultSplittingOpts()
limits := streamingpromql.NewStaticQueryLimitsProvider()
limits.MaxOutOfOrderTimeWindow = 3 * time.Hour
opts.Limits = limits
Expand Down Expand Up @@ -1035,6 +1027,32 @@ func TestQuerySplitting_SubquerySpinoff_SkipsSplitting(t *testing.T) {
verifyCacheStats(t, testCache, 0, 0, 0)
}

func TestQuerySplitting_SplittingDisabledOnQuerier_FallsBackToRegularNode(t *testing.T) {
// Simulate query-frontend with splitting enabled
plannerOpts := defaultSplittingOpts()
planner, err := streamingpromql.NewQueryPlanner(plannerOpts, streamingpromql.NewMaximumSupportedVersionQueryPlanVersionProvider())
require.NoError(t, err)

// Simulate querier with splitting disabled
engineOpts := streamingpromql.NewTestEngineOpts()
engineOpts.RangeVectorSplitting.Enabled = false
engine, err := streamingpromql.NewEngineWithCache(engineOpts, stats.NewQueryMetrics(nil), planner, nil)
require.NoError(t, err)

promStorage := promqltest.LoadedStorage(t, `
load 10m
some_metric{env="1"} 0+1x60
`)
t.Cleanup(func() { require.NoError(t, promStorage.Close()) })

baseT := timestamp.Time(0)
ts := baseT.Add(6 * time.Hour)

result := runInstantQuery(t, engine, promStorage, "sum_over_time(some_metric[5h])", ts)
require.NoError(t, result.Err)
require.Equal(t, expectedScalarResult(ts, 645, "env", "1"), result)
}

func createSplittingEngineWithCache(t *testing.T, registry *prometheus.Registry, splitInterval time.Duration, enableDelayedNameRemoval bool, enableEliminateDeduplicateAndMerge bool) (promql.QueryEngine, *testCacheBackend) {
t.Helper()

Expand Down Expand Up @@ -1073,9 +1091,7 @@ func setupEngineAndCache(t *testing.T) (*testCacheBackend, promql.QueryEngine) {
backend := newTestCacheBackend()
irCache := cache.NewCacheFactoryWithBackend(backend, streamingpromql.NewStaticQueryLimitsProvider(), prometheus.NewRegistry(), log.NewNopLogger())

opts := streamingpromql.NewTestEngineOpts()
opts.RangeVectorSplitting.Enabled = true
opts.RangeVectorSplitting.SplitInterval = 2 * time.Hour
opts := defaultSplittingOpts()

queryPlanner, err := streamingpromql.NewQueryPlanner(opts, streamingpromql.NewMaximumSupportedVersionQueryPlanVersionProvider())
require.NoError(t, err)
Expand Down Expand Up @@ -1123,6 +1139,13 @@ func verifyCacheStats(t *testing.T, backend *testCacheBackend, expectedGets, exp
require.Equal(t, expectedSets, backend.sets, "Expected %d cache sets, got %d", expectedSets, backend.sets)
}

func defaultSplittingOpts() streamingpromql.EngineOpts {
opts := streamingpromql.NewTestEngineOpts()
opts.RangeVectorSplitting.Enabled = true
opts.RangeVectorSplitting.SplitInterval = 2 * time.Hour
return opts
}

type storageQueryRange struct {
mint, maxt int64
}
Expand Down
14 changes: 0 additions & 14 deletions pkg/streamingpromql/planning/materialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,3 @@ func (f RangeAwareNodeMaterializerFunc[T]) Materialize(n Node, materializer *Mat

return f(node, materializer, timeRange, params, overrideRangeParams)
}

type DisabledMaterializer struct {
err error
}

var _ NodeMaterializer = DisabledMaterializer{}

func NewDisabledMaterializer(err error) DisabledMaterializer {
return DisabledMaterializer{err: err}
}

func (d DisabledMaterializer) Materialize(Node, *Materializer, types.QueryTimeRange, *OperatorParameters, RangeParams) (OperatorFactory, error) {
return nil, d.err
}
Loading