Skip to content

Commit 5970eff

Browse files
authored
pkg/query: Allow reader schema order to be dynamic (#6066)
* pkg/query: Allow reader schema order to be dynamic * Fix various lints
1 parent 13b76b4 commit 5970eff

File tree

9 files changed

+281
-107
lines changed

9 files changed

+281
-107
lines changed

pkg/cache/lru/lru.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,15 @@ func (c *LRU[K, V]) Get(key K) (value V, ok bool) {
8383
return e.Value.(entry[K, V]).value, true
8484
}
8585
c.metrics.misses.Inc()
86-
return
86+
return value, ok
8787
}
8888

8989
// Peek returns the key value (or undefined if not found) without updating the "recently used"-ness of the key.
9090
func (c *LRU[K, V]) Peek(key K) (value V, ok bool) {
9191
if e, ok := c.items[key]; ok {
9292
return e.Value.(entry[K, V]).value, true
9393
}
94-
return
94+
return value, ok
9595
}
9696

9797
// Remove removes the provided key from the cache.

pkg/profile/reader.go

Lines changed: 149 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package profile
1515

1616
import (
17+
"fmt"
1718
"strings"
1819

1920
"github.com/apache/arrow-go/v18/arrow"
@@ -63,95 +64,170 @@ type RecordReader struct {
6364
Diff *array.Int64
6465
}
6566

66-
func NewReader(p Profile) Reader {
67+
func NewReader(p Profile) (Reader, error) {
6768
r := Reader{
6869
Profile: p,
6970
}
7071

7172
for _, ar := range p.Samples {
72-
r.RecordReaders = append(r.RecordReaders, NewRecordReader(ar))
73+
rr, err := NewRecordReader(ar)
74+
if err != nil {
75+
return Reader{}, err
76+
}
77+
r.RecordReaders = append(r.RecordReaders, rr)
7378
}
74-
return r
79+
return r, nil
7580
}
7681

77-
func NewRecordReader(ar arrow.RecordBatch) *RecordReader {
82+
func NewRecordReader(ar arrow.RecordBatch) (*RecordReader, error) {
7883
schema := ar.Schema()
7984

85+
rr := &RecordReader{
86+
Record: ar,
87+
}
88+
8089
labelFields := make([]arrow.Field, 0, schema.NumFields())
81-
for _, field := range schema.Fields() {
90+
labelColumns := make([]LabelColumn, 0, schema.NumFields())
91+
92+
// Iterate over schema fields once and populate the RecordReader
93+
for i, field := range schema.Fields() {
8294
if strings.HasPrefix(field.Name, ColumnLabelsPrefix) {
8395
labelFields = append(labelFields, field)
96+
col := ar.Column(i).(*array.Dictionary)
97+
labelColumns = append(labelColumns, LabelColumn{
98+
Col: col.Indices().(*array.Uint32),
99+
Dict: col.Dictionary().(*array.Binary),
100+
})
101+
continue
84102
}
85-
}
86103

87-
labelColumns := make([]LabelColumn, len(labelFields))
88-
for i := range labelFields {
89-
col := ar.Column(i).(*array.Dictionary)
90-
labelColumns[i] = LabelColumn{
91-
Col: col.Indices().(*array.Uint32),
92-
Dict: col.Dictionary().(*array.Binary),
104+
switch field.Name {
105+
case "locations":
106+
rr.Locations = ar.Column(i).(*array.List)
107+
rr.Location = rr.Locations.ListValues().(*array.Struct)
108+
109+
// Process location struct fields by name
110+
locationType := rr.Location.DataType().(*arrow.StructType)
111+
for j := 0; j < locationType.NumFields(); j++ {
112+
locField := locationType.Field(j)
113+
switch locField.Name {
114+
case "address":
115+
rr.Address = rr.Location.Field(j).(*array.Uint64)
116+
case "mapping_start":
117+
rr.MappingStart = rr.Location.Field(j).(*array.Uint64)
118+
case "mapping_limit":
119+
rr.MappingLimit = rr.Location.Field(j).(*array.Uint64)
120+
case "mapping_offset":
121+
rr.MappingOffset = rr.Location.Field(j).(*array.Uint64)
122+
case "mapping_file":
123+
mappingFile := rr.Location.Field(j).(*array.Dictionary)
124+
rr.MappingFileIndices = mappingFile.Indices().(*array.Uint32)
125+
rr.MappingFileDict = mappingFile.Dictionary().(*array.Binary)
126+
case "mapping_build_id":
127+
mappingBuildID := rr.Location.Field(j).(*array.Dictionary)
128+
rr.MappingBuildIDIndices = mappingBuildID.Indices().(*array.Uint32)
129+
rr.MappingBuildIDDict = mappingBuildID.Dictionary().(*array.Binary)
130+
case "lines":
131+
rr.Lines = rr.Location.Field(j).(*array.List)
132+
rr.Line = rr.Lines.ListValues().(*array.Struct)
133+
134+
// Process line struct fields by name
135+
lineType := rr.Line.DataType().(*arrow.StructType)
136+
for k := 0; k < lineType.NumFields(); k++ {
137+
lineField := lineType.Field(k)
138+
switch lineField.Name {
139+
case "line":
140+
rr.LineNumber = rr.Line.Field(k).(*array.Int64)
141+
case "function_name":
142+
lineFunctionName := rr.Line.Field(k).(*array.Dictionary)
143+
rr.LineFunctionNameIndices = lineFunctionName.Indices().(*array.Uint32)
144+
rr.LineFunctionNameDict = lineFunctionName.Dictionary().(*array.Binary)
145+
case "function_system_name":
146+
lineFunctionSystemName := rr.Line.Field(k).(*array.Dictionary)
147+
rr.LineFunctionSystemNameIndices = lineFunctionSystemName.Indices().(*array.Uint32)
148+
rr.LineFunctionSystemNameDict = lineFunctionSystemName.Dictionary().(*array.Binary)
149+
case "function_filename":
150+
lineFunctionFilename := rr.Line.Field(k).(*array.Dictionary)
151+
rr.LineFunctionFilenameIndices = lineFunctionFilename.Indices().(*array.Uint32)
152+
rr.LineFunctionFilenameDict = lineFunctionFilename.Dictionary().(*array.Binary)
153+
case "function_start_line":
154+
rr.LineFunctionStartLine = rr.Line.Field(k).(*array.Int64)
155+
}
156+
}
157+
}
158+
}
159+
case "value":
160+
rr.Value = ar.Column(i).(*array.Int64)
161+
case "diff":
162+
rr.Diff = ar.Column(i).(*array.Int64)
163+
case ColumnTimestamp:
164+
rr.Timestamp = ar.Column(i).(*array.Int64)
165+
case ColumnPeriod:
166+
rr.Period = ar.Column(i).(*array.Int64)
93167
}
94168
}
95-
labelNum := len(labelFields)
96-
97-
// Get readers from the unfiltered profile.
98-
locations := ar.Column(labelNum).(*array.List)
99-
location := locations.ListValues().(*array.Struct)
100-
address := location.Field(0).(*array.Uint64)
101-
mappingStart := location.Field(1).(*array.Uint64)
102-
mappingLimit := location.Field(2).(*array.Uint64)
103-
mappingOffset := location.Field(3).(*array.Uint64)
104-
mappingFile := location.Field(4).(*array.Dictionary)
105-
mappingFileIndices := mappingFile.Indices().(*array.Uint32)
106-
mappingFileDict := mappingFile.Dictionary().(*array.Binary)
107-
mappingBuildID := location.Field(5).(*array.Dictionary)
108-
mappingBuildIDIndices := mappingBuildID.Indices().(*array.Uint32)
109-
mappingBuildIDDict := mappingBuildID.Dictionary().(*array.Binary)
110-
lines := location.Field(6).(*array.List)
111-
line := lines.ListValues().(*array.Struct)
112-
lineNumber := line.Field(0).(*array.Int64)
113-
lineFunctionName := line.Field(1).(*array.Dictionary)
114-
lineFunctionNameIndices := lineFunctionName.Indices().(*array.Uint32)
115-
lineFunctionNameDict := lineFunctionName.Dictionary().(*array.Binary)
116-
lineFunctionSystemName := line.Field(2).(*array.Dictionary)
117-
lineFunctionSystemNameIndices := lineFunctionSystemName.Indices().(*array.Uint32)
118-
lineFunctionSystemNameDict := lineFunctionSystemName.Dictionary().(*array.Binary)
119-
lineFunctionFilename := line.Field(3).(*array.Dictionary)
120-
lineFunctionFilenameIndices := lineFunctionFilename.Indices().(*array.Uint32)
121-
lineFunctionFilenameDict := lineFunctionFilename.Dictionary().(*array.Binary)
122-
lineFunctionStartLine := line.Field(4).(*array.Int64)
123-
valueColumn := ar.Column(labelNum + 1).(*array.Int64)
124-
diffColumn := ar.Column(labelNum + 2).(*array.Int64)
125-
timestamp := ar.Column(labelNum + 3).(*array.Int64)
126-
period := ar.Column(labelNum + 4).(*array.Int64)
127-
128-
return &RecordReader{
129-
Record: ar,
130-
LabelFields: labelFields,
131-
LabelColumns: labelColumns,
132-
Locations: locations,
133-
Location: location,
134-
Address: address,
135-
MappingStart: mappingStart,
136-
MappingLimit: mappingLimit,
137-
MappingOffset: mappingOffset,
138-
MappingFileIndices: mappingFileIndices,
139-
MappingFileDict: mappingFileDict,
140-
MappingBuildIDIndices: mappingBuildIDIndices,
141-
MappingBuildIDDict: mappingBuildIDDict,
142-
Lines: lines,
143-
Line: line,
144-
LineNumber: lineNumber,
145-
LineFunctionNameIndices: lineFunctionNameIndices,
146-
LineFunctionNameDict: lineFunctionNameDict,
147-
LineFunctionSystemNameIndices: lineFunctionSystemNameIndices,
148-
LineFunctionSystemNameDict: lineFunctionSystemNameDict,
149-
LineFunctionFilenameIndices: lineFunctionFilenameIndices,
150-
LineFunctionFilenameDict: lineFunctionFilenameDict,
151-
LineFunctionStartLine: lineFunctionStartLine,
152-
Value: valueColumn,
153-
Diff: diffColumn,
154-
Timestamp: timestamp,
155-
Period: period,
169+
170+
rr.LabelFields = labelFields
171+
rr.LabelColumns = labelColumns
172+
173+
// Validate that all required fields were found
174+
if rr.Locations == nil {
175+
return nil, fmt.Errorf("missing required field: locations")
176+
}
177+
if rr.Location == nil {
178+
return nil, fmt.Errorf("missing required field: location")
179+
}
180+
if rr.Address == nil {
181+
return nil, fmt.Errorf("missing required field: address")
182+
}
183+
if rr.MappingStart == nil {
184+
return nil, fmt.Errorf("missing required field: mapping_start")
185+
}
186+
if rr.MappingLimit == nil {
187+
return nil, fmt.Errorf("missing required field: mapping_limit")
188+
}
189+
if rr.MappingOffset == nil {
190+
return nil, fmt.Errorf("missing required field: mapping_offset")
191+
}
192+
if rr.MappingFileIndices == nil || rr.MappingFileDict == nil {
193+
return nil, fmt.Errorf("missing required field: mapping_file")
156194
}
195+
if rr.MappingBuildIDIndices == nil || rr.MappingBuildIDDict == nil {
196+
return nil, fmt.Errorf("missing required field: mapping_build_id")
197+
}
198+
if rr.Lines == nil {
199+
return nil, fmt.Errorf("missing required field: lines")
200+
}
201+
if rr.Line == nil {
202+
return nil, fmt.Errorf("missing required field: line")
203+
}
204+
if rr.LineNumber == nil {
205+
return nil, fmt.Errorf("missing required field: line")
206+
}
207+
if rr.LineFunctionNameIndices == nil || rr.LineFunctionNameDict == nil {
208+
return nil, fmt.Errorf("missing required field: function_name")
209+
}
210+
if rr.LineFunctionSystemNameIndices == nil || rr.LineFunctionSystemNameDict == nil {
211+
return nil, fmt.Errorf("missing required field: function_system_name")
212+
}
213+
if rr.LineFunctionFilenameIndices == nil || rr.LineFunctionFilenameDict == nil {
214+
return nil, fmt.Errorf("missing required field: function_filename")
215+
}
216+
if rr.LineFunctionStartLine == nil {
217+
return nil, fmt.Errorf("missing required field: function_start_line")
218+
}
219+
if rr.Value == nil {
220+
return nil, fmt.Errorf("missing required field: value")
221+
}
222+
if rr.Diff == nil {
223+
return nil, fmt.Errorf("missing required field: diff")
224+
}
225+
if rr.Timestamp == nil {
226+
return nil, fmt.Errorf("missing required field: %s", ColumnTimestamp)
227+
}
228+
if rr.Period == nil {
229+
return nil, fmt.Errorf("missing required field: %s", ColumnPeriod)
230+
}
231+
232+
return rr, nil
157233
}

pkg/query/columnquery.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,10 @@ func filterRecord(
531531
_, span := tracer.Start(ctx, "filterRecord")
532532
defer span.End()
533533

534-
r := profile.NewRecordReader(rec)
534+
r, err := profile.NewRecordReader(rec)
535+
if err != nil {
536+
return nil, 0, 0, fmt.Errorf("failed to create record reader: %w", err)
537+
}
535538

536539
// If no filters, return all records
537540
if len(filters) == 0 {
@@ -623,7 +626,11 @@ func filterRecord(
623626

624627
filtered := int64(0)
625628
for _, r := range recs {
626-
filtered += math.Int64.Sum(profile.NewRecordReader(r).Value)
629+
rr, err := profile.NewRecordReader(r)
630+
if err != nil {
631+
return nil, 0, 0, fmt.Errorf("failed to create record reader: %w", err)
632+
}
633+
filtered += math.Int64.Sum(rr.Value)
627634
}
628635

629636
return recs, originalValueSum, filtered, nil

pkg/query/columnquery_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1401,7 +1401,8 @@ func TestFilterData(t *testing.T) {
14011401
r.Release()
14021402
}
14031403
}()
1404-
r := profile.NewRecordReader(recs[0])
1404+
r, err := profile.NewRecordReader(recs[0])
1405+
require.NoError(t, err)
14051406
valid := 0
14061407
for i := 0; i < r.Location.Len(); i++ {
14071408
if r.Location.IsValid(i) {
@@ -1464,7 +1465,8 @@ func TestFilterUnsymbolized(t *testing.T) {
14641465
r.Release()
14651466
}
14661467
}()
1467-
r := profile.NewRecordReader(recs[0])
1468+
r, err := profile.NewRecordReader(recs[0])
1469+
require.NoError(t, err)
14681470
valid := 0
14691471
for i := 0; i < r.Location.Len(); i++ {
14701472
if r.Location.IsValid(i) {
@@ -1560,7 +1562,8 @@ func TestFilterDataWithPath(t *testing.T) {
15601562
r.Release()
15611563
}
15621564
}()
1563-
r := profile.NewRecordReader(recs[0])
1565+
r, err := profile.NewRecordReader(recs[0])
1566+
require.NoError(t, err)
15641567
validIndexes := []uint32{}
15651568
for i := 0; i < r.Location.Len(); i++ {
15661569
if r.Location.IsValid(i) {
@@ -1662,7 +1665,8 @@ func TestFilterDataFrameFilter(t *testing.T) {
16621665
r.Release()
16631666
}
16641667
}()
1665-
r := profile.NewRecordReader(recs[0])
1668+
r, err := profile.NewRecordReader(recs[0])
1669+
require.NoError(t, err)
16661670
valid := 0
16671671
for i := 0; i < r.Location.Len(); i++ {
16681672
if r.Location.IsValid(i) {
@@ -1958,7 +1962,8 @@ func TestFilterDataExclude(t *testing.T) {
19581962
totalValue := int64(0)
19591963
for _, rec := range recs {
19601964
totalRows += rec.NumRows()
1961-
r := profile.NewRecordReader(rec)
1965+
r, err := profile.NewRecordReader(rec)
1966+
require.NoError(t, err)
19621967
totalValue += math.Int64.Sum(r.Value)
19631968
}
19641969
require.Equal(t, int64(2), totalRows)

0 commit comments

Comments
 (0)