Skip to content

Commit 90cc6f8

Browse files
committed
ArrowWriter Mode, saving Feather/IPC to Channel/Stream/File/ByteArray
1 parent fcfae09 commit 90cc6f8

File tree

1 file changed

+183
-80
lines changed
  • dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io

1 file changed

+183
-80
lines changed

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowWriter.kt

Lines changed: 183 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,25 @@ import org.apache.arrow.vector.types.pojo.FieldType
1616
import org.apache.arrow.vector.types.pojo.Schema
1717
import org.apache.arrow.vector.util.Text
1818
import org.jetbrains.kotlinx.dataframe.AnyCol
19+
import org.jetbrains.kotlinx.dataframe.AnyFrame
1920
import org.jetbrains.kotlinx.dataframe.DataFrame
2021
import org.jetbrains.kotlinx.dataframe.api.*
2122
import org.jetbrains.kotlinx.dataframe.exceptions.TypeConversionException
2223
import java.io.ByteArrayOutputStream
24+
import java.io.File
25+
import java.io.FileOutputStream
26+
import java.io.OutputStream
2327
import java.nio.channels.Channels
2428
import java.nio.channels.WritableByteChannel
2529
import java.time.LocalDate
2630
import java.time.LocalDateTime
2731
import java.time.LocalTime
2832
import kotlin.reflect.typeOf
2933

