diff --git a/pkg/dataobj/metastore/iter.go b/pkg/dataobj/metastore/iter.go index 414c9b581afa0..64c26f4f27c38 100644 --- a/pkg/dataobj/metastore/iter.go +++ b/pkg/dataobj/metastore/iter.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/dskit/user" "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers" "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" ) @@ -332,6 +333,123 @@ func forEachMatchedPointerSectionKey( return nil } +func forEachIndexPointer( + ctx context.Context, + object *dataobj.Object, + sStart, sEnd *scalar.Timestamp, + f func(pointer indexpointers.IndexPointer), +) error { + targetTenant, err := user.ExtractOrgID(ctx) + if err != nil { + return fmt.Errorf("extracting org ID: %w", err) + } + var reader indexpointers.Reader + defer reader.Close() + + const batchSize = 1024 + buf := make([]indexpointers.IndexPointer, batchSize) + + // iterate over the sections and fill buf column by column + // once the read operation is over invoke client's [f] on every read row (numRows not always the same as len(buf)) + for _, section := range object.Sections().Filter(indexpointers.CheckSection) { + if section.Tenant != targetTenant { + continue + } + + sec, err := indexpointers.Open(ctx, section) + if err != nil { + return fmt.Errorf("opening section: %w", err) + } + + var ( + colPath *indexpointers.Column + colMinTimestamp *indexpointers.Column + colMaxTimestamp *indexpointers.Column + ) + + for _, c := range sec.Columns() { + if c.Type == indexpointers.ColumnTypePath { + colPath = c + } + if c.Type == indexpointers.ColumnTypeMinTimestamp { + colMinTimestamp = c + } + if c.Type == indexpointers.ColumnTypeMaxTimestamp { + colMaxTimestamp = c + } + if colPath != nil && colMinTimestamp != nil && colMaxTimestamp != nil { + break + } + } + + if colPath == nil || colMinTimestamp == nil || colMaxTimestamp == nil { + return fmt.Errorf("one of the mandatory columns is missing: (path=%t, minTimestamp=%t, maxTimestamp=%t)", colPath == nil, colMinTimestamp == nil, colMaxTimestamp == nil) + } + + reader.Reset(indexpointers.ReaderOptions{ + Columns: sec.Columns(), + Predicates: []indexpointers.Predicate{ + indexpointers.WhereTimeRangeOverlapsWith(colMinTimestamp, colMaxTimestamp, sStart, sEnd), + }, + }) + + for { + rec, readErr := reader.Read(ctx, batchSize) + if readErr != nil && !errors.Is(readErr, io.EOF) { + return fmt.Errorf("reading recordBatch: %w", readErr) + } + numRows := int(rec.NumRows()) + if numRows == 0 && errors.Is(readErr, io.EOF) { + break + } + + for colIdx := range int(rec.NumCols()) { + col := rec.Column(colIdx) + pointerCol := sec.Columns()[colIdx] + + switch pointerCol.Type { + case indexpointers.ColumnTypePath: + values := col.(*array.String) + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].Path = values.Value(rIdx) + } + case indexpointers.ColumnTypeMinTimestamp: + values := col.(*array.Timestamp) + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].StartTs = time.Unix(0, int64(values.Value(rIdx))) + } + case indexpointers.ColumnTypeMaxTimestamp: + values := col.(*array.Timestamp) + for rIdx := range numRows { + if col.IsNull(rIdx) { + continue + } + buf[rIdx].EndTs = time.Unix(0, int64(values.Value(rIdx))) + } + default: + continue + } + } + + for rowIdx := range numRows { + f(buf[rowIdx]) + } + + if errors.Is(readErr, io.EOF) { + break + } + } + } + + return nil +} + func findPointersColumnsByTypes(allColumns []*pointers.Column, columnTypes ...pointers.ColumnType) ([]*pointers.Column, error) { result := make([]*pointers.Column, 0, len(columnTypes)) diff --git a/pkg/dataobj/metastore/object.go b/pkg/dataobj/metastore/object.go index 525f5dabaf08d..d8bb98330c0ee 100644 --- a/pkg/dataobj/metastore/object.go +++ b/pkg/dataobj/metastore/object.go @@ -401,10 +401,13 @@ func (m *ObjectMetastore) listObjectsFromTables(ctx context.Context, tablePaths objects := make([][]string, len(tablePaths)) g, ctx := errgroup.WithContext(ctx) + sStart := scalar.NewTimestampScalar(arrow.Timestamp(start.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns) + sEnd := scalar.NewTimestampScalar(arrow.Timestamp(end.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns) + for i, path := range tablePaths { g.Go(func() error { var err error - objects[i], err = m.listObjects(ctx, path, start, end) + objects[i], err = m.listObjects(ctx, path, sStart, sEnd) // If the metastore object is not found, it means it's outside of any existing window // and we can safely ignore it. if err != nil && !m.bucket.IsObjNotFoundErr(err) { @@ -627,7 +630,7 @@ func addLabels(mtx *sync.Mutex, streams map[uint64][]*labels.Labels, newLabels * streams[key] = append(streams[key], newLabels) } -func (m *ObjectMetastore) listObjects(ctx context.Context, path string, start, end time.Time) ([]string, error) { +func (m *ObjectMetastore) listObjects(ctx context.Context, path string, sStart, sEnd *scalar.Timestamp) ([]string, error) { var buf bytes.Buffer objectReader, err := m.bucket.Get(ctx, path) if err != nil { @@ -645,12 +648,7 @@ func (m *ObjectMetastore) listObjects(ctx context.Context, path string, start, e } var objectPaths []string - // Read all relevant entries from the table of contents - predicate := indexpointers.TimeRangeRowPredicate{ - Start: start.UTC(), - End: end.UTC(), - } - err = forEachIndexPointer(ctx, object, predicate, func(indexPointer indexpointers.IndexPointer) { + err = forEachIndexPointer(ctx, object, sStart, sEnd, func(indexPointer indexpointers.IndexPointer) { objectPaths = append(objectPaths, indexPointer.Path) }) if err != nil { @@ -660,50 +658,6 @@ func (m *ObjectMetastore) listObjects(ctx context.Context, path string, start, e return objectPaths, nil } -func forEachIndexPointer(ctx context.Context, object *dataobj.Object, predicate indexpointers.RowPredicate, f func(indexpointers.IndexPointer)) error { - targetTenant, err := user.ExtractOrgID(ctx) - if err != nil { - return fmt.Errorf("extracting org ID: %w", err) - } - var reader indexpointers.RowReader - defer reader.Close() - - buf := make([]indexpointers.IndexPointer, 1024) - - for _, section := range object.Sections().Filter(indexpointers.CheckSection) { - if section.Tenant != targetTenant { - continue - } - sec, err := indexpointers.Open(ctx, section) - if err != nil { - return fmt.Errorf("opening section: %w", err) - } - - reader.Reset(sec) - if predicate != nil { - err := reader.SetPredicate(predicate) - if err != nil { - return err - } - } - - for { - num, err := reader.Read(ctx, buf) - if err != nil && !errors.Is(err, io.EOF) { - return err - } - if num == 0 && errors.Is(err, io.EOF) { - break - } - for _, indexPointer := range buf[:num] { - f(indexPointer) - } - } - } - - return nil -} - func forEachStream(ctx context.Context, object *dataobj.Object, predicate streams.RowPredicate, f func(streams.Stream)) error { targetTenant, err := user.ExtractOrgID(ctx) if err != nil { diff --git a/pkg/dataobj/sections/indexpointers/builder_test.go b/pkg/dataobj/sections/indexpointers/builder_test.go index e09320c92b63a..0a8200e1be085 100644 --- a/pkg/dataobj/sections/indexpointers/builder_test.go +++ b/pkg/dataobj/sections/indexpointers/builder_test.go @@ -1,4 +1,4 @@ -package indexpointers +package indexpointers_test import ( "context" @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers" ) func TestBuilder(t *testing.T) { @@ -22,7 +23,7 @@ func TestBuilder(t *testing.T) { {path: "bar", start: unixTime(10), end: unixTime(20)}, } - ib := NewBuilder(nil, 1024, 0) + ib := indexpointers.NewBuilder(nil, 1024, 0) for _, p := range pp { ib.Append(p.path, p.start, p.end) } @@ -34,7 +35,7 @@ func TestBuilder(t *testing.T) { require.NoError(t, err) defer closer.Close() - expect := []IndexPointer{ + expect := []indexpointers.IndexPointer{ { Path: "foo", StartTs: unixTime(10), @@ -47,8 +48,8 @@ func TestBuilder(t *testing.T) { }, } - var actual []IndexPointer - for result := range Iter(context.Background(), obj) { + var actual []indexpointers.IndexPointer + for result := range indexpointers.Iter(context.Background(), obj) { pointer, err := result.Value() require.NoError(t, err) actual = append(actual, pointer) diff --git a/pkg/dataobj/sections/indexpointers/predicate.go b/pkg/dataobj/sections/indexpointers/predicate.go new file mode 100644 index 0000000000000..3c90641bf0099 --- /dev/null +++ b/pkg/dataobj/sections/indexpointers/predicate.go @@ -0,0 +1,150 @@ +package indexpointers + +import ( + "github.com/apache/arrow-go/v18/arrow/scalar" +) + +// Predicate is an expression used to filter column values in a [Reader]. +type Predicate interface{ isPredicate() } + +// Supported predicates. +type ( + // An AndPredicate is a [Predicate] which asserts that a row may only be + // included if both the Left and Right Predicate are true. + AndPredicate struct{ Left, Right Predicate } + + // An OrPredicate is a [Predicate] which asserts that a row may only be + // included if either the Left or Right Predicate are true. + OrPredicate struct{ Left, Right Predicate } + + // A NotePredicate is a [Predicate] which asserts that a row may only be + // included if the inner Predicate is false. + NotPredicate struct{ Inner Predicate } + + // TruePredicate is a [Predicate] which always returns true. + TruePredicate struct{} + + // FalsePredicate is a [Predicate] which always returns false. + FalsePredicate struct{} + + // An EqualPredicate is a [Predicate] which asserts that a row may only be + // included if the Value of the Column is equal to the Value. + EqualPredicate struct { + Column *Column // Column to check. + Value scalar.Scalar // Value to check equality for. + } + + // An InPredicate is a [Predicate] which asserts that a row may only be + // included if the Value of the Column is present in the provided Values. + InPredicate struct { + Column *Column // Column to check. + Values []scalar.Scalar // Values to check for inclusion. + } + + // A GreaterThanPredicate is a [Predicate] which asserts that a row may only + // be included if the Value of the Column is greater than the provided Value. + GreaterThanPredicate struct { + Column *Column // Column to check. + Value scalar.Scalar // Value for which rows in Column must be greater than. + } + + // A LessThanPredicate is a [Predicate] which asserts that a row may only be + // included if the Value of the Column is less than the provided Value. + LessThanPredicate struct { + Column *Column // Column to check. + Value scalar.Scalar // Value for which rows in Column must be less than. + } + + // FuncPredicate is a [Predicate] which asserts that a row may only be + // included if the Value of the Column passes the Keep function. + // + // Instances of FuncPredicate are ineligible for page filtering and should + // only be used when there isn't a more explicit Predicate implementation. + FuncPredicate struct { + Column *Column // Column to check. + + // Keep is invoked with the column and value pair to check. Keep is given + // the Column instance to allow for reusing the same function across + // multiple columns, if necessary. + // + // If Keep returns true, the row is kept. + Keep func(column *Column, value scalar.Scalar) bool + } +) + +func (AndPredicate) isPredicate() {} +func (OrPredicate) isPredicate() {} +func (NotPredicate) isPredicate() {} +func (TruePredicate) isPredicate() {} +func (FalsePredicate) isPredicate() {} +func (EqualPredicate) isPredicate() {} +func (InPredicate) isPredicate() {} +func (GreaterThanPredicate) isPredicate() {} +func (LessThanPredicate) isPredicate() {} +func (FuncPredicate) isPredicate() {} + +// walkPredicate traverses a predicate in depth-first order: it starts by +// calling fn(p). If fn(p) returns true, walkPredicate is invoked recursively +// with fn for each of the non-nil children of p, followed by a call of +// fn(nil). +func walkPredicate(p Predicate, fn func(Predicate) bool) { + if p == nil || !fn(p) { + return + } + + switch p := p.(type) { + case AndPredicate: + walkPredicate(p.Left, fn) + walkPredicate(p.Right, fn) + + case OrPredicate: + walkPredicate(p.Left, fn) + walkPredicate(p.Right, fn) + + case NotPredicate: + walkPredicate(p.Inner, fn) + + case TruePredicate: // No children. + case FalsePredicate: // No children. + case EqualPredicate: // No children. + case InPredicate: // No children. + case GreaterThanPredicate: // No children. + case LessThanPredicate: // No children. + case FuncPredicate: // No children. + + default: + panic("streams.walkPredicate: unsupported predicate type") + } + + fn(nil) +} + +func WhereTimeRangeOverlapsWith( + colMinTimestamp *Column, + colMaxTimestamp *Column, + start scalar.Scalar, + end scalar.Scalar, +) Predicate { + return AndPredicate{ + Left: OrPredicate{ + Left: EqualPredicate{ + Column: colMaxTimestamp, + Value: start, + }, + Right: GreaterThanPredicate{ + Column: colMaxTimestamp, + Value: start, + }, + }, + Right: OrPredicate{ + Left: EqualPredicate{ + Column: colMinTimestamp, + Value: end, + }, + Right: LessThanPredicate{ + Column: colMinTimestamp, + Value: end, + }, + }, + } +} diff --git a/pkg/dataobj/sections/indexpointers/reader.go b/pkg/dataobj/sections/indexpointers/reader.go new file mode 100644 index 0000000000000..5234903611320 --- /dev/null +++ b/pkg/dataobj/sections/indexpointers/reader.go @@ -0,0 +1,481 @@ +package indexpointers + +import ( + "context" + "errors" + "fmt" + _ "io" // Used for documenting io.EOF. + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/arrow/scalar" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/arrowconv" + "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow" + "github.com/grafana/loki/v3/pkg/dataobj/sections/internal/columnar" +) + +// ReaderOptions customizes the behavior of a [Reader]. +type ReaderOptions struct { + // Columns to read. Each column must belong to the same [Section]. + Columns []*Column + + // Predicates holds a set of predicates to apply when reading the section. + // Columns referenced in Predicates must be in the set of Columns. + Predicates []Predicate + + // Allocator to use for allocating Arrow records. If nil, + // [memory.DefaultAllocator] is used. + Allocator memory.Allocator +} + +// Validate returns an error if the opts is not valid. ReaderOptions are only +// valid when: +// +// - Each [Column] in Columns belongs to the same [Section]. +// - Each [Predicate] in Predicates references a [Column] from Columns. +// - Scalar values used in predicates are of a supported type: an int64, +// uint64, timestamp, or a byte array. +func (opts *ReaderOptions) Validate() error { + columnLookup := make(map[*Column]struct{}, len(opts.Columns)) + + if len(opts.Columns) > 0 { + // Ensure all columns belong to the same section. + var checkSection *Section + + for _, col := range opts.Columns { + if checkSection != nil && col.Section != checkSection { + return fmt.Errorf("all columns must belong to the same section: got=%p want=%p", col.Section, checkSection) + } else if checkSection == nil { + checkSection = col.Section + } + columnLookup[col] = struct{}{} + } + } + + var errs []error + + validateColumn := func(col *Column) { + if col == nil { + errs = append(errs, fmt.Errorf("column is nil")) + } else if _, found := columnLookup[col]; !found { + errs = append(errs, fmt.Errorf("column %p not in Columns", col)) + } + } + + validateScalar := func(s scalar.Scalar) { + _, ok := arrowconv.DatasetType(s.DataType()) + if !ok { + errs = append(errs, fmt.Errorf("unsupported scalar type %s", s.DataType())) + } + } + + for _, p := range opts.Predicates { + walkPredicate(p, func(p Predicate) bool { + // Validate that predicates reference valid columns and use valid + // scalars. + switch p := p.(type) { + case nil: // End of walk; nothing to do. + + case AndPredicate: // Nothing to do. + case OrPredicate: // Nothing to do. + case NotPredicate: // Nothing to do. + case TruePredicate: // Nothing to do. + case FalsePredicate: // Nothing to do. + + case EqualPredicate: + validateColumn(p.Column) + validateScalar(p.Value) + + case InPredicate: + validateColumn(p.Column) + for _, val := range p.Values { + validateScalar(val) + } + + case GreaterThanPredicate: + validateColumn(p.Column) + validateScalar(p.Value) + + case LessThanPredicate: + validateColumn(p.Column) + validateScalar(p.Value) + + case FuncPredicate: + validateColumn(p.Column) + + default: + errs = append(errs, fmt.Errorf("unrecognized predicate type %T", p)) + } + + return true + }) + } + + return errors.Join(errs...) +} + +// A Reader reads batches of rows from a [Section]. +type Reader struct { + opts ReaderOptions + schema *arrow.Schema // Set on [Reader.Reset]. + + ready bool + inner *dataset.Reader + buf []dataset.Row + + builder *array.RecordBuilder +} + +// NewReader creates a new Reader from the provided options. Options are not +// validated until the first call to [Reader.Read]. +func NewReader(opts ReaderOptions) *Reader { + var r Reader + r.Reset(opts) + return &r +} + +// Schema returns the [arrow.Schema] used by the Reader. Fields in the schema +// match the order of columns listed in [ReaderOptions]. +// +// Names of fields in the schema are guaranteed to be unique per column but are +// not guaranteed to be stable. +// +// The returned Schema must not be modified. +func (r *Reader) Schema() *arrow.Schema { return r.schema } + +// Read reads the batch of rows from the section, returning them as an Arrow +// record. +// +// If [ReaderOptions] has predicates, only rows that match the predicates are +// returned. If none of the next batchSize rows matched the predicate, Read +// returns a nil record with a nil error. +// +// Read will return an error if the next batch of rows could not be read due to +// invalid options or I/O errors. At the end of the section, Read returns nil, +// [io.EOF]. +// +// Read may return a non-nil record with a non-nil error, including if the +// error is [io.EOF]. Callers should always process the record before +// processing the error value. +// +// When a record is returned, it will match the schema specified by +// [Reader.Schema]. These records must always be released after use. +func (r *Reader) Read(ctx context.Context, batchSize int) (arrow.RecordBatch, error) { + if !r.ready { + err := r.init() + if err != nil { + return nil, fmt.Errorf("initializing Reader: %w", err) + } + } + + r.buf = slicegrow.GrowToCap(r.buf, batchSize) + r.buf = r.buf[:batchSize] + + n, readErr := r.inner.Read(ctx, r.buf) + r.builder.Reserve(n) + for rowIndex := range n { + row := r.buf[rowIndex] + + for columnIndex, val := range row.Values { + columnBuilder := r.builder.Field(columnIndex) + + if val.IsNil() { + columnBuilder.AppendNull() + continue + } + + // Append non-null values. We switch on [ColumnType] here so it's easier + // to follow the mapping of ColumnType to Arrow type. The mappings here + // should align with both [columnToField] (for Arrow type) and + // [Builder.encodeTo] (for dataset type). + // + // Passing our byte slices to [array.StringBuilder.BinaryBuilder.Append] are safe; it + // will copy the contents of the value and we can reuse the buffer on the + // next call to [dataset.Reader.Read]. + columnType := r.opts.Columns[columnIndex].Type + switch columnType { + case ColumnTypeInvalid: + columnBuilder.AppendNull() // Unsupported column + case ColumnTypePath: + columnBuilder.(*array.StringBuilder).BinaryBuilder.Append(val.Binary()) + case ColumnTypeMinTimestamp, ColumnTypeMaxTimestamp: // Values are nanosecond timestamps as int64 + columnBuilder.(*array.TimestampBuilder).Append(arrow.Timestamp(val.Int64())) + default: + // We'll only hit this if we added a new column type but forgot to + // support reading it. + return nil, fmt.Errorf("unsupported column type %s for column %d", columnType, columnIndex) + } + } + } + + // We only return readErr after processing n so that we properly handle n>0 + // while also getting an error such as io.EOF. + return r.builder.NewRecordBatch(), readErr +} + +func (r *Reader) init() error { + if err := r.opts.Validate(); err != nil { + return fmt.Errorf("invalid options: %w", err) + } else if r.opts.Allocator == nil { + r.opts.Allocator = memory.DefaultAllocator + } + + var innerSection *columnar.Section + innerColumns := make([]*columnar.Column, len(r.opts.Columns)) + for i, column := range r.opts.Columns { + if innerSection == nil { + innerSection = column.Section.inner + } + innerColumns[i] = column.inner + } + + dset, err := columnar.MakeDataset(innerSection, innerColumns) + if err != nil { + return fmt.Errorf("creating dataset: %w", err) + } else if len(dset.Columns()) != len(r.opts.Columns) { + return fmt.Errorf("dataset has %d columns, expected %d", len(dset.Columns()), len(r.opts.Columns)) + } + + columnLookup := make(map[*Column]dataset.Column, len(r.opts.Columns)) + for i, col := range dset.Columns() { + columnLookup[r.opts.Columns[i]] = col + } + + preds, err := mapPredicates(r.opts.Predicates, columnLookup) + if err != nil { + return fmt.Errorf("mapping predicates: %w", err) + } + + innerOptions := dataset.ReaderOptions{ + Dataset: dset, + Columns: dset.Columns(), + Predicates: preds, + Prefetch: true, + } + if r.inner == nil { + r.inner = dataset.NewReader(innerOptions) + } else { + r.inner.Reset(innerOptions) + } + + if r.builder == nil { + r.builder = array.NewRecordBuilder(r.opts.Allocator, r.schema) + } + + r.ready = true + return nil +} + +func mapPredicates(ps []Predicate, columnLookup map[*Column]dataset.Column) (predicates []dataset.Predicate, err error) { + // For simplicity, [mapPredicate] and the functions it calls panic if they + // encounter an unsupported conversion. + // + // These should normally be handled by [ReaderOptions.Validate], but we catch + // any panics here to gracefully return an error to the caller instead of + // potentially crashing the goroutine. + defer func() { + if r := recover(); r == nil { + return + } else if recoveredErr, ok := r.(error); ok { + err = recoveredErr + } else { + err = fmt.Errorf("error while mapping: %v", r) + } + }() + + for _, p := range ps { + predicates = append(predicates, mapPredicate(p, columnLookup)) + } + return +} + +func mapPredicate(p Predicate, columnLookup map[*Column]dataset.Column) dataset.Predicate { + switch p := p.(type) { + case AndPredicate: + return dataset.AndPredicate{ + Left: mapPredicate(p.Left, columnLookup), + Right: mapPredicate(p.Right, columnLookup), + } + + case OrPredicate: + return dataset.OrPredicate{ + Left: mapPredicate(p.Left, columnLookup), + Right: mapPredicate(p.Right, columnLookup), + } + + case NotPredicate: + return dataset.NotPredicate{ + Inner: mapPredicate(p.Inner, columnLookup), + } + + case TruePredicate: + return dataset.TruePredicate{} + + case FalsePredicate: + return dataset.FalsePredicate{} + + case EqualPredicate: + col, ok := columnLookup[p.Column] + if !ok { + panic(fmt.Sprintf("column %p not found in column lookup", p.Column)) + } + return dataset.EqualPredicate{ + Column: col, + Value: arrowconv.FromScalar(p.Value, mustConvertType(p.Value.DataType())), + } + + case InPredicate: + col, ok := columnLookup[p.Column] + if !ok { + panic(fmt.Sprintf("column %p not found in column lookup", p.Column)) + } + + vals := make([]dataset.Value, len(p.Values)) + for i := range p.Values { + vals[i] = arrowconv.FromScalar(p.Values[i], mustConvertType(p.Values[i].DataType())) + } + + var valueSet dataset.ValueSet + switch col.ColumnDesc().Type.Physical { + case datasetmd.PHYSICAL_TYPE_INT64: + valueSet = dataset.NewInt64ValueSet(vals) + case datasetmd.PHYSICAL_TYPE_UINT64: + valueSet = dataset.NewUint64ValueSet(vals) + case datasetmd.PHYSICAL_TYPE_BINARY: + valueSet = dataset.NewBinaryValueSet(vals) + default: + panic("InPredicate not implemented for datatype") + } + + return dataset.InPredicate{ + Column: col, + Values: valueSet, + } + + case GreaterThanPredicate: + col, ok := columnLookup[p.Column] + if !ok { + panic(fmt.Sprintf("column %p not found in column lookup", p.Column)) + } + return dataset.GreaterThanPredicate{ + Column: col, + Value: arrowconv.FromScalar(p.Value, mustConvertType(p.Value.DataType())), + } + + case LessThanPredicate: + col, ok := columnLookup[p.Column] + if !ok { + panic(fmt.Sprintf("column %p not found in column lookup", p.Column)) + } + return dataset.LessThanPredicate{ + Column: col, + Value: arrowconv.FromScalar(p.Value, mustConvertType(p.Value.DataType())), + } + + case FuncPredicate: + col, ok := columnLookup[p.Column] + if !ok { + panic(fmt.Sprintf("column %p not found in column lookup", p.Column)) + } + + fieldType := columnToField(p.Column).Type + + return dataset.FuncPredicate{ + Column: col, + Keep: func(_ dataset.Column, value dataset.Value) bool { + return p.Keep(p.Column, arrowconv.ToScalar(value, fieldType)) + }, + } + + default: + panic(fmt.Sprintf("unsupported predicate type %T", p)) + } +} + +func mustConvertType(dtype arrow.DataType) datasetmd.PhysicalType { + toType, ok := arrowconv.DatasetType(dtype) + if !ok { + panic(fmt.Sprintf("unsupported dataset type %s", dtype)) + } + return toType +} + +// Reset discards any state and resets r with a new set of optiosn. This +// permits reusing a Reader rather than allocating a new one. +func (r *Reader) Reset(opts ReaderOptions) { + r.opts = opts + r.schema = columnsSchema(opts.Columns) + + r.ready = false + + if r.inner != nil { + // Close our inner reader so it releases resources immediately. It'll be + // fully reset on the next call to [Reader.init]. + _ = r.inner.Close() + } + if r.builder != nil { + r.builder = nil + } +} + +// Close closes the Reader and releases any resources it holds. Closed Readers +// can be reused by calling [Reader.Reset]. +func (r *Reader) Close() error { + if r.inner != nil { + return r.inner.Close() + } + if r.builder != nil { + r.builder = nil + } + return nil +} + +func columnsSchema(cols []*Column) *arrow.Schema { + fields := make([]arrow.Field, 0, len(cols)) + for _, col := range cols { + fields = append(fields, columnToField(col)) + } + return arrow.NewSchema(fields, nil) +} + +var columnDatatypes = map[ColumnType]arrow.DataType{ + ColumnTypeInvalid: arrow.Null, + ColumnTypePath: arrow.BinaryTypes.String, + ColumnTypeMinTimestamp: arrow.FixedWidthTypes.Timestamp_ns, + ColumnTypeMaxTimestamp: arrow.FixedWidthTypes.Timestamp_ns, +} + +func columnToField(col *Column) arrow.Field { + dtype, ok := columnDatatypes[col.Type] + if !ok { + dtype = arrow.Null + } + + return arrow.Field{ + Name: makeColumnName(col.Name, col.Type.String(), dtype), + Type: dtype, + Nullable: true, // All columns are nullable. + } +} + +// makeColumnName returns a unique name for a [Column] and its expected data +// type. +// +// Unique names are used by unit tests to be able to produce expected rows. +func makeColumnName(label string, name string, dty arrow.DataType) string { + switch { + case label == "" && name == "": + return dty.Name() + case label == "" && name != "": + return name + "." + dty.Name() + default: + if name == "" { + name = "" + } + return label + "." + name + "." + dty.Name() + } +} diff --git a/pkg/dataobj/sections/indexpointers/reader_test.go b/pkg/dataobj/sections/indexpointers/reader_test.go new file mode 100644 index 0000000000000..4b691213aeb22 --- /dev/null +++ b/pkg/dataobj/sections/indexpointers/reader_test.go @@ -0,0 +1,181 @@ +package indexpointers_test + +import ( + "context" + "errors" + "io" + "testing" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/arrow/scalar" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers" + "github.com/grafana/loki/v3/pkg/util/arrowtest" +) + +// TestReader does a basic end-to-end test over a reader with a predicate applied. +func TestReaderWithoutPredicate(t *testing.T) { + sec := buildSection(t, []indexpointers.IndexPointer{ + {Path: "path1", StartTs: unixTime(10), EndTs: unixTime(20)}, + {Path: "path2", StartTs: unixTime(30), EndTs: unixTime(40)}, + {Path: "path3", StartTs: unixTime(50), EndTs: unixTime(60)}, + }) + + var ( + pathCol = sec.Columns()[0] + minTimestampCol = sec.Columns()[1] + maxTimestampCol = sec.Columns()[2] + ) + + require.Equal(t, "path", pathCol.Name) + require.Equal(t, indexpointers.ColumnTypePath, pathCol.Type) + require.Equal(t, "min_timestamp", minTimestampCol.Name) + require.Equal(t, indexpointers.ColumnTypeMinTimestamp, minTimestampCol.Type) + require.Equal(t, "max_timestamp", maxTimestampCol.Name) + require.Equal(t, indexpointers.ColumnTypeMaxTimestamp, maxTimestampCol.Type) + + for _, tt := range []struct { + name string + columns []*indexpointers.Column + expected arrowtest.Rows + }{ + { + name: "basic reads with selected columns", + columns: []*indexpointers.Column{pathCol}, + expected: arrowtest.Rows{ + {"path.path.utf8": "path1"}, + {"path.path.utf8": "path2"}, + {"path.path.utf8": "path3"}, + }, + }, + { + name: "basic reads with all columns", + columns: []*indexpointers.Column{pathCol, minTimestampCol, maxTimestampCol}, + expected: arrowtest.Rows{ + {"path.path.utf8": "path1", "min_timestamp.min_timestamp.timestamp": unixTime(10).UTC(), "max_timestamp.max_timestamp.timestamp": unixTime(20).UTC()}, + {"path.path.utf8": "path2", "min_timestamp.min_timestamp.timestamp": unixTime(30).UTC(), "max_timestamp.max_timestamp.timestamp": unixTime(40).UTC()}, + {"path.path.utf8": "path3", "min_timestamp.min_timestamp.timestamp": unixTime(50).UTC(), "max_timestamp.max_timestamp.timestamp": unixTime(60).UTC()}, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + r := indexpointers.NewReader(indexpointers.ReaderOptions{ + Columns: tt.columns, + Allocator: memory.DefaultAllocator, + Predicates: nil, + }) + + actualTable, err := readTable(context.Background(), r) + require.NoError(t, err) + + actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable) + require.NoError(t, err, "failed to get rows from table") + require.Equal(t, tt.expected, actual) + }) + } +} + +// TestReaderWithTimestampPredicates tests reading with timestamp predicates. +func TestReaderWithTimestampPredicates(t *testing.T) { + var ( + t10 = unixTime(10) + t20 = unixTime(20) + t25 = unixTime(25) + t25s = scalar.NewTimestampScalar(arrow.Timestamp(t25.UnixNano()), &arrow.TimestampType{Unit: arrow.Nanosecond}) + t30 = unixTime(30) + t40 = unixTime(40) + t50 = unixTime(50) + t55 = unixTime(55) + t55s = scalar.NewTimestampScalar(arrow.Timestamp(t55.UnixNano()), &arrow.TimestampType{Unit: arrow.Nanosecond}) + t60 = unixTime(60) + ) + sec := buildSection(t, []indexpointers.IndexPointer{ + {Path: "path1", StartTs: t10, EndTs: t20}, + {Path: "path2", StartTs: t30, EndTs: t40}, + {Path: "path3", StartTs: t50, EndTs: t60}, + }) + + var ( + pathCol = sec.Columns()[0] + minTimestampCol = sec.Columns()[1] + maxTimestampCol = sec.Columns()[2] + ) + + r := indexpointers.NewReader(indexpointers.ReaderOptions{ + Columns: []*indexpointers.Column{pathCol, minTimestampCol, maxTimestampCol}, + Allocator: memory.DefaultAllocator, + Predicates: []indexpointers.Predicate{ + indexpointers.WhereTimeRangeOverlapsWith(minTimestampCol, maxTimestampCol, t25s, t55s), + }, + }) + + actualTable, err := readTable(context.Background(), r) + require.NoError(t, err) + + actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable) + require.NoError(t, err) + + expected := arrowtest.Rows{ + { + "path.path.utf8": "path2", + "min_timestamp.min_timestamp.timestamp": t30.UTC(), + "max_timestamp.max_timestamp.timestamp": t40.UTC(), + }, + { + "path.path.utf8": "path3", + "min_timestamp.min_timestamp.timestamp": t50.UTC(), + "max_timestamp.max_timestamp.timestamp": t60.UTC(), + }, + } + require.Equal(t, expected, actual) +} + +func buildSection(t *testing.T, ptrData []indexpointers.IndexPointer) *indexpointers.Section { + t.Helper() + + sectionBuilder := indexpointers.NewBuilder(nil, 0, 2) + + for _, ptr := range ptrData { + sectionBuilder.Append(ptr.Path, ptr.StartTs, ptr.EndTs) + } + + objectBuilder := dataobj.NewBuilder(nil) + require.NoError(t, objectBuilder.Append(sectionBuilder)) + + obj, closer, err := objectBuilder.Flush() + require.NoError(t, err) + t.Cleanup(func() { closer.Close() }) + + sec, err := indexpointers.Open(t.Context(), obj.Sections()[0]) + require.NoError(t, err) + return sec +} + +func readTable(ctx context.Context, r *indexpointers.Reader) (arrow.Table, error) { + var recs []arrow.RecordBatch + + for { + rec, err := r.Read(ctx, 128) + if rec != nil { + if rec.NumRows() > 0 { + recs = append(recs, rec) + } + } + + if err != nil && errors.Is(err, io.EOF) { + break + } else if err != nil { + return nil, err + } + } + + if len(recs) == 0 { + return nil, io.EOF + } + + return array.NewTableFromRecords(recs[0].Schema(), recs), nil +} diff --git a/pkg/dataobj/sections/indexpointers/row_predicate_test.go b/pkg/dataobj/sections/indexpointers/row_predicate_test.go index 3eb8294c9a46a..3cb927877c7df 100644 --- a/pkg/dataobj/sections/indexpointers/row_predicate_test.go +++ b/pkg/dataobj/sections/indexpointers/row_predicate_test.go @@ -3,6 +3,7 @@ package indexpointers import ( "fmt" "testing" + "time" "github.com/stretchr/testify/require" @@ -11,6 +12,8 @@ import ( type fakeColumn struct{ dataset.Column } +func unixTime(sec int64) time.Time { return time.Unix(sec, 0) } + var ( fakeMinTimestampColumn = &fakeColumn{ Column: &dataset.MemColumn{ diff --git a/pkg/dataobj/sections/indexpointers/row_reader_test.go b/pkg/dataobj/sections/indexpointers/row_reader_test.go index a45a6c415110a..9b9d0fd5052c7 100644 --- a/pkg/dataobj/sections/indexpointers/row_reader_test.go +++ b/pkg/dataobj/sections/indexpointers/row_reader_test.go @@ -1,4 +1,4 @@ -package indexpointers +package indexpointers_test import ( "context" @@ -10,9 +10,10 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers" ) -var indexPointerTestData = []IndexPointer{ +var indexPointerTestData = []indexpointers.IndexPointer{ {Path: "/path/to/index/object/1", StartTs: unixTime(10), EndTs: unixTime(20)}, {Path: "/path/to/index/object/2", StartTs: unixTime(12), EndTs: unixTime(17)}, {Path: "/path/to/index/object/3", StartTs: unixTime(13), EndTs: unixTime(18)}, @@ -22,16 +23,16 @@ func unixTime(sec int64) time.Time { return time.Unix(sec, 0) } func TestRowReader(t *testing.T) { dec := buildIndexPointersDecoder(t, 100, 0) // Many pages - r := NewRowReader(dec) + r := indexpointers.NewRowReader(dec) actual, err := readAllIndexPointers(context.Background(), r) require.NoError(t, err) require.Equal(t, indexPointerTestData, actual) } -func buildIndexPointersDecoder(t *testing.T, pageSize, pageRows int) *Section { +func buildIndexPointersDecoder(t *testing.T, pageSize, pageRows int) *indexpointers.Section { t.Helper() - s := NewBuilder(nil, pageSize, pageRows) + s := indexpointers.NewBuilder(nil, pageSize, pageRows) for _, d := range indexPointerTestData { s.Append(d.Path, d.StartTs, d.EndTs) } @@ -43,15 +44,15 @@ func buildIndexPointersDecoder(t *testing.T, pageSize, pageRows int) *Section { require.NoError(t, err) t.Cleanup(func() { closer.Close() }) - sec, err := Open(t.Context(), obj.Sections()[0]) + sec, err := indexpointers.Open(t.Context(), obj.Sections()[0]) require.NoError(t, err) return sec } -func readAllIndexPointers(ctx context.Context, r *RowReader) ([]IndexPointer, error) { +func readAllIndexPointers(ctx context.Context, r *indexpointers.RowReader) ([]indexpointers.IndexPointer, error) { var ( - res []IndexPointer - buf = make([]IndexPointer, 128) + res []indexpointers.IndexPointer + buf = make([]indexpointers.IndexPointer, 128) ) for {