Skip to content

Commit d6f50ca

Browse files
authored
[TSDB] Use index sampling to determine shard factor (grafana#6396)
* use index sampling to determine shard factor * logging/tracing + better parallelism planning * queryrange downstreamer now checks max_query_parallelism * lint * handles interval, offset in tsdb planning and adds logging * fixes ns->ms confusion in index stats proto * handle zero shard value without panics * shardmapper will downstream a single unsharded query * uses concat expr with no shards to avoid query parsing errors * better logging * fixes wrong Size() method call and rounds to nearest KB when calculating chunk size * humanize bytes in log line * only adds defaultLookback to index sampling when interval is zero * removes comment * more logging * adjust through correctly * adds query length for index queries
1 parent 01fe534 commit d6f50ca

File tree

16 files changed

+394
-69
lines changed

16 files changed

+394
-69
lines changed

pkg/logql/downstream.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ operand expression can take advantage of the parallel execution model:
4343
// querying the underlying backend shards individually and re-aggregating them.
4444
type DownstreamEngine struct {
4545
logger log.Logger
46-
timeout time.Duration
46+
opts EngineOpts
4747
downstreamable Downstreamable
4848
limits Limits
4949
}
@@ -53,19 +53,21 @@ func NewDownstreamEngine(opts EngineOpts, downstreamable Downstreamable, limits
5353
opts.applyDefault()
5454
return &DownstreamEngine{
5555
logger: logger,
56-
timeout: opts.Timeout,
56+
opts: opts,
5757
downstreamable: downstreamable,
5858
limits: limits,
5959
}
6060
}
6161

62+
func (ng *DownstreamEngine) Opts() EngineOpts { return ng.opts }
63+
6264
// Query constructs a Query
63-
func (ng *DownstreamEngine) Query(p Params, mapped syntax.Expr) Query {
65+
func (ng *DownstreamEngine) Query(ctx context.Context, p Params, mapped syntax.Expr) Query {
6466
return &query{
6567
logger: ng.logger,
66-
timeout: ng.timeout,
68+
timeout: ng.opts.Timeout,
6769
params: p,
68-
evaluator: NewDownstreamEvaluator(ng.downstreamable.Downstreamer()),
70+
evaluator: NewDownstreamEvaluator(ng.downstreamable.Downstreamer(ctx)),
6971
parse: func(_ context.Context, _ string) (syntax.Expr, error) {
7072
return mapped, nil
7173
},
@@ -158,7 +160,7 @@ func ParseShards(strs []string) (Shards, error) {
158160
}
159161

160162
type Downstreamable interface {
161-
Downstreamer() Downstreamer
163+
Downstreamer(context.Context) Downstreamer
162164
}
163165

164166
type DownstreamQuery struct {

pkg/logql/downstream_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,11 @@ func TestMappingEquivalence(t *testing.T) {
7878
qry := regular.Query(params)
7979
ctx := user.InjectOrgID(context.Background(), "fake")
8080

81-
mapper, err := NewShardMapper(shards, nilShardMetrics)
82-
require.Nil(t, err)
81+
mapper := NewShardMapper(ConstantShards(shards), nilShardMetrics)
8382
_, mapped, err := mapper.Parse(tc.query)
8483
require.Nil(t, err)
8584

86-
shardedQry := sharded.Query(params, mapped)
85+
shardedQry := sharded.Query(ctx, params, mapped)
8786

8887
res, err := qry.Exec(ctx)
8988
require.Nil(t, err)
@@ -331,7 +330,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
331330

332331
require.False(t, noop, "downstream engine cannot execute noop")
333332

334-
rangeQry := downstreamEngine.Query(params, rangeExpr)
333+
rangeQry := downstreamEngine.Query(ctx, params, rangeExpr)
335334
rangeRes, err := rangeQry.Exec(ctx)
336335
require.Nil(t, err)
337336

pkg/logql/shardmapper.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,20 @@ type ShardResolver interface {
1616
Shards(expr syntax.Expr) (int, error)
1717
}
1818

19-
type constantShards int
19+
type ConstantShards int
2020

21-
func (s constantShards) Shards(_ syntax.Expr) (int, error) { return int(s), nil }
21+
func (s ConstantShards) Shards(_ syntax.Expr) (int, error) { return int(s), nil }
2222

2323
type ShardMapper struct {
24-
shards constantShards
24+
shards ShardResolver
2525
metrics *MapperMetrics
2626
}
2727

28-
func NewShardMapper(shards int, metrics *MapperMetrics) (ShardMapper, error) {
29-
if shards < 2 {
30-
return ShardMapper{}, fmt.Errorf("cannot create ShardMapper with <2 shards. Received %d", shards)
31-
}
28+
func NewShardMapper(resolver ShardResolver, metrics *MapperMetrics) ShardMapper {
3229
return ShardMapper{
33-
shards: constantShards(shards),
30+
shards: resolver,
3431
metrics: metrics,
35-
}, nil
32+
}
3633
}
3734

3835
func NewShardMapperMetrics(registerer prometheus.Registerer) *MapperMetrics {
@@ -116,6 +113,14 @@ func (m ShardMapper) mapLogSelectorExpr(expr syntax.LogSelectorExpr, r *downstre
116113
if err != nil {
117114
return nil, err
118115
}
116+
if shards == 0 {
117+
return &ConcatLogSelectorExpr{
118+
DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{
119+
shard: nil,
120+
LogSelectorExpr: expr,
121+
},
122+
}, nil
123+
}
119124
for i := shards - 1; i >= 0; i-- {
120125
head = &ConcatLogSelectorExpr{
121126
DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{
@@ -139,6 +144,14 @@ func (m ShardMapper) mapSampleExpr(expr syntax.SampleExpr, r *downstreamRecorder
139144
if err != nil {
140145
return nil, err
141146
}
147+
if shards == 0 {
148+
return &ConcatSampleExpr{
149+
DownstreamSampleExpr: DownstreamSampleExpr{
150+
shard: nil,
151+
SampleExpr: expr,
152+
},
153+
}, nil
154+
}
142155
for i := shards - 1; i >= 0; i-- {
143156
head = &ConcatSampleExpr{
144157
DownstreamSampleExpr: DownstreamSampleExpr{

pkg/logql/shardmapper_test.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ func TestShardedStringer(t *testing.T) {
5151
}
5252

5353
func TestMapSampleExpr(t *testing.T) {
54-
m, err := NewShardMapper(2, nilShardMetrics)
55-
require.Nil(t, err)
54+
m := NewShardMapper(ConstantShards(2), nilShardMetrics)
5655

5756
for _, tc := range []struct {
5857
in syntax.SampleExpr
@@ -114,8 +113,7 @@ func TestMapSampleExpr(t *testing.T) {
114113
}
115114

116115
func TestMappingStrings(t *testing.T) {
117-
m, err := NewShardMapper(2, nilShardMetrics)
118-
require.Nil(t, err)
116+
m := NewShardMapper(ConstantShards(2), nilShardMetrics)
119117
for _, tc := range []struct {
120118
in string
121119
out string
@@ -279,8 +277,7 @@ func TestMappingStrings(t *testing.T) {
279277
}
280278

281279
func TestMapping(t *testing.T) {
282-
m, err := NewShardMapper(2, nilShardMetrics)
283-
require.Nil(t, err)
280+
m := NewShardMapper(ConstantShards(2), nilShardMetrics)
284281

285282
for _, tc := range []struct {
286283
in string

pkg/logql/syntax/ast.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,7 @@ type SampleExpr interface {
707707
// Selector is the LogQL selector to apply when retrieving logs.
708708
Selector() LogSelectorExpr
709709
Extractor() (SampleExtractor, error)
710-
MatcherGroups() [][]*labels.Matcher
710+
MatcherGroups() []MatcherRange
711711
Expr
712712
}
713713

@@ -754,10 +754,16 @@ func (e *RangeAggregationExpr) Selector() LogSelectorExpr {
754754
return e.Left.Left
755755
}
756756

757-
func (e *RangeAggregationExpr) MatcherGroups() [][]*labels.Matcher {
757+
func (e *RangeAggregationExpr) MatcherGroups() []MatcherRange {
758758
xs := e.Left.Left.Matchers()
759759
if len(xs) > 0 {
760-
return [][]*labels.Matcher{xs}
760+
return []MatcherRange{
761+
{
762+
Matchers: xs,
763+
Interval: e.Left.Interval,
764+
Offset: e.Left.Offset,
765+
},
766+
}
761767
}
762768
return nil
763769
}
@@ -880,7 +886,7 @@ func mustNewVectorAggregationExpr(left SampleExpr, operation string, gr *Groupin
880886
}
881887
}
882888

883-
func (e *VectorAggregationExpr) MatcherGroups() [][]*labels.Matcher {
889+
func (e *VectorAggregationExpr) MatcherGroups() []MatcherRange {
884890
return e.Left.MatcherGroups()
885891
}
886892

@@ -1005,7 +1011,7 @@ type BinOpExpr struct {
10051011
Opts *BinOpOptions
10061012
}
10071013

1008-
func (e *BinOpExpr) MatcherGroups() [][]*labels.Matcher {
1014+
func (e *BinOpExpr) MatcherGroups() []MatcherRange {
10091015
return append(e.SampleExpr.MatcherGroups(), e.RHS.MatcherGroups()...)
10101016
}
10111017

@@ -1391,7 +1397,7 @@ func (e *LiteralExpr) Shardable() bool { return true }
13911397
func (e *LiteralExpr) Walk(f WalkFn) { f(e) }
13921398
func (e *LiteralExpr) Pipeline() (log.Pipeline, error) { return log.NewNoopPipeline(), nil }
13931399
func (e *LiteralExpr) Matchers() []*labels.Matcher { return nil }
1394-
func (e *LiteralExpr) MatcherGroups() [][]*labels.Matcher { return nil }
1400+
func (e *LiteralExpr) MatcherGroups() []MatcherRange { return nil }
13951401
func (e *LiteralExpr) Extractor() (log.SampleExtractor, error) { return nil, nil }
13961402
func (e *LiteralExpr) Value() float64 { return e.Val }
13971403

@@ -1446,7 +1452,7 @@ func (e *LabelReplaceExpr) Selector() LogSelectorExpr {
14461452
return e.Left.Selector()
14471453
}
14481454

1449-
func (e *LabelReplaceExpr) MatcherGroups() [][]*labels.Matcher {
1455+
func (e *LabelReplaceExpr) MatcherGroups() []MatcherRange {
14501456
return e.Left.MatcherGroups()
14511457
}
14521458

@@ -1521,13 +1527,22 @@ var shardableOps = map[string]bool{
15211527
OpTypeMul: true,
15221528
}
15231529

1524-
func MatcherGroups(expr Expr) [][]*labels.Matcher {
1530+
type MatcherRange struct {
1531+
Matchers []*labels.Matcher
1532+
Interval, Offset time.Duration
1533+
}
1534+
1535+
func MatcherGroups(expr Expr) []MatcherRange {
15251536
switch e := expr.(type) {
15261537
case SampleExpr:
15271538
return e.MatcherGroups()
15281539
case LogSelectorExpr:
15291540
if xs := e.Matchers(); len(xs) > 0 {
1530-
return [][]*labels.Matcher{xs}
1541+
return []MatcherRange{
1542+
{
1543+
Matchers: xs,
1544+
},
1545+
}
15311546
}
15321547
return nil
15331548
default:

pkg/logql/syntax/ast_test.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package syntax
33
import (
44
"fmt"
55
"testing"
6+
"time"
67

78
"github.com/prometheus/prometheus/model/labels"
89
"github.com/prometheus/prometheus/promql"
@@ -182,19 +183,34 @@ func Test_SampleExpr_String(t *testing.T) {
182183
func TestMatcherGroups(t *testing.T) {
183184
for i, tc := range []struct {
184185
query string
185-
exp [][]*labels.Matcher
186+
exp []MatcherRange
186187
}{
187188
{
188189
query: `{job="foo"}`,
189-
exp: [][]*labels.Matcher{
190-
{labels.MustNewMatcher(labels.MatchEqual, "job", "foo")},
190+
exp: []MatcherRange{
191+
{
192+
Matchers: []*labels.Matcher{
193+
labels.MustNewMatcher(labels.MatchEqual, "job", "foo"),
194+
},
195+
},
191196
},
192197
},
193198
{
194-
query: `count_over_time({job="foo"}[5m]) / count_over_time({job="bar"}[5m])`,
195-
exp: [][]*labels.Matcher{
196-
{labels.MustNewMatcher(labels.MatchEqual, "job", "foo")},
197-
{labels.MustNewMatcher(labels.MatchEqual, "job", "bar")},
199+
query: `count_over_time({job="foo"}[5m]) / count_over_time({job="bar"}[5m] offset 10m)`,
200+
exp: []MatcherRange{
201+
{
202+
Interval: 5 * time.Minute,
203+
Matchers: []*labels.Matcher{
204+
labels.MustNewMatcher(labels.MatchEqual, "job", "foo"),
205+
},
206+
},
207+
{
208+
Interval: 5 * time.Minute,
209+
Offset: 10 * time.Minute,
210+
Matchers: []*labels.Matcher{
211+
labels.MustNewMatcher(labels.MatchEqual, "job", "bar"),
212+
},
213+
},
198214
},
199215
},
200216
} {

pkg/logql/test_utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ type MockDownstreamer struct {
212212
*Engine
213213
}
214214

215-
func (m MockDownstreamer) Downstreamer() Downstreamer { return m }
215+
func (m MockDownstreamer) Downstreamer(_ context.Context) Downstreamer { return m }
216216

217217
func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQuery) ([]logqlmodel.Result, error) {
218218
results := make([]logqlmodel.Result, 0, len(queries))

pkg/querier/queryrange/downstreamer.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66

77
"github.com/go-kit/log/level"
8+
"github.com/grafana/dskit/tenant"
89
"github.com/prometheus/prometheus/model/labels"
910
"github.com/prometheus/prometheus/promql"
1011
"github.com/prometheus/prometheus/promql/parser"
@@ -21,7 +22,8 @@ const (
2122
)
2223

2324
type DownstreamHandler struct {
24-
next queryrangebase.Handler
25+
limits Limits
26+
next queryrangebase.Handler
2527
}
2628

2729
func ParamsToLokiRequest(params logql.Params, shards logql.Shards) queryrangebase.Request {
@@ -54,8 +56,19 @@ func ParamsToLokiRequest(params logql.Params, shards logql.Shards) queryrangebas
5456
// from creating an unreasonably large number of goroutines, such as
5557
// the case of a query like `a / a / a / a / a ..etc`, which could try
5658
// to shard each leg, quickly dispatching an unreasonable number of goroutines.
57-
func (h DownstreamHandler) Downstreamer() logql.Downstreamer {
59+
// In the future, it's probably better to replace this with a channel based API
60+
// so we don't have to do all this ugly edge case handling/accounting
61+
func (h DownstreamHandler) Downstreamer(ctx context.Context) logql.Downstreamer {
5862
p := DefaultDownstreamConcurrency
63+
64+
// We may increase parallelism above the default,
65+
// ensure we don't end up bottlenecking here.
66+
if user, err := tenant.TenantID(ctx); err == nil {
67+
if x := h.limits.MaxQueryParallelism(user); x > 0 {
68+
p = x
69+
}
70+
}
71+
5972
locks := make(chan struct{}, p)
6073
for i := 0; i < p; i++ {
6174
locks <- struct{}{}

0 commit comments

Comments
 (0)