Skip to content

Commit 913d6a0

Browse files
committed
Add custom fill of missing columns in convertTo
1 parent b59cfb6 commit 913d6a0

File tree

10 files changed

+144
-41
lines changed

10 files changed

+144
-41
lines changed

core/src/main/kotlin/org/jetbrains/kotlinx/dataframe/api/convertTo.kt

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package org.jetbrains.kotlinx.dataframe.api
22

33
import org.jetbrains.kotlinx.dataframe.AnyFrame
4+
import org.jetbrains.kotlinx.dataframe.ColumnsSelector
45
import org.jetbrains.kotlinx.dataframe.DataColumn
56
import org.jetbrains.kotlinx.dataframe.DataFrame
7+
import org.jetbrains.kotlinx.dataframe.RowExpression
68
import org.jetbrains.kotlinx.dataframe.exceptions.ColumnNotFoundException
79
import org.jetbrains.kotlinx.dataframe.exceptions.ExcessiveColumnsException
810
import org.jetbrains.kotlinx.dataframe.exceptions.TypeConversionException
911
import org.jetbrains.kotlinx.dataframe.exceptions.TypeConverterNotFoundException
12+
import org.jetbrains.kotlinx.dataframe.impl.api.ConvertSchemaDslInternal
1013
import org.jetbrains.kotlinx.dataframe.impl.api.convertToImpl
1114
import org.jetbrains.kotlinx.dataframe.schema.ColumnSchema
1215
import kotlin.reflect.KProperty
@@ -15,16 +18,29 @@ import kotlin.reflect.typeOf
1518

1619
public enum class ExcessiveColumns { Remove, Keep, Fail }
1720

21+
/**
22+
* Holds data context for [fill] operation
23+
*/
24+
public data class ConvertToFill<T, C>(
25+
internal val dsl: ConvertSchemaDsl<T>,
26+
val columns: ColumnsSelector<T, C>
27+
)
28+
1829
/** Provides access to [fromType] and [toSchema] in the flexible [ConvertSchemaDsl.convertIf] method. */
1930
public class ConverterScope(public val fromType: KType, public val toSchema: ColumnSchema)
2031

21-
/** Dsl to define how specific type conversion should occur.
32+
/**
33+
* Dsl to customize column conversion
2234
*
2335
* Example:
2436
* ```kotlin
2537
* df.convertTo<SomeSchema> {
2638
* // defines how to convert Int? -> String
2739
* convert<Int?>().with { it?.toString() ?: "No input given" }
40+
* // defines how to convert String -> SomeType
41+
* parser { SomeType(it) }
42+
* // fill missing column `sum` with expression `a+b`
43+
* fill { sum }.with { a + b }
2844
* }
2945
* ```
3046
*/
@@ -56,6 +72,17 @@ public interface ConvertSchemaDsl<in T> {
5672
)
5773
}
5874

