Skip to content

Commit 4f81274

Browse files
authored
Verify the file size does not change while AsyncRequestBody.fromFile's publisher is executing. (aws#2796)
This adds validation that the file size at the start of reading the file and at the end of reading the file are equal, as well as that we actually read the amount of data from the file that we expected to read. This will hopefully reduce some instances of checksum mismatched exceptions by replacing it with a more useful message. In the future, we might consider adding functionality for the customer to opt-in to locking the file for even more safety. That's not done as part of this PR, because it requires new public APIs and there hasn't been any customer demand for such a thing.
1 parent 5af29f8 commit 4f81274

File tree

3 files changed

+225
-18
lines changed

3 files changed

+225
-18
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "AWS SDK for Java v2",
3+
"contributor": "",
4+
"type": "feature",
5+
"description": "Provide an error message if a AsyncRequestBody.fromFile source file changes length or update time while the SDK is reading from the file."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import java.nio.channels.AsynchronousFileChannel;
2424
import java.nio.channels.CompletionHandler;
2525
import java.nio.file.Files;
26+
import java.nio.file.NoSuchFileException;
2627
import java.nio.file.Path;
2728
import java.nio.file.StandardOpenOption;
29+
import java.nio.file.attribute.FileTime;
2830
import java.util.Optional;
2931
import java.util.concurrent.atomic.AtomicLong;
3032
import org.reactivestreams.Subscriber;
@@ -90,13 +92,12 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
9092
// We need to synchronize here because the subscriber could call
9193
// request() from within onSubscribe which would potentially
9294
// trigger onNext before onSubscribe is finished.
93-
//
94-
// Note: size() can throw IOE here
95-
Subscription subscription = new FileSubscription(channel, channel.size(), s, chunkSizeInBytes);
95+
Subscription subscription = new FileSubscription(path, channel, s, chunkSizeInBytes);
96+
9697
synchronized (subscription) {
9798
s.onSubscribe(subscription);
9899
}
99-
} catch (IOException e) {
100+
} catch (IOException | RuntimeException e) {
100101
if (channel != null) {
101102
runAndLogError(log.logger(), "Unable to close file channel", channel::close);
102103
}
@@ -176,25 +177,31 @@ public FileAsyncRequestBody build() {
176177
* Reads the file for one subscriber.
177178
*/
178179
private static final class FileSubscription implements Subscription {
180+
private final Path path;
179181
private final AsynchronousFileChannel inputChannel;
180182
private final Subscriber<? super ByteBuffer> subscriber;
181183
private final int chunkSize;
182184

183185
private final AtomicLong position = new AtomicLong(0);
184186
private final AtomicLong remainingBytes = new AtomicLong(0);
187+
private final long sizeAtStart;
188+
private final FileTime modifiedTimeAtStart;
185189
private long outstandingDemand = 0;
186190
private boolean readInProgress = false;
187191
private volatile boolean done = false;
188192
private final Object lock = new Object();
189193

190-
private FileSubscription(AsynchronousFileChannel inputChannel,
191-
long size,
194+
private FileSubscription(Path path,
195+
AsynchronousFileChannel inputChannel,
192196
Subscriber<? super ByteBuffer> subscriber,
193-
int chunkSize) {
197+
int chunkSize) throws IOException {
198+
this.path = path;
194199
this.inputChannel = inputChannel;
195200
this.subscriber = subscriber;
196201
this.chunkSize = chunkSize;
197-
this.remainingBytes.set(Validate.isNotNegative(size, "size"));
202+
this.sizeAtStart = inputChannel.size();
203+
this.modifiedTimeAtStart = Files.getLastModifiedTime(path);
204+
this.remainingBytes.set(Validate.isNotNegative(sizeAtStart, "size"));
198205
}
199206

200207
@Override
@@ -307,6 +314,36 @@ private void signalOnNext(ByteBuffer attachment) {
307314
}
308315

309316
private void signalOnComplete() {
317+
try {
318+
long sizeAtEnd = Files.size(path);
319+
if (sizeAtStart != sizeAtEnd) {
320+
signalOnError(new IOException("File size changed after reading started. Initial size: " + sizeAtStart + ". "
321+
+ "Current size: " + sizeAtEnd));
322+
return;
323+
}
324+
325+
if (remainingBytes.get() > 0) {
326+
signalOnError(new IOException("Fewer bytes were read than were expected, was the file modified after "
327+
+ "reading started?"));
328+
return;
329+
}
330+
331+
FileTime modifiedTimeAtEnd = Files.getLastModifiedTime(path);
332+
if (modifiedTimeAtStart.compareTo(modifiedTimeAtEnd) != 0) {
333+
signalOnError(new IOException("File last-modified time changed after reading started. Initial modification "
334+
+ "time: " + modifiedTimeAtStart + ". Current modification time: " +
335+
modifiedTimeAtEnd));
336+
return;
337+
}
338+
} catch (NoSuchFileException e) {
339+
signalOnError(new IOException("Unable to check file status after read. Was the file deleted or were its "
340+
+ "permissions changed?", e));
341+
return;
342+
} catch (IOException e) {
343+
signalOnError(new IOException("Unable to check file status after read.", e));
344+
return;
345+
}
346+
310347
synchronized (this) {
311348
if (!done) {
312349
done = true;

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java

Lines changed: 174 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,38 +15,52 @@
1515

1616
package software.amazon.awssdk.core.internal.async;
1717

18+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
19+
import static org.junit.Assert.assertTrue;
20+
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
21+
22+
import java.io.ByteArrayOutputStream;
23+
import java.io.FileNotFoundException;
1824
import java.io.IOException;
1925
import java.nio.ByteBuffer;
20-
import java.nio.channels.AsynchronousFileChannel;
26+
import java.nio.charset.StandardCharsets;
2127
import java.nio.file.Files;
28+
import java.nio.file.NoSuchFileException;
2229
import java.nio.file.Path;
23-
import java.nio.file.StandardOpenOption;
30+
import java.nio.file.attribute.FileTime;
31+
import java.time.Instant;
2432
import java.util.concurrent.CompletableFuture;
2533
import java.util.concurrent.ExecutionException;
34+
import java.util.concurrent.Semaphore;
35+
import java.util.concurrent.ThreadLocalRandom;
2636
import java.util.concurrent.TimeUnit;
2737
import java.util.concurrent.TimeoutException;
28-
import java.util.concurrent.atomic.AtomicLong;
29-
import org.junit.AfterClass;
30-
import org.junit.BeforeClass;
38+
import org.junit.After;
39+
import org.junit.Before;
3140
import org.junit.Test;
3241
import org.reactivestreams.Subscriber;
3342
import org.reactivestreams.Subscription;
3443
import software.amazon.awssdk.core.async.AsyncRequestBody;
3544
import software.amazon.awssdk.testutils.RandomTempFile;
45+
import software.amazon.awssdk.utils.BinaryUtils;
3646

3747
public class FileAsyncRequestBodyTest {
3848
private static final long MiB = 1024 * 1024;
3949
private static final long TEST_FILE_SIZE = 10 * MiB;
4050
private static Path testFile;
4151

42-
@BeforeClass
43-
public static void setup() throws IOException {
52+
@Before
53+
public void setup() throws IOException {
4454
testFile = new RandomTempFile(TEST_FILE_SIZE).toPath();
4555
}
4656

47-
@AfterClass
48-
public static void teardown() throws IOException {
49-
Files.delete(testFile);
57+
@After
58+
public void teardown() throws IOException {
59+
try {
60+
Files.delete(testFile);
61+
} catch (NoSuchFileException e) {
62+
// ignore
63+
}
5064
}
5165

5266
// If we issue just enough requests to read the file entirely but not more (to go past EOF), we should still receive
@@ -92,4 +106,154 @@ public void onComplete() {
92106

93107
completed.get(5, TimeUnit.SECONDS);
94108
}
109+
110+
@Test
111+
public void changingFile_fileGetsShorterThanAlreadyRead_failsBecauseTooShort() throws Exception {
112+
AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder()
113+
.path(testFile)
114+
.build();
115+
116+
ControllableSubscriber subscriber = new ControllableSubscriber();
117+
118+
// Start reading file
119+
asyncRequestBody.subscribe(subscriber);
120+
subscriber.sub.request(1);
121+
assertTrue(subscriber.onNextSemaphore.tryAcquire(5, TimeUnit.SECONDS));
122+
123+
// Change the file to be shorter than the amount read so far
124+
Files.write(testFile, "Hello".getBytes(StandardCharsets.UTF_8));
125+
126+
// Finishing reading the file
127+
subscriber.sub.request(Long.MAX_VALUE);
128+
129+
assertThatThrownBy(() -> subscriber.completed.get(5, TimeUnit.SECONDS))
130+
.hasCauseInstanceOf(IOException.class);
131+
}
132+
133+
@Test
134+
public void changingFile_fileGetsShorterThanExistingLength_failsBecauseTooShort() throws Exception {
135+
AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder()
136+
.path(testFile)
137+
.build();
138+
139+
ControllableSubscriber subscriber = new ControllableSubscriber();
140+
141+
// Start reading file
142+
asyncRequestBody.subscribe(subscriber);
143+
subscriber.sub.request(1);
144+
assertTrue(subscriber.onNextSemaphore.tryAcquire(5, TimeUnit.SECONDS));
145+
146+
// Change the file to be 1 byte shorter than when we started
147+
int currentSize = Math.toIntExact(Files.size(testFile));
148+
byte[] slightlyShorterFileContent = new byte[currentSize - 1];
149+
ThreadLocalRandom.current().nextBytes(slightlyShorterFileContent);
150+
Files.write(testFile, slightlyShorterFileContent);
151+
152+
// Finishing reading the file
153+
subscriber.sub.request(Long.MAX_VALUE);
154+
155+
assertThatThrownBy(() -> subscriber.completed.get(5, TimeUnit.SECONDS))
156+
.hasCauseInstanceOf(IOException.class);
157+
}
158+
159+
@Test
160+
public void changingFile_fileGetsLongerThanExistingLength_failsBecauseTooLong() throws Exception {
161+
AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder()
162+
.path(testFile)
163+
.build();
164+
165+
ControllableSubscriber subscriber = new ControllableSubscriber();
166+
167+
// Start reading file
168+
asyncRequestBody.subscribe(subscriber);
169+
subscriber.sub.request(1);
170+
assertTrue(subscriber.onNextSemaphore.tryAcquire(5, TimeUnit.SECONDS));
171+
172+
// Change the file to be 1 byte longer than when we started
173+
int currentSize = Math.toIntExact(Files.size(testFile));
174+
byte[] slightlyLongerFileContent = new byte[currentSize + 1];
175+
ThreadLocalRandom.current().nextBytes(slightlyLongerFileContent);
176+
Files.write(testFile, slightlyLongerFileContent);
177+
178+
// Finishing reading the file
179+
subscriber.sub.request(Long.MAX_VALUE);
180+
181+
assertThatThrownBy(() -> subscriber.completed.get(5, TimeUnit.SECONDS))
182+
.hasCauseInstanceOf(IOException.class);
183+
}
184+
185+
@Test
186+
public void changingFile_fileGetsTouched_failsBecauseUpdatedModificationTime() throws Exception {
187+
AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder()
188+
.path(testFile)
189+
.build();
190+
191+
ControllableSubscriber subscriber = new ControllableSubscriber();
192+
193+
// Start reading file
194+
asyncRequestBody.subscribe(subscriber);
195+
subscriber.sub.request(1);
196+
assertTrue(subscriber.onNextSemaphore.tryAcquire(5, TimeUnit.SECONDS));
197+
198+
// Change the file to be updated
199+
Thread.sleep(1_000); // Wait for 1 second so that we are definitely in a different second than when the file was created
200+
Files.setLastModifiedTime(testFile, FileTime.from(Instant.now()));
201+
202+
// Finishing reading the file
203+
subscriber.sub.request(Long.MAX_VALUE);
204+
205+
assertThatThrownBy(() -> subscriber.completed.get(5, TimeUnit.SECONDS))
206+
.hasCauseInstanceOf(IOException.class);
207+
}
208+
209+
@Test
210+
public void changingFile_fileGetsDeleted_failsBecauseDeleted() throws Exception {
211+
AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder()
212+
.path(testFile)
213+
.build();
214+
215+
ControllableSubscriber subscriber = new ControllableSubscriber();
216+
217+
// Start reading file
218+
asyncRequestBody.subscribe(subscriber);
219+
subscriber.sub.request(1);
220+
assertTrue(subscriber.onNextSemaphore.tryAcquire(5, TimeUnit.SECONDS));
221+
222+
// Delete the file
223+
Files.delete(testFile);
224+
225+
// Finishing reading the file
226+
subscriber.sub.request(Long.MAX_VALUE);
227+
228+
assertThatThrownBy(() -> subscriber.completed.get(5, TimeUnit.SECONDS))
229+
.hasCauseInstanceOf(IOException.class);
230+
}
231+
232+
private static class ControllableSubscriber implements Subscriber<ByteBuffer> {
233+
private final ByteArrayOutputStream output = new ByteArrayOutputStream();
234+
private final CompletableFuture<Void> completed = new CompletableFuture<>();
235+
private final Semaphore onNextSemaphore = new Semaphore(0);
236+
private Subscription sub;
237+
238+
@Override
239+
public void onSubscribe(Subscription subscription) {
240+
this.sub = subscription;
241+
}
242+
243+
@Override
244+
public void onNext(ByteBuffer byteBuffer) {
245+
invokeSafely(() -> output.write(BinaryUtils.copyBytesFrom(byteBuffer)));
246+
onNextSemaphore.release();
247+
}
248+
249+
@Override
250+
public void onError(Throwable throwable) {
251+
completed.completeExceptionally(throwable);
252+
}
253+
254+
@Override
255+
public void onComplete() {
256+
completed.complete(null);
257+
}
258+
}
95259
}

0 commit comments

Comments
 (0)