@@ -20,33 +20,49 @@ let fileMarker = Data("ARROW1".utf8)
2020let continuationMarker = UInt32 ( 0xFFFF_FFFF )
2121
2222public struct ArrowReader : Sendable {
23+
24+ func makeBuffer(
25+ _ buffer: FBuffer ,
26+ fileData: Data ,
27+ length: UInt ,
28+ messageOffset: Int64
29+ ) -> ArrowBuffer {
30+ let startOffset = messageOffset + buffer. offset
31+ let endOffset = startOffset + buffer. length
32+
33+ // TODO: This should not copy.
34+
35+ let bufferData = [ UInt8] ( fileData [ startOffset..< endOffset] )
36+ return ArrowBuffer . createBuffer ( bufferData, length: length)
37+ }
38+
2339 private class RecordBatchData {
24- let schema : Schema
25- let recordBatch : FlatRecordBatch
40+ let schema : FSchema
41+ let recordBatch : FRecordBatch
2642 private var fieldIndex : Int32 = 0
2743 private var nodeIndex : Int32 = 0
2844 private var bufferIndex : Int32 = 0
2945 init (
30- _ recordBatch: FlatRecordBatch ,
31- schema: Schema
46+ _ recordBatch: FRecordBatch ,
47+ schema: FSchema
3248 ) {
3349 self . recordBatch = recordBatch
3450 self . schema = schema
3551 }
3652
37- func nextNode( ) -> FieldNode ? {
53+ func nextNode( ) -> FFieldNode ? {
3854 if nodeIndex >= self . recordBatch. nodesCount { return nil }
3955 defer { nodeIndex += 1 }
4056 return self . recordBatch. nodes ( at: nodeIndex)
4157 }
4258
43- func nextBuffer( ) -> Buffer ? {
59+ func nextBuffer( ) -> FBuffer ? {
4460 if bufferIndex >= self . recordBatch. buffersCount { return nil }
4561 defer { bufferIndex += 1 }
4662 return self . recordBatch. buffers ( at: bufferIndex)
4763 }
4864
49- func nextField( ) -> FlatField ? {
65+ func nextField( ) -> FField ? {
5066 if fieldIndex >= self . schema. fieldsCount { return nil }
5167 defer { fieldIndex += 1 }
5268 return self . schema. fields ( at: fieldIndex)
@@ -64,15 +80,15 @@ public struct ArrowReader: Sendable {
6480 }
6581
6682 public class ArrowReaderResult {
67- fileprivate var messageSchema : Schema ?
83+ fileprivate var messageSchema : FSchema ?
6884 public var schema : ArrowSchema ?
6985 public var batches : [ RecordBatch ] = [ ]
7086 }
7187
7288 public init ( ) { }
7389
7490 private func loadSchema(
75- _ schema: Schema
91+ _ schema: FSchema
7692 ) -> Result < ArrowSchema , ArrowError > {
7793 let builder = ArrowSchema . Builder ( )
7894 for index in 0 ..< schema. fieldsCount {
@@ -100,7 +116,7 @@ public struct ArrowReader: Sendable {
100116
101117 private func loadStructData(
102118 _ loadInfo: DataLoadInfo ,
103- field: FlatField
119+ field: FField
104120 ) -> Result < AnyArrowArray , ArrowError > {
105121 guard let node = loadInfo. batchData. nextNode ( ) else {
106122 return . failure( . invalid( " Node not found " ) )
@@ -135,7 +151,7 @@ public struct ArrowReader: Sendable {
135151 )
136152 }
137153
138- private func loadListData( _ loadInfo: DataLoadInfo , field: FlatField )
154+ private func loadListData( _ loadInfo: DataLoadInfo , field: FField )
139155 -> Result < AnyArrowArray , ArrowError >
140156 {
141157 guard let node = loadInfo. batchData. nextNode ( ) else {
@@ -166,7 +182,8 @@ public struct ArrowReader: Sendable {
166182 switch loadField ( loadInfo, field: childField) {
167183 case . success( let childHolder) :
168184 return makeArrayHolder (
169- field, buffers: [ arrowNullBuffer, arrowOffsetBuffer] ,
185+ field,
186+ buffers: [ arrowNullBuffer, arrowOffsetBuffer] ,
170187 nullCount: UInt ( node. nullCount) ,
171188 children: [ childHolder. arrowData] ,
172189 rbLength: UInt ( loadInfo. batchData. recordBatch. length) )
@@ -177,7 +194,7 @@ public struct ArrowReader: Sendable {
177194
178195 private func loadPrimitiveData(
179196 _ loadInfo: DataLoadInfo ,
180- field: FlatField
197+ field: FField
181198 )
182199 -> Result < AnyArrowArray , ArrowError >
183200 {
@@ -206,9 +223,11 @@ public struct ArrowReader: Sendable {
206223 rbLength: UInt ( loadInfo. batchData. recordBatch. length) )
207224 }
208225
226+ // MARK: Variable data loading
227+
209228 private func loadVariableData(
210229 _ loadInfo: DataLoadInfo ,
211- field: FlatField
230+ field: FField
212231 )
213232 -> Result < AnyArrowArray , ArrowError >
214233 {
@@ -232,6 +251,7 @@ public struct ArrowReader: Sendable {
232251 let arrowNullBuffer = makeBuffer (
233252 nullBuffer, fileData: loadInfo. fileData,
234253 length: nullLength, messageOffset: loadInfo. messageOffset)
254+
235255 let arrowOffsetBuffer = makeBuffer (
236256 offsetBuffer, fileData: loadInfo. fileData,
237257 length: UInt ( node. length) , messageOffset: loadInfo. messageOffset)
@@ -246,7 +266,7 @@ public struct ArrowReader: Sendable {
246266
247267 private func loadField(
248268 _ loadInfo: DataLoadInfo ,
249- field: FlatField
269+ field: FField
250270 )
251271 -> Result < AnyArrowArray , ArrowError >
252272 {
@@ -265,8 +285,8 @@ public struct ArrowReader: Sendable {
265285 }
266286
267287 private func loadRecordBatch(
268- _ recordBatch: FlatRecordBatch ,
269- schema: Schema ,
288+ _ recordBatch: FRecordBatch ,
289+ schema: FSchema ,
270290 arrowSchema: ArrowSchema ,
271291 data: Data ,
272292 messageEndOffset: Int64
@@ -310,7 +330,7 @@ public struct ArrowReader: Sendable {
310330 var length = getUInt32 ( input, offset: offset)
311331 var streamData = input
312332 // TODO: The following assumes message order will populate schemaMessage first
313- var schemaMessage : Schema ?
333+ var schemaMessage : FSchema ?
314334 while length != 0 {
315335 if length == continuationMarker {
316336 offset += Int ( MemoryLayout< UInt32> . size)
@@ -325,10 +345,10 @@ public struct ArrowReader: Sendable {
325345 data: streamData,
326346 allowReadingUnalignedBuffers: true
327347 )
328- let message : Message = getRoot ( byteBuffer: & dataBuffer)
348+ let message : FMessage = getRoot ( byteBuffer: & dataBuffer)
329349 switch message. headerType {
330350 case . recordbatch:
331- guard let rbMessage = message. header ( type: FlatRecordBatch . self) else {
351+ guard let rbMessage = message. header ( type: FRecordBatch . self) else {
332352 return . failure( . invalid( " Failed to parse RecordBatch message " ) )
333353 }
334354 guard let schemaMessage else {
@@ -353,7 +373,7 @@ public struct ArrowReader: Sendable {
353373 offset += Int ( message. bodyLength + Int64( length) )
354374 length = getUInt32 ( input, offset: offset)
355375 case . schema:
356- schemaMessage = message. header ( type: Schema . self)
376+ schemaMessage = message. header ( type: FSchema . self)
357377 guard let schemaMessage else {
358378 return . failure( . invalid( " Schema message not found " ) )
359379 }
@@ -397,7 +417,7 @@ public struct ArrowReader: Sendable {
397417 var footerBuffer = ByteBuffer (
398418 data: footerData,
399419 allowReadingUnalignedBuffers: useUnalignedBuffers)
400- let footer : Footer = getRoot ( byteBuffer: & footerBuffer)
420+ let footer : FFooter = getRoot ( byteBuffer: & footerBuffer)
401421 guard let footerSchema = footer. schema else {
402422 return . failure( . invalid( " Missing schema in footer " ) )
403423 }
@@ -410,7 +430,7 @@ public struct ArrowReader: Sendable {
410430 }
411431
412432 for index in 0 ..< footer. recordBatchesCount {
413- guard let recordBatch: Block = footer. recordBatches ( at: index) else {
433+ guard let recordBatch: FBlock = footer. recordBatches ( at: index) else {
414434 return . failure( . invalid( " Missing record batch at index \( index) " ) )
415435 }
416436 var messageLength = fileData. withUnsafeBytes { rawBuffer in
@@ -436,10 +456,10 @@ public struct ArrowReader: Sendable {
436456 var mbb = ByteBuffer (
437457 data: recordBatchData,
438458 allowReadingUnalignedBuffers: useUnalignedBuffers)
439- let message : Message = getRoot ( byteBuffer: & mbb)
459+ let message : FMessage = getRoot ( byteBuffer: & mbb)
440460 switch message. headerType {
441461 case . recordbatch:
442- guard let rbMessage = message. header ( type: FlatRecordBatch . self) else {
462+ guard let rbMessage = message. header ( type: FRecordBatch . self) else {
443463 return . failure( . invalid( " Expected RecordBatch as message header " ) )
444464 }
445465 guard let footerSchema = footer. schema else {
@@ -475,7 +495,9 @@ public struct ArrowReader: Sendable {
475495 _ fileURL: URL
476496 ) -> Result < ArrowReaderResult , ArrowError > {
477497 do {
478- let fileData = try Data ( contentsOf: fileURL)
498+ // TODO: implement alignment checks.
499+ let fileData = try Data ( contentsOf: fileURL, options: . mappedIfSafe)
500+
479501 if !validateFileData( fileData) {
480502 return . failure( . ioError( " Not a valid arrow file. " ) )
481503 }
@@ -499,10 +521,10 @@ public struct ArrowReader: Sendable {
499521 var mbb = ByteBuffer (
500522 data: dataHeader,
501523 allowReadingUnalignedBuffers: useUnalignedBuffers)
502- let message : Message = getRoot ( byteBuffer: & mbb)
524+ let message : FMessage = getRoot ( byteBuffer: & mbb)
503525 switch message. headerType {
504526 case . schema:
505- guard let sMessage = message. header ( type: Schema . self) else {
527+ guard let sMessage = message. header ( type: FSchema . self) else {
506528 return . failure( . unknownError( " Expected a schema but found none " ) )
507529 }
508530 switch loadSchema ( sMessage) {
@@ -514,7 +536,7 @@ public struct ArrowReader: Sendable {
514536 return . failure( error)
515537 }
516538 case . recordbatch:
517- guard let rbMessage = message. header ( type: FlatRecordBatch . self) else {
539+ guard let rbMessage = message. header ( type: FRecordBatch . self) else {
518540 return . failure( . invalid( " Expected a RecordBatch but found none " ) )
519541 }
520542 // TODO: the result used here is also the return type. Ideally is would be constructed once as a struct (same issue as above)
0 commit comments