@@ -71,6 +71,8 @@ public final class FileAsyncRequestBody implements AsyncRequestBody {
7171 private final int chunkSizeInBytes ;
7272 private final long position ;
7373 private final long numBytesToRead ;
74+ private final FileTime modifiedTimeAtStart ;
75+ private final long sizeAtStart ;
7476
7577 private FileAsyncRequestBody (DefaultBuilder builder ) {
7678 this .path = builder .path ;
@@ -79,6 +81,22 @@ private FileAsyncRequestBody(DefaultBuilder builder) {
7981 this .position = builder .position == null ? 0 : Validate .isNotNegative (builder .position , "position" );
8082 this .numBytesToRead = builder .numBytesToRead == null ? fileLength - this .position :
8183 Validate .isNotNegative (builder .numBytesToRead , "numBytesToRead" );
84+ try {
85+ if (builder .modifiedTimeAtStart != null ) {
86+ this .modifiedTimeAtStart = builder .modifiedTimeAtStart ;
87+ } else {
88+ this .modifiedTimeAtStart = Files .getLastModifiedTime (path );
89+ }
90+
91+ if (builder .sizeAtStart != null ) {
92+ this .sizeAtStart = builder .sizeAtStart ;
93+ } else {
94+ this .sizeAtStart = Files .size (path );
95+ }
96+ } catch (IOException e ) {
97+ throw new RuntimeException (e );
98+ }
99+
82100 }
83101
84102 @ Override
@@ -112,6 +130,14 @@ public long numBytesToRead() {
112130 return numBytesToRead ;
113131 }
114132
133+ public FileTime modifiedTimeAtStart () {
134+ return modifiedTimeAtStart ;
135+ }
136+
137+ public long sizeAtStart () {
138+ return sizeAtStart ;
139+ }
140+
115141 @ Override
116142 public Optional <Long > contentLength () {
117143 return Optional .of (numBytesToRead );
@@ -131,7 +157,7 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
131157 // We need to synchronize here because the subscriber could call
132158 // request() from within onSubscribe which would potentially
133159 // trigger onNext before onSubscribe is finished.
134- Subscription subscription = new FileSubscription (channel , s );
160+ Subscription subscription = new FileSubscription (channel , s , modifiedTimeAtStart , sizeAtStart );
135161
136162 synchronized (subscription ) {
137163 s .onSubscribe (subscription );
@@ -203,6 +229,20 @@ public interface Builder extends SdkBuilder<Builder, FileAsyncRequestBody> {
203229 * @return The builder for method chaining.
204230 */
205231 Builder numBytesToRead (Long numBytesToRead );
232+
233+ /**
234+ * Optional - sets the file modified time at the start of the request.
235+ * @param modifiedTimeAtStart initial file modification time
236+ * @return The builder for method chaining.
237+ */
238+ Builder modifiedTimeAtStart (FileTime modifiedTimeAtStart );
239+
240+ /**
241+ * Optional - sets the file size in bytes at the start of the request.
242+ * @param sizeAtStart initial file size at start.
243+ * @return The builder for method chaining.
244+ */
245+ Builder sizeAtStart (Long sizeAtStart );
206246 }
207247
208248 private static final class DefaultBuilder implements Builder {
@@ -211,6 +251,8 @@ private static final class DefaultBuilder implements Builder {
211251 private Path path ;
212252 private Integer chunkSizeInBytes ;
213253 private Long numBytesToRead ;
254+ private FileTime modifiedTimeAtStart ;
255+ private Long sizeAtStart ;
214256
215257 @ Override
216258 public Builder path (Path path ) {
@@ -240,6 +282,18 @@ public Builder numBytesToRead(Long numBytesToRead) {
240282 return this ;
241283 }
242284
285+ @ Override
286+ public Builder modifiedTimeAtStart (FileTime modifiedTimeAtStart ) {
287+ this .modifiedTimeAtStart = modifiedTimeAtStart ;
288+ return this ;
289+ }
290+
291+ @ Override
292+ public Builder sizeAtStart (Long sizeAtStart ) {
293+ this .sizeAtStart = sizeAtStart ;
294+ return this ;
295+ }
296+
243297 public void setChunkSizeInBytes (Integer chunkSizeInBytes ) {
244298 chunkSizeInBytes (chunkSizeInBytes );
245299 }
@@ -267,11 +321,12 @@ private final class FileSubscription implements Subscription {
267321 private final Object lock = new Object ();
268322
269323 private FileSubscription (AsynchronousFileChannel inputChannel ,
270- Subscriber <? super ByteBuffer > subscriber ) throws IOException {
324+ Subscriber <? super ByteBuffer > subscriber ,
325+ FileTime modifiedTimeAtStart , long sizeAtStart ) throws IOException {
271326 this .inputChannel = inputChannel ;
272327 this .subscriber = subscriber ;
273- this .sizeAtStart = inputChannel . size () ;
274- this .modifiedTimeAtStart = Files . getLastModifiedTime ( path ) ;
328+ this .sizeAtStart = sizeAtStart ;
329+ this .modifiedTimeAtStart = modifiedTimeAtStart ;
275330 this .remainingBytes = new AtomicLong (numBytesToRead );
276331 this .currentPosition = new AtomicLong (position );
277332 }
@@ -338,12 +393,19 @@ public void completed(Integer result, ByteBuffer attachment) {
338393
339394 int readBytes = attachment .remaining ();
340395 currentPosition .addAndGet (readBytes );
341- remainingBytes .addAndGet (-readBytes );
396+ long remaining = remainingBytes .addAndGet (-readBytes );
397+
398+ // we need to validate the file is unchanged before providing the last bytes to subscriber
399+ // the subscriber (eg: NettyRequestExecutor) may cancel subscription once all expected bytes have
400+ // been received. Validating here ensures errors are correctly signaled.
401+ if (remaining == 0 ) {
402+ closeFile ();
403+ validateFileUnchanged ();
404+ }
342405
343406 signalOnNext (attachment );
344407
345- if (remainingBytes .get () == 0 ) {
346- closeFile ();
408+ if (remaining == 0 ) {
347409 signalOnComplete ();
348410 }
349411
@@ -391,42 +453,50 @@ private void signalOnNext(ByteBuffer attachment) {
391453 }
392454
393455 private void signalOnComplete () {
456+ if (!validateFileUnchanged ()) {
457+ return ;
458+ }
459+
460+ synchronized (this ) {
461+ if (!done ) {
462+ done = true ;
463+ subscriber .onComplete ();
464+ }
465+ }
466+ }
467+
468+ private boolean validateFileUnchanged () {
394469 try {
395470 long sizeAtEnd = Files .size (path );
396471 if (sizeAtStart != sizeAtEnd ) {
397- signalOnError (new IOException ("File size changed after reading started. Initial size: " + sizeAtStart + ". "
398- + " Current size: " + sizeAtEnd ));
399- return ;
472+ signalOnError (new RuntimeException ("File size changed after reading started. Initial size: "
473+ + sizeAtStart + ". Current size: " + sizeAtEnd ));
474+ return false ;
400475 }
401476
402477 if (remainingBytes .get () > 0 ) {
403- signalOnError (new IOException ("Fewer bytes were read than were expected, was the file modified after "
478+ signalOnError (new RuntimeException ("Fewer bytes were read than were expected, was the file modified after "
404479 + "reading started?" ));
405- return ;
480+ return false ;
406481 }
407482
408483 FileTime modifiedTimeAtEnd = Files .getLastModifiedTime (path );
409484 if (modifiedTimeAtStart .compareTo (modifiedTimeAtEnd ) != 0 ) {
410- signalOnError (new IOException ("File last-modified time changed after reading started. Initial modification "
411- + "time: " + modifiedTimeAtStart + ". Current modification time: " +
412- modifiedTimeAtEnd ));
413- return ;
485+ signalOnError (
486+ new RuntimeException ("File last-modified time changed after reading started. Initial modification "
487+ + "time: " + modifiedTimeAtStart + ". Current modification time: " +
488+ modifiedTimeAtEnd ));
489+ return false ;
414490 }
415491 } catch (NoSuchFileException e ) {
416492 signalOnError (new IOException ("Unable to check file status after read. Was the file deleted or were its "
417493 + "permissions changed?" , e ));
418- return ;
494+ return false ;
419495 } catch (IOException e ) {
420496 signalOnError (new IOException ("Unable to check file status after read." , e ));
421- return ;
422- }
423-
424- synchronized (this ) {
425- if (!done ) {
426- done = true ;
427- subscriber .onComplete ();
428- }
497+ return false ;
429498 }
499+ return true ;
430500 }
431501
432502 private void signalOnError (Throwable t ) {
0 commit comments