-
Notifications
You must be signed in to change notification settings - Fork 976
[KYUUBI #7245] Fix arrow batch converter error #7246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@echo567, please keep the PR template and fill in seriously, especially "Was this patch authored or co-authored using generative AI tooling?", it does matter for legal purposes. |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #7246 +/- ##
======================================
Coverage 0.00% 0.00%
======================================
Files 696 696
Lines 43530 43528 -2
Branches 5883 5881 -2
======================================
+ Misses 43530 43528 -2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Sorry, the changes have been made. |
|
The code is copied from Spark, seems it was changed at SPARK-44657. Can we just follow that? |
okay,I made modifications based on this Spark issue |
|
hi I've merged the code from the latest master branch. Is there anything else I need to change? |
|
@cfmcgrady, do you want to have another look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes a critical bug where the spark.connect.grpc.arrow.maxBatchSize configuration was not being respected when using kyuubi.operation.result.format=arrow, leading to memory overflow issues and slow data transfer when processing large datasets without explicit limits.
Key changes:
- Fixed incorrect byte unit conversion in
SparkDatasetHelperthat was treating 4 MiB as 4 bytes - Refactored batch size limit checking logic in
KyuubiArrowConvertersto properly enforce limits even when no global row limit is set - Introduced clear helper methods to improve code readability and maintainability
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala | Fixes critical bug in maxBatchSize calculation by correctly converting "4m" to 4194304 bytes instead of 4 |
| externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala | Refactors batch iteration logic to properly enforce batch size and record count limits, replacing complex conditional logic with clear helper methods |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // If either limit is hit, create a batch. This implies that the limit that is hit | ||
| // first triggers the creation of a batch even if the other limit is not yet hit | ||
| // hence preferring the more restrictive limit. |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states "If either limit is hit, create a batch" but the logic actually continues the loop when neither limit is exceeded. The comment should be clarified to say "Continue adding rows to the batch until either limit is exceeded" or similar wording to accurately reflect the loop continuation condition rather than batch creation trigger.
| // If either limit is hit, create a batch. This implies that the limit that is hit | |
| // first triggers the creation of a batch even if the other limit is not yet hit | |
| // hence preferring the more restrictive limit. | |
| // Continue adding rows to the current batch until either limit is exceeded. | |
| // The limit that is reached first determines the batch boundary, even if the | |
| // other limit has not yet been reached, thus preferring the more restrictive limit. |
### Why are the changes needed?
Control the amount of data to prevent memory overflow and increase to initial speed.
When `kyuubi.operation.result.format=arrow`, `spark.connect.grpc.arrow.maxBatchSize` does not work as expected.
Reproduction:
You can debug `KyuubiArrowConverters` or add the following log to line 300 of `KyuubiArrowConverters`:
```
logInfo(s"Total limit: ${limit}, rowCount: ${rowCount}, " +
s"rowCountInLastBatch:${rowCountInLastBatch}," +
s"estimatedBatchSize: ${estimatedBatchSize}," +
s"maxEstimatedBatchSize: ${maxEstimatedBatchSize}," +
s"maxRecordsPerBatch:${maxRecordsPerBatch}")
```
Test data: 1.6 million rows, 30 columns per row. Command executed:
```
bin/beeline \
-u 'jdbc:hive2://10.168.X.X:XX/default;thrift.client.max.message.size=2000000000' \
--hiveconf kyuubi.operation.result.format=arrow \
-n test -p 'testpass' \
--outputformat=csv2 -e "select * from db.table" > /tmp/test.csv
```
Log output
```
25/11/13 13:52:57 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 200000, lastBatchRowCount:200000, estimatedBatchSize: 145600000 maxEstimatedBatchSize: 4,maxRecordsPerBatch:10000
25/11/13 13:52:57 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 200000, lastBatchRowCount:200000, estimatedBatchSize: 145600000
```
Original Code
```
while (rowIter.hasNext && (
rowCountInLastBatch == 0 && maxEstimatedBatchSize > 0 ||
estimatedBatchSize <= 0 ||
estimatedBatchSize < maxEstimatedBatchSize ||
maxRecordsPerBatch <= 0 ||
rowCountInLastBatch < maxRecordsPerBatch ||
rowCount < limit ||
limit < 0))
```
When the `limit` is not set, i.e., `-1`, all data will be retrieved at once. If the row count is too large, the following three problems will occur:
(1) Driver/executor oom
(2) Array oom cause of array length is not enough
(3) Transfer data slowly
After updating the code, the log output is as follows:
```
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 5762, rowCountInLastBatch:5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch:10000
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 11524, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 17286, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000
```
The estimatedBatchSize is slightly larger than the maxEstimatedBatchSize. Data can be written in batches as expected.
Fix #7245.
### How was this patch tested?
Test data: 1.6 million rows, 30 columns per row.
```
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 5762, rowCountInLastBatch:5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch:10000
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 11524, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 17286, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #7246 from echo567/fix-arrow-converter.
Closes #7245
6ef4ef1 [echo567] Merge branch 'master' into fix-arrow-converter
c9d0d18 [echo567] fix(arrow): repairing arrow based on spark
479d7e4 [echo567] fix(spark): fix arrow batch converter error
Authored-by: echo567 <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit acdb6a3)
Signed-off-by: Cheng Pan <[email protected]>
### Why are the changes needed?
Control the amount of data to prevent memory overflow and increase to initial speed.
When `kyuubi.operation.result.format=arrow`, `spark.connect.grpc.arrow.maxBatchSize` does not work as expected.
Reproduction:
You can debug `KyuubiArrowConverters` or add the following log to line 300 of `KyuubiArrowConverters`:
```
logInfo(s"Total limit: ${limit}, rowCount: ${rowCount}, " +
s"rowCountInLastBatch:${rowCountInLastBatch}," +
s"estimatedBatchSize: ${estimatedBatchSize}," +
s"maxEstimatedBatchSize: ${maxEstimatedBatchSize}," +
s"maxRecordsPerBatch:${maxRecordsPerBatch}")
```
Test data: 1.6 million rows, 30 columns per row. Command executed:
```
bin/beeline \
-u 'jdbc:hive2://10.168.X.X:XX/default;thrift.client.max.message.size=2000000000' \
--hiveconf kyuubi.operation.result.format=arrow \
-n test -p 'testpass' \
--outputformat=csv2 -e "select * from db.table" > /tmp/test.csv
```
Log output
```
25/11/13 13:52:57 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 200000, lastBatchRowCount:200000, estimatedBatchSize: 145600000 maxEstimatedBatchSize: 4,maxRecordsPerBatch:10000
25/11/13 13:52:57 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 200000, lastBatchRowCount:200000, estimatedBatchSize: 145600000
```
Original Code
```
while (rowIter.hasNext && (
rowCountInLastBatch == 0 && maxEstimatedBatchSize > 0 ||
estimatedBatchSize <= 0 ||
estimatedBatchSize < maxEstimatedBatchSize ||
maxRecordsPerBatch <= 0 ||
rowCountInLastBatch < maxRecordsPerBatch ||
rowCount < limit ||
limit < 0))
```
When the `limit` is not set, i.e., `-1`, all data will be retrieved at once. If the row count is too large, the following three problems will occur:
(1) Driver/executor oom
(2) Array oom cause of array length is not enough
(3) Transfer data slowly
After updating the code, the log output is as follows:
```
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 5762, rowCountInLastBatch:5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch:10000
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 11524, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 17286, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000
```
The estimatedBatchSize is slightly larger than the maxEstimatedBatchSize. Data can be written in batches as expected.
Fix #7245.
### How was this patch tested?
Test data: 1.6 million rows, 30 columns per row.
```
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 5762, rowCountInLastBatch:5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch:10000
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 11524, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000
25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 17286, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #7246 from echo567/fix-arrow-converter.
Closes #7245
6ef4ef1 [echo567] Merge branch 'master' into fix-arrow-converter
c9d0d18 [echo567] fix(arrow): repairing arrow based on spark
479d7e4 [echo567] fix(spark): fix arrow batch converter error
Authored-by: echo567 <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit acdb6a3)
Signed-off-by: Cheng Pan <[email protected]>
|
thanks, merged to master/1.11/1.10 |
Why are the changes needed?
Control the amount of data to prevent memory overflow and increase to initial speed.
When
kyuubi.operation.result.format=arrow,spark.connect.grpc.arrow.maxBatchSizedoes not work as expected.Reproduction:
You can debug
KyuubiArrowConvertersor add the following log to line 300 ofKyuubiArrowConverters:Test data: 1.6 million rows, 30 columns per row. Command executed:
Log output
Original Code
When the
limitis not set, i.e.,-1, all data will be retrieved at once. If the row count is too large, the following three problems will occur:(1) Driver/executor oom
(2) Array oom cause of array length is not enough
(3) Transfer data slowly
After updating the code, the log output is as follows:
The estimatedBatchSize is slightly larger than the maxEstimatedBatchSize. Data can be written in batches as expected.
Fix #7245.
How was this patch tested?
Test data: 1.6 million rows, 30 columns per row.
Was this patch authored or co-authored using generative AI tooling?
No