Skip to content

Commit 0606cfb

Browse files
refactor(arrow): fourth increment of the Record -> RecordBatch migration (#486)
### Rationale for this change Rename the Record interface to RecordBatch for clarity, since Record commonly means a single row but this type represents a batch of rows. In addition, some method names such as NewRecord have been changed to NewRecordBatch, but the old methods still exist as wrappers around the newly named methods to maintain backward compatibility. ### What changes are included in this PR? The following packages now use RecordBatch instead of Record: arrow/ipc arrow/flight arrow/flight/flightsql arrow/internal/flight_integration arrow/array In addition: - Fixed interface compatibility issues in dependent packages (arrow/avro, arrow/csv, arrow/cdata, parquet/pqarrow) - Updated all Record() methods to call RecordBatch() for consistency ### Are these changes tested? All affected packages build successfully and core tests pass. Some tests requiring external dependencies were skipped. This is the fourth increment of the Record → RecordBatch migration. --------- Co-authored-by: MANDY Alimaa <alimaa@wisc.edu>
1 parent d715ac7 commit 0606cfb

File tree

21 files changed

+176
-93
lines changed

21 files changed

+176
-93
lines changed

arrow/array/json_reader.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ type JSONReader struct {
7575
bldr *RecordBuilder
7676

7777
refs atomic.Int64
78-
cur arrow.Record
78+
cur arrow.RecordBatch
7979
err error
8080

8181
chunk int
@@ -124,9 +124,15 @@ func (r *JSONReader) Err() error { return r.err }
124124

125125
func (r *JSONReader) Schema() *arrow.Schema { return r.schema }
126126

127+
// RecordBatch returns the last read in record batch. The returned record batch is only valid
128+
// until the next call to Next unless Retain is called on the record batch itself.
129+
func (r *JSONReader) RecordBatch() arrow.RecordBatch { return r.cur }
130+
127131
// Record returns the last read in record. The returned record is only valid
128132
// until the next call to Next unless Retain is called on the record itself.
129-
func (r *JSONReader) Record() arrow.Record { return r.cur }
133+
//
134+
// Deprecated: Use [RecordBatch] instead.
135+
func (r *JSONReader) Record() arrow.Record { return r.RecordBatch() }
130136

131137
func (r *JSONReader) Retain() {
132138
r.refs.Add(1)
@@ -144,7 +150,7 @@ func (r *JSONReader) Release() {
144150
}
145151
}
146152

147-
// Next returns true if it read in a record, which will be available via Record
153+
// Next returns true if it read in a record, which will be available via RecordBatch
148154
// and false if there is either an error or the end of the reader.
149155
func (r *JSONReader) Next() bool {
150156
if r.cur != nil {

arrow/array/record.go

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ type RecordReader interface {
3737
Schema() *arrow.Schema
3838

3939
Next() bool
40+
RecordBatch() arrow.RecordBatch
41+
// Deprecated: Use [RecordBatch] instead.
4042
Record() arrow.Record
4143
Err() error
4244
}
@@ -46,12 +48,12 @@ type simpleRecords struct {
4648
refCount atomic.Int64
4749

4850
schema *arrow.Schema
49-
recs []arrow.Record
50-
cur arrow.Record
51+
recs []arrow.RecordBatch
52+
cur arrow.RecordBatch
5153
}
5254

5355
// NewRecordReader returns a simple iterator over the given slice of records.
54-
func NewRecordReader(schema *arrow.Schema, recs []arrow.Record) (RecordReader, error) {
56+
func NewRecordReader(schema *arrow.Schema, recs []arrow.RecordBatch) (RecordReader, error) {
5557
rs := &simpleRecords{
5658
schema: schema,
5759
recs: recs,
@@ -96,8 +98,11 @@ func (rs *simpleRecords) Release() {
9698
}
9799
}
98100

99-
func (rs *simpleRecords) Schema() *arrow.Schema { return rs.schema }
100-
func (rs *simpleRecords) Record() arrow.Record { return rs.cur }
101+
func (rs *simpleRecords) Schema() *arrow.Schema { return rs.schema }
102+
func (rs *simpleRecords) RecordBatch() arrow.RecordBatch { return rs.cur }
103+
104+
// Deprecated: Use [RecordBatch] instead.
105+
func (rs *simpleRecords) Record() arrow.Record { return rs.RecordBatch() }
101106
func (rs *simpleRecords) Next() bool {
102107
if len(rs.recs) == 0 {
103108
return false
@@ -121,11 +126,11 @@ type simpleRecord struct {
121126
arrs []arrow.Array
122127
}
123128

124-
// NewRecord returns a basic, non-lazy in-memory record batch.
129+
// NewRecordBatch returns a basic, non-lazy in-memory record batch.
125130
//
126-
// NewRecord panics if the columns and schema are inconsistent.
127-
// NewRecord panics if rows is larger than the height of the columns.
128-
func NewRecord(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.Record {
131+
// NewRecordBatch panics if the columns and schema are inconsistent.
132+
// NewRecordBatch panics if rows is larger than the height of the columns.
133+
func NewRecordBatch(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.RecordBatch {
129134
rec := &simpleRecord{
130135
schema: schema,
131136
rows: nrows,
@@ -156,7 +161,12 @@ func NewRecord(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.Reco
156161
return rec
157162
}
158163

159-
func (rec *simpleRecord) SetColumn(i int, arr arrow.Array) (arrow.Record, error) {
164+
// Deprecated: Use [NewRecordBatch] instead.
165+
func NewRecord(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.Record {
166+
return NewRecordBatch(schema, cols, nrows)
167+
}
168+
169+
func (rec *simpleRecord) SetColumn(i int, arr arrow.Array) (arrow.RecordBatch, error) {
160170
if i < 0 || i >= len(rec.arrs) {
161171
return nil, fmt.Errorf("arrow/array: column index out of range [0, %d): got=%d", len(rec.arrs), i)
162172
}
@@ -179,7 +189,7 @@ func (rec *simpleRecord) SetColumn(i int, arr arrow.Array) (arrow.Record, error)
179189
copy(arrs, rec.arrs)
180190
arrs[i] = arr
181191

182-
return NewRecord(rec.schema, arrs, rec.rows), nil
192+
return NewRecordBatch(rec.schema, arrs, rec.rows), nil
183193
}
184194

185195
func (rec *simpleRecord) validate() error {
@@ -242,7 +252,7 @@ func (rec *simpleRecord) ColumnName(i int) string { return rec.schema.Field(i).
242252
//
243253
// NewSlice panics if the slice is outside the valid range of the record array.
244254
// NewSlice panics if j < i.
245-
func (rec *simpleRecord) NewSlice(i, j int64) arrow.Record {
255+
func (rec *simpleRecord) NewSlice(i, j int64) arrow.RecordBatch {
246256
arrs := make([]arrow.Array, len(rec.arrs))
247257
for ii, arr := range rec.arrs {
248258
arrs[ii] = NewSlice(arr, i, j)
@@ -252,7 +262,7 @@ func (rec *simpleRecord) NewSlice(i, j int64) arrow.Record {
252262
arr.Release()
253263
}
254264
}()
255-
return NewRecord(rec.schema, arrs, j-i)
265+
return NewRecordBatch(rec.schema, arrs, j-i)
256266
}
257267

258268
func (rec *simpleRecord) String() string {
@@ -325,13 +335,13 @@ func (b *RecordBuilder) Reserve(size int) {
325335
}
326336
}
327337

328-
// NewRecord creates a new record from the memory buffers and resets the
329-
// RecordBuilder so it can be used to build a new record.
338+
// NewRecordBatch creates a new record batch from the memory buffers and resets the
339+
// RecordBuilder so it can be used to build a new record batch.
330340
//
331-
// The returned Record must be Release()'d after use.
341+
// The returned RecordBatch must be Release()'d after use.
332342
//
333-
// NewRecord panics if the fields' builder do not have the same length.
334-
func (b *RecordBuilder) NewRecord() arrow.Record {
343+
// NewRecordBatch panics if the fields' builder do not have the same length.
344+
func (b *RecordBuilder) NewRecordBatch() arrow.RecordBatch {
335345
cols := make([]arrow.Array, len(b.fields))
336346
rows := int64(0)
337347

@@ -353,7 +363,12 @@ func (b *RecordBuilder) NewRecord() arrow.Record {
353363
rows = irow
354364
}
355365

356-
return NewRecord(b.schema, cols, rows)
366+
return NewRecordBatch(b.schema, cols, rows)
367+
}
368+
369+
// Deprecated: Use [NewRecordBatch] instead.
370+
func (b *RecordBuilder) NewRecord() arrow.Record {
371+
return b.NewRecordBatch()
357372
}
358373

359374
// UnmarshalJSON for record builder will read in a single object and add the values
@@ -411,9 +426,9 @@ type iterReader struct {
411426
refCount atomic.Int64
412427

413428
schema *arrow.Schema
414-
cur arrow.Record
429+
cur arrow.RecordBatch
415430

416-
next func() (arrow.Record, error, bool)
431+
next func() (arrow.RecordBatch, error, bool)
417432
stop func()
418433

419434
err error
@@ -434,7 +449,10 @@ func (ir *iterReader) Release() {
434449
}
435450
}
436451

437-
func (ir *iterReader) Record() arrow.Record { return ir.cur }
452+
func (ir *iterReader) RecordBatch() arrow.RecordBatch { return ir.cur }
453+
454+
// Deprecated: Use [RecordBatch] instead.
455+
func (ir *iterReader) Record() arrow.Record { return ir.RecordBatch() }
438456
func (ir *iterReader) Err() error { return ir.err }
439457

440458
func (ir *iterReader) Next() bool {
@@ -452,9 +470,9 @@ func (ir *iterReader) Next() bool {
452470
return ok
453471
}
454472

455-
// ReaderFromIter wraps a go iterator for arrow.Record + error into a RecordReader
473+
// ReaderFromIter wraps a go iterator for arrow.RecordBatch + error into a RecordReader
456474
// interface object for ease of use.
457-
func ReaderFromIter(schema *arrow.Schema, itr iter.Seq2[arrow.Record, error]) RecordReader {
475+
func ReaderFromIter(schema *arrow.Schema, itr iter.Seq2[arrow.RecordBatch, error]) RecordReader {
458476
next, stop := iter.Pull2(itr)
459477
rdr := &iterReader{
460478
schema: schema,
@@ -469,12 +487,12 @@ func ReaderFromIter(schema *arrow.Schema, itr iter.Seq2[arrow.Record, error]) Re
469487
// you can use range on. The semantics are still important, if a record
470488
// that is returned is desired to be utilized beyond the scope of an iteration
471489
// then Retain must be called on it.
472-
func IterFromReader(rdr RecordReader) iter.Seq2[arrow.Record, error] {
490+
func IterFromReader(rdr RecordReader) iter.Seq2[arrow.RecordBatch, error] {
473491
rdr.Retain()
474492
return func(yield func(arrow.RecordBatch, error) bool) {
475493
defer rdr.Release()
476494
for rdr.Next() {
477-
if !yield(rdr.Record(), nil) {
495+
if !yield(rdr.RecordBatch(), nil) {
478496
return
479497
}
480498
}
@@ -486,6 +504,6 @@ func IterFromReader(rdr RecordReader) iter.Seq2[arrow.Record, error] {
486504
}
487505

488506
var (
489-
_ arrow.Record = (*simpleRecord)(nil)
490-
_ RecordReader = (*simpleRecords)(nil)
507+
_ arrow.RecordBatch = (*simpleRecord)(nil)
508+
_ RecordReader = (*simpleRecords)(nil)
491509
)

arrow/array/table.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,8 +320,11 @@ func NewTableReader(tbl arrow.Table, chunkSize int64) *TableReader {
320320
return tr
321321
}
322322

323-
func (tr *TableReader) Schema() *arrow.Schema { return tr.tbl.Schema() }
324-
func (tr *TableReader) Record() arrow.RecordBatch { return tr.rec }
323+
func (tr *TableReader) Schema() *arrow.Schema { return tr.tbl.Schema() }
324+
func (tr *TableReader) RecordBatch() arrow.RecordBatch { return tr.rec }
325+
326+
// Deprecated: Use [RecordBatch] instead.
327+
func (tr *TableReader) Record() arrow.Record { return tr.RecordBatch() }
325328

326329
func (tr *TableReader) Next() bool {
327330
if tr.cur >= tr.max {

arrow/array/util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func FromJSON(mem memory.Allocator, dt arrow.DataType, r io.Reader, opts ...From
180180

181181
// RecordToStructArray constructs a struct array from the columns of the record batch
182182
// by referencing them, zero-copy.
183-
func RecordToStructArray(rec arrow.Record) *Struct {
183+
func RecordToStructArray(rec arrow.RecordBatch) *Struct {
184184
cols := make([]arrow.ArrayData, rec.NumCols())
185185
for i, c := range rec.Columns() {
186186
cols[i] = c.Data()

arrow/avro/reader.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,17 @@ func (r *OCFReader) AvroSchema() string { return r.avroSchema }
189189
// Schema returns the converted Arrow schema of the Avro OCF
190190
func (r *OCFReader) Schema() *arrow.Schema { return r.schema }
191191

192+
// RecordBatch returns the current record batch that has been extracted from the
193+
// underlying Avro OCF file.
194+
// It is valid until the next call to Next.
195+
func (r *OCFReader) RecordBatch() arrow.RecordBatch { return r.cur }
196+
192197
// Record returns the current record that has been extracted from the
193198
// underlying Avro OCF file.
194199
// It is valid until the next call to Next.
195-
func (r *OCFReader) Record() arrow.RecordBatch { return r.cur }
200+
//
201+
// Deprecated: Use [RecordBatch] instead.
202+
func (r *OCFReader) Record() arrow.Record { return r.RecordBatch() }
196203

197204
// Metrics returns the maximum queue depth of the Avro record read cache and of the
198205
// converted Arrow record cache.

arrow/cdata/cdata.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -959,8 +959,11 @@ type nativeCRecordBatchReader struct {
959959
func (n *nativeCRecordBatchReader) Retain() {}
960960
func (n *nativeCRecordBatchReader) Release() {}
961961

962-
func (n *nativeCRecordBatchReader) Err() error { return n.err }
963-
func (n *nativeCRecordBatchReader) Record() arrow.RecordBatch { return n.cur }
962+
func (n *nativeCRecordBatchReader) Err() error { return n.err }
963+
func (n *nativeCRecordBatchReader) RecordBatch() arrow.RecordBatch { return n.cur }
964+
965+
// Deprecated: Use [RecordBatch] instead.
966+
func (n *nativeCRecordBatchReader) Record() arrow.Record { return n.RecordBatch() }
964967

965968
func (n *nativeCRecordBatchReader) Next() bool {
966969
err := n.next()

arrow/cdata/cdata_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -980,10 +980,13 @@ func (r *failingReader) Next() bool {
980980
r.opCount -= 1
981981
return r.opCount > 0
982982
}
983-
func (r *failingReader) Record() arrow.RecordBatch {
983+
func (r *failingReader) RecordBatch() arrow.RecordBatch {
984984
arrdata.Records["primitives"][0].Retain()
985985
return arrdata.Records["primitives"][0]
986986
}
987+
func (r *failingReader) Record() arrow.Record {
988+
return r.RecordBatch()
989+
}
987990
func (r *failingReader) Err() error {
988991
if r.opCount == 0 {
989992
return fmt.Errorf("Expected error message")

arrow/csv/reader.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,17 @@ func (r *Reader) Err() error { return r.err }
222222

223223
func (r *Reader) Schema() *arrow.Schema { return r.schema }
224224

225+
// RecordBatch returns the current record batch that has been extracted from the
226+
// underlying CSV file.
227+
// It is valid until the next call to Next.
228+
func (r *Reader) RecordBatch() arrow.RecordBatch { return r.cur }
229+
225230
// Record returns the current record that has been extracted from the
226231
// underlying CSV file.
227232
// It is valid until the next call to Next.
228-
func (r *Reader) Record() arrow.RecordBatch { return r.cur }
233+
//
234+
// Deprecated: Use [RecordBatch] instead.
235+
func (r *Reader) Record() arrow.Record { return r.RecordBatch() }
229236

230237
// Next returns whether a Record could be extracted from the underlying CSV file.
231238
//

arrow/flight/flightsql/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,7 +1097,7 @@ type PreparedStatement struct {
10971097
handle []byte
10981098
datasetSchema *arrow.Schema
10991099
paramSchema *arrow.Schema
1100-
paramBinding arrow.Record
1100+
paramBinding arrow.RecordBatch
11011101
streamBinding array.RecordReader
11021102
closed bool
11031103
}
@@ -1373,7 +1373,7 @@ func (p *PreparedStatement) clearParameters() {
13731373
// from under the statement. Release will be called on a previous binding
13741374
// record or reader if it existed, and will be called upon calling Close on the
13751375
// PreparedStatement.
1376-
func (p *PreparedStatement) SetParameters(binding arrow.Record) {
1376+
func (p *PreparedStatement) SetParameters(binding arrow.RecordBatch) {
13771377
p.clearParameters()
13781378
p.paramBinding = binding
13791379
if p.paramBinding != nil {

arrow/flight/flightsql/example/sql_batch_reader.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ type SqlBatchReader struct {
104104

105105
schema *arrow.Schema
106106
rows *sql.Rows
107-
record arrow.Record
107+
record arrow.RecordBatch
108108
bldr *array.RecordBuilder
109109
err error
110110

@@ -253,7 +253,10 @@ func (r *SqlBatchReader) Release() {
253253
}
254254
func (r *SqlBatchReader) Schema() *arrow.Schema { return r.schema }
255255

256-
func (r *SqlBatchReader) Record() arrow.Record { return r.record }
256+
func (r *SqlBatchReader) RecordBatch() arrow.RecordBatch { return r.record }
257+
258+
// Deprecated: Use [RecordBatch] instead.
259+
func (r *SqlBatchReader) Record() arrow.Record { return r.RecordBatch() }
257260

258261
func (r *SqlBatchReader) Err() error { return r.err }
259262

0 commit comments

Comments
 (0)