-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[spark] Eliminate the De/serialization process when writing to the ap… #5159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
7e73de8 to
8d2594d
Compare
Zouxxyy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, can we remove SparkRow in the future? Actually, I noticed that Spark's DataWriter takes InternalRow as input too, SparkInternalRowWrapper is useful for integrating the v2 writer in the future.
| private def toPaimonRow(row: Row) = | ||
| new SparkRow(rowType, row, SparkRowUtils.getRowKind(row, rowKindColIdx)) | ||
| private def toPaimonRow(row: InternalRow) = | ||
| new SparkInternalRowWrapper( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use like this to reduce the cost of initialization.
SparkInternalRowWrapper wrap(row internalRow) {
this.row = internalRow;
return this;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| private final int length; | ||
| private final StructType structType; | ||
|
|
||
| public SparkInternalRowWrapper( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can only keep these in SparkInternalRowWrapper
private org.apache.spark.sql.catalyst.InternalRow internalRow;
private final StructType structType; // or paimon rowType
private final rowKindColIdx
and get FieldCount or RowKind by static method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have one concern, can we get smaller fields by getRow(int pos, int numFields) , if it's support, the numFields may not equal to the structType length. So, I kept the length field now.
| } finally { | ||
| write.close() | ||
| val schema = dataFrame.schema | ||
| dataFrame.queryExecution.toRdd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tested the performance difference with this modification for unaware bucket scenario
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I have not test this diff
| } | ||
|
|
||
| def callFunction(name: String, args: Seq[Column]): Column = { | ||
| call_udf(name, args: _*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference between this and the call_function in Spark 3.5+. It seems that new Compatibility have been added for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed, and use the call_udf
| import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback | ||
| import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType} | ||
|
|
||
| case class FixedBucketExpression(_children: Seq[Expression]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add some comments to the args
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
|
Thanks @Zouxxyy for your comments, I think we can remove the |
|
Another question to discussion here: before this PR we do not use the bucket expression, so we do not depend on the PaimonSparkExtension during write, but now we force to write with the bucket expression, so the extension is forced to use now. I think this may break the compatibility, what do you think of this cc @Zouxxyy @YannByron @JingsongLi |
|
I remember that Spark's dynamic partitioning writing seems to require configuring extensions too? cc @Zouxxyy |
|
I append a commit to support work as before when the |
The doc including |
Yes, the extension help to do some type alignment work, +1 to
But, I still think we should keep the way to work without extension, we currently use the extension optionally (eg: dynamic overwrite, call procedure). So I lean to not break the compatibility. |
Zouxxyy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall LGTM, Just need to remove unnecessary modifications.
| write.finish() | ||
| } finally { | ||
| write.close() | ||
| dataFrame |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unnecessary changes
| val inputSchema = inputDs.schema | ||
| writeWithBucketAssigner( | ||
| partitionByKey(), | ||
| inputDs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unnecessary changes
| val encoderGroupWithBucketCol = EncoderSerDeGroup(withInitBucketCol.schema) | ||
|
|
||
| def newWrite(): SparkTableWrite = new SparkTableWrite(writeBuilder, rowType, rowKindColIdx) | ||
| def newWrite(): SparkTableWrite = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
emove unnecessary changes
| .map(x => col(data.schema.fieldNames(x))) | ||
| .toSeq | ||
| val args = Seq(lit(bucketNumber)) ++ bucketKeyCol | ||
| val repartitioned = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about data.withColumn(BUCKET_COL, call_udf(BucketExpression.FIXED_BUCKET, args: _*)), so that we can use iter.foreach(row => write.write(row, row.getInt(bucketColIdx))) to avoid computing bucket id twice
Thanks @Zouxxyy for your comments, addressed all of them |
…pend bucket table (apache#5159)
…pend bucket table
Purpose
Linked issue: close #5148
Currently, we append the bucket column into the Row to shuffle, which lead to the
DeserializeToObjectandSerializeFromObjectnode, which is CPU costly.I try to
fixed_bucketfunction to directly calculate based on the original InternalRowRDD[InternalRow]to avoid to convert to Row when performing writeThe later one's performance is significant better
Tests
API and Format
Documentation