Skip to content

Commit 28304a8

Browse files
cache schema fields to avoid memory allocation on every Next() call
1 parent 08f247d commit 28304a8

File tree

3 files changed

+20
-16
lines changed

3 files changed

+20
-16
lines changed

chdb/driver/driver.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func (d DriverType) PrepareRows(result chdbpurego.ChdbResult, buf []byte, bufSiz
5454
localResult: result, reader: reader,
5555
bufferSize: bufSize, needNewBuffer: true,
5656
useUnsafeStringReader: useUnsafe,
57+
schemaFields: reader.Schema().Fields(),
5758
}, nil
5859

5960
}
@@ -73,6 +74,7 @@ func (d DriverType) PrepareStreamingRows(result chdbpurego.ChdbStreamResult, buf
7374
stream: result, curChunk: nextRes, reader: reader,
7475
bufferSize: bufSize, needNewBuffer: true,
7576
useUnsafeStringReader: useUnsafe,
77+
schemaFields: reader.Schema().Fields(),
7678
}, nil
7779

7880
}

chdb/driver/parquet.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type parquetRows struct {
2828
reader *parquet.GenericReader[any] // parquet reader
2929
curRecord parquet.Row // TODO: delete this?
3030
buffer []parquet.Row // record buffer
31+
schemaFields []parquet.Field // schema fields
3132
bufferSize int // amount of records to preload into buffer
3233
bufferIndex int64 // index in the current buffer
3334
curRow int64 // row counter
@@ -36,8 +37,7 @@ type parquetRows struct {
3637
}
3738

3839
func (r *parquetRows) Columns() (out []string) {
39-
sch := r.reader.Schema()
40-
for _, f := range sch.Fields() {
40+
for _, f := range r.schemaFields {
4141
out = append(out, f.Name())
4242
}
4343

@@ -53,7 +53,7 @@ func (r *parquetRows) Close() error {
5353
r.reader = nil
5454
r.localResult.Free()
5555
r.localResult = nil
56-
56+
r.schemaFields = nil
5757
r.buffer = nil
5858
return nil
5959
}
@@ -90,7 +90,7 @@ func (r *parquetRows) Next(dest []driver.Value) error {
9090

9191
}
9292
r.curRecord = r.buffer[r.bufferIndex]
93-
if r.curRecord == nil || len(r.curRecord) == 0 {
93+
if len(r.curRecord) == 0 {
9494
return fmt.Errorf("empty row")
9595
}
9696
var scanError error
@@ -166,19 +166,19 @@ func (r *parquetRows) Next(dest []driver.Value) error {
166166
}
167167

168168
func (r *parquetRows) ColumnTypeDatabaseTypeName(index int) string {
169-
return r.reader.Schema().Fields()[index].Type().String()
169+
return r.schemaFields[index].Type().String()
170170
}
171171

172172
func (r *parquetRows) ColumnTypeNullable(index int) (nullable, ok bool) {
173-
return r.reader.Schema().Fields()[index].Optional(), true
173+
return r.schemaFields[index].Optional(), true
174174
}
175175

176176
func (r *parquetRows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool) {
177177
return 0, 0, false
178178
}
179179

180180
func (r *parquetRows) ColumnTypeScanType(index int) reflect.Type {
181-
switch r.reader.Schema().Fields()[index].Type().Kind() {
181+
switch r.schemaFields[index].Type().Kind() {
182182
case parquet.Boolean:
183183
return reflect.TypeOf(false)
184184
case parquet.Int32:

chdb/driver/parquet_streaming.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,17 @@ type parquetStreamingRows struct {
1818
curChunk chdbpurego.ChdbResult // current chunk
1919
reader *parquet.GenericReader[any] // parquet reader
2020
curRecord parquet.Row
21-
buffer []parquet.Row // record buffer
22-
bufferSize int // amount of records to preload into buffer
23-
bufferIndex int64 // index in the current buffer
24-
curRow int64 // row counter
21+
buffer []parquet.Row // record buffer
22+
schemaFields []parquet.Field // schema fields
23+
bufferSize int // amount of records to preload into buffer
24+
bufferIndex int64 // index in the current buffer
25+
curRow int64 // row counter
2526
needNewBuffer bool
2627
useUnsafeStringReader bool
2728
}
2829

2930
func (r *parquetStreamingRows) Columns() (out []string) {
30-
sch := r.reader.Schema()
31-
for _, f := range sch.Fields() {
31+
for _, f := range r.schemaFields {
3232
out = append(out, f.Name())
3333
}
3434

@@ -45,6 +45,7 @@ func (r *parquetStreamingRows) Close() error {
4545
r.stream.Free()
4646
r.curChunk = nil
4747
r.stream = nil
48+
r.schemaFields = nil
4849

4950
r.buffer = nil
5051
return nil
@@ -85,6 +86,7 @@ func (r *parquetStreamingRows) readNextChunkFromStream() error {
8586
return io.EOF
8687
}
8788
r.reader = parquet.NewGenericReader[any](bytes.NewReader(r.curChunk.Buf()))
89+
r.schemaFields = r.reader.Schema().Fields()
8890
return nil
8991
}
9092

@@ -182,19 +184,19 @@ func (r *parquetStreamingRows) Next(dest []driver.Value) error {
182184
}
183185

184186
func (r *parquetStreamingRows) ColumnTypeDatabaseTypeName(index int) string {
185-
return r.reader.Schema().Fields()[index].Type().String()
187+
return r.schemaFields[index].Type().String()
186188
}
187189

188190
func (r *parquetStreamingRows) ColumnTypeNullable(index int) (nullable, ok bool) {
189-
return r.reader.Schema().Fields()[index].Optional(), true
191+
return r.schemaFields[index].Optional(), true
190192
}
191193

192194
func (r *parquetStreamingRows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool) {
193195
return 0, 0, false
194196
}
195197

196198
func (r *parquetStreamingRows) ColumnTypeScanType(index int) reflect.Type {
197-
switch r.reader.Schema().Fields()[index].Type().Kind() {
199+
switch r.schemaFields[index].Type().Kind() {
198200
case parquet.Boolean:
199201
return reflect.TypeOf(false)
200202
case parquet.Int32:

0 commit comments

Comments
 (0)