Skip to content

Conversation

@zhongyujiang
Copy link
Contributor

Purpose

Linked issue: part of #4816, split out from #5241

Add the Paimon bucket function transform in the Paimon-Spark connector.
It can be used to request data clustering based on the Paimon bucket transform from DataSource V2 when integrating V2 write: use case

Tests

BucketFunctionTest

API and Format

Documentation

Copy link
Contributor Author

@zhongyujiang zhongyujiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @Zouxxyy @YannByron Can you please take a review when you have time? Thanks!

builder.put("BucketLongString", BucketLongString.class);
builder.put("BucketStringInteger", BucketStringInteger.class);
builder.put("BucketStringLong", BucketStringLong.class);
builder.put("BucketStringString", BucketStringString.class);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For primitive types and common composite types, I added separate bucket function classs to improve performance, while BucketGeneric handles other types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested the performance of this implementation? I think the current approach is too specialized, and the data conversion involving Spark and Paimon doesn't reuse the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/zhongyujiang/incubator-paimon/tree/gh/spark-v2-write_function-benchmark

Benchmark                                            Mode  Cnt  Score   Error  Units
PaimonBucketFunctionBenchmark.bucketFunction           ss    5  0.416 ± 0.033   s/op
PaimonBucketFunctionBenchmark.genericBucketFunction    ss    5  0.685 ± 0.302   s/op

Here is a JMH benchmark result.
I think this optimization has limited impact on write performance, since a significant amount of time is spent on network and compression during data writing. However, there is indeed a noticeable performance difference between the two implementations.
This part of the code can't be reused since the invoke method takes different input types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the difference is small. I added a benchmark #5418.
Will test the performance of the existing bucket function and this PR's, including the function and e2e write cost.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only compare these two function implementations, you can see from the results above that there's a clear difference 0.685 / 0.416 = 1.64.
In real write scenarios, I agree that the impact of this optimization is very minimal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, I’ve already added a benchmark comparison between the v2 write and the previous v1 write here, the results is attached in the PR too. But it didn’t include the newly added v1 bucket function optimization, I think the benchmark results from JMH would be more reliable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhongyujiang The functions of fixed_bucket() and bucket() are the same, so it is reasonable to keep only one (Keep bucket because it will be used in bucket join and so on)

I tested it and found that the current implementation is indeed faster, but the e2e time is about the same. But the current implementation introduces some duplicate code, such as the conversion between spark type and paimon type, as well as some proprietary code, which I think will be difficult to maintain in the future.

I suggest moving the logic of fixed_bucket to bucket, then remove fixed_bucket, WDYT

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me update this.


if (type instanceof org.apache.paimon.types.TimestampType) {
return ((org.apache.paimon.types.TimestampType) type).getPrecision()
== SPARK_TIMESTAMP_PRECISION;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Spark Timestamp type is in microseconds and does not allow custom precision. When binding the Paimon bucket function, Paimon’s type information is not available. Therefore, support is only possible when the Paimon timestamp precision is equal to the Spark timestamp precision.

@zhongyujiang
Copy link
Contributor Author

@Zouxxyy @YannByron Hi, can you please help review this when you have time? Thanks.

@Aitozi
Copy link
Contributor

Aitozi commented Apr 15, 2025

Hi, I'm curious what's the different between the FixedBucketExpression vs BucketFunction. I just try to know why we have to use the BoundFunction not the Expression here ?

@zhongyujiang
Copy link
Contributor Author

Hi, I'm curious what's the different between the FixedBucketExpression vs BucketFunction. I just try to know why we have to use the BoundFunction not the Expression here ?

@Aitozi Yea, Zouxxyy also mentioned this. I think when I pulled this PR, FixedBucketExpression hadn't been merged yet, so I added BucketFunction. I'll update this PR today.

@zhongyujiang zhongyujiang force-pushed the gh/spark-bucket-functions branch from f468d15 to 0122c24 Compare April 16, 2025 08:45
Copy link
Contributor Author

@zhongyujiang zhongyujiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated to resolve the comments above, please take a look when you have time, cc @Aitozi @Zouxxyy Thanks!

private transient BinaryRowWriter reuseWriter;

public InternalRowSerializer(RowType rowType, boolean isBucketKeySerializer) {
this(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bucket key projection use different write logic for non-compact type nulls: ref, we cannot use InternalRowSerializer for bucket key serialization, so added this.

*
* params arg0: bucket number, arg1...argn bucket keys.
*/
class BucketFunction extends UnboundFunction {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from master...Zouxxyy:incubator-paimon:dev/replace-bucket-function, except that the change where the input type passed to SparkInternalRowWrapper was changed from bucketKeyStructType to inputType, since the type of the Spark InternalRow wrapped by the wrapper is inputType (which includes the NUM_BUCKETS column).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have removed the NUM_BUCKETS col via ProjectedRow.from(mapping), so just pass bucketKeyStructType here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Zouxxyy, what I meant is that when creating the SparkInternalRowWrapper, we should pass in inputType instead of bucketKeyStructType, because the input row passed to this wrapper includes the NUM_BUCKETS column.
For InternalRowSerializer, we still pass in bucketKeyStructType, since as you mentioned, we’ve already removed NUM_BUCKETS using ProjectedRow.

image

Copy link
Contributor

@Zouxxyy Zouxxyy Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, make sense, thanks!

@zhongyujiang zhongyujiang force-pushed the gh/spark-bucket-functions branch from 0122c24 to 8b4ae0c Compare April 18, 2025 05:43
@zhongyujiang
Copy link
Contributor Author

@Zouxxyy rebased, please take another look when you have time, thanks!

Copy link
Contributor

@Zouxxyy Zouxxyy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, thanks

@Zouxxyy Zouxxyy merged commit b448244 into apache:master Apr 18, 2025
20 checks passed
@zhongyujiang zhongyujiang deleted the gh/spark-bucket-functions branch May 6, 2025 13:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants