@@ -220,16 +220,42 @@ public int read(byte[] into, int off, int len) throws IOException {
220220
221221 public class ReadAheadRemoteFileInputStream
222222 extends InputStream {
223+ private class UnconfirmedRead {
224+ private final long offset ;
225+ private final Promise <Response , SFTPException > promise ;
226+ private final int length ;
227+
228+ private UnconfirmedRead (long offset , int length , Promise <Response , SFTPException > promise ) {
229+ this .offset = offset ;
230+ this .length = length ;
231+ this .promise = promise ;
232+ }
233+
234+ UnconfirmedRead (long offset , int length ) throws IOException {
235+ this (offset , length , RemoteFile .this .asyncRead (offset , length ));
236+ }
237+
238+ public long getOffset () {
239+ return offset ;
240+ }
241+
242+ public Promise <Response , SFTPException > getPromise () {
243+ return promise ;
244+ }
245+
246+ public int getLength () {
247+ return length ;
248+ }
249+ }
223250
224251 private final byte [] b = new byte [1 ];
225252
226253 private final int maxUnconfirmedReads ;
227254 private final long readAheadLimit ;
228- private final Queue <Promise <Response , SFTPException >> unconfirmedReads = new LinkedList <Promise <Response , SFTPException >>();
229- private final Queue <Long > unconfirmedReadOffsets = new LinkedList <Long >();
255+ private final Queue <UnconfirmedRead > unconfirmedReads = new LinkedList <>();
230256
231- private long requestOffset ;
232- private long responseOffset ;
257+ private long currentOffset ;
258+ private int maxReadLength = Integer . MAX_VALUE ;
233259 private boolean eof ;
234260
235261 public ReadAheadRemoteFileInputStream (int maxUnconfirmedReads ) {
@@ -247,28 +273,42 @@ public ReadAheadRemoteFileInputStream(int maxUnconfirmedReads, long fileOffset,
247273 assert 0 <= fileOffset ;
248274
249275 this .maxUnconfirmedReads = maxUnconfirmedReads ;
250- this .requestOffset = this . responseOffset = fileOffset ;
276+ this .currentOffset = fileOffset ;
251277 this .readAheadLimit = readAheadLimit > 0 ? fileOffset + readAheadLimit : Long .MAX_VALUE ;
252278 }
253279
254280 private ByteArrayInputStream pending = new ByteArrayInputStream (new byte [0 ]);
255281
256282 private boolean retrieveUnconfirmedRead (boolean blocking ) throws IOException {
257- if (unconfirmedReads .size () <= 0 ) {
283+ final UnconfirmedRead unconfirmedRead = unconfirmedReads .peek ();
284+ if (unconfirmedRead == null || !blocking && !unconfirmedRead .getPromise ().isDelivered ()) {
258285 return false ;
259286 }
287+ unconfirmedReads .remove (unconfirmedRead );
260288
261- if (!blocking && !unconfirmedReads .peek ().isDelivered ()) {
262- return false ;
263- }
264-
265- unconfirmedReadOffsets .remove ();
266- final Response res = unconfirmedReads .remove ().retrieve (requester .getTimeoutMs (), TimeUnit .MILLISECONDS );
289+ final Response res = unconfirmedRead .promise .retrieve (requester .getTimeoutMs (), TimeUnit .MILLISECONDS );
267290 switch (res .getType ()) {
268291 case DATA :
269292 int recvLen = res .readUInt32AsInt ();
270- responseOffset += recvLen ;
271- pending = new ByteArrayInputStream (res .array (), res .rpos (), recvLen );
293+ if (unconfirmedRead .offset == currentOffset ) {
294+ currentOffset += recvLen ;
295+ pending = new ByteArrayInputStream (res .array (), res .rpos (), recvLen );
296+
297+ if (recvLen < unconfirmedRead .length ) {
298+ // The server returned a packet smaller than the client had requested.
299+ // It can be caused by at least one of the following:
300+ // * The file has been read fully. Then, few futile read requests can be sent during
301+ // the next read(), but the file will be downloaded correctly anyway.
302+ // * The server shapes the request length. Then, the read window will be adjusted,
303+ // and all further read-ahead requests won't be shaped.
304+ // * The file on the server is not a regular file, it is something like fifo.
305+ // Then, the window will shrink, and the client will start reading the file slower than it
306+ // hypothetically can. It must be a rare case, and it is not worth implementing a sort of
307+ // congestion control algorithm here.
308+ maxReadLength = recvLen ;
309+ unconfirmedReads .clear ();
310+ }
311+ }
272312 break ;
273313
274314 case STATUS :
@@ -296,49 +336,24 @@ public int read(byte[] into, int off, int len) throws IOException {
296336 // we also need to go here for len <= 0, because pending may be at
297337 // EOF in which case it would return -1 instead of 0
298338
339+ long requestOffset = currentOffset ;
299340 while (unconfirmedReads .size () <= maxUnconfirmedReads ) {
300341 // Send read requests as long as there is no EOF and we have not reached the maximum parallelism
301- int reqLen = Math .max (1024 , len ); // don't be shy!
342+ int reqLen = Math .min ( Math . max (1024 , len ), maxReadLength );
302343 if (readAheadLimit > requestOffset ) {
303344 long remaining = readAheadLimit - requestOffset ;
304345 if (reqLen > remaining ) {
305346 reqLen = (int ) remaining ;
306347 }
307348 }
308- unconfirmedReads .add (RemoteFile .this .asyncRead (requestOffset , reqLen ));
309- unconfirmedReadOffsets .add (requestOffset );
349+ unconfirmedReads .add (new UnconfirmedRead (requestOffset , reqLen ));
310350 requestOffset += reqLen ;
311351 if (requestOffset >= readAheadLimit ) {
312352 break ;
313353 }
314354 }
315355
316- long nextOffset = unconfirmedReadOffsets .peek ();
317- if (responseOffset != nextOffset ) {
318-
319- // the server could not give us all the data we needed, so
320- // we try to fill the gap synchronously
321-
322- assert responseOffset < nextOffset ;
323- assert 0 < (nextOffset - responseOffset );
324- assert (nextOffset - responseOffset ) <= Integer .MAX_VALUE ;
325-
326- byte [] buf = new byte [(int ) (nextOffset - responseOffset )];
327- int recvLen = RemoteFile .this .read (responseOffset , buf , 0 , buf .length );
328-
329- if (recvLen < 0 ) {
330- eof = true ;
331- return -1 ;
332- }
333-
334- if (0 == recvLen ) {
335- // avoid infinite loops
336- throw new SFTPException ("Unexpected response size (0), bailing out" );
337- }
338-
339- responseOffset += recvLen ;
340- pending = new ByteArrayInputStream (buf , 0 , recvLen );
341- } else if (!retrieveUnconfirmedRead (true /*blocking*/ )) {
356+ if (!retrieveUnconfirmedRead (true /*blocking*/ )) {
342357
343358 // this may happen if we change prefetch strategy
344359 // currently, we should never get here...
0 commit comments