@@ -53,6 +53,10 @@ import org.jetbrains.kotlinx.dataframe.api.map
53
53
import org.jetbrains.kotlinx.dataframe.exceptions.CellConversionException
54
54
import org.jetbrains.kotlinx.dataframe.exceptions.TypeConverterNotFoundException
55
55
import org.jetbrains.kotlinx.dataframe.name
56
+ import org.jetbrains.kotlinx.dataframe.values
57
+ import java.nio.charset.Charset
58
+ import kotlin.reflect.full.isSubtypeOf
59
+ import kotlin.reflect.typeOf
56
60
57
61
/* *
58
62
* Save [dataFrame] content in Apache Arrow format (can be written to File, ByteArray, OutputStream or raw Channel) with [targetSchema].
@@ -67,14 +71,25 @@ internal class ArrowWriterImpl(
67
71
68
72
private val allocator = RootAllocator ()
69
73
70
- private fun allocateVector (vector : FieldVector , size : Int ) {
74
+ private fun allocateVector (vector : FieldVector , size : Int , totalBytes : Long? = null ) {
71
75
when (vector) {
72
76
is FixedWidthVector -> vector.allocateNew(size)
73
- is VariableWidthVector -> vector.allocateNew(size)
77
+ is VariableWidthVector -> totalBytes?. let { vector.allocateNew(it, size) } ? : vector.allocateNew(size)
74
78
else -> throw IllegalArgumentException (" Can not allocate ${vector.javaClass.canonicalName} " )
75
79
}
76
80
}
77
81
82
+ /* *
83
+ * Calculate buffer size for VariableWidthVector (return null for FixedWidthVector)
84
+ */
85
+ private fun countTotalBytes (column : AnyCol ): Long? {
86
+ val columnType = column.type()
87
+ return when {
88
+ columnType.isSubtypeOf(typeOf<String ?>()) -> column.values.fold(0L ) {totalBytes, value -> totalBytes + value.toString().length * 4 }
89
+ else -> null
90
+ }
91
+ }
92
+
78
93
private fun infillWithNulls (vector : FieldVector , size : Int ) {
79
94
when (vector) {
80
95
is BaseFixedWidthVector -> for (i in 0 until size) { vector.setNull(i) }
@@ -189,11 +204,12 @@ internal class ArrowWriterImpl(
189
204
actualField.createVector(allocator)!!
190
205
}
191
206
192
- allocateVector(vector, dataFrame.rowsCount())
193
207
if (convertedColumn == null ) {
194
208
check(actualField.isNullable)
209
+ allocateVector(vector, dataFrame.rowsCount())
195
210
infillWithNulls(vector, dataFrame.rowsCount())
196
211
} else {
212
+ allocateVector(vector, dataFrame.rowsCount(), countTotalBytes(convertedColumn))
197
213
infillVector(vector, convertedColumn)
198
214
}
199
215
return vector
0 commit comments