Skip to content

Commit 91e657e

Browse files
committed
[SPARK-52274] Update ArrowReader/Writer with GH-44910
### What changes were proposed in this pull request? This PR aims to update `ArrowReader` and `ArrowWriter` to apply the upstream Apache Arrow change, GH-44910. ### Why are the changes needed? We need to keep syncing with the upstream in order to use Apache Arrow Swift library when it's released. This is a part of preparation to remove these files from this repository in the end. ### Does this PR introduce _any_ user-facing change? No behavior change. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #170 from dongjoon-hyun/SPARK-52274. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 4c4aa90 commit 91e657e

File tree

3 files changed

+127
-12
lines changed

3 files changed

+127
-12
lines changed

Sources/SparkConnect/ArrowReader.swift

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import FlatBuffers
1919
import Foundation
2020

2121
let FILEMARKER = "ARROW1"
22-
let CONTINUATIONMARKER = -1
22+
let CONTINUATIONMARKER = UInt32(0xFFFF_FFFF)
2323

2424
/// @nodoc
2525
public class ArrowReader { // swiftlint:disable:this type_body_length
@@ -240,7 +240,78 @@ public class ArrowReader { // swiftlint:disable:this type_body_length
240240
return .success(RecordBatch(arrowSchema, columns: columns))
241241
}
242242

243-
public func fromStream( // swiftlint:disable:this function_body_length
243+
/*
244+
This is for reading the Arrow streaming format. The Arrow streaming format
245+
is slightly different from the Arrow File format as it doesn't contain a header
246+
and footer.
247+
*/
248+
public func readStreaming( // swiftlint:disable:this function_body_length
249+
_ input: Data,
250+
useUnalignedBuffers: Bool = false
251+
) -> Result<ArrowReaderResult, ArrowError> {
252+
let result = ArrowReaderResult()
253+
var offset: Int = 0
254+
var length = getUInt32(input, offset: offset)
255+
var streamData = input
256+
var schemaMessage: org_apache_arrow_flatbuf_Schema?
257+
while length != 0 {
258+
if length == CONTINUATIONMARKER {
259+
offset += Int(MemoryLayout<UInt32>.size)
260+
length = getUInt32(input, offset: offset)
261+
if length == 0 {
262+
return .success(result)
263+
}
264+
}
265+
266+
offset += Int(MemoryLayout<UInt32>.size)
267+
streamData = input[offset...]
268+
let dataBuffer = ByteBuffer(
269+
data: streamData,
270+
allowReadingUnalignedBuffers: true)
271+
let message = org_apache_arrow_flatbuf_Message.getRootAsMessage(bb: dataBuffer)
272+
switch message.headerType {
273+
case .recordbatch:
274+
do {
275+
let rbMessage = message.header(type: org_apache_arrow_flatbuf_RecordBatch.self)!
276+
let recordBatch = try loadRecordBatch(
277+
rbMessage,
278+
schema: schemaMessage!,
279+
arrowSchema: result.schema!,
280+
data: input,
281+
messageEndOffset: (Int64(offset) + Int64(length))
282+
).get()
283+
result.batches.append(recordBatch)
284+
offset += Int(message.bodyLength + Int64(length))
285+
length = getUInt32(input, offset: offset)
286+
} catch let error as ArrowError {
287+
return .failure(error)
288+
} catch {
289+
return .failure(.unknownError("Unexpected error: \(error)"))
290+
}
291+
case .schema:
292+
schemaMessage = message.header(type: org_apache_arrow_flatbuf_Schema.self)!
293+
let schemaResult = loadSchema(schemaMessage!)
294+
switch schemaResult {
295+
case .success(let schema):
296+
result.schema = schema
297+
case .failure(let error):
298+
return .failure(error)
299+
}
300+
offset += Int(message.bodyLength + Int64(length))
301+
length = getUInt32(input, offset: offset)
302+
default:
303+
return .failure(.unknownError("Unhandled header type: \(message.headerType)"))
304+
}
305+
}
306+
return .success(result)
307+
}
308+
309+
/*
310+
This is for reading the Arrow file format. The Arrow file format supports
311+
random accessing the data. The Arrow file format contains a header and
312+
footer around the Arrow streaming format.
313+
*/
314+
public func readFile( // swiftlint:disable:this function_body_length
244315
_ fileData: Data,
245316
useUnalignedBuffers: Bool = false
246317
) -> Result<ArrowReaderResult, ArrowError> {
@@ -266,7 +337,7 @@ public class ArrowReader { // swiftlint:disable:this type_body_length
266337
for index in 0..<footer.recordBatchesCount {
267338
let recordBatch = footer.recordBatches(at: index)!
268339
var messageLength = fileData.withUnsafeBytes { rawBuffer in
269-
rawBuffer.loadUnaligned(fromByteOffset: Int(recordBatch.offset), as: Int32.self)
340+
rawBuffer.loadUnaligned(fromByteOffset: Int(recordBatch.offset), as: UInt32.self)
270341
}
271342

272343
var messageOffset: Int64 = 1
@@ -275,7 +346,7 @@ public class ArrowReader { // swiftlint:disable:this type_body_length
275346
messageLength = fileData.withUnsafeBytes { rawBuffer in
276347
rawBuffer.loadUnaligned(
277348
fromByteOffset: Int(recordBatch.offset + Int64(MemoryLayout<Int32>.size)),
278-
as: Int32.self)
349+
as: UInt32.self)
279350
}
280351
}
281352

@@ -299,8 +370,10 @@ public class ArrowReader { // swiftlint:disable:this type_body_length
299370
messageEndOffset: messageEndOffset
300371
).get()
301372
result.batches.append(recordBatch)
302-
} catch let error {
373+
} catch let error as ArrowError {
303374
return .failure(error)
375+
} catch {
376+
return .failure(.unknownError("Unexpected error: \(error)"))
304377
}
305378
default:
306379
return .failure(.unknownError("Unhandled header type: \(message.headerType)"))
@@ -320,7 +393,7 @@ public class ArrowReader { // swiftlint:disable:this type_body_length
320393
let markerLength = FILEMARKER.utf8.count
321394
let footerLengthEnd = Int(fileData.count - markerLength)
322395
let data = fileData[..<(footerLengthEnd)]
323-
return fromStream(data)
396+
return readFile(data)
324397
} catch {
325398
return .failure(.unknownError("Error loading file: \(error)"))
326399
}
@@ -360,13 +433,15 @@ public class ArrowReader { // swiftlint:disable:this type_body_length
360433
).get()
361434
result.batches.append(recordBatch)
362435
return .success(())
363-
} catch let error {
436+
} catch let error as ArrowError {
364437
return .failure(error)
438+
} catch {
439+
return .failure(.unknownError("Unexpected error: \(error)"))
365440
}
366-
367441
default:
368442
return .failure(.unknownError("Unhandled header type: \(message.headerType)"))
369443
}
370444
}
371445

