@@ -2,6 +2,7 @@ package org.jetbrains.kotlinx.dataframe.io
2
2
3
3
import org.apache.arrow.memory.RootAllocator
4
4
import org.apache.arrow.vector.BigIntVector
5
+ import org.apache.arrow.vector.BitVector
5
6
import org.apache.arrow.vector.Decimal256Vector
6
7
import org.apache.arrow.vector.DecimalVector
7
8
import org.apache.arrow.vector.DurationVector
@@ -27,13 +28,16 @@ import org.apache.arrow.vector.complex.StructVector
27
28
import org.apache.arrow.vector.ipc.ArrowFileReader
28
29
import org.apache.arrow.vector.ipc.ArrowStreamReader
29
30
import org.apache.arrow.vector.types.pojo.Field
31
+ import org.apache.commons.compress.utils.SeekableInMemoryByteChannel
30
32
import org.jetbrains.kotlinx.dataframe.AnyBaseColumn
31
33
import org.jetbrains.kotlinx.dataframe.AnyFrame
32
34
import org.jetbrains.kotlinx.dataframe.DataColumn
33
35
import org.jetbrains.kotlinx.dataframe.DataFrame
34
36
import org.jetbrains.kotlinx.dataframe.api.Infer
35
37
import org.jetbrains.kotlinx.dataframe.api.concat
36
38
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
39
+ import org.jetbrains.kotlinx.dataframe.codeGen.AbstractDefaultReadMethod
40
+ import org.jetbrains.kotlinx.dataframe.codeGen.DefaultReadDfMethod
37
41
import java.io.File
38
42
import java.io.InputStream
39
43
import java.math.BigDecimal
@@ -47,13 +51,34 @@ import java.time.Duration
47
51
import java.time.LocalDateTime
48
52
import kotlin.reflect.typeOf
49
53
54
+ public class ArrowFeather : SupportedFormat {
55
+ override fun readDataFrame (stream : InputStream , header : List <String >): AnyFrame = DataFrame .readArrowFeather(stream)
56
+
57
+ override fun readDataFrame (file : File , header : List <String >): AnyFrame = DataFrame .readArrowFeather(file)
58
+
59
+ override fun acceptsExtension (ext : String ): Boolean = ext == " feather"
60
+
61
+ override val testOrder: Int = 50000
62
+
63
+ override fun createDefaultReadMethod (pathRepresentation : String? ): DefaultReadDfMethod {
64
+ return DefaultReadArrowMethod (pathRepresentation)
65
+ }
66
+ }
67
+
68
+ private const val readArrowFeather = " readArrowFeather"
69
+
70
+ private class DefaultReadArrowMethod (path : String? ) : AbstractDefaultReadMethod(path, MethodArguments .EMPTY , readArrowFeather)
71
+
50
72
internal object Allocator {
51
73
val ROOT by lazy {
52
74
RootAllocator (Long .MAX_VALUE )
53
75
}
54
76
}
55
77
56
- private fun readArrow (channel : ReadableByteChannel , allocator : RootAllocator = Allocator .ROOT ): AnyFrame {
78
+ /* *
79
+ * Read [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format) data from existing [channel]
80
+ */
81
+ public fun readArrowIPC (channel : ReadableByteChannel , allocator : RootAllocator = Allocator .ROOT ): AnyFrame {
57
82
ArrowStreamReader (channel, allocator).use { reader ->
58
83
val dfs = buildList {
59
84
val root = reader.vectorSchemaRoot
@@ -67,7 +92,10 @@ private fun readArrow(channel: ReadableByteChannel, allocator: RootAllocator = A
67
92
}
68
93
}
69
94
70
- private fun readArrow (channel : SeekableByteChannel , allocator : RootAllocator = Allocator .ROOT ): AnyFrame {
95
+ /* *
96
+ * Read [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files) data from existing [channel]
97
+ */
98
+ public fun readArrowFeather (channel : SeekableByteChannel , allocator : RootAllocator = Allocator .ROOT ): AnyFrame {
71
99
ArrowFileReader (channel, allocator).use { reader ->
72
100
val dfs = buildList {
73
101
reader.recordBlocks.forEach { block ->
@@ -82,6 +110,8 @@ private fun readArrow(channel: SeekableByteChannel, allocator: RootAllocator = A
82
110
}
83
111
}
84
112
113
+ private fun BitVector.values (range : IntRange ): List <Boolean ?> = range.map { getObject(it) }
114
+
85
115
private fun UInt1Vector.values (range : IntRange ): List <Byte ?> = range.map { getObject(it) }
86
116
private fun UInt2Vector.values (range : IntRange ): List <Char ?> = range.map { getObject(it) }
87
117
private fun UInt4Vector.values (range : IntRange ): List <Long ?> = range.map { getObjectNoOverflow(it) }
@@ -146,6 +176,7 @@ private fun readField(root: VectorSchemaRoot, field: Field): AnyBaseColumn {
146
176
is LargeVarCharVector -> vector.values(range).withType()
147
177
is VarBinaryVector -> vector.values(range).withType()
148
178
is LargeVarBinaryVector -> vector.values(range).withType()
179
+ is BitVector -> vector.values(range).withType()
149
180
is SmallIntVector -> vector.values(range).withType()
150
181
is TinyIntVector -> vector.values(range).withType()
151
182
is UInt1Vector -> vector.values(range).withType()
@@ -171,23 +202,75 @@ private fun readField(root: VectorSchemaRoot, field: Field): AnyBaseColumn {
171
202
return DataColumn .createValueColumn(field.name, list, type, Infer .Nulls )
172
203
}
173
204
174
- public fun DataFrame.Companion.readArrow (file : File ): AnyFrame {
175
- return Files .newByteChannel(file.toPath()).use { readArrow(it) }
205
+ // IPC reading block
206
+
207
+ /* *
208
+ * Read [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format) data from existing [file]
209
+ */
210
+ public fun DataFrame.Companion.readArrowIPC (file : File ): AnyFrame = Files .newByteChannel(file.toPath()).use { readArrowIPC(it) }
211
+
212
+ /* *
213
+ * Read [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format) data from existing [byteArray]
214
+ */
215
+ public fun DataFrame.Companion.readArrowIPC (byteArray : ByteArray ): AnyFrame = SeekableInMemoryByteChannel (byteArray).use { readArrowIPC(it) }
216
+
217
+ /* *
218
+ * Read [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format) data from existing [stream]
219
+ */
220
+ public fun DataFrame.Companion.readArrowIPC (stream : InputStream ): AnyFrame = Channels .newChannel(stream).use { readArrowIPC(it) }
221
+
222
+ /* *
223
+ * Read [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format) data from existing [url]
224
+ */
225
+ public fun DataFrame.Companion.readArrowIPC (url : URL ): AnyFrame =
226
+ when {
227
+ isFile(url) -> readArrowIPC(urlAsFile(url))
228
+ isProtocolSupported(url) -> url.openStream().use { readArrowIPC(it) }
229
+ else -> {
230
+ throw IllegalArgumentException (" Invalid protocol for url $url " )
231
+ }
232
+ }
233
+
234
+ public fun DataFrame.Companion.readArrowIPC (path : String ): AnyFrame = if (isURL(path)) {
235
+ readArrowIPC(URL (path))
236
+ } else {
237
+ readArrowIPC(File (path))
176
238
}
177
239
178
- public fun DataFrame.Companion.readArrow (stream : InputStream ): AnyFrame = Channels .newChannel(stream).use { readArrow(it) }
240
+ // Feather reading block
241
+
242
+ /* *
243
+ * Read [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files) data from existing [file]
244
+ */
245
+ public fun DataFrame.Companion.readArrowFeather (file : File ): AnyFrame = Files .newByteChannel(file.toPath()).use { readArrowFeather(it) }
246
+
247
+ /* *
248
+ * Read [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files) data from existing [byteArray]
249
+ */
250
+ public fun DataFrame.Companion.readArrowFeather (byteArray : ByteArray ): AnyFrame = SeekableInMemoryByteChannel (byteArray).use { readArrowFeather(it) }
251
+
252
+ /* *
253
+ * Read [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files) data from existing [stream]
254
+ */
255
+ public fun DataFrame.Companion.readArrowFeather (stream : InputStream ): AnyFrame = readArrowFeather(stream.readAllBytes())
179
256
180
- public fun DataFrame.Companion.readArrow (url : URL ): AnyFrame =
257
+ /* *
258
+ * Read [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files) data from existing [url]
259
+ */
260
+ public fun DataFrame.Companion.readArrowFeather (url : URL ): AnyFrame =
181
261
when {
182
- isFile(url) -> readArrow (urlAsFile(url))
183
- isProtocolSupported(url) -> url.openStream().use { readArrow(it) }
262
+ isFile(url) -> readArrowFeather (urlAsFile(url))
263
+ isProtocolSupported(url) -> readArrowFeather( url.readBytes())
184
264
else -> {
185
265
throw IllegalArgumentException (" Invalid protocol for url $url " )
186
266
}
187
267
}
188
268
189
- public fun DataFrame.Companion.readArrow (path : String ): AnyFrame = if (isURL(path)) {
190
- readArrow(URL (path))
269
+ /* *
270
+ * Read [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files) data from existing [path]
271
+ */
272
+ public fun DataFrame.Companion.readArrowFeather (path : String ): AnyFrame = if (isURL(path)) {
273
+ readArrowFeather(URL (path))
191
274
} else {
192
- readArrow (File (path))
275
+ readArrowFeather (File (path))
193
276
}
0 commit comments