Skip to content

Commit 189efbf

Browse files
committed
chore(metastore): use indexpointers columnar reader
1 parent 6fca05f commit 189efbf

File tree

2 files changed

+124
-52
lines changed

2 files changed

+124
-52
lines changed

pkg/dataobj/metastore/iter.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/grafana/dskit/user"
1313

1414
"github.com/grafana/loki/v3/pkg/dataobj"
15+
"github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers"
1516
"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
1617
)
1718

@@ -332,6 +333,123 @@ func forEachMatchedPointerSectionKey(
332333
return nil
333334
}
334335

336+
func forEachIndexPointer(
337+
ctx context.Context,
338+
object *dataobj.Object,
339+
sStart, sEnd *scalar.Timestamp,
340+
f func(pointer indexpointers.IndexPointer),
341+
) error {
342+
targetTenant, err := user.ExtractOrgID(ctx)
343+
if err != nil {
344+
return fmt.Errorf("extracting org ID: %w", err)
345+
}
346+
var reader indexpointers.Reader
347+
defer reader.Close()
348+
349+
const batchSize = 1024
350+
buf := make([]indexpointers.IndexPointer, batchSize)
351+
352+
// iterate over the sections and fill buf column by column
353+
// once the read operation is over invoke client's [f] on every read row (numRows not always the same as len(buf))
354+
for _, section := range object.Sections().Filter(indexpointers.CheckSection) {
355+
if section.Tenant != targetTenant {
356+
continue
357+
}
358+
359+
sec, err := indexpointers.Open(ctx, section)
360+
if err != nil {
361+
return fmt.Errorf("opening section: %w", err)
362+
}
363+
364+
var (
365+
colPath *indexpointers.Column
366+
colMinTimestamp *indexpointers.Column
367+
colMaxTimestamp *indexpointers.Column
368+
)
369+
370+
for _, c := range sec.Columns() {
371+
if c.Type == indexpointers.ColumnTypePath {
372+
colPath = c
373+
}
374+
if c.Type == indexpointers.ColumnTypeMinTimestamp {
375+
colMinTimestamp = c
376+
}
377+
if c.Type == indexpointers.ColumnTypeMaxTimestamp {
378+
colMaxTimestamp = c
379+
}
380+
if colPath != nil && colMinTimestamp != nil && colMaxTimestamp != nil {
381+
break
382+
}
383+
}
384+
385+
if colPath == nil || colMinTimestamp == nil || colMaxTimestamp == nil {
386+
return fmt.Errorf("one of the mandatory columns is missing: (path=%t, minTimestamp=%t, maxTimestamp=%t)", colPath == nil, colMinTimestamp == nil, colMaxTimestamp == nil)
387+
}
388+
389+
reader.Reset(indexpointers.ReaderOptions{
390+
Columns: sec.Columns(),
391+
Predicates: []indexpointers.Predicate{
392+
indexpointers.WhereTimeRangeOverlapsWith(colMinTimestamp, colMaxTimestamp, sStart, sEnd),
393+
},
394+
})
395+
396+
for {
397+
rec, readErr := reader.Read(ctx, batchSize)
398+
if readErr != nil && !errors.Is(readErr, io.EOF) {
399+
return fmt.Errorf("reading recordBatch: %w", readErr)
400+
}
401+
numRows := int(rec.NumRows())
402+
if numRows == 0 && errors.Is(readErr, io.EOF) {
403+
break
404+
}
405+
406+
for colIdx := range int(rec.NumCols()) {
407+
col := rec.Column(colIdx)
408+
pointerCol := sec.Columns()[colIdx]
409+
410+
switch pointerCol.Type {
411+
case indexpointers.ColumnTypePath:
412+
values := col.(*array.String)
413+
for rIdx := range numRows {
414+
if col.IsNull(rIdx) {
415+
continue
416+
}
417+
buf[rIdx].Path = values.Value(rIdx)
418+
}
419+
case indexpointers.ColumnTypeMinTimestamp:
420+
values := col.(*array.Timestamp)
421+
for rIdx := range numRows {
422+
if col.IsNull(rIdx) {
423+
continue
424+
}
425+
buf[rIdx].StartTs = time.Unix(0, int64(values.Value(rIdx)))
426+
}
427+
case indexpointers.ColumnTypeMaxTimestamp:
428+
values := col.(*array.Timestamp)
429+
for rIdx := range numRows {
430+
if col.IsNull(rIdx) {
431+
continue
432+
}
433+
buf[rIdx].EndTs = time.Unix(0, int64(values.Value(rIdx)))
434+
}
435+
default:
436+
continue
437+
}
438+
}
439+
440+
for rowIdx := range numRows {
441+
f(buf[rowIdx])
442+
}
443+
444+
if errors.Is(readErr, io.EOF) {
445+
break
446+
}
447+
}
448+
}
449+
450+
return nil
451+
}
452+
335453
func findPointersColumnsByTypes(allColumns []*pointers.Column, columnTypes ...pointers.ColumnType) ([]*pointers.Column, error) {
336454
result := make([]*pointers.Column, 0, len(columnTypes))
337455

pkg/dataobj/metastore/object.go

Lines changed: 6 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -401,10 +401,13 @@ func (m *ObjectMetastore) listObjectsFromTables(ctx context.Context, tablePaths
401401
objects := make([][]string, len(tablePaths))
402402
g, ctx := errgroup.WithContext(ctx)
403403

404+
sStart := scalar.NewTimestampScalar(arrow.Timestamp(start.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns)
405+
sEnd := scalar.NewTimestampScalar(arrow.Timestamp(end.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns)
406+
404407
for i, path := range tablePaths {
405408
g.Go(func() error {
406409
var err error
407-
objects[i], err = m.listObjects(ctx, path, start, end)
410+
objects[i], err = m.listObjects(ctx, path, sStart, sEnd)
408411
// If the metastore object is not found, it means it's outside of any existing window
409412
// and we can safely ignore it.
410413
if err != nil && !m.bucket.IsObjNotFoundErr(err) {
@@ -627,7 +630,7 @@ func addLabels(mtx *sync.Mutex, streams map[uint64][]*labels.Labels, newLabels *
627630
streams[key] = append(streams[key], newLabels)
628631
}
629632

630-
func (m *ObjectMetastore) listObjects(ctx context.Context, path string, start, end time.Time) ([]string, error) {
633+
func (m *ObjectMetastore) listObjects(ctx context.Context, path string, sStart, sEnd *scalar.Timestamp) ([]string, error) {
631634
var buf bytes.Buffer
632635
objectReader, err := m.bucket.Get(ctx, path)
633636
if err != nil {
@@ -645,12 +648,7 @@ func (m *ObjectMetastore) listObjects(ctx context.Context, path string, start, e
645648
}
646649
var objectPaths []string
647650

648-
// Read all relevant entries from the table of contents
649-
predicate := indexpointers.TimeRangeRowPredicate{
650-
Start: start.UTC(),
651-
End: end.UTC(),
652-
}
653-
err = forEachIndexPointer(ctx, object, predicate, func(indexPointer indexpointers.IndexPointer) {
651+
err = forEachIndexPointer(ctx, object, sStart, sEnd, func(indexPointer indexpointers.IndexPointer) {
654652
objectPaths = append(objectPaths, indexPointer.Path)
655653
})
656654
if err != nil {
@@ -660,50 +658,6 @@ func (m *ObjectMetastore) listObjects(ctx context.Context, path string, start, e
660658
return objectPaths, nil
661659
}
662660

663-
func forEachIndexPointer(ctx context.Context, object *dataobj.Object, predicate indexpointers.RowPredicate, f func(indexpointers.IndexPointer)) error {
664-
targetTenant, err := user.ExtractOrgID(ctx)
665-
if err != nil {
666-
return fmt.Errorf("extracting org ID: %w", err)
667-
}
668-
var reader indexpointers.RowReader
669-
defer reader.Close()
670-
671-
buf := make([]indexpointers.IndexPointer, 1024)
672-
673-
for _, section := range object.Sections().Filter(indexpointers.CheckSection) {
674-
if section.Tenant != targetTenant {
675-
continue
676-
}
677-
sec, err := indexpointers.Open(ctx, section)
678-
if err != nil {
679-
return fmt.Errorf("opening section: %w", err)
680-
}
681-
682-
reader.Reset(sec)
683-
if predicate != nil {
684-
err := reader.SetPredicate(predicate)
685-
if err != nil {
686-
return err
687-
}
688-
}
689-
690-
for {
691-
num, err := reader.Read(ctx, buf)
692-
if err != nil && !errors.Is(err, io.EOF) {
693-
return err
694-
}
695-
if num == 0 && errors.Is(err, io.EOF) {
696-
break
697-
}
698-
for _, indexPointer := range buf[:num] {
699-
f(indexPointer)
700-
}
701-
}
702-
}
703-
704-
return nil
705-
}
706-
707661
func forEachStream(ctx context.Context, object *dataobj.Object, predicate streams.RowPredicate, f func(streams.Stream)) error {
708662
targetTenant, err := user.ExtractOrgID(ctx)
709663
if err != nil {

0 commit comments

Comments
 (0)