1
1
package org.jetbrains.kotlinx.dataframe.impl.api
2
2
3
- import org.jetbrains.kotlinx.dataframe.AnyCol
4
3
import org.jetbrains.kotlinx.dataframe.AnyFrame
4
+ import org.jetbrains.kotlinx.dataframe.AnyRow
5
5
import org.jetbrains.kotlinx.dataframe.ColumnsSelector
6
6
import org.jetbrains.kotlinx.dataframe.DataColumn
7
7
import org.jetbrains.kotlinx.dataframe.DataFrame
@@ -14,6 +14,7 @@ import org.jetbrains.kotlinx.dataframe.api.Infer
14
14
import org.jetbrains.kotlinx.dataframe.api.all
15
15
import org.jetbrains.kotlinx.dataframe.api.allNulls
16
16
import org.jetbrains.kotlinx.dataframe.api.asColumnGroup
17
+ import org.jetbrains.kotlinx.dataframe.api.concat
17
18
import org.jetbrains.kotlinx.dataframe.api.convertTo
18
19
import org.jetbrains.kotlinx.dataframe.api.emptyDataFrame
19
20
import org.jetbrains.kotlinx.dataframe.api.getColumnPaths
@@ -24,11 +25,12 @@ import org.jetbrains.kotlinx.dataframe.api.toDataFrame
24
25
import org.jetbrains.kotlinx.dataframe.api.update
25
26
import org.jetbrains.kotlinx.dataframe.api.with
26
27
import org.jetbrains.kotlinx.dataframe.codeGen.MarkersExtractor
28
+ import org.jetbrains.kotlinx.dataframe.columns.ColumnGroup
27
29
import org.jetbrains.kotlinx.dataframe.columns.ColumnKind
28
30
import org.jetbrains.kotlinx.dataframe.columns.ColumnPath
31
+ import org.jetbrains.kotlinx.dataframe.columns.FrameColumn
29
32
import org.jetbrains.kotlinx.dataframe.exceptions.ExcessiveColumnsException
30
33
import org.jetbrains.kotlinx.dataframe.exceptions.TypeConversionException
31
- import org.jetbrains.kotlinx.dataframe.impl.columns.asAnyFrameColumn
32
34
import org.jetbrains.kotlinx.dataframe.impl.emptyPath
33
35
import org.jetbrains.kotlinx.dataframe.impl.schema.createEmptyColumn
34
36
import org.jetbrains.kotlinx.dataframe.impl.schema.createEmptyDataFrame
@@ -107,8 +109,8 @@ internal fun AnyFrame.convertToImpl(
107
109
108
110
val visited = mutableSetOf<String >()
109
111
val newColumns = columns().mapNotNull { originalColumn ->
110
- val targetColumn = schema.columns[originalColumn.name()]
111
- if (targetColumn == null ) {
112
+ val targetSchema = schema.columns[originalColumn.name()]
113
+ if (targetSchema == null ) {
112
114
when (excessiveColumns) {
113
115
ExcessiveColumns .Fail -> throw ExcessiveColumnsException (listOf (originalColumn.name))
114
116
ExcessiveColumns .Keep -> originalColumn
@@ -118,13 +120,13 @@ internal fun AnyFrame.convertToImpl(
118
120
visited.add(originalColumn.name())
119
121
val currentSchema = originalColumn.extractSchema()
120
122
when {
121
- targetColumn == currentSchema -> originalColumn
123
+ targetSchema == currentSchema -> originalColumn
122
124
123
125
! allowConversion -> {
124
126
val originalSchema = mapOf (originalColumn.name to currentSchema)
125
127
.render(0 , StringBuilder (), " \t " )
126
128
127
- val targetSchema = mapOf (originalColumn.name to targetColumn )
129
+ val targetSchema = mapOf (originalColumn.name to targetSchema )
128
130
.render(0 , StringBuilder (), " \t " )
129
131
130
132
throw IllegalArgumentException (" Column has schema:\n $originalSchema \n that differs from target schema:\n $targetSchema " )
@@ -135,26 +137,31 @@ internal fun AnyFrame.convertToImpl(
135
137
136
138
// try to perform any user-specified conversions first
137
139
val from = originalColumn.type()
138
- val to = targetColumn .type
139
- val converter = dsl.getConverter(from, targetColumn )
140
+ val to = targetSchema .type
141
+ val converter = dsl.getConverter(from, targetSchema )
140
142
141
143
val convertedColumn = if (converter != null ) {
142
144
val nullsAllowed = to.isMarkedNullable
143
145
originalColumn.map(to, Infer .Nulls ) {
144
146
val result =
145
147
if (it != null || ! converter.skipNulls) {
146
- converter.transform(ConverterScope (from, targetColumn ), it)
148
+ converter.transform(ConverterScope (from, targetSchema ), it)
147
149
} else {
148
150
it
149
151
}
150
152
151
- if (! nullsAllowed && result == null ) throw TypeConversionException (it, from, to, originalColumn.path())
153
+ if (! nullsAllowed && result == null ) throw TypeConversionException (
154
+ it,
155
+ from,
156
+ to,
157
+ originalColumn.path()
158
+ )
152
159
153
160
result
154
161
}
155
162
} else null
156
163
157
- when (targetColumn .kind) {
164
+ when (targetSchema .kind) {
158
165
ColumnKind .Value ->
159
166
convertedColumn ? : originalColumn.convertTo(to)
160
167
@@ -187,37 +194,43 @@ internal fun AnyFrame.convertToImpl(
187
194
DataColumn .createColumnGroup(
188
195
name = column.name(),
189
196
df = columnGroup.convertToSchema(
190
- schema = (targetColumn as ColumnSchema .Group ).schema,
197
+ schema = (targetSchema as ColumnSchema .Group ).schema,
191
198
path = columnPath,
192
199
),
193
200
)
194
201
}
195
202
196
203
ColumnKind .Frame -> {
197
204
val column = convertedColumn ? : originalColumn
198
-
199
- // perform any patches if needed to be able to convert a column to a frame column
200
- val patchedOriginalColumn: AnyCol = when {
201
- // a value column of AnyFrame? (or nulls) can be converted to a frame column by making nulls empty dataframes
202
- column.kind == ColumnKind .Value && column.all { it is AnyFrame ? } -> {
203
- column
204
- .map { (it ? : emptyDataFrame<Any ?>()) as AnyFrame }
205
- .convertTo<AnyFrame >()
205
+ val frameSchema = (targetSchema as ColumnSchema .Frame ).schema
206
+
207
+ val frames = when (column.kind) {
208
+ ColumnKind .Frame ->
209
+ (column as FrameColumn <* >).values()
210
+
211
+ ColumnKind .Value -> {
212
+ require(column.all { it == null || it is AnyFrame || (it is List <* > && it.all { it is AnyRow ? }) }) {
213
+ " Column `${column.name} ` is ValueColumn and contains objects that can not be converted into `DataFrame`"
214
+ }
215
+ column.values().map {
216
+ when (it) {
217
+ null -> emptyDataFrame()
218
+ is AnyFrame -> it
219
+ else -> (it as List <AnyRow ?>).concat()
220
+ }
221
+ }
206
222
}
207
223
208
- else -> column
224
+ ColumnKind .Group -> {
225
+ (column as ColumnGroup <* >).values().map { it.toDataFrame() }
226
+ }
209
227
}
210
228
211
- require(patchedOriginalColumn.kind == ColumnKind .Frame ) {
212
- " Column `${patchedOriginalColumn.name} ` is ${patchedOriginalColumn.kind} Column and can not be converted to `FrameColumn`"
213
- }
214
- val frameColumn = patchedOriginalColumn.asAnyFrameColumn()
215
- val frameSchema = (targetColumn as ColumnSchema .Frame ).schema
216
- val frames = frameColumn.values().map { it.convertToSchema(frameSchema, columnPath) }
229
+ val convertedFrames = frames.map { it.convertToSchema(frameSchema, columnPath)}
217
230
218
231
DataColumn .createFrameColumn(
219
- name = patchedOriginalColumn .name(),
220
- groups = frames ,
232
+ name = column .name(),
233
+ groups = convertedFrames ,
221
234
schema = lazy { frameSchema },
222
235
)
223
236
}
@@ -259,7 +272,7 @@ internal fun AnyFrame.convertToImpl(
259
272
}
260
273
261
274
if (missingPaths.isNotEmpty()) {
262
- throw IllegalArgumentException (" The following columns were not found in DataFrame: ${missingPaths.map { it.joinToString()}} , and their type was not nullable. Use `fill` to initialize these columns" )
275
+ throw IllegalArgumentException (" The following columns were not found in DataFrame: ${missingPaths.map { it.joinToString() }} , and their type was not nullable. Use `fill` to initialize these columns" )
263
276
}
264
277
265
278
return result
0 commit comments