34+
/**
35+
* Create Arrow [Schema] matching [this] actual data.
36+
* Columns with not supported types will be interpreted as String
37+
*/
3038
public fun List<AnyCol>.toArrowSchema(): Schema {
3139
val fields = this.map { column ->
3240
when (column.type()) {
@@ -69,15 +77,42 @@ public fun List<AnyCol>.toArrowSchema(): Schema {
6977
return Schema(fields)
7078
}
7179

72-
public fun DataFrame<*>.arrowWriter(): ArrowWriter = ArrowWriter(this, this.columns().toArrowSchema())
80+
/**
81+
* Create [ArrowWriter] for [this] DataFrame with target schema matching actual data
82+
*/
83+
public fun DataFrame<*>.arrowWriter(): ArrowWriter = this.arrowWriter(this.columns().toArrowSchema())
7384

74-
public fun DataFrame<*>.arrowWriter(targetSchema: Schema): ArrowWriter = ArrowWriter(this, targetSchema)
85+
/**
86+
* Create [ArrowWriter] for [this] DataFrame with explicit [targetSchema]
87+
*/
88+
public fun DataFrame<*>.arrowWriter(targetSchema: Schema, mode: ArrowWriter.Companion.Mode = ArrowWriter.Companion.Mode.STRICT): ArrowWriter = ArrowWriter(this, targetSchema, mode)
7589

7690
/**
77-
* Save [dataFrame] content in Apache Arrow format (can be written to File, ByteArray or stream) with [targetSchema].
78-
*
91+
* Save [dataFrame] content in Apache Arrow format (can be written to File, ByteArray, OutputStream or raw Channel) with [targetSchema].
92+
* If [dataFrame] content does not match with [targetSchema], behaviour is specified by [mode]
7993
*/
80-
public class ArrowWriter(public val dataFrame: DataFrame<*>, public val targetSchema: Schema): AutoCloseable {
94+
public class ArrowWriter(private val dataFrame: DataFrame<*>, private val targetSchema: Schema, private val mode: Mode): AutoCloseable {
95+
96+
public companion object {
97+
/**
98+
* If [restrictWidening] is true, [dataFrame] columns not described in [targetSchema] would not be saved (otherwise, would be saved as is).
99+
* If [restrictNarrowing] is true, [targetSchema] fields that are not nullable and do not exist in [dataFrame] will produce exception (otherwise, would not be saved).
100+
* If [strictType] is true, [dataFrame] columns described in [targetSchema] with non-compatible type will produce exception (otherwise, would be saved as is).
101+
* If [strictNullable] is true, [targetSchema] fields that are not nullable and contain nulls in [dataFrame] will produce exception (otherwise, would be saved as is with nullable = true).
102+
*/
103+
public class Mode(
104+
public val restrictWidening: Boolean,
105+
public val restrictNarrowing: Boolean,
106+
public val strictType: Boolean,
107+
public val strictNullable: Boolean
108+
) {
109+
public companion object {
110+
public val STRICT: Mode = Mode(true, true, true, true)
111+
public val LOYAL: Mode = Mode(false, false, false, false)
112+
}
113+
}
114+
}
115+
81116
private val allocator = RootAllocator()
82117

83118
private fun allocateVector(vector: FieldVector, size: Int) {
@@ -201,117 +236,185 @@ public class ArrowWriter(public val dataFrame: DataFrame<*>, public val targetSc
201236
}
202237

203238
/**
204-
* Create Arrow VectorSchemaRoot with [dataFrame] content cast to [targetSchema].
205-
* If [restrictWidening] is true, [dataFrame] columns not described in [targetSchema] would not be saved (otherwise, would be saved as is).
206-
* If [restrictNarrowing] is true, [targetSchema] fields that are not nullable and do not exist in [dataFrame] will produce exception (otherwise, would not be saved).
207-
* If [strictType] is true, [dataFrame] columns described in [targetSchema] with non-compatible type will produce exception (otherwise, would be saved as is).
208-
* If [strictNullable] is true, [targetSchema] fields that are not nullable and contain nulls in [dataFrame] will produce exception (otherwise, would be saved as is with nullable = true).
239+
* Create Arrow VectorSchemaRoot with [dataFrame] content cast to [targetSchema] according to the [mode].
209240
*/
210-
private fun allocateVectorSchemaRoot(
211-
restrictWidening: Boolean = true,
212-
restrictNarrowing: Boolean = true,
213-
strictType: Boolean = true,
214-
strictNullable: Boolean = true
215-
): VectorSchemaRoot {
241+
private fun allocateVectorSchemaRoot(): VectorSchemaRoot {
216242
val mainVectors = LinkedHashMap<String, FieldVector>()
217243
for (field in targetSchema.fields) {
218244
val column = dataFrame.getColumnOrNull(field.name)
219245
if (column == null && !field.isNullable) {
220-
if (restrictNarrowing) {
246+
if (mode.restrictNarrowing) {
221247
throw Exception("${field.name} column is not presented")
222248
} else {
223249
continue
224250
}
225251
}
226252

227-
val vector = allocateVectorAndInfill(field, column, strictType, strictNullable)
253+
val vector = allocateVectorAndInfill(field, column, mode.strictType, mode.strictNullable)
228254
mainVectors[field.name] = vector
229255
}
230256
val vectors = ArrayList<FieldVector>()
231257
vectors.addAll(mainVectors.values)
232-
if (!restrictWidening) {
258+
if (!mode.restrictWidening) {
233259
val otherVectors = dataFrame.columns().filter { column -> !mainVectors.containsKey(column.name()) }.toVectors()
234260
vectors.addAll(otherVectors)
235261
}
236262
return VectorSchemaRoot(vectors)
237263
}
238264

239-
public fun featherToChannel(channel: WritableByteChannel) {
240-
allocateVectorSchemaRoot(false, false, false, false).use { vectorSchemaRoot ->
241-
ArrowFileWriter(vectorSchemaRoot, null, channel).use { writer ->
265+
// IPC saving block
266+
267+
/**
268+
* Save data to [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format), write to opened [channel].
269+
*/
270+
public fun writeArrowIPC(channel: WritableByteChannel) {
271+
allocateVectorSchemaRoot().use { vectorSchemaRoot ->
272+
ArrowStreamWriter(vectorSchemaRoot, null, channel).use { writer ->
242273
writer.writeBatch();
243274
}
244275
}
245276
}
246277

247-
public fun ipcToChannel(channel: WritableByteChannel) {
248-
allocateVectorSchemaRoot(false, false, false, false).use { vectorSchemaRoot ->
249-
ArrowStreamWriter(vectorSchemaRoot, null, channel).use { writer ->
278+
/**
279+
* Save data to [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format), write to opened [stream].
280+
*/
281+
public fun writeArrowIPC(stream: OutputStream) {
282+
writeArrowIPC(Channels.newChannel(stream))
283+
}
284+
285+
/**
286+
* Save data to [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format), write to new or existing [file].
287+
* If file exists, it can be recreated or expanded.
288+
*/
289+
public fun writeArrowIPC(file: File, append: Boolean = true) {
290+
writeArrowIPC(FileOutputStream(file, append))
291+
}
292+
293+
/**
294+
* Save data to [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format), write to new [ByteArray]
295+
*/
296+
public fun saveArrowIPCToByteArray(): ByteArray {
297+
val stream = ByteArrayOutputStream()
298+
writeArrowIPC(stream)
299+
return stream.toByteArray()
300+
}
301+
302+
// Feather saving block
303+
304+
/**
305+
* Save data to [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files), write to opened [channel].
306+
*/
307+
public fun writeArrowFeather(channel: WritableByteChannel) {
308+
allocateVectorSchemaRoot().use { vectorSchemaRoot ->
309+
ArrowFileWriter(vectorSchemaRoot, null, channel).use { writer ->
250310
writer.writeBatch();
251311
}
252312
}
253313
}
254314

255-
public fun featherToByteArray(): ByteArray {
256-
ByteArrayOutputStream().use { byteArrayStream ->
257-
Channels.newChannel(byteArrayStream).use { channel ->
258-
featherToChannel(channel)
259-
return byteArrayStream.toByteArray()
260-
}
261-
}
315+
/**
316+
* Save data to [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files), write to opened [stream].
317+
*/
318+
public fun writeArrowFeather(stream: OutputStream) {
319+
writeArrowFeather(Channels.newChannel(stream))
262320
}
263321

264-
public fun iptToByteArray(): ByteArray {
265-
ByteArrayOutputStream().use { byteArrayStream ->
266-
Channels.newChannel(byteArrayStream).use { channel ->
267-
ipcToChannel(channel)
268-
return byteArrayStream.toByteArray()
269-
}
270-
}
322+
/**
323+
* Save data to [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files), write to new or existing [file].
324+
* If file exists, it would be recreated.
325+
*/
326+
public fun writeArrowFeather(file: File) {
327+
writeArrowFeather(FileOutputStream(file))
328+
}
329+
330+
/**
331+
* Save data to [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files), write to new [ByteArray]
332+
*/
333+
public fun saveArrowFeatherToByteArray(): ByteArray {
334+
val stream = ByteArrayOutputStream()
335+
writeArrowFeather(stream)
336+
return stream.toByteArray()
271337
}
272338

273339
override fun close() {
274340
allocator.close()
275341
}
276342
}
277-
//
278-
//// IPC saving block
279-
//
280-
///**
281-
// * Save data to [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format), write to new or existing [file].
282-
// * If file exists, it can be recreated or expanded.
283-
// */
284-
//public fun AnyFrame.writeArrowIPC(file: File, append: Boolean = true) {
285-
//
286-
//}
287-
//
288-
///**
289-
// * Save data to [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format), write to [ByteArray]
290-
// */
291-
//public fun AnyFrame.writeArrowIPCToByteArray() {
292-
//
293-
//}
294-
//
295-
//// Feather saving block
296-
//
297-
///**
298-
// * Save data to [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files), write to new or existing [file].
299-
// * If file exists, it would be recreated.
300-
// */
301-
//public fun AnyFrame.writeArrowFeather(file: File) {
302-
//
303-
//}
304-
//
305-
///**
306-
// * Save data to [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files), write to [ByteArray]
307-
// */
308-
//public fun DataFrame.Companion.writeArrowFeatherToByteArray(): ByteArray {
309-
//
310-
//}
311-
//
312-
///**
313-
// * Write [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files) from existing [stream]
314-
// */
315-
//public fun DataFrame.Companion.writeArrowFeather(stream: OutputStream) {
316-
//
317-
//}
343+
344+
// IPC saving block with default parameters
345+
346+
/**
347+
* Save data to [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format), write to opened [channel].
348+
*/
349+
public fun AnyFrame.writeArrowIPC(channel: WritableByteChannel) {
350+
this.arrowWriter().use { writer ->
351+
writer.writeArrowIPC(channel)
352+
}
353+
}
354+
355+
/**
356+
* Save data to [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format), write to opened [stream].
357+
*/
358+
public fun AnyFrame.writeArrowIPC(stream: OutputStream) {
359+
this.arrowWriter().use { writer ->
360+
writer.writeArrowIPC(stream)
361+
}
362+
}
363+
364+
/**
365+
* Save data to [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format), write to new or existing [file].
366+
* If file exists, it can be recreated or expanded.
367+
*/
368+
public fun AnyFrame.writeArrowIPC(file: File, append: Boolean = true) {
369+
this.arrowWriter().use { writer ->
370+
writer.writeArrowIPC(file, append)
371+
}
372+
}
373+
374+
/**
375+
* Save data to [Arrow interprocess streaming format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-streaming-format), write to new [ByteArray]
376+
*/
377+
public fun AnyFrame.saveArrowIPCToByteArray(): ByteArray {
378+
return this.arrowWriter().use { writer ->
379+
writer.saveArrowIPCToByteArray()
380+
}
381+
}
382+
383+
// Feather saving block with default parameters
384+
385+
/**
386+
* Save data to [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files), write to opened [channel].
387+
*/
388+
public fun AnyFrame.writeArrowFeather(channel: WritableByteChannel) {
389+
this.arrowWriter().use { writer ->
390+
writer.writeArrowFeather(channel)
391+
}
392+
}
393+
394+
/**
395+
* Save data to [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files), write to opened [stream].
396+
*/
397+
public fun AnyFrame.writeArrowFeather(stream: OutputStream) {
398+
this.arrowWriter().use { writer ->
399+
writer.writeArrowFeather(stream)
400+
}
401+
}
402+
403+
/**
404+
* Save data to [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files), write to new or existing [file].
405+
* If file exists, it would be recreated.
406+
*/
407+
public fun AnyFrame.writeArrowFeather(file: File) {
408+
this.arrowWriter().use { writer ->
409+
writer.writeArrowFeather(file)
410+
}
411+
}
412+
413+
/**
414+
* Save data to [Arrow random access format](https://arrow.apache.org/docs/java/ipc.html#writing-and-reading-random-access-files), write to new [ByteArray]
415+
*/
416+
public fun AnyFrame.saveArrowFeatherToByteArray(): ByteArray {
417+
return this.arrowWriter().use { writer ->
418+
writer.saveArrowFeatherToByteArray()
419+
}
420+
}

0 commit comments

Comments
 (0)