fix: add proto roundtrips for Spark tests and fix issues it surfaces#315
fix: add proto roundtrips for Spark tests and fix issues it surfaces#315vbarua merged 17 commits intosubstrait-io:mainfrom
Conversation
| // count only needs to be set when it is not -1 | ||
| builder.count(rel.getCount()); | ||
| } | ||
| var builder = Fetch.builder().input(input).offset(rel.getOffset()).count(rel.getCount()); |
There was a problem hiding this comment.
while the idea of not setting count if it's -1 is fine, this makes roundtrip tests fail if count is set in the pojo. Alternative fix is to ensure in the pojo it's never set if -1.
| val aggOutputMap = aggregates.zipWithIndex.map { | ||
| case (e, i) => | ||
| AttributeReference(s"agg_func_$i", e.dataType)() -> e | ||
| AttributeReference(s"agg_func_$i", e.dataType, nullable = e.nullable)() -> e |
There was a problem hiding this comment.
these were causing wrong nullability for the type in the created pojos. I don't think that type field is used anywhere so it didn't cause harm, but still failed roundtrip tests as the type isn't written in proto and then it got correctly evaluated from other fields on read.
spark/src/main/scala/io/substrait/spark/logical/ToSubstraitRel.scala
Outdated
Show resolved
Hide resolved
623ee12 to
e374c85
Compare
13a3b99 to
ad8c73b
Compare
b6b2307 to
e7be830
Compare
58ac64e to
4451193
Compare
2945e97 to
fb788e1
Compare
|
@vbarua @andrew-coleman this has been open for a while, but now finally ready for review! The testing change collides a bit with Andrew's #333, but either should be trivial to rebase once the other is in. |
| protected Type.Struct deriveRecordType() { | ||
| return getInputs().get(0).getRecordType(); | ||
| // The different inputs may have schemas that differ in nullability, but not in type. | ||
| // In that case we should return a schema that is nullable where any of the inputs is nullable. |
There was a problem hiding this comment.
Looking at the docs for this (https://substrait.io/relations/logical_relations/#set-operation-types), the output nullability depends on which set operation is being performed.
There was a problem hiding this comment.
yep, I realized that as well but forgot to fix 😅 I'll try to tomorrow..
2dc72e8 to
1925efc
Compare
1925efc to
f3fee70
Compare
vbarua
left a comment
There was a problem hiding this comment.
Found some time to actually look at this. Have one comment about the nullability of Scalar Subqueries, and one requests for tests for the Set output type derivation logic.
|
|
||
| // As defined in https://substrait.io/relations/logical_relations/#set-operation-types | ||
| return switch (getSetOp()) { | ||
| case UNKNOWN -> first; // alternative would be to throw an exception |
There was a problem hiding this comment.
meta: out of scope for this PR, but given that there is no default operation for when this is not specified it might make sense to not allow UKNOWN in the POJOs. That is, we should force the user to set the operation field.
| "Scalar subquery must have exactly one field"); | ||
| } | ||
| // Result can be null if the query returns no rows | ||
| return TypeCreator.asNullable(type.fields().get(0)); |
There was a problem hiding this comment.
// Result can be null if the query returns no rows
Is this actually the case? If you have a non-nullable column, but there are no values, that column is still non-nullable as far as I understand it.
In cany case, the spec indicates:
// A subquery with one row and one column. This is often an aggregate
// though not required to be.
So I think it would be safe to just use the nullability of the field as is. If there is no row returned by the subquery that's a violation of the spec as written.
There was a problem hiding this comment.
Issue was that Spark was always reporting nullable=true, which meant the from-proto didn't match the from-spark version. However this "fix" I had was wrong indeed, I made a more correct one here: 4f7f274. Thanks!
| val protoPlan = io.substrait.proto.Rel.parseFrom(bytes) | ||
| val substraitPlan2 = | ||
| new ProtoRelConverter(extensionCollector, SparkExtension.COLLECTION).from(protoPlan) | ||
| substraitPlan2.shouldEqualPlainly(substraitPlan) |
There was a problem hiding this comment.
This is a good check to have generally ✨
| case s: ScalarSubquery if s.outerAttrs.isEmpty && s.joinCond.isEmpty => | ||
| val rel = toSubstraitRel.visit(s.plan) | ||
| val t = | ||
| s.plan.schema.fields.head // Using this instead of s.dataType/s.nullable to get correct nullability |
There was a problem hiding this comment.
There's two ScalarSubquery classes in Spark, the one we're using here is https://github.com/apache/spark/blob/9fe78e3d33499060467ecdc0c2631beae0b0316c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala#L411 which always sets nullability to true. However the other one picks the nullability based on the plan's schema: https://github.com/apache/spark/blob/9fe78e3d33499060467ecdc0c2631beae0b0316c/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala#L68. I'm not very sure why there's a difference, but picking it from the plan seems to align better with what we expect here (and makes the tests pass)
There was a problem hiding this comment.
They both have override def nullable: Boolean = true (line 69 in your second link)?
There was a problem hiding this comment.
Woops indeed, I can't read 🤦. Still, I think the change here is fine nonetheless.
| int finalI = i; | ||
| boolean anyOtherIsRequired = rest.stream().anyMatch(t -> !t.fields().get(finalI).nullable()); | ||
| fields.add(anyOtherIsRequired ? TypeCreator.asNotNullable(typeA) : typeA); | ||
| } |
There was a problem hiding this comment.
Looks good. Tiny nit - both of these functions share a lot of common code (only 2 lines differ) - could they be refactored somehow? Not important though :)
There was a problem hiding this comment.
I had it before as a single function, but it ends up making it harder to read since while the difference is small, it's non-trivial :/
vbarua
left a comment
There was a problem hiding this comment.
Changes look good. Will merge them shortly.
Adds testing for substrait-spark that going from POJO (ie. substrait-java plan) -> Proto -> POJO results in the same POJO.
The test showed a bunch of cases where that assertion fails, mainly due to the java pojos containing a derived outputType which was in many cases incorrect when created from the proto.