Skip to content

Commit 6eea98d

Browse files
committed
Review-requested modifications
1 parent 9ff9de2 commit 6eea98d

File tree

5 files changed

+61
-40
lines changed

5 files changed

+61
-40
lines changed

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowWriter.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public interface ArrowWriter : AutoCloseable {
5050
* If [strictType] is true, [dataFrame] columns described in [targetSchema] with non-compatible type will produce exception (otherwise, would be saved as is).
5151
* If [strictNullable] is true, [targetSchema] fields that are not nullable and contain nulls in [dataFrame] will produce exception (otherwise, would be saved as is with nullable = true).
5252
*/
53-
public class Mode(
53+
public data class Mode(
5454
public val restrictWidening: Boolean,
5555
public val restrictNarrowing: Boolean,
5656
public val strictType: Boolean,

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowWriterImpl.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,15 @@ internal class ArrowWriterImpl(
7171
when (vector) {
7272
is FixedWidthVector -> vector.allocateNew(size)
7373
is VariableWidthVector -> vector.allocateNew(size)
74-
else -> TODO("Not implemented for ${vector.javaClass.canonicalName}")
74+
else -> throw IllegalArgumentException("Can not allocate ${vector.javaClass.canonicalName}")
7575
}
7676
}
7777

7878
private fun infillWithNulls(vector: FieldVector, size: Int) {
7979
when (vector) {
8080
is BaseFixedWidthVector -> for (i in 0 until size) { vector.setNull(i) }
8181
is BaseVariableWidthVector -> for (i in 0 until size) { vector.setNull(i) }
82-
else -> TODO("Not implemented for ${vector.javaClass.canonicalName}")
82+
else -> throw IllegalArgumentException("Can not infill ${vector.javaClass.canonicalName}")
8383
}
8484
vector.valueCount = size
8585
}
@@ -89,7 +89,7 @@ internal class ArrowWriterImpl(
8989
return when (targetFieldType) {
9090
ArrowType.Utf8() -> column.map { it?.toString() }
9191
ArrowType.LargeUtf8() -> column.map { it?.toString() }
92-
ArrowType.Binary(), ArrowType.LargeBinary() -> TODO("Saving var binary is currently not implemented")
92+
ArrowType.Binary(), ArrowType.LargeBinary() -> throw NotImplementedError("Saving var binary is currently not implemented")
9393
ArrowType.Bool() -> column.convertToBoolean()
9494
ArrowType.Int(8, true) -> column.convertToByte()
9595
ArrowType.Int(16, true) -> column.convertToShort()
@@ -105,7 +105,7 @@ internal class ArrowWriterImpl(
105105
// is ArrowType.Duration -> todo
106106
// is ArrowType.Struct -> todo
107107
else -> {
108-
TODO("Saving ${targetFieldType.javaClass.canonicalName} is not implemented")
108+
throw NotImplementedError("Saving ${targetFieldType.javaClass.canonicalName} is currently not implemented")
109109
}
110110
}
111111
}
@@ -140,7 +140,7 @@ internal class ArrowWriterImpl(
140140
is TimeSecVector -> column.convertToLocalTime().forEachIndexed { i, value -> value?.let { vector.set(i, (value.toNanoOfDay() / 1000 / 1000 / 1000).toInt()); value } ?: vector.setNull(i) }
141141
// is StructVector -> todo
142142
else -> {
143-
TODO("Saving to ${vector.javaClass.canonicalName} is not implemented")
143+
throw NotImplementedError("Saving to ${vector.javaClass.canonicalName} is currently not implemented")
144144
}
145145
}
146146

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
package org.jetbrains.kotlinx.dataframe.io
22

33
import org.apache.arrow.memory.RootAllocator
4-
import org.apache.arrow.vector.ipc.ArrowFileReader
5-
import org.apache.arrow.vector.ipc.ArrowStreamReader
64
import org.apache.commons.compress.utils.SeekableInMemoryByteChannel
75
import org.jetbrains.kotlinx.dataframe.AnyFrame
86
import org.jetbrains.kotlinx.dataframe.DataFrame
97
import org.jetbrains.kotlinx.dataframe.api.NullabilityOptions
10-
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
118
import org.jetbrains.kotlinx.dataframe.codeGen.AbstractDefaultReadMethod
129
import org.jetbrains.kotlinx.dataframe.codeGen.DefaultReadDfMethod
1310
import java.io.File
@@ -54,41 +51,15 @@ public fun DataFrame.Companion.readArrowIPC(
5451
channel: ReadableByteChannel,
5552
allocator: RootAllocator = Allocator.ROOT,
5653
nullability: NullabilityOptions = NullabilityOptions.Infer,
57-
): AnyFrame {
58-
ArrowStreamReader(channel, allocator).use { reader ->
59-
val dfs = buildList {
60-
val root = reader.vectorSchemaRoot
61-
val schema = root.schema
62-
while (reader.loadNextBatch()) {
63-
val df = schema.fields.map { f -> readField(root, f, nullability) }.toDataFrame()
64-
add(df)
65-
}
66-
}
67-
return dfs.concatKeepingSchema()
68-
}
69-
}
70-
54+
): AnyFrame = readArrowIPCImpl(channel, allocator, nullability)
7155
/**
7256
* Read [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files) data from existing [channel]
7357
*/
7458
public fun DataFrame.Companion.readArrowFeather(
7559
channel: SeekableByteChannel,
7660
allocator: RootAllocator = Allocator.ROOT,
7761
nullability: NullabilityOptions = NullabilityOptions.Infer,
78-
): AnyFrame {
79-
ArrowFileReader(channel, allocator).use { reader ->
80-
val dfs = buildList {
81-
reader.recordBlocks.forEach { block ->
82-
reader.loadRecordBatch(block)
83-
val root = reader.vectorSchemaRoot
84-
val schema = root.schema
85-
val df = schema.fields.map { f -> readField(root, f, nullability) }.toDataFrame()
86-
add(df)
87-
}
88-
}
89-
return dfs.concatKeepingSchema()
90-
}
91-
}
62+
): AnyFrame = readArrowFeatherImpl(channel, allocator, nullability)
9263

9364
// IPC reading block
9465

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.jetbrains.kotlinx.dataframe.io
22

3+
import org.apache.arrow.memory.RootAllocator
34
import org.apache.arrow.vector.BigIntVector
45
import org.apache.arrow.vector.BitVector
56
import org.apache.arrow.vector.DateDayVector
@@ -26,9 +27,12 @@ import org.apache.arrow.vector.VarBinaryVector
2627
import org.apache.arrow.vector.VarCharVector
2728
import org.apache.arrow.vector.VectorSchemaRoot
2829
import org.apache.arrow.vector.complex.StructVector
30+
import org.apache.arrow.vector.ipc.ArrowFileReader
31+
import org.apache.arrow.vector.ipc.ArrowStreamReader
2932
import org.apache.arrow.vector.types.pojo.Field
3033
import org.apache.arrow.vector.util.DateUtility
3134
import org.jetbrains.kotlinx.dataframe.AnyBaseCol
35+
import org.jetbrains.kotlinx.dataframe.AnyFrame
3236
import org.jetbrains.kotlinx.dataframe.DataColumn
3337
import org.jetbrains.kotlinx.dataframe.DataFrame
3438
import org.jetbrains.kotlinx.dataframe.api.Infer
@@ -39,9 +43,12 @@ import org.jetbrains.kotlinx.dataframe.api.cast
3943
import org.jetbrains.kotlinx.dataframe.api.dataFrameOf
4044
import org.jetbrains.kotlinx.dataframe.api.emptyDataFrame
4145
import org.jetbrains.kotlinx.dataframe.api.getColumn
46+
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
4247
import org.jetbrains.kotlinx.dataframe.impl.asList
4348
import java.math.BigDecimal
4449
import java.math.BigInteger
50+
import java.nio.channels.ReadableByteChannel
51+
import java.nio.channels.SeekableByteChannel
4552
import java.time.Duration
4653
import java.time.LocalDate
4754
import java.time.LocalDateTime
@@ -197,11 +204,54 @@ internal fun readField(root: VectorSchemaRoot, field: Field, nullability: Nullab
197204
is TimeSecVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)
198205
is StructVector -> vector.values(range).withTypeNullable(field.isNullable, nullability)
199206
else -> {
200-
TODO("not fully implemented")
207+
throw NotImplementedError("reading from ${vector.javaClass.canonicalName} is not implemented")
201208
}
202209
}
203210
return DataColumn.createValueColumn(field.name, list, type, Infer.None)
204211
} catch (unexpectedNull: NullabilityException) {
205212
throw IllegalArgumentException("Column `${field.name}` should be not nullable but has nulls")
206213
}
207214
}
215+
216+
/**
217+
* Read [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format) data from existing [channel]
218+
*/
219+
public fun DataFrame.Companion.readArrowIPCImpl(
220+
channel: ReadableByteChannel,
221+
allocator: RootAllocator = Allocator.ROOT,
222+
nullability: NullabilityOptions = NullabilityOptions.Infer,
223+
): AnyFrame {
224+
ArrowStreamReader(channel, allocator).use { reader ->
225+
val dfs = buildList {
226+
val root = reader.vectorSchemaRoot
227+
val schema = root.schema
228+
while (reader.loadNextBatch()) {
229+
val df = schema.fields.map { f -> readField(root, f, nullability) }.toDataFrame()
230+
add(df)
231+
}
232+
}
233+
return dfs.concatKeepingSchema()
234+
}
235+
}
236+
237+
/**
238+
* Read [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files) data from existing [channel]
239+
*/
240+
public fun DataFrame.Companion.readArrowFeatherImpl(
241+
channel: SeekableByteChannel,
242+
allocator: RootAllocator = Allocator.ROOT,
243+
nullability: NullabilityOptions = NullabilityOptions.Infer,
244+
): AnyFrame {
245+
ArrowFileReader(channel, allocator).use { reader ->
246+
val dfs = buildList {
247+
reader.recordBlocks.forEach { block ->
248+
reader.loadRecordBatch(block)
249+
val root = reader.vectorSchemaRoot
250+
val schema = root.schema
251+
val df = schema.fields.map { f -> readField(root, f, nullability) }.toDataFrame()
252+
add(df)
253+
}
254+
}
255+
return dfs.concatKeepingSchema()
256+
}
257+
}

