Skip to content

Commit a935a63

Browse files
chore: add support for fetching chunks with sizing info from the index to optimize GetShards calls (#19221)
Signed-off-by: Sandeep Sukhani <[email protected]> Co-authored-by: Christian Haudum <[email protected]>
1 parent 17b1418 commit a935a63

26 files changed

+335
-307
lines changed

pkg/indexgateway/gateway.go

Lines changed: 18 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ func (g *Gateway) GetShards(request *logproto.ShardsRequest, server logproto.Ind
418418
return err
419419
}
420420

421-
forSeries, ok := g.indexQuerier.HasForSeries(request.From, request.Through)
421+
ok := g.indexQuerier.HasChunkSizingInfo(request.From, request.Through)
422422
if !ok {
423423
sp.AddEvent("index does not support forSeries", trace.WithAttributes(
424424
attribute.String("action", "falling back to indexQuerier.GetShards impl"),
@@ -438,7 +438,7 @@ func (g *Gateway) GetShards(request *logproto.ShardsRequest, server logproto.Ind
438438
return server.Send(shards)
439439
}
440440

441-
return g.boundedShards(ctx, request, server, instanceID, p, forSeries)
441+
return g.boundedShards(ctx, request, server, instanceID, p)
442442
}
443443

444444
// boundedShards handles bounded shard requests, optionally returning precomputed chunks.
@@ -448,7 +448,6 @@ func (g *Gateway) boundedShards(
448448
server logproto.IndexGateway_GetShardsServer,
449449
instanceID string,
450450
p chunk.Predicate,
451-
forSeries sharding.ForSeries,
452451
) error {
453452
// TODO(owen-d): instead of using GetChunks which buffers _all_ the chunks
454453
// (expensive when looking at the full fingerprint space), we should
@@ -467,27 +466,16 @@ func (g *Gateway) boundedShards(
467466
defer sp.End()
468467

469468
// 1) for all bounds, get chunk refs
470-
grps, _, err := g.indexQuerier.GetChunks(ctx, instanceID, req.From, req.Through, p, nil)
469+
refs, err := g.indexQuerier.GetChunkRefsWithSizingInfo(ctx, instanceID, req.From, req.Through, p)
471470
if err != nil {
472471
return err
473472
}
474473

475-
var ct int
476-
for _, g := range grps {
477-
ct += len(g)
478-
}
474+
ct := len(refs)
479475

480476
sp.AddEvent("queried local index", trace.WithAttributes(
481477
attribute.Int("index_chunks_resolved", ct),
482478
))
483-
// TODO(owen-d): pool
484-
refs := make([]*logproto.ChunkRef, 0, ct)
485-
486-
for _, cs := range grps {
487-
for j := range cs {
488-
refs = append(refs, &cs[j].ChunkRef)
489-
}
490-
}
491479

492480
filtered := refs
493481

@@ -523,7 +511,7 @@ func (g *Gateway) boundedShards(
523511
}
524512

525513
} else {
526-
shards, chunkGrps, err := accumulateChunksToShards(ctx, instanceID, forSeries, req, p, filtered)
514+
shards, chunkGrps, err := accumulateChunksToShards(req, refs)
527515
if err != nil {
528516
return err
529517
}
@@ -597,71 +585,14 @@ func ExtractShardRequestMatchersAndAST(query string) (chunk.Predicate, error) {
597585
}), nil
598586
}
599587

600-
// TODO(owen-d): consider extending index impl to support returning chunkrefs _with_ sizing info
601-
// TODO(owen-d): perf, this is expensive :(
602588
func accumulateChunksToShards(
603-
ctx context.Context,
604-
user string,
605-
forSeries sharding.ForSeries,
606589
req *logproto.ShardsRequest,
607-
p chunk.Predicate,
608-
filtered []*logproto.ChunkRef,
590+
filtered []logproto.ChunkRefWithSizingInfo,
609591
) ([]logproto.Shard, []logproto.ChunkRefGroup, error) {
610592
// map for looking up post-filtered chunks in O(n) while iterating the index again for sizing info
611-
filteredM := make(map[model.Fingerprint][]refWithSizingInfo, 1024)
593+
filteredM := make(map[model.Fingerprint][]logproto.ChunkRefWithSizingInfo, 1024)
612594
for _, ref := range filtered {
613-
x := refWithSizingInfo{ref: ref}
614-
filteredM[model.Fingerprint(ref.Fingerprint)] = append(filteredM[model.Fingerprint(ref.Fingerprint)], x)
615-
}
616-
617-
var mtx sync.Mutex
618-
619-
if err := forSeries.ForSeries(
620-
ctx,
621-
user,
622-
v1.NewBounds(filtered[0].FingerprintModel(), filtered[len(filtered)-1].FingerprintModel()),
623-
req.From, req.Through,
624-
func(l labels.Labels, fp model.Fingerprint, chks []tsdb_index.ChunkMeta) (stop bool) {
625-
mtx.Lock()
626-
defer mtx.Unlock()
627-
628-
// check if this is a fingerprint we need
629-
if _, ok := filteredM[fp]; !ok {
630-
return false
631-
}
632-
633-
filteredChks := filteredM[fp]
634-
var j int
635-
636-
outer:
637-
for i := range filteredChks {
638-
for j < len(chks) {
639-
switch filteredChks[i].Cmp(chks[j]) {
640-
case iter.Less:
641-
// this chunk is not in the queried index, continue checking other chunks
642-
continue outer
643-
case iter.Greater:
644-
// next chunk in index but didn't pass filter; continue
645-
j++
646-
continue
647-
case iter.Eq:
648-
// a match; set the sizing info
649-
filteredChks[i].KB = chks[j].KB
650-
filteredChks[i].Entries = chks[j].Entries
651-
j++
652-
continue outer
653-
}
654-
}
655-
656-
// we've finished this index's chunks; no need to keep checking filtered chunks
657-
break
658-
}
659-
660-
return false
661-
},
662-
p.Matchers...,
663-
); err != nil {
664-
return nil, nil, err
595+
filteredM[model.Fingerprint(ref.Fingerprint)] = append(filteredM[model.Fingerprint(ref.Fingerprint)], ref)
665596
}
666597

667598
collectedSeries := sharding.SizedFPs(sharding.SizedFPsPool.Get(len(filteredM)))
@@ -689,13 +620,22 @@ func accumulateChunksToShards(
689620
return filtered[i].Fingerprint > uint64(s.Bounds.Max)
690621
})
691622
chkGrps = append(chkGrps, logproto.ChunkRefGroup{
692-
Refs: filtered[from:through],
623+
Refs: refsWithSizingInfoToRefs(filtered[from:through]),
693624
})
694625
}
695626

696627
return shards, chkGrps, nil
697628
}
698629

630+
func refsWithSizingInfoToRefs(refsWithSizingInfo []logproto.ChunkRefWithSizingInfo) []*logproto.ChunkRef {
631+
refs := make([]*logproto.ChunkRef, 0, len(refsWithSizingInfo))
632+
for _, refWithSizingInfo := range refsWithSizingInfo {
633+
refs = append(refs, &refWithSizingInfo.ChunkRef)
634+
}
635+
636+
return refs
637+
}
638+
699639
type refWithSizingInfo struct {
700640
ref *logproto.ChunkRef
701641
KB uint32

pkg/indexgateway/gateway_test.go

Lines changed: 23 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616

1717
v2 "github.com/grafana/loki/v3/pkg/iter/v2"
1818
"github.com/grafana/loki/v3/pkg/logproto"
19-
"github.com/grafana/loki/v3/pkg/storage/chunk"
2019
"github.com/grafana/loki/v3/pkg/storage/config"
2120
"github.com/grafana/loki/v3/pkg/storage/stores/series/index"
2221
tsdb_index "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
@@ -426,135 +425,47 @@ func TestRefWithSizingInfo(t *testing.T) {
426425
// TODO(owen-d): more testing for specific cases
427426
func TestAccumulateChunksToShards(t *testing.T) {
428427
// only check eq by checksum for convenience -- we're not testing the comparison function here
429-
mkRef := func(fp model.Fingerprint, checksum uint32) *logproto.ChunkRef {
430-
return &logproto.ChunkRef{
428+
mkRef := func(fp model.Fingerprint, checksum uint32) logproto.ChunkRef {
429+
return logproto.ChunkRef{
431430
Fingerprint: uint64(fp),
432431
Checksum: checksum,
433432
}
434433
}
435434

436-
sized := func(ref *logproto.ChunkRef, kb, entries uint32) refWithSizingInfo {
437-
return refWithSizingInfo{
438-
ref: ref,
439-
KB: kb,
440-
Entries: entries,
435+
sized := func(ref logproto.ChunkRef, kb, entries uint32) logproto.ChunkRefWithSizingInfo {
436+
return logproto.ChunkRefWithSizingInfo{
437+
ChunkRef: ref,
438+
KB: kb,
439+
Entries: entries,
441440
}
442441

443442
}
444443

445-
fsImpl := func(series [][]refWithSizingInfo) sharding.ForSeriesFunc {
446-
return sharding.ForSeriesFunc(
447-
func(
448-
_ context.Context,
449-
_ string,
450-
_ tsdb_index.FingerprintFilter,
451-
_, _ model.Time,
452-
fn func(
453-
_ labels.Labels,
454-
fp model.Fingerprint,
455-
chks []tsdb_index.ChunkMeta,
456-
) (stop bool), _ ...*labels.Matcher) error {
457-
458-
for _, s := range series {
459-
chks := []tsdb_index.ChunkMeta{}
460-
for _, r := range s {
461-
chks = append(chks, tsdb_index.ChunkMeta{
462-
Checksum: r.ref.Checksum,
463-
KB: r.KB,
464-
Entries: r.Entries,
465-
})
466-
}
467-
468-
if stop := fn(labels.EmptyLabels(), s[0].ref.FingerprintModel(), chks); stop {
469-
return nil
470-
}
471-
}
472-
return nil
473-
},
474-
)
475-
}
476-
477-
filtered := []*logproto.ChunkRef{
444+
filtered := []logproto.ChunkRefWithSizingInfo{
478445
// shard 0
479-
mkRef(1, 0),
480-
mkRef(1, 1),
481-
mkRef(1, 2),
446+
sized(mkRef(1, 0), 100, 1),
447+
sized(mkRef(1, 1), 100, 1),
448+
sized(mkRef(1, 2), 100, 1),
482449

483450
// shard 1
484-
mkRef(2, 10),
485-
mkRef(2, 20),
486-
mkRef(2, 30),
451+
sized(mkRef(2, 10), 100, 1),
452+
sized(mkRef(2, 20), 100, 1),
453+
sized(mkRef(2, 30), 100, 1),
487454

488455
// shard 2 split across multiple series
489-
mkRef(3, 10),
490-
mkRef(4, 10),
491-
mkRef(4, 20),
456+
sized(mkRef(3, 10), 50, 1),
457+
sized(mkRef(4, 10), 30, 1),
458+
sized(mkRef(4, 20), 30, 1),
492459

493460
// last shard contains leftovers + skip a few fps in between
494-
mkRef(7, 10),
461+
sized(mkRef(7, 10), 25, 1),
495462
}
496463

497-
series := [][]refWithSizingInfo{
498-
{
499-
// first series creates one shard since a shard can't contain partial series.
500-
// no chunks were filtered out
501-
sized(mkRef(1, 0), 100, 1),
502-
sized(mkRef(1, 1), 100, 1),
503-
sized(mkRef(1, 2), 100, 1),
504-
},
505-
{
506-
// second shard also contains one series, but this series has chunks filtered out.
507-
sized(mkRef(2, 0), 100, 1), // filtered out
508-
sized(mkRef(2, 10), 100, 1), // included
509-
sized(mkRef(2, 11), 100, 1), // filtered out
510-
sized(mkRef(2, 20), 100, 1), // included
511-
sized(mkRef(2, 21), 100, 1), // filtered out
512-
sized(mkRef(2, 30), 100, 1), // included
513-
sized(mkRef(2, 31), 100, 1), // filtered out
514-
},
515-
516-
// third shard contains multiple series.
517-
// combined they have 110kb, which is above the target of 100kb
518-
// but closer than leaving the second series out which would create
519-
// a shard with 50kb
520-
{
521-
// first series, 50kb
522-
sized(mkRef(3, 10), 50, 1), // 50kb
523-
sized(mkRef(3, 11), 50, 1), // 50kb, not included
524-
},
525-
{
526-
// second series
527-
sized(mkRef(4, 10), 30, 1), // 30kb
528-
sized(mkRef(4, 11), 30, 1), // 30kb, not included
529-
sized(mkRef(4, 20), 30, 1), // 30kb
530-
},
531-
532-
// Fourth shard contains a single series with 25kb,
533-
// but iterates over non-included fp(s) before it
534-
{
535-
// register a series in the index which is not included in the filtered list
536-
sized(mkRef(6, 10), 100, 1), // not included
537-
sized(mkRef(6, 11), 100, 1), // not included
538-
},
539-
{
540-
// last shard contains leftovers
541-
sized(mkRef(7, 10), 25, 1),
542-
sized(mkRef(7, 11), 100, 1), // not included
543-
},
544-
}
545-
546-
shards, grps, err := accumulateChunksToShards(
547-
context.Background(),
548-
"",
549-
fsImpl(series),
550-
&logproto.ShardsRequest{
551-
TargetBytesPerShard: 100 << 10,
552-
},
553-
chunk.NewPredicate(nil, nil), // we're not checking matcher injection here
554-
filtered,
555-
)
464+
shards, grps, err := accumulateChunksToShards(&logproto.ShardsRequest{
465+
TargetBytesPerShard: 100 << 10,
466+
}, filtered)
556467

557-
expectedChks := [][]*logproto.ChunkRef{
468+
expectedChks := [][]logproto.ChunkRefWithSizingInfo{
558469
filtered[0:3],
559470
filtered[3:6],
560471
filtered[6:9],
@@ -604,7 +515,7 @@ func TestAccumulateChunksToShards(t *testing.T) {
604515
for i := range shards {
605516
require.Equal(t, exp[i], shards[i], "invalid shard at index %d", i)
606517
for j := range grps[i].Refs {
607-
require.Equal(t, expectedChks[i][j], grps[i].Refs[j], "invalid chunk in grp %d at index %d", i, j)
518+
require.Equal(t, &expectedChks[i][j].ChunkRef, grps[i].Refs[j], "invalid chunk in grp %d at index %d", i, j)
608519
}
609520
}
610521
require.Equal(t, len(exp), len(shards))

pkg/ingester/flush_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,14 @@ func (s *testStore) HasForSeries(_, _ model.Time) (sharding.ForSeries, bool) {
500500
return nil, false
501501
}
502502

503+
func (s *testStore) HasChunkSizingInfo(_, _ model.Time) bool {
504+
return false
505+
}
506+
507+
func (s *testStore) GetChunkRefsWithSizingInfo(_ context.Context, _ string, _, _ model.Time, _ chunk.Predicate) ([]logproto.ChunkRefWithSizingInfo, error) {
508+
return nil, nil
509+
}
510+
503511
func (s *testStore) GetSchemaConfigs() []config.PeriodConfig {
504512
return defaultPeriodConfigs
505513
}

pkg/ingester/ingester_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,14 @@ func (s *mockStore) HasForSeries(_, _ model.Time) (sharding.ForSeries, bool) {
515515
return nil, false
516516
}
517517

518+
func (s *mockStore) HasChunkSizingInfo(_, _ model.Time) bool {
519+
return false
520+
}
521+
522+
func (s *mockStore) GetChunkRefsWithSizingInfo(_ context.Context, _ string, _, _ model.Time, _ chunk.Predicate) ([]logproto.ChunkRefWithSizingInfo, error) {
523+
return nil, nil
524+
}
525+
518526
func (s *mockStore) Volume(_ context.Context, _ string, _, _ model.Time, limit int32, _ []string, _ string, _ ...*labels.Matcher) (*logproto.VolumeResponse, error) {
519527
return &logproto.VolumeResponse{
520528
Volumes: []logproto.Volume{

pkg/logproto/types.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,27 @@ import (
77
func (c *ChunkRef) FingerprintModel() model.Fingerprint {
88
return model.Fingerprint(c.Fingerprint)
99
}
10+
11+
type ChunkRefWithSizingInfo struct {
12+
ChunkRef
13+
KB uint32
14+
Entries uint32
15+
}
16+
17+
// Less Compares chunks by (Fp, From, Through, checksum)
18+
// Assumes User is equivalent
19+
func (c *ChunkRef) Less(x ChunkRef) bool {
20+
if c.Fingerprint != x.Fingerprint {
21+
return c.Fingerprint < x.Fingerprint
22+
}
23+
24+
if c.From != x.From {
25+
return c.From < x.From
26+
}
27+
28+
if c.Through != x.Through {
29+
return c.Through < x.Through
30+
}
31+
32+
return c.Checksum < x.Checksum
33+
}

0 commit comments

Comments
 (0)