Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions arrow/array/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type simpleRecord struct {
refCount atomic.Int64

schema *arrow.Schema
meta arrow.Metadata

rows int64
arrs []arrow.Array
Expand All @@ -131,8 +132,19 @@ type simpleRecord struct {
// NewRecordBatch panics if the columns and schema are inconsistent.
// NewRecordBatch panics if rows is larger than the height of the columns.
func NewRecordBatch(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.RecordBatch {
return NewRecordBatchWithMetadata(schema, cols, nrows, arrow.Metadata{})
}

// NewRecordBatchWithMetadata returns a basic, non-lazy in-memory record batch
// with custom metadata. The metadata is preserved during IPC serialization
// at the Message level.
//
// NewRecordBatchWithMetadata panics if the columns and schema are inconsistent.
// NewRecordBatchWithMetadata panics if rows is larger than the height of the columns.
func NewRecordBatchWithMetadata(schema *arrow.Schema, cols []arrow.Array, nrows int64, meta arrow.Metadata) arrow.RecordBatch {
rec := &simpleRecord{
schema: schema,
meta: meta,
rows: nrows,
arrs: make([]arrow.Array, len(cols)),
}
Expand Down Expand Up @@ -189,7 +201,7 @@ func (rec *simpleRecord) SetColumn(i int, arr arrow.Array) (arrow.RecordBatch, e
copy(arrs, rec.arrs)
arrs[i] = arr

return NewRecordBatch(rec.schema, arrs, rec.rows), nil
return NewRecordBatchWithMetadata(rec.schema, arrs, rec.rows, rec.meta), nil
}

func (rec *simpleRecord) validate() error {
Expand Down Expand Up @@ -240,6 +252,7 @@ func (rec *simpleRecord) Release() {
}

func (rec *simpleRecord) Schema() *arrow.Schema { return rec.schema }
func (rec *simpleRecord) Metadata() arrow.Metadata { return rec.meta }
func (rec *simpleRecord) NumRows() int64 { return rec.rows }
func (rec *simpleRecord) NumCols() int64 { return int64(len(rec.arrs)) }
func (rec *simpleRecord) Columns() []arrow.Array { return rec.arrs }
Expand All @@ -262,7 +275,7 @@ func (rec *simpleRecord) NewSlice(i, j int64) arrow.RecordBatch {
arr.Release()
}
}()
return NewRecordBatch(rec.schema, arrs, j-i)
return NewRecordBatchWithMetadata(rec.schema, arrs, j-i, rec.meta)
}

func (rec *simpleRecord) String() string {
Expand Down Expand Up @@ -504,6 +517,6 @@ func IterFromReader(rdr RecordReader) iter.Seq2[arrow.RecordBatch, error] {
}

var (
_ arrow.RecordBatch = (*simpleRecord)(nil)
_ RecordReader = (*simpleRecords)(nil)
_ arrow.RecordBatchWithMetadata = (*simpleRecord)(nil)
_ RecordReader = (*simpleRecords)(nil)
)
7 changes: 6 additions & 1 deletion arrow/ipc/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,11 @@ func newRecordBatch(schema *arrow.Schema, memo *dictutils.Memo, meta *memory.Buf
defer codec.Close()
}

customMeta, err := metadataFromFB(msg)
if err != nil {
panic(err)
}

ctx := &arrayLoaderContext{
src: ipcSource{
meta: &md,
Expand Down Expand Up @@ -488,7 +493,7 @@ func newRecordBatch(schema *arrow.Schema, memo *dictutils.Memo, meta *memory.Buf
defer cols[i].Release()
}

return array.NewRecordBatch(schema, cols, rows)
return array.NewRecordBatchWithMetadata(schema, cols, rows, customMeta)
}

type ipcSource struct {
Expand Down
257 changes: 257 additions & 0 deletions arrow/ipc/ipc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"math/rand"
"os"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -688,3 +689,259 @@ func TestArrowBinaryIPCWriterTruncatedVOffsets(t *testing.T) {

require.False(t, reader.Next())
}

func TestRecordBatchCustomMetadataRoundtrip(t *testing.T) {
mem := memory.NewGoAllocator()
schema := arrow.NewSchema(
[]arrow.Field{{Name: "x", Type: arrow.PrimitiveTypes.Int32}},
nil,
)

bldr := array.NewInt32Builder(mem)
defer bldr.Release()
bldr.AppendValues([]int32{1, 2, 3}, nil)
col := bldr.NewArray()
defer col.Release()

meta := arrow.NewMetadata([]string{"k1", "k2"}, []string{"v1", "v2"})
rec := array.NewRecordBatchWithMetadata(schema, []arrow.Array{col}, 3, meta)
defer rec.Release()

// Write to IPC stream
var buf bytes.Buffer
writer := ipc.NewWriter(&buf, ipc.WithSchema(schema))
require.NoError(t, writer.Write(rec))
require.NoError(t, writer.Close())

// Read back
reader, err := ipc.NewReader(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)
defer reader.Release()

require.True(t, reader.Next())
got := reader.RecordBatch()

rm, ok := got.(arrow.RecordBatchWithMetadata)
require.True(t, ok, "record batch should implement RecordBatchWithMetadata")

require.Equal(t, meta.Keys(), rm.Metadata().Keys())
require.Equal(t, meta.Values(), rm.Metadata().Values())
}

func TestRecordBatchCustomMetadataFileRoundtrip(t *testing.T) {
mem := memory.NewGoAllocator()
schema := arrow.NewSchema(
[]arrow.Field{{Name: "x", Type: arrow.PrimitiveTypes.Int32}},
nil,
)

bldr := array.NewInt32Builder(mem)
defer bldr.Release()
bldr.AppendValues([]int32{10, 20}, nil)
col := bldr.NewArray()
defer col.Release()

meta := arrow.NewMetadata([]string{"file_key"}, []string{"file_value"})
rec := array.NewRecordBatchWithMetadata(schema, []arrow.Array{col}, 2, meta)
defer rec.Release()

// Write to IPC file
var buf bytes.Buffer
writer, err := ipc.NewFileWriter(&buf, ipc.WithSchema(schema))
require.NoError(t, err)
require.NoError(t, writer.Write(rec))
require.NoError(t, writer.Close())

// Read back
reader, err := ipc.NewFileReader(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)
defer reader.Close()

require.Equal(t, 1, reader.NumRecords())
got, err := reader.RecordBatchAt(0)
require.NoError(t, err)
defer got.Release()

rm, ok := got.(arrow.RecordBatchWithMetadata)
require.True(t, ok, "record batch should implement RecordBatchWithMetadata")

require.Equal(t, meta.Keys(), rm.Metadata().Keys())
require.Equal(t, meta.Values(), rm.Metadata().Values())
}

func TestRecordBatchCustomMetadataInterop(t *testing.T) {
t.Run("file", func(t *testing.T) {
f, err := os.Open("testdata/custom_metadata.arrow")
require.NoError(t, err)
defer f.Close()

reader, err := ipc.NewFileReader(f)
require.NoError(t, err)
defer reader.Close()

// Verify schema metadata
schemaMeta := reader.Schema().Metadata()
idx := schemaMeta.FindKey("schema_key")
require.GreaterOrEqual(t, idx, 0)
require.Equal(t, "schema_value", schemaMeta.Values()[idx])

require.Equal(t, 2, reader.NumRecords())

// Batch 1
rec0, err := reader.RecordBatchAt(0)
require.NoError(t, err)
defer rec0.Release()
rm0, ok := rec0.(arrow.RecordBatchWithMetadata)
require.True(t, ok)
m0 := rm0.Metadata()
require.Equal(t, "1", m0.Values()[m0.FindKey("batch_num")])
require.Equal(t, "value1", m0.Values()[m0.FindKey("key1")])

// Batch 2
rec1, err := reader.RecordBatchAt(1)
require.NoError(t, err)
defer rec1.Release()
rm1, ok := rec1.(arrow.RecordBatchWithMetadata)
require.True(t, ok)
m1 := rm1.Metadata()
require.Equal(t, "2", m1.Values()[m1.FindKey("batch_num")])
require.Equal(t, "value2", m1.Values()[m1.FindKey("key2")])
})

t.Run("stream", func(t *testing.T) {
data, err := os.ReadFile("testdata/custom_metadata_stream.arrows")
require.NoError(t, err)

reader, err := ipc.NewReader(bytes.NewReader(data))
require.NoError(t, err)
defer reader.Release()

// Verify schema metadata
schemaMeta := reader.Schema().Metadata()
idx := schemaMeta.FindKey("schema_key")
require.GreaterOrEqual(t, idx, 0)
require.Equal(t, "schema_value", schemaMeta.Values()[idx])

// Batch 1
require.True(t, reader.Next())
rec0 := reader.RecordBatch()
rm0, ok := rec0.(arrow.RecordBatchWithMetadata)
require.True(t, ok)
m0 := rm0.Metadata()
require.Equal(t, "1", m0.Values()[m0.FindKey("batch_num")])
require.Equal(t, "value1", m0.Values()[m0.FindKey("key1")])

// Batch 2
require.True(t, reader.Next())
rec1 := reader.RecordBatch()
rm1, ok := rec1.(arrow.RecordBatchWithMetadata)
require.True(t, ok)
m1 := rm1.Metadata()
require.Equal(t, "2", m1.Values()[m1.FindKey("batch_num")])
require.Equal(t, "value2", m1.Values()[m1.FindKey("key2")])

require.False(t, reader.Next())
})
}

func TestRecordBatchCustomMetadataSlice(t *testing.T) {
mem := memory.NewGoAllocator()
schema := arrow.NewSchema(
[]arrow.Field{{Name: "x", Type: arrow.PrimitiveTypes.Int32}},
nil,
)

bldr := array.NewInt32Builder(mem)
defer bldr.Release()
bldr.AppendValues([]int32{1, 2, 3, 4}, nil)
col := bldr.NewArray()
defer col.Release()

meta := arrow.NewMetadata([]string{"slice_key"}, []string{"slice_value"})
rec := array.NewRecordBatchWithMetadata(schema, []arrow.Array{col}, 4, meta)
defer rec.Release()

sliced := rec.NewSlice(1, 3)
defer sliced.Release()

rm, ok := sliced.(arrow.RecordBatchWithMetadata)
require.True(t, ok, "sliced record should implement RecordBatchWithMetadata")
require.Equal(t, meta.Keys(), rm.Metadata().Keys())
require.Equal(t, meta.Values(), rm.Metadata().Values())
require.EqualValues(t, 2, sliced.NumRows())
}

func TestRecordBatchCustomMetadataSetColumn(t *testing.T) {
mem := memory.NewGoAllocator()
schema := arrow.NewSchema(
[]arrow.Field{
{Name: "x", Type: arrow.PrimitiveTypes.Int32},
{Name: "y", Type: arrow.PrimitiveTypes.Int32},
},
nil,
)

bldr := array.NewInt32Builder(mem)
defer bldr.Release()
bldr.AppendValues([]int32{1, 2}, nil)
col1 := bldr.NewArray()
defer col1.Release()

bldr.AppendValues([]int32{3, 4}, nil)
col2 := bldr.NewArray()
defer col2.Release()

meta := arrow.NewMetadata([]string{"set_key"}, []string{"set_value"})
rec := array.NewRecordBatchWithMetadata(schema, []arrow.Array{col1, col2}, 2, meta)
defer rec.Release()

bldr.AppendValues([]int32{5, 6}, nil)
newCol := bldr.NewArray()
defer newCol.Release()

updated, err := rec.SetColumn(0, newCol)
require.NoError(t, err)
defer updated.Release()

rm, ok := updated.(arrow.RecordBatchWithMetadata)
require.True(t, ok, "updated record should implement RecordBatchWithMetadata")
require.Equal(t, meta.Keys(), rm.Metadata().Keys())
require.Equal(t, meta.Values(), rm.Metadata().Values())
}

func TestRecordBatchNoMetadataRoundtrip(t *testing.T) {
mem := memory.NewGoAllocator()
schema := arrow.NewSchema(
[]arrow.Field{{Name: "x", Type: arrow.PrimitiveTypes.Int32}},
nil,
)

bldr := array.NewInt32Builder(mem)
defer bldr.Release()
bldr.AppendValues([]int32{1, 2, 3}, nil)
col := bldr.NewArray()
defer col.Release()

rec := array.NewRecordBatch(schema, []arrow.Array{col}, 3)
defer rec.Release()

// Write to IPC stream
var buf bytes.Buffer
writer := ipc.NewWriter(&buf, ipc.WithSchema(schema))
require.NoError(t, writer.Write(rec))
require.NoError(t, writer.Close())

// Read back
reader, err := ipc.NewReader(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)
defer reader.Release()

require.True(t, reader.Next())
got := reader.RecordBatch()

rm, ok := got.(arrow.RecordBatchWithMetadata)
require.True(t, ok, "record batch should implement RecordBatchWithMetadata")

// Metadata should be empty, not nil
require.Equal(t, 0, rm.Metadata().Len())
}
12 changes: 7 additions & 5 deletions arrow/ipc/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,13 +1141,15 @@ func writeFBBuilder(b *flatbuffers.Builder, mem memory.Allocator) *memory.Buffer
return buf
}

func writeMessageFB(b *flatbuffers.Builder, mem memory.Allocator, hdrType flatbuf.MessageHeader, hdr flatbuffers.UOffsetT, bodyLen int64) *memory.Buffer {
func writeMessageFB(b *flatbuffers.Builder, mem memory.Allocator, hdrType flatbuf.MessageHeader, hdr flatbuffers.UOffsetT, bodyLen int64, customMetadata arrow.Metadata) *memory.Buffer {
metaFB := metadataToFB(b, customMetadata, flatbuf.MessageStartCustomMetadataVector)

flatbuf.MessageStart(b)
flatbuf.MessageAddVersion(b, flatbuf.MetadataVersion(currentMetadataVersion))
flatbuf.MessageAddHeaderType(b, hdrType)
flatbuf.MessageAddHeader(b, hdr)
flatbuf.MessageAddBodyLength(b, bodyLen)
flatbuf.MessageAddCustomMetadata(b, metaFB)
msg := flatbuf.MessageEnd(b)
b.Finish(msg)

Expand All @@ -1157,7 +1159,7 @@ func writeMessageFB(b *flatbuffers.Builder, mem memory.Allocator, hdrType flatbu
func writeSchemaMessage(schema *arrow.Schema, mem memory.Allocator, dict *dictutils.Mapper) *memory.Buffer {
b := flatbuffers.NewBuilder(1024)
schemaFB := schemaToFB(b, schema, dict)
return writeMessageFB(b, mem, flatbuf.MessageHeaderSchema, schemaFB, 0)
return writeMessageFB(b, mem, flatbuf.MessageHeaderSchema, schemaFB, 0, arrow.Metadata{})
}

func writeFileFooter(schema *arrow.Schema, dicts, recs []dataBlock, w io.Writer) error {
Expand All @@ -1184,10 +1186,10 @@ func writeFileFooter(schema *arrow.Schema, dicts, recs []dataBlock, w io.Writer)
return err
}

func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType, variadicCounts []int64) *memory.Buffer {
func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType, variadicCounts []int64, customMetadata arrow.Metadata) *memory.Buffer {
b := flatbuffers.NewBuilder(0)
recFB := recordToFB(b, size, bodyLength, fields, meta, codec, variadicCounts)
return writeMessageFB(b, mem, flatbuf.MessageHeaderRecordBatch, recFB, bodyLength)
return writeMessageFB(b, mem, flatbuf.MessageHeaderRecordBatch, recFB, bodyLength, customMetadata)
}

func writeDictionaryMessage(mem memory.Allocator, id int64, isDelta bool, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType, variadicCounts []int64) *memory.Buffer {
Expand All @@ -1199,7 +1201,7 @@ func writeDictionaryMessage(mem memory.Allocator, id int64, isDelta bool, size,
flatbuf.DictionaryBatchAddData(b, recFB)
flatbuf.DictionaryBatchAddIsDelta(b, isDelta)
dictFB := flatbuf.DictionaryBatchEnd(b)
return writeMessageFB(b, mem, flatbuf.MessageHeaderDictionaryBatch, dictFB, bodyLength)
return writeMessageFB(b, mem, flatbuf.MessageHeaderDictionaryBatch, dictFB, bodyLength, arrow.Metadata{})
}

func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType, variadicCounts []int64) flatbuffers.UOffsetT {
Expand Down
Binary file added arrow/ipc/testdata/custom_metadata.arrow
Binary file not shown.
Binary file added arrow/ipc/testdata/custom_metadata_stream.arrows
Binary file not shown.
Loading