|
23 | 23 | import java.io.IOException; |
24 | 24 | import java.io.InputStream; |
25 | 25 | import java.io.OutputStream; |
| 26 | +import java.util.ArrayDeque; |
| 27 | +import java.util.Deque; |
26 | 28 | import java.util.LinkedList; |
27 | 29 | import java.util.Queue; |
28 | 30 | import java.util.concurrent.TimeUnit; |
@@ -252,7 +254,7 @@ public int getLength() { |
252 | 254 |
|
253 | 255 | private final int maxUnconfirmedReads; |
254 | 256 | private final long readAheadLimit; |
255 | | - private final Queue<UnconfirmedRead> unconfirmedReads = new LinkedList<>(); |
| 257 | + private final Deque<UnconfirmedRead> unconfirmedReads = new ArrayDeque<>(); |
256 | 258 |
|
257 | 259 | private long currentOffset; |
258 | 260 | private int maxReadLength = Integer.MAX_VALUE; |
@@ -336,7 +338,14 @@ public int read(byte[] into, int off, int len) throws IOException { |
336 | 338 | // we also need to go here for len <= 0, because pending may be at |
337 | 339 | // EOF in which case it would return -1 instead of 0 |
338 | 340 |
|
339 | | - long requestOffset = currentOffset; |
| 341 | + long requestOffset; |
| 342 | + if (unconfirmedReads.isEmpty()) { |
| 343 | + requestOffset = currentOffset; |
| 344 | + } |
| 345 | + else { |
| 346 | + final UnconfirmedRead lastRequest = unconfirmedReads.getLast(); |
| 347 | + requestOffset = lastRequest.offset + lastRequest.length; |
| 348 | + } |
340 | 349 | while (unconfirmedReads.size() <= maxUnconfirmedReads) { |
341 | 350 | // Send read requests as long as there is no EOF and we have not reached the maximum parallelism |
342 | 351 | int reqLen = Math.min(Math.max(1024, len), maxReadLength); |
|
0 commit comments