Skip to content

Commit e54f435

Browse files
Tyler Roachmattcreaser
andauthored
fix(storage): Download Accuracy Fix (#3156)
Co-authored-by: Matt Creaser <[email protected]>
1 parent 8b2ef91 commit e54f435

File tree

3 files changed

+63
-7
lines changed

3 files changed

+63
-7
lines changed

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/TransferOperations.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import com.amplifyframework.storage.s3.transfer.worker.InitiateMultiPartUploadTr
3838
import com.amplifyframework.storage.s3.transfer.worker.PartUploadTransferWorker
3939
import com.amplifyframework.storage.s3.transfer.worker.RouterWorker
4040
import com.amplifyframework.storage.s3.transfer.worker.SinglePartUploadWorker
41+
import java.util.concurrent.TimeUnit
4142

4243
internal object TransferOperations {
4344

@@ -84,7 +85,13 @@ internal object TransferOperations {
8485
): Boolean {
8586
if (TransferState.isStarted(transferRecord.state) && !TransferState.isInTerminalState(transferRecord.state)) {
8687
transferStatusUpdater.updateTransferState(transferRecord.id, TransferState.PENDING_PAUSE)
87-
workManager.cancelUniqueWork(transferRecord.id.toString())
88+
try {
89+
workManager.cancelUniqueWork(
90+
transferRecord.id.toString()
91+
).result.get(1, TimeUnit.SECONDS)
92+
} catch (_: Exception) {
93+
// do nothing
94+
}
8895
return true
8996
}
9097
return false
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.amplifyframework.storage.s3.transfer
17+
18+
import java.io.BufferedOutputStream
19+
import java.io.OutputStream
20+
21+
/**
22+
* BufferedOutputStream that allows clearing the buffer without flushing. This may especially be important when
23+
* attempting to close the BufferedOutputStream without flushing the buffer.
24+
*/
25+
internal class ClearableBufferedOutputStream(
26+
out: OutputStream,
27+
size: Int = 8192
28+
) : BufferedOutputStream(out, size) {
29+
30+
fun clear() {
31+
count = 0
32+
buf.fill(0)
33+
}
34+
}

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/DownloadWorker.kt

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,15 @@ import aws.smithy.kotlin.runtime.content.ByteStream
2424
import aws.smithy.kotlin.runtime.content.writeToFile
2525
import aws.smithy.kotlin.runtime.io.SdkSource
2626
import aws.smithy.kotlin.runtime.io.buffer
27+
import com.amplifyframework.storage.s3.transfer.ClearableBufferedOutputStream
2728
import com.amplifyframework.storage.s3.transfer.DownloadProgressListener
2829
import com.amplifyframework.storage.s3.transfer.DownloadProgressListenerInterceptor
2930
import com.amplifyframework.storage.s3.transfer.StorageTransferClientProvider
3031
import com.amplifyframework.storage.s3.transfer.TransferDB
3132
import com.amplifyframework.storage.s3.transfer.TransferStatusUpdater
32-
import java.io.BufferedOutputStream
3333
import java.io.File
3434
import java.io.FileOutputStream
3535
import kotlinx.coroutines.Dispatchers
36-
import kotlinx.coroutines.currentCoroutineContext
3736
import kotlinx.coroutines.isActive
3837
import kotlinx.coroutines.withContext
3938

@@ -101,9 +100,11 @@ internal class DownloadWorker(
101100
val append = file.length() > 0
102101
val fileOutputStream = FileOutputStream(file, append)
103102
var totalRead = 0L
104-
BufferedOutputStream(fileOutputStream).use { fileOutput ->
103+
// use ensures the underlying source is closed. In this case, a BufferedOutputStream. By default,
104+
// a bos flushes on close. We may not want this behavior, so we use ClearableBufferedOutputStream.
105+
ClearableBufferedOutputStream(fileOutputStream).use { fileOutput ->
105106
val copied = 0L
106-
while (currentCoroutineContext().isActive) {
107+
while (isActive) {
107108
val remaining = limit - copied
108109
if (remaining == 0L) break
109110
val readBytes =
@@ -112,10 +113,24 @@ internal class DownloadWorker(
112113
if (readBytes > 0) {
113114
totalRead += readBytes
114115
}
115-
fileOutput.write(buffer, 0, readBytes)
116+
if (isActive) {
117+
// Double check to make sure that we are still active before writing to buffer
118+
fileOutput.write(buffer, 0, readBytes)
119+
} else {
120+
// If we are no longer active, clear the buffer so that no more data is written to the
121+
// file. A resume operation may have already started, and it resumes based on the
122+
// file size at its start. A flush here could result in duplicating file data
123+
fileOutput.clear()
124+
}
116125
}
117-
if (sourceStream.buffer().exhausted()) {
126+
if (sourceStream.buffer().exhausted() && isActive) {
127+
// Double check to make sure that we are still active before flushing to buffer
118128
fileOutput.flush()
129+
} else {
130+
// If we are no longer active, clear the buffer so that no more data is written to the
131+
// file. A resume operation may have already started, and it resumes based on the
132+
// file size at its start. A flush here could result in duplicating file data
133+
fileOutput.clear()
119134
}
120135
}
121136
}

0 commit comments

Comments
 (0)