docs/StardustDocs/topics/write.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[//]: # (title: Write)
22
<!---IMPORT org.jetbrains.kotlinx.dataframe.samples.api.Write-->
33

4-
`DataFrame` can be saved into CSV, TSV, JSON, XLS and XLSX, Apache Arrow formats.
4+
`DataFrame` instances can be saved in the following formats: CSV, TSV, JSON, XLS(X) and Apache Arrow.
55

66
### Writing to CSV
77

@@ -161,7 +161,7 @@ val featherByteArray: ByteArray = df.saveArrowFeatherToByteArray()
161161
Second is a bit more tricky. You have to create specify schema itself and casting behavior mode as `ArrowWriter` parameters.
162162
Behavior `Mode` has four independent switchers: `restrictWidening`, `restrictNarrowing`, `strictType`, `strictNullable`.
163163
You can use `Mode.STRICT` (this is default), `Mode.LOYAL` or any combination you want.
164-
`ArrowWriter` object should be closed after using because Arrow uses random access buffers not managed by Java GC.
164+
The `ArrowWriter` object should be closed after using because Arrow uses random access buffers not managed by Java GC.
165165
Finally, you can specify a callback to be invoked if some data is lost or can not be saved according to your schema.
166166

167167
Here is full example:

0 commit comments

Comments
 (0)