Skip to content

Commit 3583c91

Browse files
authored
Merge pull request #169 from Kopilov/arrow
Arrow writing
2 parents f219cc6 + ea1d516 commit 3583c91

File tree

19 files changed

+1486
-281
lines changed

19 files changed

+1486
-281
lines changed

core/src/main/kotlin/org/jetbrains/kotlinx/dataframe/api/convert.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,14 @@ public fun <T : Any> DataColumn<T?>.convertToLocalDate(): DataColumn<LocalDate?>
119119
public fun <T : Any> DataColumn<T>.convertToLocalTime(): DataColumn<LocalTime> = convertTo()
120120
public fun <T : Any> DataColumn<T?>.convertToLocalTime(): DataColumn<LocalTime?> = convertTo()
121121

122+
@JvmName("convertToByteFromT")
123+
public fun <T : Any> DataColumn<T>.convertToByte(): DataColumn<Byte> = convertTo()
124+
public fun <T : Any> DataColumn<T?>.convertToByte(): DataColumn<Byte?> = convertTo()
125+
126+
@JvmName("convertToShortFromT")
127+
public fun <T : Any> DataColumn<T>.convertToShort(): DataColumn<Short> = convertTo()
128+
public fun <T : Any> DataColumn<T?>.convertToShort(): DataColumn<Short?> = convertTo()
129+
122130
@JvmName("convertToIntFromT")
123131
public fun <T : Any> DataColumn<T>.convertToInt(): DataColumn<Int> = convertTo()
124132
public fun <T : Any> DataColumn<T?>.convertToInt(): DataColumn<Int?> = convertTo()

core/src/main/kotlin/org/jetbrains/kotlinx/dataframe/exceptions/TypeConverterNotFoundException.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package org.jetbrains.kotlinx.dataframe.exceptions
22

3-
import kotlin.reflect.*
3+
import kotlin.reflect.KType
44