75+
/**
76+
* Defines how to fill specified columns in destination schema that were not found in original dataframe.
77+
* All [fill] operations for missing columns are executed after successful conversion of matched columns, so converted values of matched columns can be safely used in [with] expression.
78+
* @param columns target columns in destination dataframe schema to be filled
79+
*/
80+
public inline fun <T, reified C> ConvertSchemaDsl<T>.fill(noinline columns: ColumnsSelector<T, C>): ConvertToFill<T, C> = ConvertToFill(this, columns)
81+
82+
public fun <T, C> ConvertToFill<T, C>.with(expr: RowExpression<T, C>) {
83+
(dsl as ConvertSchemaDslInternal<T>).fill(columns as ColumnsSelector<*, C>, expr as RowExpression<*, C>)
84+
}
85+
5986
/**
6087
* Defines how to convert `String` values into given type [C].
6188
*/
@@ -95,6 +122,10 @@ public class ConvertType<T>(
95122
* df.convertTo<SomeSchema> {
96123
* // defines how to convert Int? -> String
97124
* convert<Int?>().with { it?.toString() ?: "No input given" }
125+
* // defines how to convert String -> SomeType
126+
* parser { SomeType(it) }
127+
* // fill missing column `sum` with expression `a + b`
128+
* fill { sum }.with { a + b }
98129
* }
99130
* ```
100131
*
@@ -109,8 +140,8 @@ public class ConvertType<T>(
109140
*/
110141
public inline fun <reified T : Any> AnyFrame.convertTo(
111142
excessiveColumnsBehavior: ExcessiveColumns = ExcessiveColumns.Keep,
112-
noinline body: ConvertSchemaDsl<T>.() -> Unit = {},
113-
): DataFrame<T> = convertTo(typeOf<T>(), excessiveColumnsBehavior, body).cast()
143+
noinline body: ConvertSchemaDsl<T>.() -> Unit = {}
144+
): DataFrame<T> = convertToImpl(typeOf<T>(), true, excessiveColumnsBehavior, body).cast()
114145

115146
/**
116147
* Converts values in [DataFrame] to match given column schema [schemaType].
@@ -126,6 +157,10 @@ public inline fun <reified T : Any> AnyFrame.convertTo(
126157
* df.convertTo<SomeSchema> {
127158
* // defines how to convert Int? -> String
128159
* convert<Int?>().with { it?.toString() ?: "No input given" }
160+
* // defines how to convert String -> SomeType
161+
* parser { SomeType(it) }
162+
* // fill missing column `sum` with expression `a+b`
163+
* fill { sum }.with { a + b }
129164
* }
130165
* ```
131166
*

core/src/main/kotlin/org/jetbrains/kotlinx/dataframe/columns/ColumnPath.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,7 @@ public data class ColumnPath(val path: List<String>) : List<String> by path, Col
5151

5252
override fun toString(): String = path.toString()
5353

54+
public fun joinToString(separator: String = "/"): String = path.joinToString(separator)
55+
5456
override fun <C> get(column: ColumnReference<C>): ColumnAccessor<C> = ColumnAccessorImpl(this + column.path())
5557
}
Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
package org.jetbrains.kotlinx.dataframe.exceptions
22

3+
import org.jetbrains.kotlinx.dataframe.AnyCol
4+
import org.jetbrains.kotlinx.dataframe.path
5+
import kotlin.reflect.*
36
import kotlin.reflect.KType
47

