feat(spark): support Struct type and literal, and include names for struct fields#342
Conversation
| ImmutableRoot | ||
| .builder() | ||
| .input(rel) | ||
| .addAllNames(ToSubstraitType.toNamedStruct(p.schema).names()) |
There was a problem hiding this comment.
This changes the produced plans to include DFS names in the root
|
|
||
| private def buildNamedScan(schema: StructType, tableNames: List[String]): relation.NamedScan = { | ||
| val namedStruct = toNamedStruct(schema) | ||
| val namedStruct = ToSubstraitType.toNamedStruct(schema) |
There was a problem hiding this comment.
Just being more explicit here
| NamedStruct.of(names, struct) | ||
| } | ||
|
|
||
| def toStructType(namedStruct: NamedStruct): StructType = { |
There was a problem hiding this comment.
these were previously inside ToSubstraitType, which is confusing given they return Spark types. Now they're moved into the ToSparkType object above, but also rewritten to use the newly-available struct type code.
(We should also split this file, but I'd rather do that as FLUP to keep the diff clean)
96b721c to
eec9d21
Compare
| typeExpression.accept(new ToSparkType(Seq.empty)) | ||
| } | ||
|
|
||
| def toStructType(namedStruct: NamedStruct): StructType = { |
There was a problem hiding this comment.
moved from below, see comment below
| .map { case ((t, d), name) => StructField(name, d, t.nullable()) } | ||
| ) | ||
| def toNamedStruct(schema: StructType): NamedStruct = { | ||
| val dfsNames = JavaConverters.seqAsJavaList(fieldNamesDfs(schema)) |
There was a problem hiding this comment.
this changes anything using toNamedStruct to produce the full list of names rather than only top-level
| def seqToOptionHelper(s: Seq[Option[T]], accum: Seq[T] = Seq[T]()): Option[Seq[T]] = { | ||
| s match { | ||
| case Some(head) :: Nil => | ||
| case Seq(Some(head)) => |
There was a problem hiding this comment.
I'm not sure why but the earlier code just wasn't working
eec9d21 to
36d3b20
Compare
|
@andrew-coleman @vbarua looking for your review for this :) Note that this is a breaking change since the currently produced plans aren't valid Substrait - they lack the inner names, this adds those, but that changes the meaning of the names-fields. For Andrew: I saw you had added a bit of struct handling in your PR, this is compatible with that but adds the rest of the handling as well. |
| require(nameIdx < dfsNames.size) | ||
| val n = dfsNames(nameIdx) | ||
| nameIdx += 1 | ||
| n |
There was a problem hiding this comment.
| require(nameIdx < dfsNames.size) | |
| val n = dfsNames(nameIdx) | |
| nameIdx += 1 | |
| n | |
| dfsNames(i) |
Won't nameIdx always be the same i? If so, no need to increment a separate counter.
Incrementing an object-scoped counter assumes that the .map is behaving like a foreach - i.e. guaranteed order of execution, but I think (in theory, at least) the map could execute its functions concurrently (not sure it does, in practice), just assembling the results back into the original sequence.
There was a problem hiding this comment.
Looking at this further, I guess it's doing this because it's doing a depth-first traversal of a potentially nested struct.
spark/src/test/scala/io/substrait/spark/SubstraitPlanTestBase.scala
Outdated
Show resolved
Hide resolved
vbarua
left a comment
There was a problem hiding this comment.
Made a brief skim, and overall looks reasonable to me.
Thanks for looking at this as well @andrew-coleman, I think you're more familiar with the Spark side as well.
BREAKING CHANGE: Plan root's "names" field now includes nested names in addition to the top field names. The new behavior is how Substrait spec defines the names, but this change breaks compatibility of existing plans containing struct fields since the names list will not match.