372446
}
447+
// swiftlint:disable:this file_length

Sources/SparkConnect/ArrowReaderHelper.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,3 +312,10 @@ func validateFileData(_ data: Data) -> Bool {
312312
let endString = String(decoding: data[(data.count - markerLength)...], as: UTF8.self)
313313
return startString == FILEMARKER && endString == FILEMARKER
314314
}
315+
316+
func getUInt32(_ data: Data, offset: Int) -> UInt32 {
317+
let token = data.withUnsafeBytes { rawBuffer in
318+
rawBuffer.loadUnaligned(fromByteOffset: offset, as: UInt32.self)
319+
}
320+
return token
321+
}

Sources/SparkConnect/ArrowWriter.swift

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ public class ArrowWriter { // swiftlint:disable:this type_body_length
132132
let startIndex = writer.count
133133
switch writeRecordBatch(batch: batch) {
134134
case .success(let rbResult):
135+
withUnsafeBytes(of: CONTINUATIONMARKER.littleEndian) { writer.append(Data($0)) }
135136
withUnsafeBytes(of: rbResult.1.o.littleEndian) { writer.append(Data($0)) }
136137
writer.append(rbResult.0)
137138
switch writeRecordBatchData(&writer, batch: batch) {
@@ -250,7 +251,7 @@ public class ArrowWriter { // swiftlint:disable:this type_body_length
250251
return .success(fbb.data)
251252
}
252253

253-
private func writeStream(_ writer: inout DataWriter, info: ArrowWriter.Info) -> Result<
254+
private func writeFile(_ writer: inout DataWriter, info: ArrowWriter.Info) -> Result<
254255
Bool, ArrowError
255256
> {
256257
var fbb: FlatBufferBuilder = FlatBufferBuilder()
@@ -284,9 +285,41 @@ public class ArrowWriter { // swiftlint:disable:this type_body_length
284285
return .success(true)
285286
}
286287

287-
public func toStream(_ info: ArrowWriter.Info) -> Result<Data, ArrowError> {
288+
public func writeSteaming(_ info: ArrowWriter.Info) -> Result<Data, ArrowError> {
289+
let writer: any DataWriter = InMemDataWriter()
290+
switch toMessage(info.schema) {
291+
case .success(let schemaData):
292+
withUnsafeBytes(of: CONTINUATIONMARKER.littleEndian) { writer.append(Data($0)) }
293+
withUnsafeBytes(of: UInt32(schemaData.count).littleEndian) { writer.append(Data($0)) }
294+
writer.append(schemaData)
295+
case .failure(let error):
296+
return .failure(error)
297+
}
298+
299+
for batch in info.batches {
300+
switch toMessage(batch) {
301+
case .success(let batchData):
302+
withUnsafeBytes(of: CONTINUATIONMARKER.littleEndian) { writer.append(Data($0)) }
303+
withUnsafeBytes(of: UInt32(batchData[0].count).littleEndian) { writer.append(Data($0)) }
304+
writer.append(batchData[0])
305+
writer.append(batchData[1])
306+
case .failure(let error):
307+
return .failure(error)
308+
}
309+
}
310+
311+
withUnsafeBytes(of: CONTINUATIONMARKER.littleEndian) { writer.append(Data($0)) }
312+
withUnsafeBytes(of: UInt32(0).littleEndian) { writer.append(Data($0)) }
313+
if let memWriter = writer as? InMemDataWriter {
314+
return .success(memWriter.data)
315+
} else {
316+
return .failure(.invalid("Unable to cast writer"))
317+
}
318+
}
319+
320+
public func writeFile(_ info: ArrowWriter.Info) -> Result<Data, ArrowError> {
288321
var writer: any DataWriter = InMemDataWriter()
289-
switch writeStream(&writer, info: info) {
322+
switch writeFile(&writer, info: info) {
290323
case .success:
291324
if let memWriter = writer as? InMemDataWriter {
292325
return .success(memWriter.data)
@@ -313,7 +346,7 @@ public class ArrowWriter { // swiftlint:disable:this type_body_length
313346

314347
var writer: any DataWriter = FileDataWriter(fileHandle)
315348
writer.append(FILEMARKER.data(using: .utf8)!)
316-
switch writeStream(&writer, info: info) {
349+
switch writeFile(&writer, info: info) {
317350
case .success:
318351
writer.append(FILEMARKER.data(using: .utf8)!)
319352
case .failure(let error):

0 commit comments

Comments
 (0)