Skip to content

Commit 5ac7d92

Browse files
authored
fix: translate missing or corrupt file exceptions, fall back if asked to ignore (#1765)
* CometNativeException handling in NativeUtil. Passes Spark SQL test "SPARK-16337 temporary view refresh" now. * Matches data corruption error. * Fixes when ignore is set in SQLConf. * Checkpoint with falling back for ignoreCorruptFiles or ignoreMissingFiles.
1 parent 908ae9a commit 5ac7d92

File tree

8 files changed

+146
-38
lines changed

8 files changed

+146
-38
lines changed

dev/diffs/3.4.3.diff

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -452,14 +452,14 @@ index a6b295578d6..91acca4306f 100644
452452

453453
test("SPARK-35884: Explain Formatted") {
454454
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
455-
index 2796b1cf154..be7078b38f4 100644
455+
index 2796b1cf154..4816349d690 100644
456456
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
457457
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
458458
@@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT}
459459
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterThan, Literal}
460460
import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt}
461461
import org.apache.spark.sql.catalyst.plans.logical.Filter
462-
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec, CometSortMergeJoinExec}
462+
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec}
463463
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
464464
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
465465
import org.apache.spark.sql.execution.datasources.FilePartition
@@ -487,11 +487,12 @@ index 2796b1cf154..be7078b38f4 100644
487487
}
488488
assert(fileScan.nonEmpty)
489489
assert(fileScan.get.partitionFilters.isEmpty)
490-
@@ -1100,6 +1104,8 @@ class FileBasedDataSourceSuite extends QueryTest
490+
@@ -1100,6 +1104,9 @@ class FileBasedDataSourceSuite extends QueryTest
491491
val filters = df.queryExecution.executedPlan.collect {
492492
case f: FileSourceScanLike => f.dataFilters
493493
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
494494
+ case b: CometScanExec => b.dataFilters
495+
+ case b: CometNativeScanExec => b.dataFilters
495496
+ case b: CometBatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
496497
}.flatten
497498
assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L))))
@@ -940,7 +941,7 @@ index 18123a4d6ec..fbe4c766eee 100644
940941

941942
test("non-matching optional group") {
942943
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
943-
index 75eabcb96f2..36e3318ad7e 100644
944+
index 75eabcb96f2..7c0bbd71551 100644
944945
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
945946
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
946947
@@ -21,10 +21,11 @@ import scala.collection.mutable.ArrayBuffer

dev/diffs/3.5.4.diff

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -570,46 +570,64 @@ index a206e97c353..fea1149b67d 100644
570570

571571
test("SPARK-35884: Explain Formatted") {
572572
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
573-
index 93275487f29..d18ab7b20c0 100644
573+
index 93275487f29..01e5c601763 100644
574574
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
575575
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
576-
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
576+
@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption}
577+
578+
import scala.collection.mutable
579+
580+
+import org.apache.comet.CometConf
581+
import org.apache.hadoop.conf.Configuration
582+
import org.apache.hadoop.fs.{LocalFileSystem, Path}
583+
584+
@@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
577585
import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt}
578586
import org.apache.spark.sql.catalyst.plans.logical.Filter
579587
import org.apache.spark.sql.catalyst.types.DataTypeUtils
580-
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec, CometSortMergeJoinExec}
588+
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec}
581589
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
582590
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
583591
import org.apache.spark.sql.execution.datasources.FilePartition
584-
@@ -955,6 +956,7 @@ class FileBasedDataSourceSuite extends QueryTest
592+
@@ -250,6 +252,8 @@ class FileBasedDataSourceSuite extends QueryTest
593+
case "" => "_LEGACY_ERROR_TEMP_2062"
594+
case _ => "_LEGACY_ERROR_TEMP_2055"
595+
}
596+
+ // native_datafusion Parquet scan cannot throw a SparkFileNotFoundException
597+
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
598+
checkErrorMatchPVals(
599+
exception = intercept[SparkException] {
600+
testIgnoreMissingFiles(options)
601+
@@ -955,6 +959,7 @@ class FileBasedDataSourceSuite extends QueryTest
585602
assert(bJoinExec.isEmpty)
586603
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
587604
case smJoin: SortMergeJoinExec => smJoin
588605
+ case smJoin: CometSortMergeJoinExec => smJoin
589606
}
590607
assert(smJoinExec.nonEmpty)
591608
}
592-
@@ -1015,6 +1017,7 @@ class FileBasedDataSourceSuite extends QueryTest
609+
@@ -1015,6 +1020,7 @@ class FileBasedDataSourceSuite extends QueryTest
593610