5-
public open class TypeConversionException(public val value: Any?, public val from: KType, public val to: KType) : RuntimeException() {
8+
public open class TypeConversionException(
9+
public val value: Any?,
10+
public val from: KType,
11+
public val to: KType,
12+
public val column: AnyCol?
13+
) : RuntimeException() {
614

715
override val message: String
8-
get() = "Failed to convert '$value' from $from to $to"
16+
get() = "Failed to convert '$value' from $from to $to" + (column?.let { " in column ${it.path.joinToString()}" } ?: "")
917
}
Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package org.jetbrains.kotlinx.dataframe.exceptions
22

3+
import org.jetbrains.kotlinx.dataframe.AnyCol
4+
import org.jetbrains.kotlinx.dataframe.path
5+
import kotlin.reflect.*
36
import kotlin.reflect.KType
47

5-
public class TypeConverterNotFoundException(public val from: KType, public val to: KType) : IllegalArgumentException() {
8+
public class TypeConverterNotFoundException(public val from: KType, public val to: KType, public val column: AnyCol?) : IllegalArgumentException() {
69

710
override val message: String
8-
get() = "Type converter from $from to $to is not found"
11+
get() = "Type converter from $from to $to is not found" + (column?.let { " for column ${it.path.joinToString()}" } ?: "")
912
}

core/src/main/kotlin/org/jetbrains/kotlinx/dataframe/impl/api/convert.kt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,7 @@ internal fun AnyCol.convertToTypeImpl(to: KType): AnyCol {
7878
nullsFound = true
7979
null
8080
}
81-
82-
else -> throw TypeConversionException(null, from, to)
81+
else -> throw TypeConversionException(null, from, to, this)
8382
}
8483

8584
fun applyConverter(converter: TypeConverter): AnyCol {
@@ -107,13 +106,13 @@ internal fun AnyCol.convertToTypeImpl(to: KType): AnyCol {
107106
val clazz = it.javaClass.kotlin
108107
val type = clazz.createStarProjectedType(false)
109108
val converter = getConverter(type, to, ParserOptions(locale = Locale.getDefault()))
110-
?: throw TypeConverterNotFoundException(from, to)
109+
?: throw TypeConverterNotFoundException(from, to, this)
111110
converter(it)
112111
}.checkNulls()
113112
}
114113
DataColumn.createValueColumn(name, values, to.withNullability(nullsFound))
115114
}
116-
else -> throw TypeConverterNotFoundException(from, to)
115+
else -> throw TypeConverterNotFoundException(from, to, this)
117116
}
118117
} catch (e: TypeConversionException) {
119118
throw CellConversionException(e.value, e.from, e.to, this.name(), currentRow, e)
@@ -146,7 +145,7 @@ internal fun Any.convertTo(type: KType): Any? {
146145
val clazz = javaClass.kotlin
147146
if (clazz.isSubclassOf(type.jvmErasure)) return this
148147
val from = clazz.createStarProjectedType(false)
149-
val converter = getConverter(from, type) ?: throw TypeConverterNotFoundException(from, type)
148+
val converter = getConverter(from, type) ?: throw TypeConverterNotFoundException(from, type, null)
150149
return converter(this)
151150
}
152151

@@ -170,11 +169,11 @@ internal fun createConverter(from: KType, to: KType, options: ParserOptions? = n
170169
toClass.primaryConstructor ?: error("Value type $toClass doesn't have primary constructor")
171170
val underlyingType = constructor.parameters.single().type
172171
val converter = getConverter(from, underlyingType)
173-
?: throw TypeConverterNotFoundException(from, underlyingType)
172+
?: throw TypeConverterNotFoundException(from, underlyingType, null)
174173
return convert<Any> {
175174
val converted = converter(it)
176175
if (converted == null && !underlyingType.isMarkedNullable) {
177-
throw TypeConversionException(it, from, underlyingType)
176+
throw TypeConversionException(it, from, underlyingType, null)
178177
}
179178
constructor.call(converted)
180179
}
@@ -217,14 +216,15 @@ internal fun createConverter(from: KType, to: KType, options: ParserOptions? = n
217216
val constructorParameter = constructor.parameters.single()
218217
val underlyingType = constructorParameter.type
219218
val converter = getConverter(underlyingType, to)
220-
?: throw TypeConverterNotFoundException(underlyingType, to)
219+
?: throw TypeConverterNotFoundException(underlyingType, to, null)
221220
val property =
222221
fromClass.memberProperties.single { it.name == constructorParameter.name } as kotlin.reflect.KProperty1<Any, *>
223222
if (property.visibility != kotlin.reflect.KVisibility.PUBLIC) {
224223
throw TypeConversionException(
225224
"Not public member property in primary constructor of value type",
226225
from,
227-
to
226+
to,
227+
null
228228
)
229229
}
230230

core/src/main/kotlin/org/jetbrains/kotlinx/dataframe/impl/api/convertTo.kt

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package org.jetbrains.kotlinx.dataframe.impl.api
22

33
import org.jetbrains.kotlinx.dataframe.AnyCol
44
import org.jetbrains.kotlinx.dataframe.AnyFrame
5+
import org.jetbrains.kotlinx.dataframe.ColumnsSelector
56
import org.jetbrains.kotlinx.dataframe.DataColumn
67
import org.jetbrains.kotlinx.dataframe.DataFrame
78
import org.jetbrains.kotlinx.dataframe.DataRow
9+
import org.jetbrains.kotlinx.dataframe.RowExpression
810
import org.jetbrains.kotlinx.dataframe.api.ConvertSchemaDsl
911
import org.jetbrains.kotlinx.dataframe.api.ConverterScope
1012
import org.jetbrains.kotlinx.dataframe.api.ExcessiveColumns
@@ -14,10 +16,13 @@ import org.jetbrains.kotlinx.dataframe.api.allNulls
1416
import org.jetbrains.kotlinx.dataframe.api.asColumnGroup
1517
import org.jetbrains.kotlinx.dataframe.api.convertTo
1618
import org.jetbrains.kotlinx.dataframe.api.emptyDataFrame
19+
import org.jetbrains.kotlinx.dataframe.api.getColumnPaths
1720
import org.jetbrains.kotlinx.dataframe.api.isEmpty
1821
import org.jetbrains.kotlinx.dataframe.api.map
1922
import org.jetbrains.kotlinx.dataframe.api.name
2023
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
24+
import org.jetbrains.kotlinx.dataframe.api.update
25+
import org.jetbrains.kotlinx.dataframe.api.with
2126
import org.jetbrains.kotlinx.dataframe.codeGen.MarkersExtractor
2227
import org.jetbrains.kotlinx.dataframe.columns.ColumnKind
2328
import org.jetbrains.kotlinx.dataframe.columns.ColumnPath
@@ -39,8 +44,16 @@ import kotlin.reflect.jvm.jvmErasure
3944

4045
private open class Converter(val transform: ConverterScope.(Any?) -> Any?, val skipNulls: Boolean)
4146

42-
private class ConvertSchemaDslImpl<T> : ConvertSchemaDsl<T> {
43-
private val converters: MutableMap<Pair<KType, KType>, Converter> = mutableMapOf()
47+
private class Filler(val columns: ColumnsSelector<*, *>, val expr: RowExpression<*, *>)
48+
49+
internal interface ConvertSchemaDslInternal<T> : ConvertSchemaDsl<T> {
50+
public fun <C> fill(columns: ColumnsSelector<*, C>, expr: RowExpression<*, C>)
51+
}
52+
53+
private class ConvertSchemaDslImpl<T> : ConvertSchemaDslInternal<T> {
54+
private val converters: MutableMap<Pair<KType, KType>, Converter> = mutableMapOf<Pair<KType, KType>, Converter>()
55+
56+
val fillers = mutableListOf<Filler>()
4457

4558
private val flexibleConverters: MutableMap<(KType, ColumnSchema) -> Boolean, Converter> = mutableMapOf()
4659

@@ -50,6 +63,10 @@ private class ConvertSchemaDslImpl<T> : ConvertSchemaDsl<T> {
5063
Converter({ converter(it as A) }, !from.isMarkedNullable)
5164
}
5265

66+
override fun <C> fill(columns: ColumnsSelector<*, C>, expr: RowExpression<*, C>) {
67+
fillers.add(Filler(columns, expr))
68+
}
69+
5370
override fun convertIf(
5471
condition: (KType, ColumnSchema) -> Boolean,
5572
converter: ConverterScope.(Any?) -> Any?,
@@ -80,13 +97,15 @@ internal fun AnyFrame.convertToImpl(
8097
val dsl = ConvertSchemaDslImpl<Any>()
8198
dsl.body()
8299

100+
val missingPaths = mutableSetOf<ColumnPath>()
101+
83102
fun AnyFrame.convertToSchema(schema: DataFrameSchema, path: ColumnPath): AnyFrame {
84103
// if current frame is empty
85104
if (this.isEmpty()) {
86105
return schema.createEmptyDataFrame()
87106
}
88107

89-
var visited = 0
108+
val visited = mutableSetOf<String>()
90109
val newColumns = columns().mapNotNull { originalColumn ->
91110
val targetColumn = schema.columns[originalColumn.name()]
92111
if (targetColumn == null) {
@@ -96,8 +115,7 @@ internal fun AnyFrame.convertToImpl(
96115
ExcessiveColumns.Remove -> null
97116
}
98117
} else {
99-
visited++
100-
118+
visited.add(originalColumn.name())
101119
val currentSchema = originalColumn.extractSchema()
102120
when {
103121
targetColumn == currentSchema -> originalColumn
@@ -130,7 +148,7 @@ internal fun AnyFrame.convertToImpl(
130148
it
131149
}
132150

133-
if (!nullsAllowed && result == null) throw TypeConversionException(it, from, to)
151+
if (!nullsAllowed && result == null) throw TypeConversionException(it, from, to, originalColumn)
134152

135153
result
136154
}
@@ -210,7 +228,6 @@ internal fun AnyFrame.convertToImpl(
210228
}.toMutableList()
211229

212230
// when the target is nullable but the source does not contain a column, fill it in with nulls / empty dataframes
213-
val newColumnsNames = newColumns.map { it.name() }
214231
val size = this.size.nrow
215232
schema.columns.forEach { (name, targetColumn) ->
216233
val isNullable =
@@ -219,20 +236,30 @@ internal fun AnyFrame.convertToImpl(
219236
targetColumn.contentType?.isMarkedNullable == true || // like DataRow<Something?> for a group column (all columns in the group will be nullable)
220237
targetColumn.kind == ColumnKind.Frame // frame column can be filled with empty dataframes
221238

222-
if (name !in newColumnsNames && isNullable) {
223-
visited++
224-
newColumns += targetColumn.createEmptyColumn(name, size)
239+
if (name !in visited) {
240+
if (isNullable) {
241+
newColumns += targetColumn.createEmptyColumn(name, size)
242+
} else missingPaths.add(path + name)
225243
}
226244
}
227-
228-
if (visited != schema.columns.size) {
229-
val unvisited = schema.columns.keys - columnNames().toSet()
230-
throw IllegalArgumentException("The following columns were not found in DataFrame: $unvisited, and their type was not nullable")
231-
}
232245
return newColumns.toDataFrame()
233246
}
234247

235248
val clazz = type.jvmErasure
236249
val marker = MarkersExtractor.get(clazz)
237-
return convertToSchema(marker.schema, emptyPath())
250+
var result = convertToSchema(marker.schema, emptyPath())
251+
252+
dsl.fillers.forEach { filler ->
253+
val paths = result.getColumnPaths(filler.columns)
254+
missingPaths.removeAll(paths)
255+
result = result.update(paths).with {
256+
filler.expr(this, this)
257+
}
258+
}
259+
260+
if (missingPaths.isNotEmpty()) {
261+
throw IllegalArgumentException("The following columns were not found in DataFrame: ${missingPaths.map { it.joinToString()}}, and their type was not nullable. Use `fill` to initialize these columns")
262+
}
263+
264+
return result
238265
}

core/src/main/kotlin/org/jetbrains/kotlinx/dataframe/impl/api/parse.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ internal open class DelegatedStringParser<T>(override val type: KType, val handl
5656
return {
5757
val str = it as String
5858
if (str in nulls) null
59-
else handle(str) ?: throw TypeConversionException(it, typeOf<String>(), type)
59+
else handle(str) ?: throw TypeConversionException(it, typeOf<String>(), type, null)
6060
}
6161
}
6262

@@ -71,7 +71,7 @@ internal class StringParserWithFormat<T>(override val type: KType, val getParser
7171
return {
7272
val str = it as String
7373
if (str in nulls) null
74-
else handler(str) ?: throw TypeConversionException(it, typeOf<String>(), type)
74+
else handler(str) ?: throw TypeConversionException(it, typeOf<String>(), type, null)
7575
}
7676
}
7777

core/src/main/kotlin/org/jetbrains/kotlinx/dataframe/impl/schema/Utils.kt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,6 @@ internal fun DataFrameSchema.createEmptyDataFrame(numberOfRows: Int): AnyFrame =
137137
}.toDataFrame()
138138
}
139139

140-
internal fun createEmptyDataFrame(numberOfRows: Int): AnyFrame =
141-
DataFrame.empty(numberOfRows)
142-
143140
@PublishedApi
144141
internal fun createEmptyDataFrameOf(clazz: KClass<*>): AnyFrame =
145142
MarkersExtractor.get(clazz).schema.createEmptyDataFrame()

0 commit comments

Comments
 (0)