Skip to content

Commit 8bfdaa5

Browse files
committed
fix
1 parent f7627ce commit 8bfdaa5

File tree

5 files changed

+81
-2
lines changed

5 files changed

+81
-2
lines changed

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultJsonHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.delta.kernel.utils.FileStatus;
3737
import io.delta.storage.LogStore;
3838
import java.io.*;
39+
import java.nio.channels.ClosedByInterruptException;
3940
import java.nio.charset.StandardCharsets;
4041
import java.util.*;
4142

@@ -113,6 +114,9 @@ public boolean hasNext() {
113114
}
114115
return nextLine != null;
115116
} catch (IOException ex) {
117+
if (ex instanceof ClosedByInterruptException || ex instanceof InterruptedIOException) {
118+
throw new UncheckedIOException(ex);
119+
}
116120
throw new KernelEngineException(
117121
format("Error reading JSON file: %s", currentFile.getPath()), ex);
118122
}

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileReader.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
import io.delta.kernel.utils.CloseableIterator;
3232
import io.delta.kernel.utils.FileStatus;
3333
import java.io.IOException;
34+
import java.io.InterruptedIOException;
35+
import java.io.UncheckedIOException;
36+
import java.nio.channels.ClosedByInterruptException;
3437
import java.util.*;
3538
import org.apache.hadoop.conf.Configuration;
3639
import org.apache.parquet.filter2.compat.FilterCompat;
@@ -91,6 +94,9 @@ public boolean hasNext() {
9194
hasNotConsumedNextElement = next != null;
9295
return hasNotConsumedNextElement;
9396
} catch (IOException ex) {
97+
if (ex instanceof ClosedByInterruptException || ex instanceof InterruptedIOException) {
98+
throw new UncheckedIOException(ex);
99+
}
94100
throw new KernelEngineException(
95101
"Error reading Parquet file: " + fileStatus.getPath(), ex);
96102
}
@@ -158,6 +164,9 @@ protected ReadSupport<Object> getReadSupport() {
158164

159165
} catch (IOException e) {
160166
Utils.closeCloseablesSilently(fileReader, reader);
167+
if (e instanceof ClosedByInterruptException || e instanceof InterruptedIOException) {
168+
throw new UncheckedIOException(e);
169+
}
161170
throw new KernelEngineException(
162171
"Error reading Parquet file: " + fileStatus.getPath(), e);
163172
}

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/engine/DefaultJsonHandlerSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
*/
1616
package io.delta.kernel.defaults.engine
1717

18+
import java.io.UncheckedIOException
1819
import java.math.{BigDecimal => JBigDecimal}
20+
import java.nio.channels.ClosedByInterruptException
1921
import java.nio.file.FileAlreadyExistsException
2022
import java.util.Optional
2123

@@ -24,9 +26,12 @@ import scala.collection.JavaConverters._
2426
import io.delta.kernel.data.ColumnVector
2527
import io.delta.kernel.defaults.engine.hadoopio.HadoopFileIO
2628
import io.delta.kernel.defaults.utils.{DefaultVectorTestUtils, TestRow, TestUtils}
29+
import io.delta.kernel.exceptions.KernelEngineException
2730
import io.delta.kernel.internal.actions.CommitInfo
2831
import io.delta.kernel.internal.util.InternalUtils.singletonStringColumnVector
32+
import io.delta.kernel.internal.util.Utils.singletonCloseableIterator
2933
import io.delta.kernel.types._
34+
import io.delta.kernel.utils.FileStatus
3035

3136
import org.apache.hadoop.conf.Configuration
3237
import org.scalatest.funsuite.AnyFunSuite
@@ -554,4 +559,22 @@ class DefaultJsonHandlerSuite extends AnyFunSuite with TestUtils with DefaultVec
554559
assert(commitInfo.getInCommitTimestamp === Optional.empty())
555560
assert(commitInfo.getTimestamp === 1740009523401L)
556561
}
562+
563+
test("ClosedByInterruptException propagates as UncheckedIOException, not KernelEngineException") {
564+
val handler = new DefaultJsonHandler(throwingFileIO(new ClosedByInterruptException()))
565+
val fileIter = singletonCloseableIterator(
566+
FileStatus.of("file:///fake/_delta_log/00000000000000000000.json", 100, 0))
567+
val iter = handler.readJsonFiles(
568+
fileIter,
569+
new StructType().add("value", StringType.STRING),
570+
Optional.empty())
571+
572+
val ex = intercept[UncheckedIOException] {
573+
iter.hasNext()
574+
}
575+
assert(
576+
ex.getCause.isInstanceOf[ClosedByInterruptException],
577+
s"Expected ClosedByInterruptException cause, got: ${ex.getCause.getClass}")
578+
assert(!ex.isInstanceOf[KernelEngineException])
579+
}
557580
}

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,17 @@
1515
*/
1616
package io.delta.kernel.defaults.internal.parquet
1717