594611
val fileScan = df.queryExecution.executedPlan collectFirst {
595612
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
596613
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _) => f
597614
}
598615
assert(fileScan.nonEmpty)
599616
assert(fileScan.get.partitionFilters.nonEmpty)
600-
@@ -1056,6 +1059,7 @@ class FileBasedDataSourceSuite extends QueryTest
617+
@@ -1056,6 +1062,7 @@ class FileBasedDataSourceSuite extends QueryTest
601618

602619
val fileScan = df.queryExecution.executedPlan collectFirst {
603620
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
604621
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _) => f
605622
}
606623
assert(fileScan.nonEmpty)
607624
assert(fileScan.get.partitionFilters.isEmpty)
608-
@@ -1240,6 +1244,8 @@ class FileBasedDataSourceSuite extends QueryTest
625+
@@ -1240,6 +1247,9 @@ class FileBasedDataSourceSuite extends QueryTest
609626
val filters = df.queryExecution.executedPlan.collect {
610627
case f: FileSourceScanLike => f.dataFilters
611628
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
612629
+ case b: CometScanExec => b.dataFilters
630+
+ case b: CometNativeScanExec => b.dataFilters
613631
+ case b: CometBatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
614632
}.flatten
615633
assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L))))
@@ -1071,7 +1089,7 @@ index fa1a64460fc..1d2e215d6a3 100644
10711089

10721090
test("non-matching optional group") {
10731091
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
1074-
index 260c992f1ae..b9d8e22337c 100644
1092+
index 260c992f1ae..6297c071b19 100644
10751093
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
10761094
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
10771095
@@ -22,10 +22,11 @@ import scala.collection.mutable.ArrayBuffer

dev/diffs/3.5.5.diff

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -441,46 +441,64 @@ index a206e97c353..fea1149b67d 100644
441441

442442
test("SPARK-35884: Explain Formatted") {
443443
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
444-
index 93275487f29..d18ab7b20c0 100644
444+
index 93275487f29..01e5c601763 100644
445445
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
446446
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
447-
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
447+
@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption}
448+
449+
import scala.collection.mutable
450+
451+
+import org.apache.comet.CometConf
452+
import org.apache.hadoop.conf.Configuration
453+
import org.apache.hadoop.fs.{LocalFileSystem, Path}
454+
455+
@@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
448456
import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt}
449457
import org.apache.spark.sql.catalyst.plans.logical.Filter
450458
import org.apache.spark.sql.catalyst.types.DataTypeUtils
451-
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec, CometSortMergeJoinExec}
459+
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec}
452460
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
453461
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
454462
import org.apache.spark.sql.execution.datasources.FilePartition
455-
@@ -955,6 +956,7 @@ class FileBasedDataSourceSuite extends QueryTest
463+
@@ -250,6 +252,8 @@ class FileBasedDataSourceSuite extends QueryTest
464+
case "" => "_LEGACY_ERROR_TEMP_2062"
465+
case _ => "_LEGACY_ERROR_TEMP_2055"
466+
}
467+
+ // native_datafusion Parquet scan cannot throw a SparkFileNotFoundException
468+
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
469+
checkErrorMatchPVals(
470+
exception = intercept[SparkException] {
471+
testIgnoreMissingFiles(options)
472+
@@ -955,6 +959,7 @@ class FileBasedDataSourceSuite extends QueryTest
456473
assert(bJoinExec.isEmpty)
457474
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
458475
case smJoin: SortMergeJoinExec => smJoin
459476
+ case smJoin: CometSortMergeJoinExec => smJoin
460477
}
461478
assert(smJoinExec.nonEmpty)
462479
}
463-
@@ -1015,6 +1017,7 @@ class FileBasedDataSourceSuite extends QueryTest
480+
@@ -1015,6 +1020,7 @@ class FileBasedDataSourceSuite extends QueryTest
464481

465482
val fileScan = df.queryExecution.executedPlan collectFirst {
466483
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
467484
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _) => f
468485
}
469486
assert(fileScan.nonEmpty)
470487
assert(fileScan.get.partitionFilters.nonEmpty)
471-
@@ -1056,6 +1059,7 @@ class FileBasedDataSourceSuite extends QueryTest
488+
@@ -1056,6 +1062,7 @@ class FileBasedDataSourceSuite extends QueryTest
472489

