Skip to content

Commit dd386ba

Browse files
authored
[avro] Unwrap AvroRuntimeException and throw the real IOException (#4179)
1 parent 2fa598d commit dd386ba

File tree

4 files changed

+127
-20
lines changed

4 files changed

+127
-20
lines changed

paimon-common/src/main/java/org/apache/paimon/utils/IteratorResultIterator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,18 @@
2525

2626
import javax.annotation.Nullable;
2727

28-
import java.util.Iterator;
28+
import java.io.IOException;
2929

3030
/** A simple {@link RecordReader.RecordIterator} that returns the elements of an iterator. */
3131
public final class IteratorResultIterator extends RecyclableIterator<InternalRow>
3232
implements FileRecordIterator<InternalRow> {
3333

34-
private final Iterator<InternalRow> records;
34+
private final IteratorWithException<InternalRow, IOException> records;
3535
private final Path filePath;
3636
private long nextFilePos;
3737

3838
public IteratorResultIterator(
39-
final Iterator<InternalRow> records,
39+
final IteratorWithException<InternalRow, IOException> records,
4040
final @Nullable Runnable recycler,
4141
final Path filePath,
4242
long pos) {
@@ -48,7 +48,7 @@ public IteratorResultIterator(
4848

4949
@Nullable
5050
@Override
51-
public InternalRow next() {
51+
public InternalRow next() throws IOException {
5252
if (records.hasNext()) {
5353
nextFilePos++;
5454
return records.next();
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.utils;
20+
21+
/** An iterator which might throws exception. */
22+
public interface IteratorWithException<V, E extends Exception> {
23+
24+
boolean hasNext() throws E;
25+
26+
V next() throws E;
27+
}

paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,18 @@
2626
import org.apache.paimon.types.RowType;
2727
import org.apache.paimon.utils.IOUtils;
2828
import org.apache.paimon.utils.IteratorResultIterator;
29+
import org.apache.paimon.utils.IteratorWithException;
2930
import org.apache.paimon.utils.Pool;
3031

32+
import org.apache.avro.AvroRuntimeException;
3133
import org.apache.avro.file.DataFileReader;
3234
import org.apache.avro.file.SeekableInput;
3335
import org.apache.avro.io.DatumReader;
3436

3537
import javax.annotation.Nullable;
3638

3739
import java.io.IOException;
38-
import java.util.Iterator;
40+
import java.util.function.Supplier;
3941

4042
/** Provides a {@link FormatReaderFactory} for Avro records. */
4143
public class AvroBulkFormat implements FormatReaderFactory {
@@ -105,15 +107,16 @@ public RecordIterator<InternalRow> readBatch() throws IOException {
105107

106108
long rowPosition = currentRowPosition;
107109
currentRowPosition += reader.getBlockCount();
108-
Iterator<InternalRow> iterator = new AvroBlockIterator(reader.getBlockCount(), reader);
110+
IteratorWithException<InternalRow, IOException> iterator =
111+
new AvroBlockIterator(reader.getBlockCount(), reader);
109112
return new IteratorResultIterator(
110113
iterator, () -> pool.recycler().recycle(ticket), filePath, rowPosition);
111114
}
112115

113116
private boolean readNextBlock() throws IOException {
114117
// read the next block with reader,
115118
// returns true if a block is read and false if we reach the end of this split
116-
return reader.hasNext() && !reader.pastSync(end);
119+
return replaceAvroRuntimeException(reader::hasNext) && !reader.pastSync(end);
117120
}
118121

119122
@Override
@@ -122,7 +125,8 @@ public void close() throws IOException {
122125
}
123126
}
124127

125-
private static class AvroBlockIterator implements Iterator<InternalRow> {
128+
private static class AvroBlockIterator
129+
implements IteratorWithException<InternalRow, IOException> {
126130

127131
private long numRecordsRemaining;
128132
private final DataFileReader<InternalRow> reader;
@@ -138,17 +142,23 @@ public boolean hasNext() {
138142
}
139143

140144
@Override
141-
public InternalRow next() {
142-
try {
143-
numRecordsRemaining--;
144-
// reader.next merely deserialize bytes in memory to java objects
145-
// and will not read from file
146-
// Do not reuse object, manifest file assumes no object reuse
147-
return reader.next(null);
148-
} catch (IOException e) {
149-
throw new RuntimeException(
150-
"Encountered exception when reading from avro format file", e);
145+
public InternalRow next() throws IOException {
146+
numRecordsRemaining--;
147+
// reader.next merely deserialize bytes in memory to java objects
148+
// and will not read from file
149+
// Do not reuse object, manifest file assumes no object reuse
150+
return replaceAvroRuntimeException(reader::next);
151+
}
152+
}
153+
154+
private static <T> T replaceAvroRuntimeException(Supplier<T> supplier) throws IOException {
155+
try {
156+
return supplier.get();
157+
} catch (AvroRuntimeException e) {
158+
if (e.getCause() != null && e.getCause() instanceof IOException) {
159+
throw (IOException) e.getCause();
151160
}
161+
throw e;
152162
}
153163
}
154164
}

paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import org.apache.paimon.format.FileFormatFactory.FormatContext;
2525
import org.apache.paimon.format.FormatReaderContext;
2626
import org.apache.paimon.format.FormatWriter;
27+
import org.apache.paimon.fs.FileIO;
2728
import org.apache.paimon.fs.Path;
2829
import org.apache.paimon.fs.PositionOutputStream;
30+
import org.apache.paimon.fs.SeekableInputStream;
2931
import org.apache.paimon.fs.local.LocalFileIO;
3032
import org.apache.paimon.options.Options;
3133
import org.apache.paimon.reader.RecordReader;
@@ -37,11 +39,15 @@
3739
import org.junit.jupiter.api.Test;
3840
import org.junit.jupiter.api.io.TempDir;
3941

42+
import java.io.File;
43+
import java.io.FileNotFoundException;
4044
import java.io.IOException;
4145
import java.util.ArrayList;
4246
import java.util.UUID;
47+
import java.util.concurrent.ThreadLocalRandom;
4348

4449
import static org.assertj.core.api.Assertions.assertThat;
50+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4551

4652
/** Test for avro file format. */
4753
public class AvroFileFormatTest {
@@ -122,10 +128,74 @@ void testReadRowPosition() throws IOException {
122128
try (RecordReader<InternalRow> reader =
123129
format.createReaderFactory(rowType)
124130
.createReader(
125-
new FormatReaderContext(
126-
fileIO, file, fileIO.getFileSize(file))); ) {
131+
new FormatReaderContext(fileIO, file, fileIO.getFileSize(file)))) {
127132
reader.forEachRemainingWithPosition(
128133
(rowPosition, row) -> assertThat(row.getInt(0) == rowPosition).isTrue());
129134
}
130135
}
136+
137+
@Test
138+
void testGetRealIOException() throws IOException {
139+
RowType rowType = DataTypes.ROW(DataTypes.INT().notNull());
140+
FileFormat format = new AvroFileFormat(new FormatContext(new Options(), 16, 16));
141+
142+
LocalFileIO localFileIO = LocalFileIO.create();
143+
Path file = new Path(new Path(tempPath.toUri()), UUID.randomUUID().toString());
144+
try (PositionOutputStream out = localFileIO.newOutputStream(file, false)) {
145+
FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd");
146+
ThreadLocalRandom random = ThreadLocalRandom.current();
147+
// magic number tested by hand
148+
for (int i = 0; i < 100000; i++) {
149+
writer.addElement(GenericRow.of(random.nextInt()));
150+
}
151+
writer.close();
152+
}
153+
154+
FileIO failingFileIO =
155+
new LocalFileIO() {
156+
157+
@Override
158+
public SeekableInputStream newInputStream(Path path) throws IOException {
159+
return new FailingInputStream(toFile(path));
160+
}
161+
162+
class FailingInputStream extends LocalFileIO.LocalSeekableInputStream {
163+
164+
private int cnt;
165+
166+
public FailingInputStream(File file) throws FileNotFoundException {
167+
super(file);
168+
cnt = 0;
169+
}
170+
171+
@Override
172+
public int read() throws IOException {
173+
checkException();
174+
return super.read();
175+
}
176+
177+
@Override
178+
public int read(byte[] b, int off, int len) throws IOException {
179+
checkException();
180+
return super.read(b, off, len);
181+
}
182+
183+
private void checkException() throws IOException {
184+
cnt++;
185+
// magic number tested by hand
186+
if (cnt == 200) {
187+
throw new IOException("Artificial exception");
188+
}
189+
}
190+
}
191+
};
192+
RecordReader<InternalRow> reader =
193+
format.createReaderFactory(rowType)
194+
.createReader(
195+
new FormatReaderContext(
196+
failingFileIO, file, failingFileIO.getFileSize(file)));
197+
assertThatThrownBy(() -> reader.forEachRemaining(row -> {}))
198+
.isInstanceOf(IOException.class)
199+
.hasMessageContaining("Artificial exception");
200+
}
131201
}

0 commit comments

Comments
 (0)