Skip to content

Commit fd10fed

Browse files
maxxedevppkarwaszgarydgregory
authored
QueueInputStream reads all but the first byte without waiting. (#748)
* QueueInputStream reads all but the first byte without waiting. Fix so that bulk reads avoid getting stuck if a timeout is set and at least one byte is available. * Call BlockingQueue::drainTo for improved performance * Refactor bulk read * Add benchmark for `QueueInputStream/QueueOutputStream` Adds a small benchmark to measure how much time does it take to transfer 1 MiB from a `QueueOutputStream` to a `QueueInputStream`. * More optimizations on bulk read * Improve test coverage * Add missing Javadoc since tag * Use final Reduce vertical whitespace * Use the exact same invariant checks as the JDK superclass Add missing Javadoc `@param` tag * Update src/main/java/org/apache/commons/io/input/QueueInputStream.java Co-authored-by: Piotr P. Karwasz <[email protected]> * Update src/main/java/org/apache/commons/io/input/QueueInputStream.java * improve tests * improve test --------- Co-authored-by: Piotr P. Karwasz <[email protected]> Co-authored-by: Gary Gregory <[email protected]> Co-authored-by: Piotr P. Karwasz <[email protected]>
1 parent 7629b20 commit fd10fed

File tree

4 files changed

+250
-1
lines changed

4 files changed

+250
-1
lines changed

src/changes/changes.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ The <action> type attribute can be add,update,fix,remove.
4747
<body>
4848
<release version="2.20.0" date="YYYY-MM-DD" description="Version 2.19.1: Java 8 or later is required.">
4949
<!-- FIX -->
50+
<action dev="maxxedev" type="fix" due-to="maxxedev">QueueInputStream reads all but the first byte without waiting</action>
5051
<action dev="ggregory" type="fix" due-to="Jesse Glick">[javadoc] Rename parameter of ProxyOutputStream.write(int) #740.</action>
5152
<action dev="ggregory" type="fix" issue="IO-875" due-to="Pierre Baumard, Gary Gregory">CopyDirectoryVisitor ignores fileFilter #743.</action>
5253
<action dev="ggregory" type="fix" due-to="Gary Gregory">org.apache.commons.io.build.AbstractOrigin.getReader(Charset) now maps a null Charset to the default Charset.</action>

src/main/java/org/apache/commons/io/input/QueueInputStream.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import java.io.PipedInputStream;
2424
import java.io.PipedOutputStream;
2525
import java.time.Duration;
26+
import java.util.ArrayList;
27+
import java.util.List;
2628
import java.util.Objects;
2729
import java.util.concurrent.BlockingQueue;
2830
import java.util.concurrent.LinkedBlockingQueue;
@@ -224,4 +226,49 @@ public int read() {
224226
}
225227
}
226228

229+
/**
230+
* Reads up to {@code length} bytes of data from the input stream into
231+
* an array of bytes. The first byte is read while honoring the timeout; the rest are read while <i>not</i> honoring
232+
* the timeout. The number of bytes actually read is returned as an integer.
233+
*
234+
* @param b the buffer into which the data is read.
235+
* @param offset the start offset in array {@code b} at which the data is written.
236+
* @param length the maximum number of bytes to read.
237+
* @return the total number of bytes read into the buffer, or {@code -1} if there is no more data because the
238+
* end of the stream has been reached.
239+
* @throws NullPointerException If {@code b} is {@code null}.
240+
* @throws IllegalStateException if thread is interrupted while waiting for the first byte.
241+
* @throws IndexOutOfBoundsException if {@code offset} is negative, {@code length} is negative, or {@code length} is
242+
* greater than {@code b.length - offset}.
243+
* @since 2.20.0
244+
*/
245+
@Override
246+
public int read(final byte[] b, final int offset, final int length) {
247+
if (b == null) {
248+
throw new NullPointerException();
249+
} else if (offset < 0 || length < 0 || length > b.length - offset) {
250+
throw new IndexOutOfBoundsException(
251+
String.format("Range [%d, %<d + %d) out of bounds for length %d", offset, length, b.length));
252+
} else if (length == 0) {
253+
return 0;
254+
}
255+
final List<Integer> drain = new ArrayList<>(Math.min(length, blockingQueue.size()));
256+
blockingQueue.drainTo(drain, length);
257+
if (drain.isEmpty()) {
258+
// no data immediately available. wait for first byte
259+
final int value = read();
260+
if (value == EOF) {
261+
return EOF;
262+
}
263+
drain.add(value);
264+
blockingQueue.drainTo(drain, length - 1);
265+
}
266+
int i = 0;
267+
for (final Integer value : drain) {
268+
b[offset + i] = (byte) (0xFF & value);
269+
i++;
270+
}
271+
return i;
272+
}
273+
227274
}

src/test/java/org/apache/commons/io/input/QueueInputStreamTest.java

Lines changed: 126 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,30 @@
1616
*/
1717
package org.apache.commons.io.input;
1818

