Skip to content

Commit c24675f

Browse files
authored
chore: adds separate v2 engine config section (#19423)
1 parent 815072f commit c24675f

File tree

9 files changed

+64
-53
lines changed

9 files changed

+64
-53
lines changed

docs/sources/shared/configuration.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4839,39 +4839,40 @@ engine:
48394839
# CLI flag: -querier.engine.max-count-min-sketch-heap-size
48404840
[max_count_min_sketch_heap_size: <int> | default = 10000]
48414841
4842+
engine_v2:
48424843
# Experimental: Enable next generation query engine for supported queries.
4843-
# CLI flag: -querier.engine.enable-v2-engine
4844-
[enable_v2_engine: <boolean> | default = false]
4844+
# CLI flag: -querier.engine-v2.enable
4845+
[enable: <boolean> | default = false]
48454846
48464847
# Experimental: Batch size of the next generation query engine.
4847-
# CLI flag: -querier.engine.batch-size
4848+
# CLI flag: -querier.engine-v2.batch-size
48484849
[batch_size: <int> | default = 100]
48494850
48504851
# Experimental: The number of inputs that are prefetched simultaneously by any
48514852
# Merge node. A value of 0 means that only the currently processed input is
48524853
# prefetched, 1 means that only the next input is prefetched, and so on. A
48534854
# negative value means that all inputs are be prefetched in parallel.
4854-
# CLI flag: -querier.engine.merge-prefetch-count
4855+
# CLI flag: -querier.engine-v2.merge-prefetch-count
48554856
[merge_prefetch_count: <int> | default = 0]
48564857
48574858
# Configures how to read byte ranges from object storage when using the V2
48584859
# engine.
48594860
range_reads:
48604861
# Experimental: maximum number of parallel reads
4861-
# CLI flag: -querier.engine.range-reads.max-parallelism
4862+
# CLI flag: -querier.engine-v2.range-reads.max-parallelism
48624863
[max_parallelism: <int> | default = 10]
48634864
48644865
# Experimental: maximum distance (in bytes) between ranges that causes them
48654866
# to be coalesced into a single range
4866-
# CLI flag: -querier.engine.range-reads.coalesce-size
4867+
# CLI flag: -querier.engine-v2.range-reads.coalesce-size
48674868
[coalesce_size: <int> | default = 1048576]
48684869
48694870
# Experimental: maximum size of a byte range
4870-
# CLI flag: -querier.engine.range-reads.max-range-size
4871+
# CLI flag: -querier.engine-v2.range-reads.max-range-size
48714872
[max_range_size: <int> | default = 8388608]
48724873
48734874
# Experimental: minimum size of a byte range
4874-
# CLI flag: -querier.engine.range-reads.min-range-size
4875+
# CLI flag: -querier.engine-v2.range-reads.min-range-size
48754876
[min_range_size: <int> | default = 1048576]
48764877
48774878
# The maximum number of queries that can be simultaneously processed by the

pkg/engine/engine.go

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package engine
22

33
import (
44
"context"
5+
"flag"
56
"fmt"
67
"strings"
78
"time"
@@ -35,21 +36,21 @@ var tracer = otel.Tracer("pkg/engine")
3536
var ErrNotSupported = errors.New("feature not supported in new query engine")
3637

3738
// New creates a new instance of the query engine that implements the [logql.Engine] interface.
38-
func New(opts logql.EngineOpts, cfg metastore.Config, bucket objstore.Bucket, limits logql.Limits, reg prometheus.Registerer, logger log.Logger) *QueryEngine {
39+
func New(cfg Config, metastoreCfg metastore.Config, bucket objstore.Bucket, limits logql.Limits, reg prometheus.Registerer, logger log.Logger) *QueryEngine {
3940
var ms metastore.Metastore
4041
if bucket != nil {
4142
indexBucket := bucket
42-
if cfg.IndexStoragePrefix != "" {
43-
indexBucket = objstore.NewPrefixedBucket(bucket, cfg.IndexStoragePrefix)
43+
if metastoreCfg.IndexStoragePrefix != "" {
44+
indexBucket = objstore.NewPrefixedBucket(bucket, metastoreCfg.IndexStoragePrefix)
4445
}
4546
ms = metastore.NewObjectMetastore(indexBucket, logger, reg)
4647
}
4748

48-
if opts.BatchSize <= 0 {
49-
panic(fmt.Sprintf("invalid batch size for query engine. must be greater than 0, got %d", opts.BatchSize))
49+
if cfg.BatchSize <= 0 {
50+
panic(fmt.Sprintf("invalid batch size for query engine. must be greater than 0, got %d", cfg.BatchSize))
5051
}
51-
if opts.RangeConfig.IsZero() {
52-
opts.RangeConfig = rangeio.DefaultConfig
52+
if cfg.RangeConfig.IsZero() {
53+
cfg.RangeConfig = rangeio.DefaultConfig
5354
}
5455

5556
return &QueryEngine{
@@ -58,18 +59,40 @@ func New(opts logql.EngineOpts, cfg metastore.Config, bucket objstore.Bucket, li
5859
limits: limits,
5960
metastore: ms,
6061
bucket: bucket,
61-
opts: opts,
62+
cfg: cfg,
6263
}
6364
}
6465

66+
// Config holds the configuration options to use with the next generation Loki Query Engine.
67+
type Config struct {
68+
// Enable the next generation Loki Query Engine for supported queries.
69+
Enable bool `yaml:"enable" category:"experimental"`
70+
71+
// Batch size of the v2 execution engine.
72+
BatchSize int `yaml:"batch_size" category:"experimental"`
73+
74+
// MergePrefetchCount controls the number of inputs that are prefetched simultaneously by any Merge node.
75+
MergePrefetchCount int `yaml:"merge_prefetch_count" category:"experimental"`
76+
77+
// RangeConfig determines how to optimize range reads in the V2 engine.
78+
RangeConfig rangeio.Config `yaml:"range_reads" category:"experimental" doc:"description=Configures how to read byte ranges from object storage when using the V2 engine."`
79+
}
80+
81+
func (opts *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
82+
f.BoolVar(&opts.Enable, prefix+"enable", false, "Experimental: Enable next generation query engine for supported queries.")
83+
f.IntVar(&opts.BatchSize, prefix+"batch-size", 100, "Experimental: Batch size of the next generation query engine.")
84+
f.IntVar(&opts.MergePrefetchCount, prefix+"merge-prefetch-count", 0, "Experimental: The number of inputs that are prefetched simultaneously by any Merge node. A value of 0 means that only the currently processed input is prefetched, 1 means that only the next input is prefetched, and so on. A negative value means that all inputs are be prefetched in parallel.")
85+
opts.RangeConfig.RegisterFlags(prefix+"range-reads.", f)
86+
}
87+
6588
// QueryEngine combines logical planning, physical planning, and execution to evaluate LogQL queries.
6689
type QueryEngine struct {
6790
logger log.Logger
6891
metrics *metrics
6992
limits logql.Limits
7093
metastore metastore.Metastore
7194
bucket objstore.Bucket
72-
opts logql.EngineOpts
95+
cfg Config
7396
}
7497

7598
// Query implements [logql.Engine].
@@ -115,7 +138,7 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo
115138

116139
// Inject the range config into the context for any calls to
117140
// [rangeio.ReadRanges] to make use of.
118-
ctx = rangeio.WithConfig(ctx, &e.opts.RangeConfig)
141+
ctx = rangeio.WithConfig(ctx, &e.cfg.RangeConfig)
119142

120143
logicalPlan, err := func() (*logical.Plan, error) {
121144
_, span := tracer.Start(ctx, "QueryEngine.Execute.logicalPlan")
@@ -196,8 +219,8 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo
196219
timer := prometheus.NewTimer(e.metrics.execution)
197220

198221
cfg := executor.Config{
199-
BatchSize: int64(e.opts.BatchSize),
200-
MergePrefetchCount: e.opts.MergePrefetchCount,
222+
BatchSize: int64(e.cfg.BatchSize),
223+
MergePrefetchCount: e.cfg.MergePrefetchCount,
201224
Bucket: e.bucket,
202225
}
203226
pipeline := executor.Run(ctx, cfg, physicalPlan, logger)

pkg/engine/internal/semconv/identifier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func MustParseFQN(fqn string) *Identifier {
126126
type scopeOrigin string
127127
type scopeType string
128128

129-
// Scope decribes the origin and type of an identifier.
129+
// Scope describes the origin and type of an identifier.
130130
type Scope struct {
131131
Origin scopeOrigin
132132
Type scopeType

pkg/logql/bench/store_dataobj_v2_engine.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,16 @@ type DataObjV2EngineStore struct {
2828
// NewDataObjV2EngineStore creates a new store that uses the v2 dataobj engine.
2929
func NewDataObjV2EngineStore(dir string, tenantID string) (*DataObjV2EngineStore, error) {
3030
storageDir := filepath.Join(dir, storageDir)
31-
return dataobjV2StoreWithOpts(storageDir, tenantID, logql.EngineOpts{
32-
EnableV2Engine: true,
33-
BatchSize: 512,
34-
RangeConfig: rangeio.DefaultConfig,
31+
return dataobjV2StoreWithOpts(storageDir, tenantID, engine.Config{
32+
Enable: true,
33+
BatchSize: 512,
34+
RangeConfig: rangeio.DefaultConfig,
3535
}, metastore.Config{
3636
IndexStoragePrefix: "index/v0",
3737
})
3838
}
3939

40-
func dataobjV2StoreWithOpts(dataDir string, tenantID string, engineOpts logql.EngineOpts, cfg metastore.Config) (*DataObjV2EngineStore, error) {
40+
func dataobjV2StoreWithOpts(dataDir string, tenantID string, cfg engine.Config, metastoreCfg metastore.Config) (*DataObjV2EngineStore, error) {
4141
logger := log.NewNopLogger()
4242

4343
// Setup filesystem client as objstore.Bucket
@@ -57,7 +57,7 @@ func dataobjV2StoreWithOpts(dataDir string, tenantID string, engineOpts logql.En
5757
// or derived from the bucket structure if it's multi-tenant aware.
5858
// This might require adjustment based on how pkg/engine/engine actually handles multi-tenancy
5959
// with a generic objstore.Bucket.
60-
queryEngine := engine.New(engineOpts, cfg, bucketClient, logql.NoLimits, nil, logger)
60+
queryEngine := engine.New(cfg, metastoreCfg, bucketClient, logql.NoLimits, nil, logger)
6161

6262
return &DataObjV2EngineStore{
6363
engine: queryEngine,

pkg/logql/engine.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
"github.com/grafana/loki/v3/pkg/util/constants"
3838
"github.com/grafana/loki/v3/pkg/util/httpreq"
3939
logutil "github.com/grafana/loki/v3/pkg/util/log"
40-
"github.com/grafana/loki/v3/pkg/util/rangeio"
4140
"github.com/grafana/loki/v3/pkg/util/server"
4241
"github.com/grafana/loki/v3/pkg/util/validation"
4342
)
@@ -161,31 +160,14 @@ type EngineOpts struct {
161160
// MaxCountMinSketchHeapSize is the maximum number of labels the heap for a topk query using a count min sketch
162161
// can track. This impacts the memory usage and accuracy of a sharded probabilistic topk query.
163162
MaxCountMinSketchHeapSize int `yaml:"max_count_min_sketch_heap_size"`
164-
165-
// Enable the next generation Loki Query Engine for supported queries.
166-
EnableV2Engine bool `yaml:"enable_v2_engine" category:"experimental"`
167-
168-
// Batch size of the v2 execution engine.
169-
BatchSize int `yaml:"batch_size" category:"experimental"`
170-
171-
// MergePrefetchCount controls the number of inputs that are prefetched simultaneously by any Merge node.
172-
MergePrefetchCount int `yaml:"merge_prefetch_count" category:"experimental"`
173-
174-
// RangeConfig determines how to optimize range reads in the V2 engine.
175-
RangeConfig rangeio.Config `yaml:"range_reads" category:"experimental" doc:"description=Configures how to read byte ranges from object storage when using the V2 engine."`
176163
}
177164

178165
func (opts *EngineOpts) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
179166
f.DurationVar(&opts.MaxLookBackPeriod, prefix+"max-lookback-period", 30*time.Second, "The maximum amount of time to look back for log lines. Used only for instant log queries.")
180167
f.IntVar(&opts.MaxCountMinSketchHeapSize, prefix+"max-count-min-sketch-heap-size", 10_000, "The maximum number of labels the heap of a topk query using a count min sketch can track.")
181-
f.BoolVar(&opts.EnableV2Engine, prefix+"enable-v2-engine", false, "Experimental: Enable next generation query engine for supported queries.")
182-
f.IntVar(&opts.BatchSize, prefix+"batch-size", 100, "Experimental: Batch size of the next generation query engine.")
183-
f.IntVar(&opts.MergePrefetchCount, prefix+"merge-prefetch-count", 0, "Experimental: The number of inputs that are prefetched simultaneously by any Merge node. A value of 0 means that only the currently processed input is prefetched, 1 means that only the next input is prefetched, and so on. A negative value means that all inputs are be prefetched in parallel.")
184168

185169
// Log executing query by default
186170
opts.LogExecutingQuery = true
187-
188-
opts.RangeConfig.RegisterFlags(prefix+"range-reads.", f)
189171
}
190172

191173
func (opts *EngineOpts) applyDefault() {

pkg/loki/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
577577
}
578578

579579
var store objstore.Bucket
580-
if t.Cfg.Querier.Engine.EnableV2Engine {
580+
if t.Cfg.Querier.EngineV2.Enable {
581581
store, err = t.createDataObjBucket("dataobj-querier")
582582
if err != nil {
583583
return nil, err

pkg/querier/http.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,19 @@ type QuerierAPI struct {
6363

6464
// NewQuerierAPI returns an instance of the QuerierAPI.
6565
func NewQuerierAPI(cfg Config, mCfg metastore.Config, querier Querier, limits querier_limits.Limits, store objstore.Bucket, reg prometheus.Registerer, logger log.Logger) *QuerierAPI {
66-
return &QuerierAPI{
66+
q := &QuerierAPI{
6767
cfg: cfg,
6868
limits: limits,
6969
querier: querier,
7070
engineV1: logql.NewEngine(cfg.Engine, querier, limits, logger),
71-
engineV2: engine.New(cfg.Engine, mCfg, store, limits, reg, logger),
7271
logger: logger,
7372
}
73+
74+
if cfg.EngineV2.Enable {
75+
q.engineV2 = engine.New(cfg.EngineV2, mCfg, store, limits, reg, logger)
76+
}
77+
78+
return q
7479
}
7580

7681
// RangeQueryHandler is a http.HandlerFunc for range queries and legacy log queries
@@ -87,7 +92,7 @@ func (q *QuerierAPI) RangeQueryHandler(ctx context.Context, req *queryrange.Loki
8792
return result, err
8893
}
8994

90-
if q.cfg.Engine.EnableV2Engine && hasDataObjectsAvailable(params.Start(), params.End()) {
95+
if q.cfg.EngineV2.Enable && hasDataObjectsAvailable(params.Start(), params.End()) {
9196
query := q.engineV2.Query(params)
9297
result, err = query.Exec(ctx)
9398
if err == nil {
@@ -126,7 +131,7 @@ func (q *QuerierAPI) InstantQueryHandler(ctx context.Context, req *queryrange.Lo
126131
return logqlmodel.Result{}, err
127132
}
128133

129-
if q.cfg.Engine.EnableV2Engine && hasDataObjectsAvailable(params.Start(), params.End()) {
134+
if q.cfg.EngineV2.Enable && hasDataObjectsAvailable(params.Start(), params.End()) {
130135
query := q.engineV2.Query(params)
131136
result, err := query.Exec(ctx)
132137
if err == nil {

pkg/querier/querier.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"golang.org/x/sync/errgroup"
2626
"google.golang.org/grpc/health/grpc_health_v1"
2727

28+
"github.com/grafana/loki/v3/pkg/engine"
2829
"github.com/grafana/loki/v3/pkg/indexgateway"
2930
"github.com/grafana/loki/v3/pkg/iter"
3031
"github.com/grafana/loki/v3/pkg/loghttp"
@@ -55,6 +56,7 @@ type Config struct {
5556
ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"`
5657
QueryIngestersWithin time.Duration `yaml:"query_ingesters_within,omitempty"`
5758
Engine logql.EngineOpts `yaml:"engine,omitempty"`
59+
EngineV2 engine.Config `yaml:"engine_v2,omitempty" category:"experimental"`
5860
MaxConcurrent int `yaml:"max_concurrent"`
5961
QueryStoreOnly bool `yaml:"query_store_only"`
6062
QueryIngesterOnly bool `yaml:"query_ingester_only"`
@@ -76,6 +78,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
7678
f.DurationVar(&cfg.ExtraQueryDelay, prefix+"extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
7779
f.DurationVar(&cfg.QueryIngestersWithin, prefix+"query-ingesters-within", 3*time.Hour, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
7880
cfg.Engine.RegisterFlagsWithPrefix(prefix+"engine.", f)
81+
cfg.EngineV2.RegisterFlagsWithPrefix(prefix+"engine-v2.", f)
7982
f.IntVar(&cfg.MaxConcurrent, prefix+"max-concurrent", 4, "The maximum number of queries that can be simultaneously processed by the querier.")
8083
f.BoolVar(&cfg.QueryStoreOnly, prefix+"query-store-only", false, "Only query the store, and not attempt any ingesters. This is useful for running a standalone querier pool operating only against stored data.")
8184
f.BoolVar(&cfg.QueryIngesterOnly, prefix+"query-ingester-only", false, "When true, queriers only query the ingesters, and not stored data. This is useful when the object store is unavailable.")

pkg/querier/querier_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,6 @@ func TestQuerier_Label_QueryTimeoutConfigFlag(t *testing.T) {
8989
func mockQuerierConfig() Config {
9090
return Config{
9191
TailMaxDuration: 1 * time.Minute,
92-
Engine: logql.EngineOpts{
93-
BatchSize: 1,
94-
},
9592
}
9693
}
9794

0 commit comments

Comments
 (0)