Skip to content

Commit a746109

Browse files
authored
Merge pull request #129 from Kopilov/arrow
Expand Arrow reading support
2 parents 4fc93d7 + 25a159b commit a746109

File tree

13 files changed

+398
-64
lines changed

13 files changed

+398
-64
lines changed

core/src/main/kotlin/org/jetbrains/kotlinx/dataframe/api/TypeConversions.kt

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import org.jetbrains.kotlinx.dataframe.columns.ColumnSet
1717
import org.jetbrains.kotlinx.dataframe.columns.FrameColumn
1818
import org.jetbrains.kotlinx.dataframe.columns.ValueColumn
1919
import org.jetbrains.kotlinx.dataframe.impl.GroupByImpl
20+
import org.jetbrains.kotlinx.dataframe.impl.anyNull
2021
import org.jetbrains.kotlinx.dataframe.impl.asList
2122
import org.jetbrains.kotlinx.dataframe.impl.columnName
2223
import org.jetbrains.kotlinx.dataframe.impl.columns.ColumnAccessorImpl
@@ -189,6 +190,50 @@ public enum class Infer {
189190
Type
190191
}
191192

193+
/**
194+
* Indicates how [DataColumn.hasNulls] (or, more accurately, DataColumn.type.isMarkedNullable) should be initialized from
195+
* expected schema and actual data when reading schema-defined data formats.
196+
*/
197+
public enum class NullabilityOptions {
198+
/**
199+
* Use only actual data, set [DataColumn.hasNulls] to true if and only if there are null values in the column.
200+
* On empty dataset use False.
201+
*/
202+
Infer,
203+
204+
/**
205+
* Set [DataColumn.hasNulls] to expected value. Throw exception if column should be not nullable but there are null values.
206+
*/
207+
Checking,
208+
209+
/**
210+
* Set [DataColumn.hasNulls] to expected value by default. Change False to True if column should be not nullable but there are null values.
211+
*/
212+
Widening
213+
}
214+
215+
public class NullabilityException() : Exception()
216+
217+
/**
218+
* @return if column should be marked nullable for current [NullabilityOptions] value with actual [data] and [expectedNulls] per some schema/signature.
219+
* @throws [NullabilityException] for [NullabilityOptions.Checking] if [expectedNulls] is false and [data] contains nulls.
220+
*/
221+
public fun NullabilityOptions.applyNullability(data: List<Any?>, expectedNulls: Boolean): Boolean {
222+
val hasNulls = data.anyNull()
223+
return when (this) {
224+
NullabilityOptions.Infer -> hasNulls
225+
NullabilityOptions.Checking -> {
226+
if (!expectedNulls && hasNulls) {
227+
throw NullabilityException()
228+
}
229+
expectedNulls
230+
}
231+
NullabilityOptions.Widening -> {
232+
expectedNulls || hasNulls
233+
}
234+
}
235+
}
236+
192237
public inline fun <reified T> Iterable<T>.toColumn(
193238
name: String = "",
194239
infer: Infer = Infer.None

dataframe-arrow/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ dependencies {
1212
implementation(libs.arrow.format)
1313
implementation(libs.arrow.memory)
1414
implementation(libs.commonsCompress)
15+
implementation(libs.kotlin.reflect)
1516

1617
testApi(project(":core"))
1718
testImplementation(libs.junit)

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrow.kt

Lines changed: 137 additions & 64 deletions
Large diffs are not rendered by default.

dataframe-arrow/src/test/kotlin/ArrowKtTest.kt

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
import io.kotest.assertions.throwables.shouldThrow
12
import io.kotest.matchers.shouldBe
23
import org.apache.arrow.vector.util.Text
34
import org.jetbrains.kotlinx.dataframe.DataFrame
5+
import org.jetbrains.kotlinx.dataframe.api.NullabilityOptions
46
import org.jetbrains.kotlinx.dataframe.api.columnOf
57
import org.jetbrains.kotlinx.dataframe.api.dataFrameOf
68
import org.jetbrains.kotlinx.dataframe.api.toColumn
79
import org.jetbrains.kotlinx.dataframe.io.readArrowFeather
10+
import org.jetbrains.kotlinx.dataframe.io.readArrowIPC
811
import org.junit.Test
912
import java.net.URL
1013

@@ -13,6 +16,7 @@ internal class ArrowKtTest {
1316
fun testResource(resourcePath: String): URL = ArrowKtTest::class.java.classLoader.getResource(resourcePath)!!
1417

1518
fun testArrowFeather(name: String) = testResource("$name.feather")
19+
fun testArrowIPC(name: String) = testResource("$name.ipc")
1620

1721
@Test
1822
fun testReadingFromFile() {
@@ -31,4 +35,56 @@ internal class ArrowKtTest {
3135
val expected = dataFrameOf(a, b, c, d)
3236
df shouldBe expected
3337
}
38+
39+
@Test
40+
fun testReadingAllTypesAsEstimated() {
41+
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test.arrow"), NullabilityOptions.Infer), false, false)
42+
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test.arrow"), NullabilityOptions.Infer), false, false)
43+
44+
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test.arrow"), NullabilityOptions.Checking), true, false)
45+
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test.arrow"), NullabilityOptions.Checking), true, false)
46+
47+
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test.arrow"), NullabilityOptions.Widening), true, false)
48+
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test.arrow"), NullabilityOptions.Widening), true, false)
49+
}
50+
51+
@Test
52+
fun testReadingAllTypesAsEstimatedWithNulls() {
53+
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-with-nulls.arrow"), NullabilityOptions.Infer), true, true)
54+
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-with-nulls.arrow"), NullabilityOptions.Infer), true, true)
55+
56+
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-with-nulls.arrow"), NullabilityOptions.Checking), true, true)
57+
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-with-nulls.arrow"), NullabilityOptions.Checking), true, true)
58+
59+
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-with-nulls.arrow"), NullabilityOptions.Widening), true, true)
60+
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-with-nulls.arrow"), NullabilityOptions.Widening), true, true)
61+
}
62+
63+
@Test
64+
fun testReadingAllTypesAsEstimatedNotNullable() {
65+
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-not-nullable.arrow"), NullabilityOptions.Infer), false, false)
66+
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-not-nullable.arrow"), NullabilityOptions.Infer), false, false)
67+
68+
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-not-nullable.arrow"), NullabilityOptions.Checking), false, false)
69+
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-not-nullable.arrow"), NullabilityOptions.Checking), false, false)
70+
71+
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-not-nullable.arrow"), NullabilityOptions.Widening), false, false)
72+
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-not-nullable.arrow"), NullabilityOptions.Widening), false, false)
73+
}
74+
75+
@Test
76+
fun testReadingAllTypesAsEstimatedNotNullableWithNulls() {
77+
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-illegal.arrow"), NullabilityOptions.Infer), true, true)
78+
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-illegal.arrow"), NullabilityOptions.Infer), true, true)
79+
80+
shouldThrow<IllegalArgumentException> {
81+
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-illegal.arrow"), NullabilityOptions.Checking), false, true)
82+
}
83+
shouldThrow<IllegalArgumentException> {
84+
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-illegal.arrow"), NullabilityOptions.Checking), false, true)
85+
}
86+
87+
assertEstimations(DataFrame.readArrowFeather(testArrowFeather("test-illegal.arrow"), NullabilityOptions.Widening), true, true)
88+
assertEstimations(DataFrame.readArrowIPC(testArrowIPC("test-illegal.arrow"), NullabilityOptions.Widening), true, true)
89+
}
3490
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
import io.kotest.matchers.shouldBe
2+
import org.jetbrains.kotlinx.dataframe.AnyFrame
3+
import org.jetbrains.kotlinx.dataframe.DataColumn
4+
import org.jetbrains.kotlinx.dataframe.api.forEachIndexed
5+
import java.math.BigInteger
6+
import java.time.LocalDate
7+
import java.time.LocalDateTime
8+
import java.time.LocalTime
9+
import java.time.ZoneOffset
10+
import kotlin.math.absoluteValue
11+
import kotlin.math.pow
12+
import kotlin.reflect.full.withNullability
13+
import kotlin.reflect.typeOf
14+
15+
/**
16+
* Assert that we have got the same data that was originally saved on example creation.
17+
*/
18+
internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean, hasNulls: Boolean) {
19+
/**
20+
* In [exampleFrame] we get two concatenated batches. To assert the estimations, we should transform frame row number to batch row number
21+
*/
22+
fun iBatch(iFrame: Int): Int {
23+
val firstBatchSize = 100;
24+
return if (iFrame < firstBatchSize) iFrame else iFrame - firstBatchSize
25+
}
26+
27+
fun expectedNull(rowNumber: Int): Boolean {
28+
return (rowNumber + 1) % 5 == 0;
29+
}
30+
31+
fun assertValueOrNull(rowNumber: Int, actual: Any?, expected: Any) {
32+
if (hasNulls && expectedNull(rowNumber)) {
33+
actual shouldBe null
34+
} else {
35+
actual shouldBe expected
36+
}
37+
}
38+
39+
val asciiStringCol = exampleFrame["asciiString"] as DataColumn<String?>
40+
asciiStringCol.type() shouldBe typeOf<String>().withNullability(expectedNullable)
41+
asciiStringCol.forEachIndexed { i, element ->
42+
assertValueOrNull(iBatch(i), element, "Test Example ${iBatch(i)}")
43+
}
44+
45+
val utf8StringCol = exampleFrame["utf8String"] as DataColumn<String?>
46+
utf8StringCol.type() shouldBe typeOf<String>().withNullability(expectedNullable)
47+
utf8StringCol.forEachIndexed { i, element ->
48+
assertValueOrNull(iBatch(i), element, "Тестовый пример ${iBatch(i)}")
49+
}
50+
51+
val largeStringCol = exampleFrame["largeString"] as DataColumn<String?>
52+
largeStringCol.type() shouldBe typeOf<String>().withNullability(expectedNullable)
53+
largeStringCol.forEachIndexed { i, element ->
54+
assertValueOrNull(iBatch(i), element, "Test Example Should Be Large ${iBatch(i)}")
55+
}
56+
57+
val booleanCol = exampleFrame["boolean"] as DataColumn<Boolean?>
58+
booleanCol.type() shouldBe typeOf<Boolean>().withNullability(expectedNullable)
59+
booleanCol.forEachIndexed { i, element ->
60+
assertValueOrNull(iBatch(i), element, iBatch(i) % 2 == 0)
61+
}
62+
63+
val byteCol = exampleFrame["byte"] as DataColumn<Byte?>
64+
byteCol.type() shouldBe typeOf<Byte>().withNullability(expectedNullable)
65+
byteCol.forEachIndexed { i, element ->
66+
assertValueOrNull(iBatch(i), element, (iBatch(i) * 10).toByte())
67+
}
68+
69+
val shortCol = exampleFrame["short"] as DataColumn<Short?>
70+
shortCol.type() shouldBe typeOf<Short>().withNullability(expectedNullable)
71+
shortCol.forEachIndexed { i, element ->
72+
assertValueOrNull(iBatch(i), element, (iBatch(i) * 1000).toShort())
73+
}
74+
75+
val intCol = exampleFrame["int"] as DataColumn<Int?>
76+
intCol.type() shouldBe typeOf<Int>().withNullability(expectedNullable)
77+
intCol.forEachIndexed { i, element ->
78+
assertValueOrNull(iBatch(i), element, iBatch(i) * 100000000)
79+
}
80+
81+
val longCol = exampleFrame["longInt"] as DataColumn<Long?>
82+
longCol.type() shouldBe typeOf<Long>().withNullability(expectedNullable)
83+
longCol.forEachIndexed { i, element ->
84+
assertValueOrNull(iBatch(i), element, iBatch(i) * 100000000000000000L)
85+
}
86+
87+
val unsignedByteCol = exampleFrame["unsigned_byte"] as DataColumn<Short?>
88+
unsignedByteCol.type() shouldBe typeOf<Short>().withNullability(expectedNullable)
89+
unsignedByteCol.forEachIndexed { i, element ->
90+
assertValueOrNull(iBatch(i), element, (iBatch(i) * 10 % (Byte.MIN_VALUE.toShort() * 2).absoluteValue).toShort())
91+
}
92+
93+
val unsignedShortCol = exampleFrame["unsigned_short"] as DataColumn<Int?>
94+
unsignedShortCol.type() shouldBe typeOf<Int>().withNullability(expectedNullable)
95+
unsignedShortCol.forEachIndexed { i, element ->
96+
assertValueOrNull(iBatch(i), element, iBatch(i) * 1000 % (Short.MIN_VALUE.toInt() * 2).absoluteValue)
97+
}
98+
99+
val unsignedIntCol = exampleFrame["unsigned_int"] as DataColumn<Long?>
100+
unsignedIntCol.type() shouldBe typeOf<Long>().withNullability(expectedNullable)
101+
unsignedIntCol.forEachIndexed { i, element ->
102+
assertValueOrNull(iBatch(i), element, iBatch(i).toLong() * 100000000 % (Int.MIN_VALUE.toLong() * 2).absoluteValue)
103+
}
104+
105+
val unsignedLongIntCol = exampleFrame["unsigned_longInt"] as DataColumn<BigInteger?>
106+
unsignedLongIntCol.type() shouldBe typeOf<BigInteger>().withNullability(expectedNullable)
107+
unsignedLongIntCol.forEachIndexed { i, element ->
108+
assertValueOrNull(iBatch(i), element, iBatch(i).toBigInteger() * 100000000000000000L.toBigInteger() % (Long.MIN_VALUE.toBigInteger() * 2.toBigInteger()).abs())
109+
}
110+
111+
val floatCol = exampleFrame["float"] as DataColumn<Float?>
112+
floatCol.type() shouldBe typeOf<Float>().withNullability(expectedNullable)
113+
floatCol.forEachIndexed { i, element ->
114+
assertValueOrNull(iBatch(i), element, 2.0f.pow(iBatch(i).toFloat()))
115+
}
116+
117+
val doubleCol = exampleFrame["double"] as DataColumn<Double?>
118+
doubleCol.type() shouldBe typeOf<Double>().withNullability(expectedNullable)
119+
doubleCol.forEachIndexed { i, element ->
120+
assertValueOrNull(iBatch(i), element, 2.0.pow(iBatch(i)))
121+
}
122+
123+
val dateCol = exampleFrame["date32"] as DataColumn<LocalDate?>
124+
dateCol.type() shouldBe typeOf<LocalDate>().withNullability(expectedNullable)
125+
dateCol.forEachIndexed { i, element ->
126+
assertValueOrNull(iBatch(i), element, LocalDate.ofEpochDay(iBatch(i).toLong() * 30))
127+
}
128+
129+
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
130+
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
131+
datetimeCol.forEachIndexed { i, element ->
132+
assertValueOrNull(iBatch(i), element, LocalDateTime.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC))
133+
}
134+
135+
val timeSecCol = exampleFrame["time32_seconds"] as DataColumn<LocalTime?>
136+
timeSecCol.type() shouldBe typeOf<LocalTime>().withNullability(expectedNullable)
137+
timeSecCol.forEachIndexed { i, element ->
138+
assertValueOrNull(iBatch(i), element, LocalTime.ofSecondOfDay(iBatch(i).toLong()))
139+
}
140+
141+
val timeMilliCol = exampleFrame["time32_milli"] as DataColumn<LocalTime?>
142+
timeMilliCol.type() shouldBe typeOf<LocalTime>().withNullability(expectedNullable)
143+
timeMilliCol.forEachIndexed { i, element ->
144+
assertValueOrNull(iBatch(i), element, LocalTime.ofNanoOfDay(iBatch(i).toLong() * 1000_000))
145+
}
146+
147+
val timeMicroCol = exampleFrame["time64_micro"] as DataColumn<LocalTime?>
148+
timeMicroCol.type() shouldBe typeOf<LocalTime>().withNullability(expectedNullable)
149+
timeMicroCol.forEachIndexed { i, element ->
150+
assertValueOrNull(iBatch(i), element, LocalTime.ofNanoOfDay(iBatch(i).toLong() * 1000))
151+
}
152+
153+
val timeNanoCol = exampleFrame["time64_nano"] as DataColumn<LocalTime?>
154+
timeNanoCol.type() shouldBe typeOf<LocalTime>().withNullability(expectedNullable)
155+
timeNanoCol.forEachIndexed { i, element ->
156+
assertValueOrNull(iBatch(i), element, LocalTime.ofNanoOfDay(iBatch(i).toLong()))
157+
}
158+
159+
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 commit comments

Comments
 (0)