Skip to content

Commit 24c66fa

Browse files
authored
feat(arrow/ipc): support custom_metadata on RecordBatch messages (#669)
### Rationale for this change Arrow IPC Messages wrap RecordBatch data in a Message flatbuffer which has a custom_metadata field (vector of KeyValue pairs). PyArrow and other implementations use this to attach per-batch metadata, but the Go implementation previously ignored it on both read and write paths. ### What changes are included in this PR? Add a RecordBatchWithMetadata optional interface to avoid breaking the existing RecordBatch interface. The simpleRecord implementation carries metadata through NewSlice, SetColumn, and IPC round-trips. Includes PyArrow-generated test fixtures for interoperability validation. ### Are these changes tested? Yes tests are included. ### Are there any user-facing changes? Yes custom_metadata is now supported on RecordBatches. This change was developed with AI assistance and manually reviewed.
1 parent a886a57 commit 24c66fa

File tree

8 files changed

+303
-13
lines changed

8 files changed

+303
-13
lines changed

arrow/array/record.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ type simpleRecord struct {
121121
refCount atomic.Int64
122122

123123
schema *arrow.Schema
124+
meta arrow.Metadata
124125

125126
rows int64
126127
arrs []arrow.Array
@@ -131,8 +132,19 @@ type simpleRecord struct {
131132
// NewRecordBatch panics if the columns and schema are inconsistent.
132133
// NewRecordBatch panics if rows is larger than the height of the columns.
133134
func NewRecordBatch(schema *arrow.Schema, cols []arrow.Array, nrows int64) arrow.RecordBatch {
135+
return NewRecordBatchWithMetadata(schema, cols, nrows, arrow.Metadata{})
136+
}
137+
138+
// NewRecordBatchWithMetadata returns a basic, non-lazy in-memory record batch
139+
// with custom metadata. The metadata is preserved during IPC serialization
140+
// at the Message level.
141+
//
142+
// NewRecordBatchWithMetadata panics if the columns and schema are inconsistent.
143+
// NewRecordBatchWithMetadata panics if rows is larger than the height of the columns.
144+
func NewRecordBatchWithMetadata(schema *arrow.Schema, cols []arrow.Array, nrows int64, meta arrow.Metadata) arrow.RecordBatch {
134145
rec := &simpleRecord{
135146
schema: schema,
147+
meta: meta,
136148
rows: nrows,
137149
arrs: make([]arrow.Array, len(cols)),
138150
}
@@ -189,7 +201,7 @@ func (rec *simpleRecord) SetColumn(i int, arr arrow.Array) (arrow.RecordBatch, e
189201
copy(arrs, rec.arrs)
190202
arrs[i] = arr
191203

192-
return NewRecordBatch(rec.schema, arrs, rec.rows), nil
204+
return NewRecordBatchWithMetadata(rec.schema, arrs, rec.rows, rec.meta), nil
193205
}
194206

195207
func (rec *simpleRecord) validate() error {
@@ -240,6 +252,7 @@ func (rec *simpleRecord) Release() {
240252
}
241253

242254
func (rec *simpleRecord) Schema() *arrow.Schema { return rec.schema }
255+
func (rec *simpleRecord) Metadata() arrow.Metadata { return rec.meta }
243256
func (rec *simpleRecord) NumRows() int64 { return rec.rows }
244257
func (rec *simpleRecord) NumCols() int64 { return int64(len(rec.arrs)) }
245258
func (rec *simpleRecord) Columns() []arrow.Array { return rec.arrs }
@@ -262,7 +275,7 @@ func (rec *simpleRecord) NewSlice(i, j int64) arrow.RecordBatch {
262275
arr.Release()
263276
}
264277
}()
265-
return NewRecordBatch(rec.schema, arrs, j-i)
278+
return NewRecordBatchWithMetadata(rec.schema, arrs, j-i, rec.meta)
266279
}
267280

268281
func (rec *simpleRecord) String() string {
@@ -504,6 +517,6 @@ func IterFromReader(rdr RecordReader) iter.Seq2[arrow.RecordBatch, error] {
504517
}
505518

506519
var (
507-
_ arrow.RecordBatch = (*simpleRecord)(nil)
508-
_ RecordReader = (*simpleRecords)(nil)
520+
_ arrow.RecordBatchWithMetadata = (*simpleRecord)(nil)
521+
_ RecordReader = (*simpleRecords)(nil)
509522
)

arrow/ipc/file_reader.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,11 @@ func newRecordBatch(schema *arrow.Schema, memo *dictutils.Memo, meta *memory.Buf
458458
defer codec.Close()
459459
}
460460

461+
customMeta, err := metadataFromFB(msg)
462+
if err != nil {
463+
panic(err)
464+
}
465+
461466
ctx := &arrayLoaderContext{
462467
src: ipcSource{
463468
meta: &md,
@@ -488,7 +493,7 @@ func newRecordBatch(schema *arrow.Schema, memo *dictutils.Memo, meta *memory.Buf
488493
defer cols[i].Release()
489494
}
490495

491-
return array.NewRecordBatch(schema, cols, rows)
496+
return array.NewRecordBatchWithMetadata(schema, cols, rows, customMeta)
492497
}
493498

494499
type ipcSource struct {

arrow/ipc/ipc_test.go

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
"io"
2424
"math/rand"
25+
"os"
2526
"strconv"
2627
"strings"
2728
"testing"
@@ -688,3 +689,259 @@ func TestArrowBinaryIPCWriterTruncatedVOffsets(t *testing.T) {
688689

689690
require.False(t, reader.Next())
690691
}
692+
693+
func TestRecordBatchCustomMetadataRoundtrip(t *testing.T) {
694+
mem := memory.NewGoAllocator()
695+
schema := arrow.NewSchema(
696+
[]arrow.Field{{Name: "x", Type: arrow.PrimitiveTypes.Int32}},
697+
nil,
698+
)
699+
700+
bldr := array.NewInt32Builder(mem)
701+
defer bldr.Release()
702+
bldr.AppendValues([]int32{1, 2, 3}, nil)
703+
col := bldr.NewArray()
704+
defer col.Release()
705+
706+
meta := arrow.NewMetadata([]string{"k1", "k2"}, []string{"v1", "v2"})
707+
rec := array.NewRecordBatchWithMetadata(schema, []arrow.Array{col}, 3, meta)
708+
defer rec.Release()
709+
710+
// Write to IPC stream
711+
var buf bytes.Buffer
712+
writer := ipc.NewWriter(&buf, ipc.WithSchema(schema))
713+
require.NoError(t, writer.Write(rec))
714+
require.NoError(t, writer.Close())
715+
716+
// Read back
717+
reader, err := ipc.NewReader(bytes.NewReader(buf.Bytes()))
718+
require.NoError(t, err)
719+
defer reader.Release()
720+
721+
require.True(t, reader.Next())
722+
got := reader.RecordBatch()
723+
724+
rm, ok := got.(arrow.RecordBatchWithMetadata)
725+
require.True(t, ok, "record batch should implement RecordBatchWithMetadata")
726+
727+
require.Equal(t, meta.Keys(), rm.Metadata().Keys())
728+
require.Equal(t, meta.Values(), rm.Metadata().Values())
729+
}
730+
731+
func TestRecordBatchCustomMetadataFileRoundtrip(t *testing.T) {
732+
mem := memory.NewGoAllocator()
733+
schema := arrow.NewSchema(
734+
[]arrow.Field{{Name: "x", Type: arrow.PrimitiveTypes.Int32}},
735+
nil,
736+
)
737+
738+
bldr := array.NewInt32Builder(mem)
739+
defer bldr.Release()
740+
bldr.AppendValues([]int32{10, 20}, nil)
741+
col := bldr.NewArray()
742+
defer col.Release()
743+
744+
meta := arrow.NewMetadata([]string{"file_key"}, []string{"file_value"})
745+
rec := array.NewRecordBatchWithMetadata(schema, []arrow.Array{col}, 2, meta)
746+
defer rec.Release()
747+
748+
// Write to IPC file
749+
var buf bytes.Buffer
750+
writer, err := ipc.NewFileWriter(&buf, ipc.WithSchema(schema))
751+
require.NoError(t, err)
752+
require.NoError(t, writer.Write(rec))
753+
require.NoError(t, writer.Close())
754+
755+
// Read back
756+
reader, err := ipc.NewFileReader(bytes.NewReader(buf.Bytes()))
757+
require.NoError(t, err)
758+
defer reader.Close()
759+
760+
require.Equal(t, 1, reader.NumRecords())
761+
got, err := reader.RecordBatchAt(0)
762+
require.NoError(t, err)
763+
defer got.Release()
764+
765+
rm, ok := got.(arrow.RecordBatchWithMetadata)
766+
require.True(t, ok, "record batch should implement RecordBatchWithMetadata")
767+
768+
require.Equal(t, meta.Keys(), rm.Metadata().Keys())
769+
require.Equal(t, meta.Values(), rm.Metadata().Values())
770+
}
771+
772+
func TestRecordBatchCustomMetadataInterop(t *testing.T) {
773+
t.Run("file", func(t *testing.T) {
774+
f, err := os.Open("testdata/custom_metadata.arrow")
775+
require.NoError(t, err)
776+
defer f.Close()
777+
778+
reader, err := ipc.NewFileReader(f)
779+
require.NoError(t, err)
780+
defer reader.Close()
781+
782+
// Verify schema metadata
783+
schemaMeta := reader.Schema().Metadata()
784+
idx := schemaMeta.FindKey("schema_key")
785+
require.GreaterOrEqual(t, idx, 0)
786+
require.Equal(t, "schema_value", schemaMeta.Values()[idx])
787+
788+
require.Equal(t, 2, reader.NumRecords())
789+
790+
// Batch 1
791+
rec0, err := reader.RecordBatchAt(0)
792+
require.NoError(t, err)
793+
defer rec0.Release()
794+
rm0, ok := rec0.(arrow.RecordBatchWithMetadata)
795+
require.True(t, ok)
796+
m0 := rm0.Metadata()
797+
require.Equal(t, "1", m0.Values()[m0.FindKey("batch_num")])
798+
require.Equal(t, "value1", m0.Values()[m0.FindKey("key1")])
799+
800+
// Batch 2
801+
rec1, err := reader.RecordBatchAt(1)
802+
require.NoError(t, err)
803+
defer rec1.Release()
804+
rm1, ok := rec1.(arrow.RecordBatchWithMetadata)
805+
require.True(t, ok)
806+
m1 := rm1.Metadata()
807+
require.Equal(t, "2", m1.Values()[m1.FindKey("batch_num")])
808+
require.Equal(t, "value2", m1.Values()[m1.FindKey("key2")])
809+
})
810+
811+
t.Run("stream", func(t *testing.T) {
812+
data, err := os.ReadFile("testdata/custom_metadata_stream.arrows")
813+
require.NoError(t, err)
814+
815+
reader, err := ipc.NewReader(bytes.NewReader(data))
816+
require.NoError(t, err)
817+
defer reader.Release()
818+
819+
// Verify schema metadata
820+
schemaMeta := reader.Schema().Metadata()
821+
idx := schemaMeta.FindKey("schema_key")
822+
require.GreaterOrEqual(t, idx, 0)
823+
require.Equal(t, "schema_value", schemaMeta.Values()[idx])
824+
825+
// Batch 1
826+
require.True(t, reader.Next())
827+
rec0 := reader.RecordBatch()
828+
rm0, ok := rec0.(arrow.RecordBatchWithMetadata)
829+
require.True(t, ok)
830+
m0 := rm0.Metadata()
831+
require.Equal(t, "1", m0.Values()[m0.FindKey("batch_num")])
832+
require.Equal(t, "value1", m0.Values()[m0.FindKey("key1")])
833+
834+
// Batch 2
835+
require.True(t, reader.Next())
836+
rec1 := reader.RecordBatch()
837+
rm1, ok := rec1.(arrow.RecordBatchWithMetadata)
838+
require.True(t, ok)
839+
m1 := rm1.Metadata()
840+
require.Equal(t, "2", m1.Values()[m1.FindKey("batch_num")])
841+
require.Equal(t, "value2", m1.Values()[m1.FindKey("key2")])
842+
843+
require.False(t, reader.Next())
844+
})
845+
}
846+
847+
func TestRecordBatchCustomMetadataSlice(t *testing.T) {
848+
mem := memory.NewGoAllocator()
849+
schema := arrow.NewSchema(
850+
[]arrow.Field{{Name: "x", Type: arrow.PrimitiveTypes.Int32}},
851+
nil,
852+
)
853+
854+
bldr := array.NewInt32Builder(mem)
855+
defer bldr.Release()
856+
bldr.AppendValues([]int32{1, 2, 3, 4}, nil)
857+
col := bldr.NewArray()
858+
defer col.Release()
859+
860+
meta := arrow.NewMetadata([]string{"slice_key"}, []string{"slice_value"})
861+
rec := array.NewRecordBatchWithMetadata(schema, []arrow.Array{col}, 4, meta)
862+
defer rec.Release()
863+
864+
sliced := rec.NewSlice(1, 3)
865+
defer sliced.Release()
866+
867+
rm, ok := sliced.(arrow.RecordBatchWithMetadata)
868+
require.True(t, ok, "sliced record should implement RecordBatchWithMetadata")
869+
require.Equal(t, meta.Keys(), rm.Metadata().Keys())
870+
require.Equal(t, meta.Values(), rm.Metadata().Values())
871+
require.EqualValues(t, 2, sliced.NumRows())
872+
}
873+
874+
func TestRecordBatchCustomMetadataSetColumn(t *testing.T) {
875+
mem := memory.NewGoAllocator()
876+
schema := arrow.NewSchema(
877+
[]arrow.Field{
878+
{Name: "x", Type: arrow.PrimitiveTypes.Int32},
879+
{Name: "y", Type: arrow.PrimitiveTypes.Int32},
880+
},
881+
nil,
882+
)
883+
884+
bldr := array.NewInt32Builder(mem)
885+
defer bldr.Release()
886+
bldr.AppendValues([]int32{1, 2}, nil)
887+
col1 := bldr.NewArray()
888+
defer col1.Release()
889+
890+
bldr.AppendValues([]int32{3, 4}, nil)
891+
col2 := bldr.NewArray()
892+
defer col2.Release()
893+
894+
meta := arrow.NewMetadata([]string{"set_key"}, []string{"set_value"})
895+
rec := array.NewRecordBatchWithMetadata(schema, []arrow.Array{col1, col2}, 2, meta)
896+
defer rec.Release()
897+
898+
bldr.AppendValues([]int32{5, 6}, nil)
899+
newCol := bldr.NewArray()
900+
defer newCol.Release()
901+
902+
updated, err := rec.SetColumn(0, newCol)
903+
require.NoError(t, err)
904+
defer updated.Release()
905+
906+
rm, ok := updated.(arrow.RecordBatchWithMetadata)
907+
require.True(t, ok, "updated record should implement RecordBatchWithMetadata")
908+
require.Equal(t, meta.Keys(), rm.Metadata().Keys())
909+
require.Equal(t, meta.Values(), rm.Metadata().Values())
910+
}
911+
912+
func TestRecordBatchNoMetadataRoundtrip(t *testing.T) {
913+
mem := memory.NewGoAllocator()
914+
schema := arrow.NewSchema(
915+
[]arrow.Field{{Name: "x", Type: arrow.PrimitiveTypes.Int32}},
916+
nil,
917+
)
918+
919+
bldr := array.NewInt32Builder(mem)
920+
defer bldr.Release()
921+
bldr.AppendValues([]int32{1, 2, 3}, nil)
922+
col := bldr.NewArray()
923+
defer col.Release()
924+
925+
rec := array.NewRecordBatch(schema, []arrow.Array{col}, 3)
926+
defer rec.Release()
927+
928+
// Write to IPC stream
929+
var buf bytes.Buffer
930+
writer := ipc.NewWriter(&buf, ipc.WithSchema(schema))
931+
require.NoError(t, writer.Write(rec))
932+
require.NoError(t, writer.Close())
933+
934+
// Read back
935+
reader, err := ipc.NewReader(bytes.NewReader(buf.Bytes()))
936+
require.NoError(t, err)
937+
defer reader.Release()
938+
939+
require.True(t, reader.Next())
940+
got := reader.RecordBatch()
941+
942+
rm, ok := got.(arrow.RecordBatchWithMetadata)
943+
require.True(t, ok, "record batch should implement RecordBatchWithMetadata")
944+
945+
// Metadata should be empty, not nil
946+
require.Equal(t, 0, rm.Metadata().Len())
947+
}

arrow/ipc/metadata.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,13 +1141,15 @@ func writeFBBuilder(b *flatbuffers.Builder, mem memory.Allocator) *memory.Buffer
11411141
return buf
11421142
}
11431143

1144-
func writeMessageFB(b *flatbuffers.Builder, mem memory.Allocator, hdrType flatbuf.MessageHeader, hdr flatbuffers.UOffsetT, bodyLen int64) *memory.Buffer {
1144+
func writeMessageFB(b *flatbuffers.Builder, mem memory.Allocator, hdrType flatbuf.MessageHeader, hdr flatbuffers.UOffsetT, bodyLen int64, customMetadata arrow.Metadata) *memory.Buffer {
1145+
metaFB := metadataToFB(b, customMetadata, flatbuf.MessageStartCustomMetadataVector)
11451146

11461147
flatbuf.MessageStart(b)
11471148
flatbuf.MessageAddVersion(b, flatbuf.MetadataVersion(currentMetadataVersion))
11481149
flatbuf.MessageAddHeaderType(b, hdrType)
11491150
flatbuf.MessageAddHeader(b, hdr)
11501151
flatbuf.MessageAddBodyLength(b, bodyLen)
1152+
flatbuf.MessageAddCustomMetadata(b, metaFB)
11511153
msg := flatbuf.MessageEnd(b)
11521154
b.Finish(msg)
11531155

@@ -1157,7 +1159,7 @@ func writeMessageFB(b *flatbuffers.Builder, mem memory.Allocator, hdrType flatbu
11571159
func writeSchemaMessage(schema *arrow.Schema, mem memory.Allocator, dict *dictutils.Mapper) *memory.Buffer {
11581160
b := flatbuffers.NewBuilder(1024)
11591161
schemaFB := schemaToFB(b, schema, dict)
1160-
return writeMessageFB(b, mem, flatbuf.MessageHeaderSchema, schemaFB, 0)
1162+
return writeMessageFB(b, mem, flatbuf.MessageHeaderSchema, schemaFB, 0, arrow.Metadata{})
11611163
}
11621164

11631165
func writeFileFooter(schema *arrow.Schema, dicts, recs []dataBlock, w io.Writer) error {
@@ -1184,10 +1186,10 @@ func writeFileFooter(schema *arrow.Schema, dicts, recs []dataBlock, w io.Writer)
11841186
return err
11851187
}
11861188

1187-
func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType, variadicCounts []int64) *memory.Buffer {
1189+
func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType, variadicCounts []int64, customMetadata arrow.Metadata) *memory.Buffer {
11881190
b := flatbuffers.NewBuilder(0)
11891191
recFB := recordToFB(b, size, bodyLength, fields, meta, codec, variadicCounts)
1190-
return writeMessageFB(b, mem, flatbuf.MessageHeaderRecordBatch, recFB, bodyLength)
1192+
return writeMessageFB(b, mem, flatbuf.MessageHeaderRecordBatch, recFB, bodyLength, customMetadata)
11911193
}
11921194

11931195
func writeDictionaryMessage(mem memory.Allocator, id int64, isDelta bool, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType, variadicCounts []int64) *memory.Buffer {
@@ -1199,7 +1201,7 @@ func writeDictionaryMessage(mem memory.Allocator, id int64, isDelta bool, size,
11991201
flatbuf.DictionaryBatchAddData(b, recFB)
12001202
flatbuf.DictionaryBatchAddIsDelta(b, isDelta)
12011203
dictFB := flatbuf.DictionaryBatchEnd(b)
1202-
return writeMessageFB(b, mem, flatbuf.MessageHeaderDictionaryBatch, dictFB, bodyLength)
1204+
return writeMessageFB(b, mem, flatbuf.MessageHeaderDictionaryBatch, dictFB, bodyLength, arrow.Metadata{})
12031205
}
12041206

12051207
func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType, variadicCounts []int64) flatbuffers.UOffsetT {
1.24 KB
Binary file not shown.
944 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)