Skip to content

Commit ee58c33

Browse files
authored
chore(metastore): use column reader for blooms (#20053)
This PR is a continuation of #19992. I use `pointers.Reader` to refine the sections that match bloom filters. I also fix minor issues that I introduced in the previous PR: - convert the columns only once per column (instead of doing it for every row) - ignore the sections that are of a wrong "type"
1 parent 6a81802 commit ee58c33

File tree

4 files changed

+340
-120
lines changed

4 files changed

+340
-120
lines changed

pkg/dataobj/metastore/iter.go

Lines changed: 164 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,8 @@ func forEachStreamSectionPointer(
9090
}
9191

9292
if colStreamID == nil || colMinTimestamp == nil || colMaxTimestamp == nil {
93-
return fmt.Errorf(
94-
"one of mandatory columns is missing: (streamID=%t, minTimestamp=%t, maxTimestamp=%t)",
95-
colStreamID == nil, colMinTimestamp == nil, colMaxTimestamp == nil,
96-
)
93+
// the section has no rows with stream-based indices and can be ignored completely
94+
continue
9795
}
9896

9997
reader.Reset(pointers.ReaderOptions{
@@ -123,60 +121,68 @@ func forEachStreamSectionPointer(
123121

124122
switch pointerCol.Type {
125123
case pointers.ColumnTypePath:
124+
values := col.(*array.String)
126125
for rIdx := range numRows {
127126
if col.IsNull(rIdx) {
128127
continue
129128
}
130-
buf[rIdx].Path = col.(*array.String).Value(rIdx)
129+
buf[rIdx].Path = values.Value(rIdx)
131130
}
132131
case pointers.ColumnTypeSection:
132+
values := col.(*array.Int64)
133133
for rIdx := range numRows {
134134
if col.IsNull(rIdx) {
135135
continue
136136
}
137-
buf[rIdx].Section = col.(*array.Int64).Value(rIdx)
137+
buf[rIdx].Section = values.Value(rIdx)
138138
}
139139
case pointers.ColumnTypeStreamID:
140+
values := col.(*array.Int64)
140141
for rIdx := range numRows {
141142
if col.IsNull(rIdx) {
142143
continue
143144
}
144-
buf[rIdx].StreamID = col.(*array.Int64).Value(rIdx)
145+
buf[rIdx].StreamID = values.Value(rIdx)
145146
}
146147
case pointers.ColumnTypeStreamIDRef:
148+
values := col.(*array.Int64)
147149
for rIdx := range numRows {
148150
if col.IsNull(rIdx) {
149151
continue
150152
}
151-
buf[rIdx].StreamIDRef = col.(*array.Int64).Value(rIdx)
153+
buf[rIdx].StreamIDRef = values.Value(rIdx)
152154
}
153155
case pointers.ColumnTypeMinTimestamp:
156+
values := col.(*array.Timestamp)
154157
for rIdx := range numRows {
155158
if col.IsNull(rIdx) {
156159
continue
157160
}
158-
buf[rIdx].StartTs = time.Unix(0, int64(col.(*array.Timestamp).Value(rIdx)))
161+
buf[rIdx].StartTs = time.Unix(0, int64(values.Value(rIdx)))
159162
}
160163
case pointers.ColumnTypeMaxTimestamp:
164+
values := col.(*array.Timestamp)
161165
for rIdx := range numRows {
162166
if col.IsNull(rIdx) {
163167
continue
164168
}
165-
buf[rIdx].EndTs = time.Unix(0, int64(col.(*array.Timestamp).Value(rIdx)))
169+
buf[rIdx].EndTs = time.Unix(0, int64(values.Value(rIdx)))
166170
}
167171
case pointers.ColumnTypeRowCount:
172+
values := col.(*array.Int64)
168173
for rIdx := range numRows {
169174
if col.IsNull(rIdx) {
170175
continue
171176
}
172-
buf[rIdx].LineCount = col.(*array.Int64).Value(rIdx)
177+
buf[rIdx].LineCount = values.Value(rIdx)
173178
}
174179
case pointers.ColumnTypeUncompressedSize:
180+
values := col.(*array.Int64)
175181
for rIdx := range numRows {
176182
if col.IsNull(rIdx) {
177183
continue
178184
}
179-
buf[rIdx].UncompressedSize = col.(*array.Int64).Value(rIdx)
185+
buf[rIdx].UncompressedSize = values.Value(rIdx)
180186
}
181187
default:
182188
continue
@@ -195,3 +201,149 @@ func forEachStreamSectionPointer(
195201

196202
return nil
197203
}
204+
205+
func forEachMatchedPointerSectionKey(
206+
ctx context.Context,
207+
object *dataobj.Object,
208+
columnName scalar.Scalar,
209+
matchColumnValue string,
210+
f func(key SectionKey),
211+
) error {
212+
targetTenant, err := user.ExtractOrgID(ctx)
213+
if err != nil {
214+
return fmt.Errorf("extracting org ID: %w", err)
215+
}
216+
217+
var reader pointers.Reader
218+
defer reader.Close()
219+
220+
const batchSize = 128
221+
buf := make([]SectionKey, batchSize)
222+
223+
for _, section := range object.Sections().Filter(pointers.CheckSection) {
224+
if section.Tenant != targetTenant {
225+
continue
226+
}
227+
228+
sec, err := pointers.Open(ctx, section)
229+
if err != nil {
230+
return fmt.Errorf("opening section: %w", err)
231+
}
232+
233+
pointerCols, err := findPointersColumnsByTypes(
234+
sec.Columns(),
235+
pointers.ColumnTypePath,
236+
pointers.ColumnTypeSection,
237+
pointers.ColumnTypeColumnName,
238+
pointers.ColumnTypeValuesBloomFilter,
239+
)
240+
if err != nil {
241+
return fmt.Errorf("finding pointers columns: %w", err)
242+
}
243+
244+
var (
245+
colColumnName *pointers.Column
246+
colBloom *pointers.Column
247+
)
248+
249+
for _, c := range pointerCols {
250+
if c.Type == pointers.ColumnTypeColumnName {
251+
colColumnName = c
252+
}
253+
if c.Type == pointers.ColumnTypeValuesBloomFilter {
254+
colBloom = c
255+
}
256+
if colColumnName != nil && colBloom != nil {
257+
break
258+
}
259+
}
260+
261+
if colColumnName == nil || colBloom == nil {
262+
// the section has no rows for blooms and can be ignored completely
263+
continue
264+
}
265+
266+
reader.Reset(
267+
pointers.ReaderOptions{
268+
Columns: pointerCols,
269+
Predicates: []pointers.Predicate{
270+
pointers.WhereBloomFilterMatches(colColumnName, colBloom, columnName, matchColumnValue),
271+
},
272+
},
273+
)
274+
275+
for {
276+
rec, readErr := reader.Read(ctx, batchSize)
277+
if readErr != nil && !errors.Is(readErr, io.EOF) {
278+
return fmt.Errorf("reading record batch: %w", readErr)
279+
}
280+
if rec == nil {
281+
if errors.Is(readErr, io.EOF) {
282+
break
283+
}
284+
continue
285+
}
286+
287+
numRows := int(rec.NumRows())
288+
if numRows == 0 {
289+
rec.Release()
290+
if errors.Is(readErr, io.EOF) {
291+
break
292+
}
293+
continue
294+
}
295+
296+
for colIdx := range int(rec.NumCols()) {
297+
col := rec.Column(colIdx)
298+
pointerCol := pointerCols[colIdx]
299+
300+
switch pointerCol.Type {
301+
case pointers.ColumnTypePath:
302+
values := col.(*array.String)
303+
for rowIdx := range numRows {
304+
if col.IsNull(rowIdx) {
305+
continue
306+
}
307+
buf[rowIdx].ObjectPath = values.Value(rowIdx)
308+
}
309+
case pointers.ColumnTypeSection:
310+
values := col.(*array.Int64)
311+
for rowIdx := range numRows {
312+
if col.IsNull(rowIdx) {
313+
continue
314+
}
315+
buf[rowIdx].SectionIdx = values.Value(rowIdx)
316+
}
317+
default:
318+
continue
319+
}
320+
}
321+
322+
for _, sectionKey := range buf {
323+
f(sectionKey)
324+
}
325+
326+
if errors.Is(readErr, io.EOF) {
327+
break
328+
}
329+
}
330+
}
331+
332+
return nil
333+
}
334+
335+
func findPointersColumnsByTypes(allColumns []*pointers.Column, columnTypes ...pointers.ColumnType) ([]*pointers.Column, error) {
336+
result := make([]*pointers.Column, 0, len(columnTypes))
337+
338+
for _, c := range allColumns {
339+
for _, neededType := range columnTypes {
340+
if neededType != c.Type {
341+
continue
342+
}
343+
344+
result = append(result, c)
345+
}
346+
}
347+
348+
return result, nil
349+
}

0 commit comments

Comments
 (0)