55
public class TypeConverterNotFoundException(public val from: KType, public val to: KType) : IllegalArgumentException() {
66

dataframe-arrow/build.gradle.kts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ dependencies {
1414
implementation(libs.arrow.memory)
1515
implementation(libs.commonsCompress)
1616
implementation(libs.kotlin.reflect)
17+
implementation(libs.kotlin.datetimeJvm)
1718

1819
testApi(project(":core"))
1920
testImplementation(libs.junit)
@@ -34,3 +35,7 @@ kotlinPublications {
3435
kotlin {
3536
explicitApi()
3637
}
38+
39+
tasks.test {
40+
jvmArgs = listOf("--add-opens", "java.base/java.nio=ALL-UNNAMED")
41+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package org.jetbrains.kotlinx.dataframe.io
2+
3+
import org.apache.arrow.vector.VectorSchemaRoot
4+
import org.apache.arrow.vector.ipc.ArrowFileWriter
5+
import org.apache.arrow.vector.ipc.ArrowStreamWriter
6+
import org.apache.arrow.vector.types.pojo.Schema
7+
import org.jetbrains.kotlinx.dataframe.AnyFrame
8+
import org.jetbrains.kotlinx.dataframe.DataFrame
9+
import org.slf4j.LoggerFactory
10+
import java.io.ByteArrayOutputStream
11+
import java.io.File
12+
import java.io.FileOutputStream
13+
import java.io.OutputStream
14+
import java.nio.channels.Channels
15+
import java.nio.channels.WritableByteChannel
16+
17+
public val ignoreMismatchMessage: (ConvertingMismatch) -> Unit = { message: ConvertingMismatch -> }
18+
public val writeMismatchMessage: (ConvertingMismatch) -> Unit = { message: ConvertingMismatch ->
19+
System.err.println(message)
20+
}
21+
22+
private val logger = LoggerFactory.getLogger(ArrowWriter::class.java)
23+
24+
public val logMismatchMessage: (ConvertingMismatch) -> Unit = { message: ConvertingMismatch ->
25+
logger.debug(message.toString())
26+
}
27+
28+
/**
29+
* Save [dataFrame] content in Apache Arrow format (can be written to File, ByteArray, OutputStream or raw Channel) with [targetSchema].
30+
* If [dataFrame] content does not match with [targetSchema], behaviour is specified by [mode], mismatches would be sent to [mismatchSubscriber]
31+
*/
32+
public interface ArrowWriter : AutoCloseable {
33+
public val dataFrame: DataFrame<*>
34+
public val targetSchema: Schema
35+
public val mode: Mode
36+
public val mismatchSubscriber: (ConvertingMismatch) -> Unit
37+
38+
public companion object {
39+
40+
public fun create(
41+
dataFrame: AnyFrame,
42+
targetSchema: Schema,
43+
mode: Mode,
44+
mismatchSubscriber: (ConvertingMismatch) -> Unit = ignoreMismatchMessage
45+
): ArrowWriter = ArrowWriterImpl(dataFrame, targetSchema, mode, mismatchSubscriber)
46+
47+
/**
48+
* If [restrictWidening] is true, [dataFrame] columns not described in [targetSchema] would not be saved (otherwise, would be saved as is).
49+
* If [restrictNarrowing] is true, [targetSchema] fields that are not nullable and do not exist in [dataFrame] will produce exception (otherwise, would not be saved).
50+
* If [strictType] is true, [dataFrame] columns described in [targetSchema] with non-compatible type will produce exception (otherwise, would be saved as is).
51+
* If [strictNullable] is true, [targetSchema] fields that are not nullable and contain nulls in [dataFrame] will produce exception (otherwise, would be saved as is with nullable = true).
52+
*/
53+
public data class Mode(
54+
public val restrictWidening: Boolean,
55+
public val restrictNarrowing: Boolean,
56+
public val strictType: Boolean,
57+
public val strictNullable: Boolean
58+
) {
59+
public companion object {
60+
public val STRICT: Mode = Mode(true, true, true, true)
61+
public val LOYAL: Mode = Mode(false, false, false, false)
62+
}
63+
}
64+
}
65+
66+
/**
67+
* Create Arrow [VectorSchemaRoot] with [dataFrame] content cast to [targetSchema] according to the [mode].
68+
*/
69+
public fun allocateVectorSchemaRoot(): VectorSchemaRoot
70+
71+
// IPC saving block
72+
73+
/**
74+
* Save data to [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format), write to opened [channel].
75+
*/
76+
public fun writeArrowIPC(channel: WritableByteChannel) {
77+
allocateVectorSchemaRoot().use { vectorSchemaRoot ->
78+
ArrowStreamWriter(vectorSchemaRoot, null, channel).use { writer ->
79+
writer.writeBatch()
80+
}
81+
}
82+
}
83+
84+
/**
85+
* Save data to [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format), write to opened [stream].
86+
*/
87+
public fun writeArrowIPC(stream: OutputStream) {
88+
writeArrowIPC(Channels.newChannel(stream))
89+
}
90+
91+
/**
92+
* Save data to [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format), write to new or existing [file].
93+
* If file exists, it can be recreated or expanded.
94+
*/
95+
public fun writeArrowIPC(file: File, append: Boolean = true) {
96+
writeArrowIPC(FileOutputStream(file, append))
97+
}
98+
99+
/**
100+
* Save data to [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format), write to new [ByteArray]
101+
*/
102+
public fun saveArrowIPCToByteArray(): ByteArray {
103+
val stream = ByteArrayOutputStream()
104+
writeArrowIPC(stream)
105+
return stream.toByteArray()
106+
}
107+
108+
// Feather saving block
109+
110+
/**
111+
* Save data to [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files), write to opened [channel].
112+
*/
113+
public fun writeArrowFeather(channel: WritableByteChannel) {
114+
allocateVectorSchemaRoot().use { vectorSchemaRoot ->
115+
ArrowFileWriter(vectorSchemaRoot, null, channel).use { writer ->
116+
writer.writeBatch()
117+
}
118+
}
119+
}
120+
121+
/**
122+
* Save data to [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files), write to opened [stream].
123+
*/
124+
public fun writeArrowFeather(stream: OutputStream) {
125+
writeArrowFeather(Channels.newChannel(stream))
126+
}
127+
128+
/**
129+
* Save data to [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files), write to new or existing [file].
130+
* If file exists, it would be recreated.
131+
*/
132+
public fun writeArrowFeather(file: File) {
133+
writeArrowFeather(FileOutputStream(file))
134+
}
135+
136+
/**
137+
* Save data to [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files), write to new [ByteArray]
138+
*/
139+
public fun saveArrowFeatherToByteArray(): ByteArray {
140+
val stream = ByteArrayOutputStream()
141+
writeArrowFeather(stream)
142+
return stream.toByteArray()
143+
}
144+
}

0 commit comments

Comments
 (0)