19+
import static java.nio.charset.StandardCharsets.UTF_8;
20+
import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
21+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
1922
import static org.junit.jupiter.api.Assertions.assertEquals;
2023
import static org.junit.jupiter.api.Assertions.assertThrows;
2124
import static org.junit.jupiter.api.Assertions.assertTimeout;
2225
import static org.junit.jupiter.api.Assertions.assertTrue;
26+
import static org.junit.jupiter.api.Assumptions.assumeFalse;
27+
import static org.junit.jupiter.api.DynamicTest.dynamicTest;
2328

2429
import java.io.BufferedInputStream;
2530
import java.io.BufferedOutputStream;
31+
import java.io.BufferedReader;
2632
import java.io.ByteArrayOutputStream;
2733
import java.io.IOException;
2834
import java.io.InputStream;
35+
import java.io.InputStreamReader;
36+
import java.io.OutputStream;
2937
import java.nio.charset.StandardCharsets;
38+
import java.nio.file.Files;
39+
import java.nio.file.Path;
3040
import java.time.Duration;
3141
import java.util.concurrent.BlockingQueue;
42+
import java.util.concurrent.CompletableFuture;
3243
import java.util.concurrent.CountDownLatch;
3344
import java.util.concurrent.LinkedBlockingQueue;
3445
import java.util.concurrent.TimeUnit;
@@ -40,7 +51,9 @@
4051
import org.apache.commons.io.output.QueueOutputStreamTest;
4152
import org.apache.commons.lang3.StringUtils;
4253
import org.junit.jupiter.api.DisplayName;
54+
import org.junit.jupiter.api.DynamicTest;
4355
import org.junit.jupiter.api.Test;
56+
import org.junit.jupiter.api.TestFactory;
4457
import org.junit.jupiter.params.ParameterizedTest;
4558
import org.junit.jupiter.params.provider.Arguments;
4659
import org.junit.jupiter.params.provider.MethodSource;
@@ -124,13 +137,125 @@ public void testAvailableAfterOpen(final String inputData) throws IOException {
124137
public void testBufferedReads(final String inputData) throws IOException {
125138
final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
126139
try (BufferedInputStream inputStream = new BufferedInputStream(new QueueInputStream(queue));
127-
QueueOutputStream outputStream = new QueueOutputStream(queue)) {
140+
QueueOutputStream outputStream = new QueueOutputStream(queue)) {
128141
outputStream.write(inputData.getBytes(StandardCharsets.UTF_8));
129142
final String actualData = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
130143
assertEquals(inputData, actualData);
131144
}
132145
}
133146

147+
@ParameterizedTest(name = "inputData={0}")
148+
@MethodSource("inputData")
149+
public void testReadLineByLineQueue(final String inputData) throws IOException {
150+
final String[] lines = inputData.split("\n");
151+
final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
152+
try (QueueInputStream inputStream = QueueInputStream.builder()
153+
.setBlockingQueue(queue)
154+
.setTimeout(Duration.ofHours(1))
155+
.get();
156+
QueueOutputStream outputStream = inputStream.newQueueOutputStream()) {
157+
158+
doTestReadLineByLine(inputData, inputStream, outputStream);
159+
}
160+
}
161+
162+
@ParameterizedTest(name = "inputData={0}")
163+
@MethodSource("inputData")
164+
public void testReadLineByLineFile(final String inputData) throws IOException {
165+
final Path tempFile = Files.createTempFile(getClass().getSimpleName(), ".txt");
166+
try (InputStream inputStream = Files.newInputStream(tempFile);
167+
OutputStream outputStream = Files.newOutputStream(tempFile)) {
168+
169+
doTestReadLineByLine(inputData, inputStream, outputStream);
170+
} finally {
171+
Files.delete(tempFile);
172+
}
173+
}
174+
175+
private void doTestReadLineByLine(final String inputData, final InputStream inputStream, final OutputStream outputStream) throws IOException {
176+
final String[] lines = inputData.split("\n");
177+
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, UTF_8))) {
178+
for (String line : lines) {
179+
outputStream.write(line.getBytes(UTF_8));
180+
outputStream.write('\n');
181+
182+
final String actualLine = reader.readLine();
183+
assertEquals(line, actualLine);
184+
}
185+
}
186+
}
187+
188+
@TestFactory
189+
public DynamicTest[] bulkReadErrorHandlingTests() {
190+
final QueueInputStream queueInputStream = new QueueInputStream();
191+
return new DynamicTest[] {
192+
dynamicTest("Offset too big", () ->
193+
assertThrows(IndexOutOfBoundsException.class, () ->
194+
queueInputStream.read(EMPTY_BYTE_ARRAY, 1, 0))),
195+
196+
dynamicTest("Offset negative", () ->
197+
assertThrows(IndexOutOfBoundsException.class, () ->
198+
queueInputStream.read(EMPTY_BYTE_ARRAY, -1, 0))),
199+
200+
dynamicTest("Length too big", () ->
201+
assertThrows(IndexOutOfBoundsException.class, () ->
202+
queueInputStream.read(EMPTY_BYTE_ARRAY, 0, 1))),
203+
204+
dynamicTest("Length negative", () ->
205+
assertThrows(IndexOutOfBoundsException.class, () ->
206+
queueInputStream.read(EMPTY_BYTE_ARRAY, 0, -1))),
207+
};
208+
}
209+
210+
@Test
211+
public void testBulkReadZeroLength() {
212+
final QueueInputStream queueInputStream = new QueueInputStream();
213+
final int read = queueInputStream.read(EMPTY_BYTE_ARRAY, 0, 0);
214+
assertEquals(0, read);
215+
}
216+
217+
@ParameterizedTest(name = "inputData={0}")
218+
@MethodSource("inputData")
219+
public void testBulkReadWaiting(final String inputData) throws IOException {
220+
assumeFalse(inputData.isEmpty());
221+
222+
final CountDownLatch onPollLatch = new CountDownLatch(1);
223+
final CountDownLatch afterWriteLatch = new CountDownLatch(1);
224+
final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>() {
225+
@Override
226+
public Integer poll(final long timeout, final TimeUnit unit) throws InterruptedException {
227+
onPollLatch.countDown();
228+
afterWriteLatch.await(1, TimeUnit.HOURS);
229+
return super.poll(timeout, unit);
230+
}
231+
};
232+
233+
// Simulate scenario where there is not data immediately available when bulk reading and QueueInputStream has to
234+
// wait.
235+
try (QueueInputStream queueInputStream = QueueInputStream.builder()
236+
.setBlockingQueue(queue)
237+
.setTimeout(Duration.ofHours(1))
238+
.get()) {
239+
final QueueOutputStream queueOutputStream = queueInputStream.newQueueOutputStream();
240+
final CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
241+
try {
242+
onPollLatch.await(1, TimeUnit.HOURS);
243+
queueOutputStream.write(inputData.getBytes(UTF_8));
244+
afterWriteLatch.countDown();
245+
} catch (final Exception e) {
246+
throw new RuntimeException(e);
247+
}
248+
});
249+
250+
final byte[] data = new byte[inputData.length()];
251+
final int read = queueInputStream.read(data, 0, data.length);
252+
assertEquals(inputData.length(), read);
253+
final String outputData = new String(data, 0, read, StandardCharsets.UTF_8);
254+
assertEquals(inputData, outputData);
255+
assertDoesNotThrow(() -> future.get());
256+
}
257+
}
258+
134259
@ParameterizedTest(name = "inputData={0}")
135260
@MethodSource("inputData")
136261
public void testBufferedReadWrite(final String inputData) throws IOException {
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.commons.io.jmh;
18+
19+
import java.util.concurrent.BlockingQueue;
20+
import java.util.concurrent.LinkedBlockingQueue;
21+
import java.util.concurrent.TimeUnit;
22+
23+
import org.apache.commons.io.input.QueueInputStream;
24+
import org.apache.commons.io.output.QueueOutputStream;
25+
import org.apache.commons.lang3.RandomUtils;
26+
import org.openjdk.jmh.annotations.Benchmark;
27+
import org.openjdk.jmh.annotations.BenchmarkMode;
28+
import org.openjdk.jmh.annotations.Group;
29+
import org.openjdk.jmh.annotations.Mode;
30+
import org.openjdk.jmh.annotations.OutputTimeUnit;
31+
import org.openjdk.jmh.annotations.Scope;
32+
import org.openjdk.jmh.annotations.State;
33+
import org.openjdk.jmh.infra.Blackhole;
34+
35+
/**
36+
* Measures the amount of time to push 1 MiB to a {@link QueueOutputStream} and read it using a {@link QueueInputStream}
37+
*/
38+
@BenchmarkMode(Mode.SampleTime)
39+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
40+
@State(Scope.Group)
41+
public class QueueStreamBenchmark {
42+
43+
private static final int CAPACITY = 1024 * 1024;
44+
private static final int BUFFER_SIZE = 1024;
45+
46+
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(CAPACITY);
47+
private final QueueInputStream inputStream = QueueInputStream.builder()
48+
.setBlockingQueue(queue)
49+
.get();
50+
private final QueueOutputStream outputStream = inputStream.newQueueOutputStream();
51+
52+
private final byte[] input = RandomUtils.insecure().randomBytes(CAPACITY);
53+
private final byte[] output = new byte[BUFFER_SIZE];
54+
55+
@Benchmark
56+
@Group("streams")
57+
public void output() throws Exception {
58+
int sent = 0;
59+
while (sent < CAPACITY) {
60+
final int len = Math.min(CAPACITY - sent, BUFFER_SIZE);
61+
outputStream.write(input, sent, len);
62+
sent += len;
63+
}
64+
}
65+
66+
@Benchmark
67+
@Group("streams")
68+
public void input(Blackhole bh) throws Exception {
69+
int received = 0;
70+
while (received < CAPACITY) {
71+
final int len = inputStream.read(output, 0, BUFFER_SIZE);
72+
bh.consume(output);
73+
received += len;
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)