473490
val fileScan = df.queryExecution.executedPlan collectFirst {
474491
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
475492
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _) => f
476493
}
477494
assert(fileScan.nonEmpty)
478495
assert(fileScan.get.partitionFilters.isEmpty)
479-
@@ -1240,6 +1244,8 @@ class FileBasedDataSourceSuite extends QueryTest
496+
@@ -1240,6 +1247,9 @@ class FileBasedDataSourceSuite extends QueryTest
480497
val filters = df.queryExecution.executedPlan.collect {
481498
case f: FileSourceScanLike => f.dataFilters
482499
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
483500
+ case b: CometScanExec => b.dataFilters
501+
+ case b: CometNativeScanExec => b.dataFilters
484502
+ case b: CometBatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
485503
}.flatten
486504
assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L))))

dev/diffs/4.0.0-preview1.diff

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -473,46 +473,64 @@ index b2aaaceb26a..625522f36ae 100644
473473

474474
test("SPARK-35884: Explain Formatted") {
475475
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
476-
index 49a33d1c925..9a540abd0c2 100644
476+
index 49a33d1c925..197c93d62b3 100644
477477
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
478478
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
479-
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
479+
@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption}
480+
481+
import scala.collection.mutable
482+
483+
+import org.apache.comet.CometConf
484+
import org.apache.hadoop.conf.Configuration
485+
import org.apache.hadoop.fs.{LocalFileSystem, Path}
486+
487+
@@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
480488
import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt}
481489
import org.apache.spark.sql.catalyst.plans.logical.Filter
482490
import org.apache.spark.sql.catalyst.types.DataTypeUtils
483-
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec, CometSortMergeJoinExec}
491+
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec, CometSortMergeJoinExec}
484492
import org.apache.spark.sql.execution.{ExplainMode, FileSourceScanLike, SimpleMode}
485493
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
486494
import org.apache.spark.sql.execution.datasources.FilePartition
487-
@@ -951,6 +952,7 @@ class FileBasedDataSourceSuite extends QueryTest
495+
@@ -246,6 +248,8 @@ class FileBasedDataSourceSuite extends QueryTest
496+
if (ignore.toBoolean) {
497+
testIgnoreMissingFiles(options)
498+
} else {
499+
+ // native_datafusion Parquet scan throws a different error message for 3.x compat
500+
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
501+
checkErrorMatchPVals(
502+
exception = intercept[SparkException] {
503+
testIgnoreMissingFiles(options)
504+
@@ -951,6 +955,7 @@ class FileBasedDataSourceSuite extends QueryTest
488505
assert(bJoinExec.isEmpty)
489506
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
490507
case smJoin: SortMergeJoinExec => smJoin
491508
+ case smJoin: CometSortMergeJoinExec => smJoin
492509
}
493510
assert(smJoinExec.nonEmpty)
494511
}
495-
@@ -1011,6 +1013,7 @@ class FileBasedDataSourceSuite extends QueryTest
512+
@@ -1011,6 +1016,7 @@ class FileBasedDataSourceSuite extends QueryTest
496513

497514
val fileScan = df.queryExecution.executedPlan collectFirst {
498515
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
499516
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _) => f
500517
}
501518
assert(fileScan.nonEmpty)
502519
assert(fileScan.get.partitionFilters.nonEmpty)
503-
@@ -1052,6 +1055,7 @@ class FileBasedDataSourceSuite extends QueryTest
520+
@@ -1052,6 +1058,7 @@ class FileBasedDataSourceSuite extends QueryTest
504521

505522
val fileScan = df.queryExecution.executedPlan collectFirst {
506523
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
507524
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _) => f
508525
}
509526
assert(fileScan.nonEmpty)
510527
assert(fileScan.get.partitionFilters.isEmpty)
511-
@@ -1236,6 +1240,8 @@ class FileBasedDataSourceSuite extends QueryTest
528+
@@ -1236,6 +1243,9 @@ class FileBasedDataSourceSuite extends QueryTest
512529
val filters = df.queryExecution.executedPlan.collect {
513530
case f: FileSourceScanLike => f.dataFilters
514531
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
515532
+ case b: CometScanExec => b.dataFilters
533+
+ case b: CometNativeScanExec => b.dataFilters
516534
+ case b: CometBatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
517535
}.flatten
518536
assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L))))
@@ -1014,7 +1032,7 @@ index 3fc0b572d80..0d87150d446 100644
10141032

