From fb233e2f02b353462848d04fc0521d44af13b1e8 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 3 Jul 2025 15:43:38 -0700 Subject: [PATCH] [SPARK-52678] Update `ArrowReader.swift` with GH-54 --- Sources/SparkConnect/ArrowReader.swift | 65 ++++++++++++-------------- 1 file changed, 31 insertions(+), 34 deletions(-) diff --git a/Sources/SparkConnect/ArrowReader.swift b/Sources/SparkConnect/ArrowReader.swift index 8af28ce..218351c 100644 --- a/Sources/SparkConnect/ArrowReader.swift +++ b/Sources/SparkConnect/ArrowReader.swift @@ -274,23 +274,22 @@ public class ArrowReader { // swiftlint:disable:this type_body_length let message = org_apache_arrow_flatbuf_Message.getRootAsMessage(bb: dataBuffer) switch message.headerType { case .recordbatch: - do { - let rbMessage = message.header(type: org_apache_arrow_flatbuf_RecordBatch.self)! - let recordBatch = try loadRecordBatch( - rbMessage, - schema: schemaMessage!, - arrowSchema: result.schema!, - data: input, - messageEndOffset: (Int64(offset) + Int64(length)) - ).get() + let rbMessage = message.header(type: org_apache_arrow_flatbuf_RecordBatch.self)! + let recordBatchResult = try loadRecordBatch( + rbMessage, + schema: schemaMessage!, + arrowSchema: result.schema!, + data: input, + messageEndOffset: (Int64(offset) + Int64(length)) + ) + switch recordBatchResult { + case .success(let recordBatch): result.batches.append(recordBatch) - offset += Int(message.bodyLength + Int64(length)) - length = getUInt32(input, offset: offset) - } catch let error as ArrowError { + case .failure(let error): return .failure(error) - } catch { - return .failure(.unknownError("Unexpected error: \(error)")) } + offset += Int(message.bodyLength + Int64(length)) + length = getUInt32(input, offset: offset) case .schema: schemaMessage = message.header(type: org_apache_arrow_flatbuf_Schema.self)! let schemaResult = loadSchema(schemaMessage!) @@ -363,20 +362,19 @@ public class ArrowReader { // swiftlint:disable:this type_body_length let message = org_apache_arrow_flatbuf_Message.getRootAsMessage(bb: mbb) switch message.headerType { case .recordbatch: - do { - let rbMessage = message.header(type: org_apache_arrow_flatbuf_RecordBatch.self)! - let recordBatch = try loadRecordBatch( - rbMessage, - schema: footer.schema!, - arrowSchema: result.schema!, - data: fileData, - messageEndOffset: messageEndOffset - ).get() + let rbMessage = message.header(type: org_apache_arrow_flatbuf_RecordBatch.self)! + let recordBatchResult = try loadRecordBatch( + rbMessage, + schema: footer.schema!, + arrowSchema: result.schema!, + data: fileData, + messageEndOffset: messageEndOffset + ) + switch recordBatchResult { + case .success(let recordBatch): result.batches.append(recordBatch) - } catch let error as ArrowError { + case .failure(let error): return .failure(error) - } catch { - return .failure(.unknownError("Unexpected error: \(error)")) } default: return .failure(.unknownError("Unhandled header type: \(message.headerType)")) @@ -429,17 +427,16 @@ public class ArrowReader { // swiftlint:disable:this type_body_length } case .recordbatch: let rbMessage = message.header(type: org_apache_arrow_flatbuf_RecordBatch.self)! - do { - let recordBatch = try loadRecordBatch( - rbMessage, schema: result.messageSchema!, arrowSchema: result.schema!, - data: dataBody, messageEndOffset: 0 - ).get() + let recordBatchResult = try loadRecordBatch( + rbMessage, schema: result.messageSchema!, arrowSchema: result.schema!, + data: dataBody, messageEndOffset: 0 + ) + switch recordBatchResult { + case .success(let recordBatch): result.batches.append(recordBatch) return .success(()) - } catch let error as ArrowError { + case .failure(let error): return .failure(error) - } catch { - return .failure(.unknownError("Unexpected error: \(error)")) } default: return .failure(.unknownError("Unhandled header type: \(message.headerType)"))