Skip to content

Commit 8ec6539

Browse files
echo567pan3793
authored andcommitted
[KYUUBI #7245] Fix arrow batch converter error
### Why are the changes needed? Control the amount of data to prevent memory overflow and increase to initial speed. When `kyuubi.operation.result.format=arrow`, `spark.connect.grpc.arrow.maxBatchSize` does not work as expected. Reproduction: You can debug `KyuubiArrowConverters` or add the following log to line 300 of `KyuubiArrowConverters`: ``` logInfo(s"Total limit: ${limit}, rowCount: ${rowCount}, " + s"rowCountInLastBatch:${rowCountInLastBatch}," + s"estimatedBatchSize: ${estimatedBatchSize}," + s"maxEstimatedBatchSize: ${maxEstimatedBatchSize}," + s"maxRecordsPerBatch:${maxRecordsPerBatch}") ``` Test data: 1.6 million rows, 30 columns per row. Command executed: ``` bin/beeline \ -u 'jdbc:hive2://10.168.X.X:XX/default;thrift.client.max.message.size=2000000000' \ --hiveconf kyuubi.operation.result.format=arrow \ -n test -p 'testpass' \ --outputformat=csv2 -e "select * from db.table" > /tmp/test.csv ``` Log output ``` 25/11/13 13:52:57 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 200000, lastBatchRowCount:200000, estimatedBatchSize: 145600000 maxEstimatedBatchSize: 4,maxRecordsPerBatch:10000 25/11/13 13:52:57 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 200000, lastBatchRowCount:200000, estimatedBatchSize: 145600000 ``` Original Code ``` while (rowIter.hasNext && ( rowCountInLastBatch == 0 && maxEstimatedBatchSize > 0 || estimatedBatchSize <= 0 || estimatedBatchSize < maxEstimatedBatchSize || maxRecordsPerBatch <= 0 || rowCountInLastBatch < maxRecordsPerBatch || rowCount < limit || limit < 0)) ``` When the `limit` is not set, i.e., `-1`, all data will be retrieved at once. If the row count is too large, the following three problems will occur: (1) Driver/executor oom (2) Array oom cause of array length is not enough (3) Transfer data slowly After updating the code, the log output is as follows: ``` 25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 5762, rowCountInLastBatch:5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch:10000 25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 11524, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000 25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 17286, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000 ``` The estimatedBatchSize is slightly larger than the maxEstimatedBatchSize. Data can be written in batches as expected. Fix #7245. ### How was this patch tested? Test data: 1.6 million rows, 30 columns per row. ``` 25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 5762, rowCountInLastBatch:5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch:10000 25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 11524, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000 25/11/14 10:57:16 INFO KyuubiArrowConverters: Total limit: -1, rowCount: 17286, rowCountInLastBatch: 5762, estimatedBatchSize: 4194736, maxEstimatedBatchSize: 4194304, maxRecordsPerBatch: 10000 ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #7246 from echo567/fix-arrow-converter. Closes #7245 6ef4ef1 [echo567] Merge branch 'master' into fix-arrow-converter c9d0d18 [echo567] fix(arrow): repairing arrow based on spark 479d7e4 [echo567] fix(spark): fix arrow batch converter error Authored-by: echo567 <[email protected]> Signed-off-by: Cheng Pan <[email protected]> (cherry picked from commit acdb6a3) Signed-off-by: Cheng Pan <[email protected]>
1 parent 3667026 commit 8ec6539

File tree

2 files changed

+23
-13
lines changed

2 files changed

+23
-13
lines changed

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -274,19 +274,28 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
274274
var estimatedBatchSize = 0L
275275
Utils.tryWithSafeFinally {
276276

277+
def isBatchSizeLimitExceeded: Boolean = {
278+
// If `maxEstimatedBatchSize` is zero or negative, it implies unlimited.
279+
maxEstimatedBatchSize > 0 && estimatedBatchSize >= maxEstimatedBatchSize
280+
}
281+
def isRecordLimitExceeded: Boolean = {
282+
// If `maxRecordsPerBatch` is zero or negative, it implies unlimited.
283+
maxRecordsPerBatch > 0 && rowCountInLastBatch >= maxRecordsPerBatch
284+
}
285+
def isGlobalLimitNotReached: Boolean = {
286+
// If the limit is negative, it means no restriction
287+
// or the current number of rows has not reached the limit.
288+
rowCount < limit || limit < 0
289+
}
290+
277291
// Always write the first row.
278-
while (rowIter.hasNext && (
279-
// For maxBatchSize and maxRecordsPerBatch, respect whatever smaller.
292+
while (rowIter.hasNext && isGlobalLimitNotReached && (
280293
// If the size in bytes is positive (set properly), always write the first row.
281-
rowCountInLastBatch == 0 && maxEstimatedBatchSize > 0 ||
282-
// If the size in bytes of rows are 0 or negative, unlimit it.
283-
estimatedBatchSize <= 0 ||
284-
estimatedBatchSize < maxEstimatedBatchSize ||
285-
// If the size of rows are 0 or negative, unlimit it.
286-
maxRecordsPerBatch <= 0 ||
287-
rowCountInLastBatch < maxRecordsPerBatch ||
288-
rowCount < limit ||
289-
limit < 0)) {
294+
rowCountInLastBatch == 0 ||
295+
// If either limit is hit, create a batch. This implies that the limit that is hit
296+
// first triggers the creation of a batch even if the other limit is not yet hit
297+
// hence preferring the more restrictive limit.
298+
(!isBatchSizeLimitExceeded && !isRecordLimitExceeded))) {
290299
val row = rowIter.next()
291300
arrowWriter.write(row)
292301
estimatedBatchSize += (row match {

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,9 @@ object SparkDatasetHelper extends Logging {
164164
KyuubiSparkUtil.globalSparkContext
165165
.getConf
166166
.getOption("spark.connect.grpc.arrow.maxBatchSize")
167-
.orElse(Option("4m"))
168-
.map(JavaUtils.byteStringAs(_, ByteUnit.MiB))
167+
// 4m
168+
.orElse(Option("4194304b"))
169+
.map(JavaUtils.byteStringAs(_, ByteUnit.BYTE))
169170
.get
170171
}
171172

0 commit comments

Comments
 (0)