10151033
test("non-matching optional group") {
10161034
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
1017-
index 68f14f13bbd..174636cefb5 100644
1035+
index 68f14f13bbd..5cb3166f875 100644
10181036
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
10191037
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
10201038
@@ -22,10 +22,11 @@ import scala.collection.mutable.ArrayBuffer

docs/source/user-guide/compatibility.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ types (regardless of the logical type). This behavior can be disabled by setting
6666
information.
6767
- There are failures in the Spark SQL test suite when enabling these new scans (tracking issues: [#1542] and [#1545]).
6868
- No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported.
69+
- Setting Spark configs `ignoreMissingFiles` or `ignoreCorruptFiles` to `true` is not compatible with `native_datafusion` scan.
6970

7071
[#1545]: https://github.com/apache/datafusion-comet/issues/1545
7172
[#1542]: https://github.com/apache/datafusion-comet/issues/1542

docs/templates/compatibility-template.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ The new scans currently have the following limitations:
6666
information.
6767
- There are failures in the Spark SQL test suite when enabling these new scans (tracking issues: [#1542] and [#1545]).
6868
- No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported.
69+
- Setting Spark configs `ignoreMissingFiles` or `ignoreCorruptFiles` to `true` is not compatible with `native_datafusion` scan.
6970

7071
[#1545]: https://github.com/apache/datafusion-comet/issues/1545
7172
[#1542]: https://github.com/apache/datafusion-comet/issues/1542

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919

2020
package org.apache.comet
2121

22+
import java.io.FileNotFoundException
2223
import java.lang.management.ManagementFactory
2324

25+
import scala.util.matching.Regex
26+
2427
import org.apache.spark._
2528
import org.apache.spark.internal.Logging
2629
import org.apache.spark.network.util.ByteUnit
@@ -133,23 +136,53 @@ class CometExecIterator(
133136
}
134137
}
135138

136-
def getNextBatch(): Option[ColumnarBatch] = {
139+
private def getNextBatch: Option[ColumnarBatch] = {
137140
assert(partitionIndex >= 0 && partitionIndex < numParts)
138141

139142
if (tracingEnabled) {
140143
traceMemoryUsage()
141144
}
142145

143146
val ctx = TaskContext.get()
144-
withTrace(
145-
s"getNextBatch[JVM] stage=${ctx.stageId()}",
146-
tracingEnabled, {
147-
nativeUtil.getNextBatch(
148-
numOutputCols,
149-
(arrayAddrs, schemaAddrs) => {
150-
nativeLib.executePlan(ctx.stageId(), partitionIndex, plan, arrayAddrs, schemaAddrs)
151-
})
152-
})
147+
148+
try {
149+
withTrace(
150+
s"getNextBatch[JVM] stage=${ctx.stageId()}",
151+
tracingEnabled, {
152+
nativeUtil.getNextBatch(
153+
numOutputCols,
154+
(arrayAddrs, schemaAddrs) => {
155+
nativeLib.executePlan(ctx.stageId(), partitionIndex, plan, arrayAddrs, schemaAddrs)
156+
})
157+
})
158+
} catch {
159+
case e: CometNativeException =>
160+
val fileNotFoundPattern: Regex =
161+
("""^External: Object at location (.+?) not found: No such file or directory """ +
162+
"""\(os error \d+\)$""").r
163+
val parquetError: Regex =
164+
"""^Parquet error: (?:.*)$""".r
165+
e.getMessage match {
166+
case fileNotFoundPattern(filePath) =>
167+
// See org.apache.spark.sql.errors.QueryExecutionErrors.readCurrentFileNotFoundError
168+
throw new SparkException(
169+
errorClass = "_LEGACY_ERROR_TEMP_2055",
170+
messageParameters = Map("message" -> e.getMessage),
171+
cause = new FileNotFoundException(filePath)
172+
) // Can't use SparkFileNotFoundException because it's private.
173+
case parquetError() =>
174+
// See org.apache.spark.sql.errors.QueryExecutionErrors.failedToReadDataError
175+
// See org.apache.parquet.hadoop.ParquetFileReader for error message.
176+
throw new SparkException(
177+
errorClass = "_LEGACY_ERROR_TEMP_2254",
178+
messageParameters = Map("message" -> e.getMessage),
179+
cause = new SparkException("File is not a Parquet file.", e))
180+
case _ =>
181+
throw e
182+
}
183+
case e: Throwable =>
184+
throw e
185+
}
153186
}
154187

155188
override def hasNext: Boolean = {
@@ -167,7 +200,7 @@ class CometExecIterator(
167200
prevBatch = null
168201
}
169202

170-
nextBatch = getNextBatch()
203+
nextBatch = getNextBatch
171204

172205
if (nextBatch.isEmpty) {
173206
close()

0 commit comments

Comments
 (0)