Skip to content

Commit 55ac385

Browse files
authored
[core] Introduce endInput in StandardLineReader to avoid close input (#6574)
1 parent b228223 commit 55ac385

File tree

2 files changed

+42
-10
lines changed

2 files changed

+42
-10
lines changed

paimon-format/src/main/java/org/apache/paimon/format/text/StandardLineReader.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,15 @@ public class StandardLineReader implements TextLineReader {
4343
private int bufferStart;
4444
private int bufferEnd;
4545
private int bufferPosition;
46-
private boolean closed;
46+
private boolean endInput;
4747

4848
public StandardLineReader(InputStream in, long offset, @Nullable Long length)
4949
throws IOException {
5050
this.in = in;
5151
this.buffer = new byte[8192];
5252
this.length = length;
5353
this.lineBuilder = new ByteArrayOutputStream();
54+
this.endInput = false;
5455
if (offset == 0) {
5556
readAtBeginning();
5657
} else {
@@ -80,16 +81,16 @@ private void readAtOffset(long offset) throws IOException {
8081
}
8182

8283
private void skipFirstLine() throws IOException {
83-
while (!closed) {
84+
while (!endInput) {
8485
if (reachLengthLimit()) {
85-
close();
86+
endInput();
8687
return;
8788
}
8889

8990
// fill buffer if necessary
9091
if (bufferPosition >= bufferEnd) {
9192
fillBuffer();
92-
if (closed) {
93+
if (endInput) {
9394
return;
9495
}
9596
}
@@ -107,15 +108,15 @@ public String readLine() throws IOException {
107108
lineBuilder.reset();
108109

109110
if (reachLengthLimit()) {
110-
close();
111+
endInput();
111112
return null;
112113
}
113114

114115
if (bufferPosition >= bufferEnd) {
115116
fillBuffer();
116117
}
117118

118-
while (!closed) {
119+
while (!endInput) {
119120
if (seekToStartOfLineTerminator()) {
120121
copyToLineBuilder();
121122
seekPastLineTerminator();
@@ -158,7 +159,7 @@ private void seekPastLineTerminator() throws IOException {
158159
// fill buffer if necessary
159160
if (bufferPosition >= bufferEnd) {
160161
fillBuffer();
161-
if (closed) {
162+
if (endInput) {
162163
return;
163164
}
164165
}
@@ -172,15 +173,15 @@ private void seekPastLineTerminator() throws IOException {
172173
}
173174

174175
private void fillBuffer() throws IOException {
175-
if (closed) {
176+
if (endInput) {
176177
return;
177178
}
178179
checkArgument(bufferPosition >= bufferEnd, "Buffer is not empty");
179180
bufferStart = 0;
180181
bufferPosition = 0;
181182
bufferEnd = IOUtils.readNBytes(in, buffer, 0, buffer.length);
182183
if (bufferEnd == 0) {
183-
close();
184+
endInput();
184185
}
185186
}
186187

@@ -205,9 +206,12 @@ private String buildLine() throws UnsupportedEncodingException {
205206
return lineBuilder.toString(UTF_8.name());
206207
}
207208

209+
private void endInput() {
210+
endInput = true;
211+
}
212+
208213
@Override
209214
public void close() throws IOException {
210-
closed = true;
211215
in.close();
212216
}
213217
}

paimon-format/src/test/java/org/apache/paimon/format/text/StandardLineReaderTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.io.ByteArrayInputStream;
3232
import java.io.IOException;
3333
import java.nio.charset.StandardCharsets;
34+
import java.util.concurrent.atomic.AtomicBoolean;
3435

3536
import static org.assertj.core.api.Assertions.assertThat;
3637
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -56,6 +57,33 @@ public void tearDown() throws IOException {
5657
}
5758
}
5859

60+
/**
61+
* After the stream is closed, we cannot use getPos, so we need to ensure that it is manually
62+
* closed.
63+
*/
64+
@Test
65+
public void testCloseShouldNotBeInvoked() throws IOException {
66+
String content = "line1\nline2\nline3";
67+
AtomicBoolean closed = new AtomicBoolean(false);
68+
try (TextLineReader reader =
69+
TextLineReader.create(
70+
new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)) {
71+
@Override
72+
public void close() {
73+
closed.set(true);
74+
}
75+
},
76+
"\n",
77+
0,
78+
null)) {
79+
assertThat(reader.readLine()).isEqualTo("line1");
80+
assertThat(reader.readLine()).isEqualTo("line2");
81+
assertThat(reader.readLine()).isEqualTo("line3");
82+
assertThat(reader.readLine()).isNull();
83+
assertThat(closed.get()).isFalse();
84+
}
85+
}
86+
5987
@Test
6088
public void testReadLineWithLineFeedDelimiter() throws IOException {
6189
// Test basic functionality with \n delimiter

0 commit comments

Comments
 (0)