@@ -6,26 +6,26 @@ import kotlinx.datetime.toJavaLocalDate
6
6
import org.apache.arrow.memory.RootAllocator
7
7
import org.apache.arrow.vector.BaseFixedWidthVector
8
8
import org.apache.arrow.vector.BaseVariableWidthVector
9
- import org.apache.arrow.vector.FieldVector
10
- import org.apache.arrow.vector.FixedWidthVector
11
- import org.apache.arrow.vector.LargeVarCharVector
12
- import org.apache.arrow.vector.TinyIntVector
13
- import org.apache.arrow.vector.SmallIntVector
14
- import org.apache.arrow.vector.IntVector
15
9
import org.apache.arrow.vector.BigIntVector
16
10
import org.apache.arrow.vector.BitVector
17
11
import org.apache.arrow.vector.DateDayVector
18
12
import org.apache.arrow.vector.DateMilliVector
19
- import org.apache.arrow.vector.DecimalVector
20
13
import org.apache.arrow.vector.Decimal256Vector
14
+ import org.apache.arrow.vector.DecimalVector
15
+ import org.apache.arrow.vector.FieldVector
16
+ import org.apache.arrow.vector.FixedWidthVector
21
17
import org.apache.arrow.vector.Float4Vector
22
18
import org.apache.arrow.vector.Float8Vector
19
+ import org.apache.arrow.vector.IntVector
20
+ import org.apache.arrow.vector.LargeVarCharVector
21
+ import org.apache.arrow.vector.SmallIntVector
23
22
import org.apache.arrow.vector.TimeMicroVector
24
23
import org.apache.arrow.vector.TimeMilliVector
25
24
import org.apache.arrow.vector.TimeNanoVector
26
25
import org.apache.arrow.vector.TimeSecVector
27
- import org.apache.arrow.vector.VariableWidthVector
26
+ import org.apache.arrow.vector.TinyIntVector
28
27
import org.apache.arrow.vector.VarCharVector
28
+ import org.apache.arrow.vector.VariableWidthVector
29
29
import org.apache.arrow.vector.VectorSchemaRoot
30
30
import org.apache.arrow.vector.ipc.ArrowFileWriter
31
31
import org.apache.arrow.vector.ipc.ArrowStreamWriter
@@ -41,22 +41,20 @@ import org.jetbrains.kotlinx.dataframe.AnyCol
41
41
import org.jetbrains.kotlinx.dataframe.AnyFrame
42
42
import org.jetbrains.kotlinx.dataframe.DataFrame
43
43
import org.jetbrains.kotlinx.dataframe.api.convertTo
44
- import org.jetbrains.kotlinx.dataframe.api.convertToBoolean
45
44
import org.jetbrains.kotlinx.dataframe.api.convertToBigDecimal
45
+ import org.jetbrains.kotlinx.dataframe.api.convertToBoolean
46
46
import org.jetbrains.kotlinx.dataframe.api.convertToDouble
47
47
import org.jetbrains.kotlinx.dataframe.api.convertToFloat
48
- import org.jetbrains.kotlinx.dataframe.api.convertToLong
49
48
import org.jetbrains.kotlinx.dataframe.api.convertToInt
50
49
import org.jetbrains.kotlinx.dataframe.api.convertToLocalDate
51
- import org.jetbrains.kotlinx.dataframe.api.convertToLocalTime
52
50
import org.jetbrains.kotlinx.dataframe.api.convertToLocalDateTime
51
+ import org.jetbrains.kotlinx.dataframe.api.convertToLocalTime
52
+ import org.jetbrains.kotlinx.dataframe.api.convertToLong
53
53
import org.jetbrains.kotlinx.dataframe.api.convertToString
54
54
import org.jetbrains.kotlinx.dataframe.api.forEachIndexed
55
55
import org.jetbrains.kotlinx.dataframe.exceptions.TypeConversionException
56
56
import org.jetbrains.kotlinx.dataframe.exceptions.TypeConverterNotFoundException
57
57
import org.jetbrains.kotlinx.dataframe.typeClass
58
- import org.slf4j.Logger
59
- import org.slf4j.LoggerFactory
60
58
import java.io.ByteArrayOutputStream
61
59
import java.io.File
62
60
import java.io.FileOutputStream
@@ -69,8 +67,8 @@ import java.time.LocalTime
69
67
import kotlin.reflect.full.isSubtypeOf
70
68
import kotlin.reflect.typeOf
71
69
72
- private val ignoreWarningMessage: (String ) -> Unit = { message: String -> }
73
- private val writeWarningMessage: (String ) -> Unit = {message: String -> System .err.println (message)}
70
+ public val ignoreWarningMessage: (String ) -> Unit = { message: String -> }
71
+ public val writeWarningMessage: (String ) -> Unit = { message: String -> System .err.println (message) }
74
72
75
73
/* *
76
74
* Create Arrow [Schema] matching [this] actual data.
@@ -135,7 +133,7 @@ public class ArrowWriter(
135
133
private val targetSchema : Schema ,
136
134
private val mode : Mode ,
137
135
private val warningSubscriber : (String ) -> Unit = ignoreWarningMessage
138
- ): AutoCloseable {
136
+ ) : AutoCloseable {
139
137
140
138
public companion object {
141
139
/* *
@@ -169,8 +167,8 @@ public class ArrowWriter(
169
167
170
168
private fun infillWithNulls (vector : FieldVector , size : Int ) {
171
169
when (vector) {
172
- is BaseFixedWidthVector -> for ( i in 0 until size) { vector.setNull(i) }
173
- is BaseVariableWidthVector -> for ( i in 0 until size) { vector.setNull(i) }
170
+ is BaseFixedWidthVector -> for (i in 0 until size) { vector.setNull(i) }
171
+ is BaseVariableWidthVector -> for (i in 0 until size) { vector.setNull(i) }
174
172
else -> TODO (" Not implemented for ${vector.javaClass.canonicalName} " )
175
173
}
176
174
vector.valueCount = size
@@ -204,39 +202,38 @@ public class ArrowWriter(
204
202
205
203
private fun infillVector (vector : FieldVector , column : AnyCol ) {
206
204
when (vector) {
207
- is VarCharVector -> column.convertToString().forEachIndexed { i, value -> value?.let { vector.set(i, Text (value)); value} ? : vector.setNull(i) }
208
- is LargeVarCharVector -> column.convertToString().forEachIndexed { i, value -> value?.let { vector.set(i, Text (value)); value} ? : vector.setNull(i) }
205
+ is VarCharVector -> column.convertToString().forEachIndexed { i, value -> value?.let { vector.set(i, Text (value)); value } ? : vector.setNull(i) }
206
+ is LargeVarCharVector -> column.convertToString().forEachIndexed { i, value -> value?.let { vector.set(i, Text (value)); value } ? : vector.setNull(i) }
209
207
// is VarBinaryVector -> todo
210
208
// is LargeVarBinaryVector -> todo
211
- is BitVector -> column.convertToBoolean().forEachIndexed { i, value -> value?.let { vector.set(i, value.compareTo(false )); value} ? : vector.setNull(i) }
212
- is TinyIntVector -> column.convertToInt().forEachIndexed { i, value -> value?.let { vector.set(i, value); value} ? : vector.setNull(i) }
213
- is SmallIntVector -> column.convertToInt().forEachIndexed { i, value -> value?.let { vector.set(i, value); value} ? : vector.setNull(i) }
214
- is IntVector -> column.convertToInt().forEachIndexed { i, value -> value?.let { vector.set(i, value); value} ? : vector.setNull(i) }
215
- is BigIntVector -> column.convertToLong().forEachIndexed { i, value -> value?.let { vector.set(i, value); value} ? : vector.setNull(i) }
209
+ is BitVector -> column.convertToBoolean().forEachIndexed { i, value -> value?.let { vector.set(i, value.compareTo(false )); value } ? : vector.setNull(i) }
210
+ is TinyIntVector -> column.convertToInt().forEachIndexed { i, value -> value?.let { vector.set(i, value); value } ? : vector.setNull(i) }
211
+ is SmallIntVector -> column.convertToInt().forEachIndexed { i, value -> value?.let { vector.set(i, value); value } ? : vector.setNull(i) }
212
+ is IntVector -> column.convertToInt().forEachIndexed { i, value -> value?.let { vector.set(i, value); value } ? : vector.setNull(i) }
213
+ is BigIntVector -> column.convertToLong().forEachIndexed { i, value -> value?.let { vector.set(i, value); value } ? : vector.setNull(i) }
216
214
// is UInt1Vector -> todo
217
215
// is UInt2Vector -> todo
218
216
// is UInt4Vector -> todo
219
217
// is UInt8Vector -> todo
220
- is DecimalVector -> column.convertToBigDecimal().forEachIndexed { i, value -> value?.let { vector.set(i, value); value} ? : vector.setNull(i) }
221
- is Decimal256Vector -> column.convertToBigDecimal().forEachIndexed { i, value -> value?.let { vector.set(i, value); value} ? : vector.setNull(i) }
222
- is Float8Vector -> column.convertToDouble().forEachIndexed { i, value -> value?.let { vector.set(i, value); value} ? : vector.setNull(i) }
223
- is Float4Vector -> column.convertToFloat().forEachIndexed { i, value -> value?.let { vector.set(i, value); value} ? : vector.setNull(i) }
218
+ is DecimalVector -> column.convertToBigDecimal().forEachIndexed { i, value -> value?.let { vector.set(i, value); value } ? : vector.setNull(i) }
219
+ is Decimal256Vector -> column.convertToBigDecimal().forEachIndexed { i, value -> value?.let { vector.set(i, value); value } ? : vector.setNull(i) }
220
+ is Float8Vector -> column.convertToDouble().forEachIndexed { i, value -> value?.let { vector.set(i, value); value } ? : vector.setNull(i) }
221
+ is Float4Vector -> column.convertToFloat().forEachIndexed { i, value -> value?.let { vector.set(i, value); value } ? : vector.setNull(i) }
224
222
225
- is DateDayVector -> column.convertToLocalDate().forEachIndexed { i, value -> value?.let { vector.set(i, (value.toJavaLocalDate().toEpochDay()).toInt()); value} ? : vector.setNull(i) }
226
- is DateMilliVector -> column.convertToLocalDateTime().forEachIndexed { i, value -> value?.let { vector.set(i, value.toInstant(TimeZone .UTC ).toEpochMilliseconds()); value} ? : vector.setNull(i) }
223
+ is DateDayVector -> column.convertToLocalDate().forEachIndexed { i, value -> value?.let { vector.set(i, (value.toJavaLocalDate().toEpochDay()).toInt()); value } ? : vector.setNull(i) }
224
+ is DateMilliVector -> column.convertToLocalDateTime().forEachIndexed { i, value -> value?.let { vector.set(i, value.toInstant(TimeZone .UTC ).toEpochMilliseconds()); value } ? : vector.setNull(i) }
227
225
// is DurationVector -> todo
228
- is TimeNanoVector -> column.convertToLocalTime().forEachIndexed { i, value -> value?.let { vector.set(i, value.toNanoOfDay()); value} ? : vector.setNull(i) }
229
- is TimeMicroVector -> column.convertToLocalTime().forEachIndexed { i, value -> value?.let { vector.set(i, value.toNanoOfDay() / 1000 ); value} ? : vector.setNull(i) }
230
- is TimeMilliVector -> column.convertToLocalTime().forEachIndexed { i, value -> value?.let { vector.set(i, (value.toNanoOfDay() / 1000 / 1000 ).toInt()); value} ? : vector.setNull(i) }
231
- is TimeSecVector -> column.convertToLocalTime().forEachIndexed { i, value -> value?.let { vector.set(i, (value.toNanoOfDay() / 1000 / 1000 / 1000 ).toInt()); value} ? : vector.setNull(i) }
226
+ is TimeNanoVector -> column.convertToLocalTime().forEachIndexed { i, value -> value?.let { vector.set(i, value.toNanoOfDay()); value } ? : vector.setNull(i) }
227
+ is TimeMicroVector -> column.convertToLocalTime().forEachIndexed { i, value -> value?.let { vector.set(i, value.toNanoOfDay() / 1000 ); value } ? : vector.setNull(i) }
228
+ is TimeMilliVector -> column.convertToLocalTime().forEachIndexed { i, value -> value?.let { vector.set(i, (value.toNanoOfDay() / 1000 / 1000 ).toInt()); value } ? : vector.setNull(i) }
229
+ is TimeSecVector -> column.convertToLocalTime().forEachIndexed { i, value -> value?.let { vector.set(i, (value.toNanoOfDay() / 1000 / 1000 / 1000 ).toInt()); value } ? : vector.setNull(i) }
232
230
// is StructVector -> todo
233
231
else -> {
234
232
TODO (" Saving to ${vector.javaClass.canonicalName} is not implemented" )
235
233
}
236
234
}
237
235
238
236
vector.valueCount = dataFrame.rowsCount()
239
-
240
237
}
241
238
242
239
/* *
@@ -259,7 +256,7 @@ public class ArrowWriter(
259
256
}
260
257
}
261
258
262
- val (convertedColumn, actualField) = try {
259
+ val (convertedColumn, actualField) = try {
263
260
convertColumnToTarget(column, field.type) to field
264
261
} catch (e: TypeConversionException ) {
265
262
handleConversionFail(e)
@@ -313,7 +310,7 @@ public class ArrowWriter(
313
310
mainVectors[field.name] = vector
314
311
}
315
312
} catch (e: Exception ) {
316
- mainVectors.values.forEach { it.close() } // Clear buffers before throwing exception
313
+ mainVectors.values.forEach { it.close() } // Clear buffers before throwing exception
317
314
throw e
318
315
}
319
316
val vectors = ArrayList <FieldVector >()
@@ -322,7 +319,9 @@ public class ArrowWriter(
322
319
if (! mode.restrictWidening) {
323
320
vectors.addAll(otherColumns.toVectors())
324
321
} else {
325
- otherColumns.forEach { warningSubscriber(" Column \" ${it.name()} \" is not described in target schema and was ignored" ) }
322
+ otherColumns.forEach {
323
+ warningSubscriber(" Column \" ${it.name()} \" is not described in target schema and was ignored" )
324
+ }
326
325
}
327
326
return VectorSchemaRoot (vectors)
328
327
}
@@ -335,7 +334,7 @@ public class ArrowWriter(
335
334
public fun writeArrowIPC (channel : WritableByteChannel ) {
336
335
allocateVectorSchemaRoot().use { vectorSchemaRoot ->
337
336
ArrowStreamWriter (vectorSchemaRoot, null , channel).use { writer ->
338
- writer.writeBatch();
337
+ writer.writeBatch()
339
338
}
340
339
}
341
340
}
@@ -372,7 +371,7 @@ public class ArrowWriter(
372
371
public fun writeArrowFeather (channel : WritableByteChannel ) {
373
372
allocateVectorSchemaRoot().use { vectorSchemaRoot ->
374
373
ArrowFileWriter (vectorSchemaRoot, null , channel).use { writer ->
375
- writer.writeBatch();
374
+ writer.writeBatch()
376
375
}
377
376
}
378
377
}
0 commit comments