Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

A Swift implementation of Apache Arrow, the universal columnar format for fast data interchange and in-memory analytics.

This is a **work in progress**. Do not use in production. Progress is fast however, expect a beta in December.

This project is based on Arrow-Swift, the official Swift implementation of Apache Arrow. The decision was made to at least temporarily operate independently of the Apache Software Foundation (ASF). Currently there are no active ASF maintaners with knowledge of Swift, and the only [Apache approved CI for Swift](https://github.com/apache/infrastructure-actions/blob/main/approved_patterns.yml) is [setup-swift which is unmaintained](https://github.com/swift-actions/setup-swift/issues), leading to intermittent CI failures. This has led to delays in much-needed fixes being implemented.

The intention is to continue contributing to the official Apache-Swift repository, however changes can be iterated on more quickly here.
Expand Down
106 changes: 27 additions & 79 deletions Sources/Arrow/ArrowArray.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ public protocol AnyArrowArray {
var length: UInt { get }
var nullCount: UInt { get }
var arrowData: ArrowData { get }
var bufferData: [Data] { get }
var bufferDataSizes: [Int] { get }
var bufferData: [Data] { get } // TODO: remove
var bufferDataSizes: [Int] { get } // TODO: remove
func asAny(_ index: UInt) -> Any?
func asString(_ index: UInt) -> String
func setCArrayPtr(_ cArrayPtr: UnsafePointer<ArrowC.ArrowArray>?)
}

// MARK: Core Protocol

/// The interface for Arrow array types.
public protocol ArrowArray<ItemType>: AnyArrowArray {
associatedtype ItemType
Expand Down Expand Up @@ -86,16 +84,14 @@ extension ArrowArrayBase {
arrowData.type
}

// TODO: Remove
public var bufferData: [Data] {
arrowData.buffers.map { buffer in
var data = Data()
buffer.append(to: &data)
return data
}
arrowData.bufferData
}

// TODO: Remove
public var bufferDataSizes: [Int] {
arrowData.buffers.map { Int($0.capacity) }
arrowData.bufferDataSizes
}

public func isNull(at index: UInt) throws(ArrowError) -> Bool {
Expand All @@ -114,17 +110,8 @@ public class FixedArray<T>: ArrowArrayBase<T> where T: BitwiseCopyable {
if arrowData.isNull(index) {
return nil
}
let byteOffset = arrowData.stride * Int(index)

// FIXME: Can probably do this and remove BitwiseCopyable constraint.
// let buffer = UnsafeBufferPointer<ItemType>(
// start: arrowData.buffers[1].rawPointer.assumingMemoryBound(to: ItemType.self),
// count: Int(arrowData.length)
// )
// return buffer[Int(index)]
return arrowData.buffers[1].rawPointer
.advanced(by: byteOffset)
.load(as: ItemType.self)
let value: ItemType = arrowData.load(at: index)
return value
}
}

Expand All @@ -134,28 +121,12 @@ public class StringArray: ArrowArrayBase<String> {
if self.arrowData.isNull(index) {
return nil
}

let offsets = self.arrowData.buffers[1]
let offsetIndex = MemoryLayout<Int32>.stride * Int(index)
var startIndex: Int32 = 0
if index > 0 {
startIndex = offsets.rawPointer.advanced(by: offsetIndex)
.load(as: Int32.self)
}
let endIndex = offsets.rawPointer.advanced(
by: offsetIndex + MemoryLayout<Int32>.stride
)
.load(as: Int32.self)

let offsetBuffer: OffsetsBuffer = arrowData.offsets
let (startIndex, endIndex) = offsetBuffer.offsets(at: Int(index))
let arrayLength = Int(endIndex - startIndex)
let values = self.arrowData.buffers[2]
let rawPointer = values.rawPointer.advanced(by: Int(startIndex))
.bindMemory(to: UInt8.self, capacity: arrayLength)
let buffer = UnsafeBufferPointer<UInt8>(
start: rawPointer,
count: arrayLength
)
return String(bytes: buffer, encoding: .utf8)
let value: String = self.arrowData.loadVariable(
at: Int(startIndex), arrayLength: arrayLength)
return value
}
}

Expand All @@ -165,8 +136,7 @@ public class BoolArray: ArrowArrayBase<Bool> {
if self.arrowData.isNull(index) {
return nil
}
let valueBuffer = self.arrowData.buffers[1]
return BitUtility.isSet(index, buffer: valueBuffer)
return arrowData.isNullValue(at: index)
}
}

