Skip to content

Commit de81ab6

Browse files
committed
enabling column-parallel parsing with coroutines
1 parent fcfac95 commit de81ab6

File tree

4 files changed

+50
-19
lines changed

4 files changed

+50
-19
lines changed

core/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ dependencies {
7373
implementation(libs.fuel)
7474

7575
api(libs.kotlin.datetimeJvm)
76+
implementation(libs.kotlin.coroutinesCore)
7677
implementation(libs.kotlinpoet)
7778
implementation(libs.sl4j)
7879
implementation(libs.kotlinLogging)

core/src/main/kotlin/org/jetbrains/kotlinx/dataframe/api/parse.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public data class ParserOptions(
5858
public fun DataColumn<String?>.tryParse(options: ParserOptions? = null): DataColumn<*> = tryParseImpl(options)
5959

6060
public fun <T> DataFrame<T>.parse(options: ParserOptions? = null): DataFrame<T> =
61-
parse(options) {
61+
parseImpl(options) {
6262
colsAtAnyDepth { !it.isColumnGroup() }
6363
}
6464

core/src/main/kotlin/org/jetbrains/kotlinx/dataframe/impl/api/parse.kt

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package org.jetbrains.kotlinx.dataframe.impl.api
22

3+
import kotlinx.coroutines.async
4+
import kotlinx.coroutines.awaitAll
5+
import kotlinx.coroutines.coroutineScope
6+
import kotlinx.coroutines.runBlocking
37
import kotlinx.datetime.Instant
48
import kotlinx.datetime.LocalDate
59
import kotlinx.datetime.LocalDateTime
@@ -18,13 +22,16 @@ import org.jetbrains.kotlinx.dataframe.DataRow
1822
import org.jetbrains.kotlinx.dataframe.api.GlobalParserOptions
1923
import org.jetbrains.kotlinx.dataframe.api.ParserOptions
2024
import org.jetbrains.kotlinx.dataframe.api.asColumnGroup
25+
import org.jetbrains.kotlinx.dataframe.api.asComparable
2126
import org.jetbrains.kotlinx.dataframe.api.asDataColumn
27+
import org.jetbrains.kotlinx.dataframe.api.asFrameColumn
2228
import org.jetbrains.kotlinx.dataframe.api.cast
23-
import org.jetbrains.kotlinx.dataframe.api.convert
29+
import org.jetbrains.kotlinx.dataframe.api.getColumns
2430
import org.jetbrains.kotlinx.dataframe.api.isColumnGroup
2531
import org.jetbrains.kotlinx.dataframe.api.isFrameColumn
26-
import org.jetbrains.kotlinx.dataframe.api.parse
27-
import org.jetbrains.kotlinx.dataframe.api.to
32+
import org.jetbrains.kotlinx.dataframe.api.name
33+
import org.jetbrains.kotlinx.dataframe.api.toColumn
34+
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
2835
import org.jetbrains.kotlinx.dataframe.api.tryParse
2936
import org.jetbrains.kotlinx.dataframe.columns.size
3037
import org.jetbrains.kotlinx.dataframe.columns.values
@@ -37,6 +44,7 @@ import org.jetbrains.kotlinx.dataframe.impl.javaDurationCanParse
3744
import org.jetbrains.kotlinx.dataframe.io.isURL
3845
import org.jetbrains.kotlinx.dataframe.io.readJsonStr
3946
import org.jetbrains.kotlinx.dataframe.typeClass
47+
import org.jetbrains.kotlinx.dataframe.values
4048
import java.math.BigDecimal
4149
import java.net.URL
4250
import java.text.NumberFormat
@@ -541,19 +549,39 @@ internal fun <T> DataColumn<String?>.parse(parser: StringParser<T>, options: Par
541549
return DataColumn.createValueColumn(name(), parsedValues, parser.type.withNullability(hasNulls)) as DataColumn<T?>
542550
}
543551

544-
internal fun <T> DataFrame<T>.parseImpl(options: ParserOptions?, columns: ColumnsSelector<T, Any?>) =
545-
convert(columns).to {
546-
when {
547-
it.isFrameColumn() -> it.cast<AnyFrame?>().parse(options)
548-
549-
it.isColumnGroup() ->
550-
it.asColumnGroup()
551-
.parse(options) { all() }
552-
.asColumnGroup(it.name())
553-
.asDataColumn()
554-
555-
it.typeClass == String::class -> it.cast<String?>().tryParse(options)
556-
557-
else -> it
558-
}
552+
internal fun <T> DataFrame<T>.parseImpl(options: ParserOptions?, columns: ColumnsSelector<T, Any?>): DataFrame<T> =
553+
runBlocking { parseParallel(options, columns) }
554+
555+
private suspend fun <T> DataFrame<T>.parseParallel(
556+
options: ParserOptions?,
557+
columns: ColumnsSelector<T, Any?>,
558+
): DataFrame<T> =
559+
coroutineScope {
560+
getColumns(columns).map {
561+
async {
562+
when {
563+
it.isFrameColumn() ->
564+
it.asFrameColumn().values.map {
565+
async {
566+
it.parseParallel(options) {
567+
colsAtAnyDepth { !it.isColumnGroup() }
568+
}
569+
}
570+
}.awaitAll()
571+
.toColumn(it.name)
572+
573+
574+
it.isColumnGroup() ->
575+
it.asColumnGroup().parseParallel(options) { all() }
576+
.asColumnGroup(it.name())
577+
.asDataColumn()
578+
579+
it.typeClass == String::class -> it.cast<String?>().tryParse(options)
580+
581+
else -> it
582+
}
583+
}
584+
}.awaitAll()
585+
.toDataFrame()
586+
.cast()
559587
}

gradle/libs.versions.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ postgresql = "42.7.4"
3131
sqlite = "3.46.1.0"
3232
jtsCore = "1.19.0"
3333
kotlinDatetime = "0.6.1"
34+
coroutines = "1.9.0"
3435
openapi = "2.1.22"
3536
kotlinLogging = "7.0.0"
3637
sl4j = "2.0.16"
@@ -84,6 +85,7 @@ jts = { group = "org.locationtech.jts", name = "jts-core", version.ref = "jtsCor
8485

8586
poi-ooxml = { group = "org.apache.poi", name = "poi-ooxml", version.ref = "poi" }
8687
kotlin-datetimeJvm = { group = "org.jetbrains.kotlinx", name = "kotlinx-datetime-jvm", version.ref = "kotlinDatetime" }
88+
kotlin-coroutinesCore = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version.ref = "coroutines" }
8789

8890
junit = { group = "junit", name = "junit", version.ref = "junit" }
8991

0 commit comments

Comments
 (0)