18+
import java.io.UncheckedIOException
1819
import java.math.BigDecimal
19-
import java.util.TimeZone
20+
import java.nio.channels.ClosedByInterruptException
21+
import java.util.{Optional, TimeZone}
2022

2123
import io.delta.golden.GoldenTableUtils.{goldenTableFile, goldenTablePath}
2224
import io.delta.kernel.defaults.utils.{ExpressionTestUtils, TestRow}
25+
import io.delta.kernel.exceptions.KernelEngineException
2326
import io.delta.kernel.test.VectorTestUtils
2427
import io.delta.kernel.types._
25-
import io.delta.kernel.utils.MetadataColumnTestUtils
28+
import io.delta.kernel.utils.{FileStatus, MetadataColumnTestUtils}
2629

2730
import org.apache.spark.sql.internal.SQLConf
2831
import org.scalatest.funsuite.AnyFunSuite
@@ -385,4 +388,21 @@ class ParquetFileReaderSuite extends AnyFunSuite
385388
readParquetFilesUsingKernel(parquetFilePath, readSchema), /* actual */
386389
readParquetFilesUsingSpark(parquetFilePath, readSchema) /* expected */ )
387390
}
391+
392+
test("ClosedByInterruptException propagates as UncheckedIOException, not KernelEngineException") {
393+
val reader = new ParquetFileReader(throwingFileIO(new ClosedByInterruptException()))
394+
val fileStatus = FileStatus.of("file:///fake/test.parquet", 100, 0)
395+
val iter = reader.read(
396+
fileStatus,
397+
new StructType().add("value", StringType.STRING),
398+
Optional.empty())
399+
400+
val ex = intercept[UncheckedIOException] {
401+
iter.hasNext()
402+
}
403+
assert(
404+
ex.getCause.isInstanceOf[ClosedByInterruptException],
405+
s"Expected ClosedByInterruptException cause, got: ${ex.getCause.getClass}")
406+
assert(!ex.isInstanceOf[KernelEngineException])
407+
}
388408
}

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/TestUtils.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import io.delta.golden.GoldenTableUtils
2828
import io.delta.kernel.{Scan, Snapshot, Table, TransactionCommitResult}
2929
import io.delta.kernel.data._
3030
import io.delta.kernel.defaults.engine.DefaultEngine
31+
import io.delta.kernel.defaults.engine.fileio.{FileIO, InputFile, OutputFile, SeekableInputStream}
3132
import io.delta.kernel.defaults.internal.data.vector.{DefaultGenericVector, DefaultStructVector}
3233
import io.delta.kernel.defaults.test.{AbstractTableManagerAdapter, LegacyTableManagerAdapter, TableManagerAdapter}
3334
import io.delta.kernel.engine.Engine
@@ -988,4 +989,26 @@ trait AbstractTestUtils
988989
case None => Optional.empty()
989990
}
990991
}
992+
993+
/**
994+
* Creates a FileIO whose newInputFile().newStream() always throws the given exception.
995+
* Useful for testing error handling in handlers.
996+
*/
997+
def throwingFileIO(exception: => Exception): FileIO = {
998+
new FileIO {
999+
override def newInputFile(path: String, fileSize: Long): InputFile = new InputFile {
1000+
override def length(): Long = fileSize
1001+
override def path(): String = path
1002+
override def newStream(): SeekableInputStream = throw exception
1003+
}
1004+
override def listFrom(filePath: String): CloseableIterator[FileStatus] = null
1005+
override def getFileStatus(path: String): FileStatus = null
1006+
override def resolvePath(path: String): String = null
1007+
override def mkdirs(path: String): Boolean = false
1008+
override def newOutputFile(path: String): OutputFile = null
1009+
override def delete(path: String): Boolean = false
1010+
override def getConf(confKey: String): Optional[String] = Optional.empty()
1011+
override def copyFileAtomically(src: String, dst: String, overwrite: Boolean): Unit = {}
1012+
}
1013+
}
9911014
}

0 commit comments

Comments
 (0)