Expand All @@ -176,10 +146,7 @@ public class Date32Array: ArrowArrayBase<Date> {
if self.arrowData.isNull(index) {
return nil
}
let byteOffset = self.arrowData.stride * Int(index)
let milliseconds = self.arrowData.buffers[1].rawPointer.advanced(
by: byteOffset
).load(as: UInt32.self)
let milliseconds: UInt32 = arrowData.load(at: index)
return Date(timeIntervalSince1970: TimeInterval(milliseconds * 86400))
}
}
Expand All @@ -190,10 +157,8 @@ public class Date64Array: ArrowArrayBase<Date> {
if self.arrowData.isNull(index) {
return nil
}
let byteOffset = self.arrowData.stride * Int(index)
let milliseconds = self.arrowData.buffers[1].rawPointer.advanced(
by: byteOffset
).load(as: UInt64.self)

let milliseconds: UInt64 = self.arrowData.load(at: index)
return Date(timeIntervalSince1970: TimeInterval(milliseconds / 1000))
}
}
Expand Down Expand Up @@ -298,28 +263,17 @@ public class BinaryArray: ArrowArrayBase<Data> {
public var options = Options()

public override subscript(_ index: UInt) -> Data? {
let offsetIndex = MemoryLayout<Int32>.stride * Int(index)
if self.arrowData.isNull(index) {
return nil
}
let offsets = self.arrowData.buffers[1]
let values = self.arrowData.buffers[2]
var startIndex: Int32 = 0
if index > 0 {
startIndex = offsets.rawPointer.advanced(by: offsetIndex)
.load(as: Int32.self)
}
let endIndex = offsets.rawPointer.advanced(
by: offsetIndex + MemoryLayout<Int32>.stride
)
.load(as: Int32.self)

let (startIndex, endIndex) = arrowData.offsets.offsets(at: Int(index))

let arrayLength = Int(endIndex - startIndex)
let rawPointer = values.rawPointer.advanced(by: Int(startIndex))
.bindMemory(to: UInt8.self, capacity: arrayLength)
let buffer = UnsafeBufferPointer<UInt8>(
start: rawPointer, count: arrayLength)
let byteArray = Array(buffer)
return Data(byteArray)

let data: Data = self.arrowData.loadVariable(
at: Int(startIndex), arrayLength: arrayLength)
return data
}

public override func asString(_ index: UInt) -> String {
Expand Down Expand Up @@ -381,16 +335,10 @@ public class NestedArray: ArrowArrayBase<[Any?]> {
switch arrowData.type {
case .list(let _):
guard let values = children.first else { return nil }
let offsets = self.arrowData.buffers[1]
let offsetIndex = Int(index) * MemoryLayout<Int32>.stride
let startOffset = offsets.rawPointer.advanced(by: offsetIndex)
.load(as: Int32.self)
let endOffset = offsets.rawPointer.advanced(
by: offsetIndex + MemoryLayout<Int32>.stride
)
.load(as: Int32.self)

let (startIndex, endIndex) = arrowData.offsets.offsets(at: Int(index))
var items: [Any?] = []
for i in startOffset..<endOffset {
for i in startIndex..<endIndex {
items.append(values.asAny(UInt(i)))
}
return items
Expand Down
9 changes: 6 additions & 3 deletions Sources/Arrow/ArrowCExporter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ public class ArrowCExporter {
// obj so the memory doesn't get
// deallocated
self.arrowData = arrowData
for arrowBuffer in arrowData.buffers {
self.data.append(arrowBuffer.rawPointer)
// for arrowBuffer in arrowData.buffers {
// self.data.append(arrowBuffer.rawPointer)
// }
for pointer in arrowData.data {
self.data.append(pointer)
}

self.buffers = UnsafeMutablePointer<UnsafeRawPointer?>.allocate(
Expand Down Expand Up @@ -132,7 +135,7 @@ public class ArrowCExporter {
cArray.buffers = exportArray.buffers
cArray.length = Int64(arrowData.length)
cArray.null_count = Int64(arrowData.nullCount)
cArray.n_buffers = Int64(arrowData.buffers.count)
cArray.n_buffers = Int64(arrowData.bufferCount)
// Swift Arrow does not currently support children or dictionaries
// This will need to be updated once support has been added
cArray.n_children = 0
Expand Down
107 changes: 101 additions & 6 deletions Sources/Arrow/ArrowData.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,73 @@

import Foundation

protocol VariableLength {
init(_ value: UnsafeBufferPointer<UInt8>)
}

extension String: VariableLength {
init(_ value: UnsafeBufferPointer<UInt8>) {
self.init(decoding: value, as: Unicode.UTF8.self)
}
}

extension Data: VariableLength {
init(value: UnsafeBufferPointer<UInt8>) {
self.init(value)
}
}

public struct ArrowData {

// FIXME: Remove
public var bufferData: [Data] {
buffers.map { buffer in
var data = Data()
buffer.append(to: &data)
return data
}
}

// FIXME: Remove
public var bufferDataSizes: [Int] {
buffers.map { Int($0.capacity) }
}

// FIXME: Remove
public var data: [UnsafeMutableRawPointer] {
buffers.map { $0.rawPointer }
}

// FIXME: Remove
public var bufferCount: Int {
buffers.count
}

// TODO: Typed accessors - migration
var offsets: OffsetsBuffer {
if !type.isVariable && !type.isNested {
fatalError()
}
return ArrowBufferBackedOffsets(buffers[1])
}

// TODO: this should replace nullBuffer
var nulls: NullBuffer {
let buffer = buffers[0]
let pointer = buffer.rawPointer.assumingMemoryBound(to: UInt8.self)
return NullBuffer(
length: Int(buffer.length), capacity: 0, ownsMemory: false,
buffer: pointer)
}

public let type: ArrowType
public let buffers: [ArrowBuffer]
public let children: [ArrowData]
public let nullCount: UInt
public let length: UInt
public let stride: Int

let nullBuffer: ArrowBuffer
private let nullBuffer: ArrowBuffer
// FIXME: Remove
private let buffers: [ArrowBuffer]

init(
_ arrowType: ArrowType,
Expand All @@ -49,12 +107,49 @@ public struct ArrowData {
self.children = children
self.nullCount = nullCount
self.length = length
self.stride = arrowType.getStride()

self.nullBuffer = buffers[0]
}

// TODO: Temporary while removing ArrowBuffer
public func load<T>(at index: UInt) -> T where T: BitwiseCopyable {
let valueType = T.self
let byteOffset = type.getStride() * Int(index)
let milliseconds = buffers[1].rawPointer.advanced(
by: byteOffset
).load(as: valueType)
return milliseconds
}

// TODO: Temporary while removing ArrowBuffer
func loadVariable<T>(
at startIndex: Int,
arrayLength: Int
) -> T where T: VariableLength {
let values = buffers[2]
let rawPointer = values.rawPointer.advanced(by: startIndex)
.bindMemory(to: UInt8.self, capacity: arrayLength)
let buffer = UnsafeBufferPointer<UInt8>(
start: rawPointer, count: arrayLength)
return T(buffer)
}

// TODO: Temporary while removing ArrowBuffer
public func isNull(_ at: UInt) -> Bool {
nullBuffer.length > 0 && !BitUtility.isSet(at, buffer: nullBuffer)
let a = nulls.length > 0 && !nulls.isSet(Int(at))
let b = nullBuffer.length > 0 && !BitUtility.isSet(at, buffer: nullBuffer)
if nulls.length != nullBuffer.length {
fatalError("Check new null handling")
}
if a != b {
fatalError("Check new null handling")
}
return a
}

// TODO: Temporary while removing ArrowBuffer
func isNullValue(at index: UInt) -> Bool {
let valueBuffer = buffers[1]
return BitUtility.isSet(index, buffer: valueBuffer)
}

}
25 changes: 18 additions & 7 deletions Sources/Arrow/ArrowReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public struct ArrowReader: Sendable {
let startOffset = messageOffset + buffer.offset
let endOffset = startOffset + buffer.length

let range = Int(startOffset)..<Int(endOffset)
// let offset = BorrowedOffsets(count: Int(length) / 4, data: fileData, range: range)
// TODO: This should not copy.

let bufferData = [UInt8](fileData[startOffset..<endOffset])
Expand Down Expand Up @@ -212,15 +214,24 @@ public struct ArrowReader: Sendable {

let nullLength = UInt(ceil(Double(node.length) / 8))
let arrowNullBuffer = makeBuffer(
nullBuffer, fileData: loadInfo.fileData,
length: nullLength, messageOffset: loadInfo.messageOffset)
nullBuffer,
fileData: loadInfo.fileData,
length: nullLength,
messageOffset: loadInfo.messageOffset
)
let arrowValueBuffer = makeBuffer(
valueBuffer, fileData: loadInfo.fileData,
length: UInt(node.length), messageOffset: loadInfo.messageOffset)
valueBuffer,
fileData: loadInfo.fileData,
length: UInt(node.length),
messageOffset: loadInfo.messageOffset
)
return makeArrayHolder(
field, buffers: [arrowNullBuffer, arrowValueBuffer],
nullCount: UInt(node.nullCount), children: nil,
rbLength: UInt(loadInfo.batchData.recordBatch.length))
field,
buffers: [arrowNullBuffer, arrowValueBuffer],
nullCount: UInt(node.nullCount),
children: nil,
rbLength: UInt(loadInfo.batchData.recordBatch.length)
)
}

// MARK: Variable data loading
Expand Down
2 changes: 1 addition & 1 deletion Sources/Arrow/ArrowType.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public struct UnionField: Codable, Sendable, Equatable {
/// [the physical memory layout of Apache Arrow]: https://arrow.apache.org/docs/format/Columnar.html#physical-memory-layout
public indirect enum ArrowType: Codable, Sendable, Equatable {
/// Null type
case null
case null // TODO: Implement this
/// A boolean datatype representing the values `true` and `false`.
case boolean
/// A signed 8-bit integer.
Expand Down
Loading