Skip to content
This repository was archived by the owner on Jul 19, 2024. It is now read-only.

Commit d8d0477

Browse files
committed
Fixed some overflow bugs in TransferManager and reliable download setup in BlobURL
1 parent c362ce7 commit d8d0477

File tree

9 files changed

+175
-88
lines changed

9 files changed

+175
-88
lines changed

ChangeLog.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ XXXX.XX.XX Version XX.X.X
44
* Added CopyFromURL, which will do a synchronous server-side copy, meaning the service will not return an HTTP response until it has completed the copy.
55
* Added support for IProgressReceiver in TransferManager operations. This parameter was previously ignored but is now supported.
66
* Removed internal dependency on javafx to be compatible with openjdk.
7+
* Fixed a bug that would cause downloading large files with the TransferManager to fail and a bug in BlobURL.download() logic for setting up reliable download.
78

89
2018.09.11 Version 10.1.0
910
* Interfaces for helper types updated to be more consistent throughout the library. All types, with the exception of the options for pipeline factories, use a fluent pattern.

src/main/java/com/microsoft/azure/storage/blob/BlobURL.java

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -322,26 +322,6 @@ public Single<DownloadResponse> download() {
322322
return this.download(null, null, false, null);
323323
}
324324

325-
/**
326-
* Reads a range of bytes from a blob. The response also includes the blob's properties and metadata. For more
327-
* information, see the <a href="https://docs.microsoft.com/rest/api/storageservices/get-blob">Azure Docs</a>.
328-
* <p>
329-
* Note that the response body has reliable download functionality built in, meaning that a failed download stream
330-
* will be automatically retried. This behavior may be configured with {@link ReliableDownloadOptions}.
331-
*
332-
* @param range
333-
* {@link BlobRange}
334-
*
335-
* @return Emits the successful response.
336-
*
337-
* @apiNote ## Sample Code \n
338-
* [!code-java[Sample_Code](../azure-storage-java/src/test/java/com/microsoft/azure/storage/Samples.java?name=upload_download "Sample code for BlobURL.download")] \n
339-
* For more samples, please see the [Samples file](%https://github.com/Azure/azure-storage-java/blob/master/src/test/java/com/microsoft/azure/storage/Samples.java)
340-
*/
341-
public Single<DownloadResponse> download(BlobRange range) {
342-
return this.download(range, null, false, null);
343-
}
344-
345325
/**
346326
* Reads a range of bytes from a blob. The response also includes the blob's properties and metadata. For more
347327
* information, see the <a href="https://docs.microsoft.com/rest/api/storageservices/get-blob">Azure Docs</a>.
@@ -374,7 +354,7 @@ public Single<DownloadResponse> download(BlobRange range, BlobAccessConditions a
374354
range = range == null ? BlobRange.DEFAULT : range;
375355
accessConditions = accessConditions == null ? BlobAccessConditions.NONE : accessConditions;
376356
HTTPGetterInfo info = new HTTPGetterInfo()
377-
.withCount(range.offset())
357+
.withOffset(range.offset())
378358
.withCount(range.count())
379359
.withETag(accessConditions.modifiedAccessConditions().ifMatch());
380360

src/main/java/com/microsoft/azure/storage/blob/DownloadResponse.java

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -73,21 +73,12 @@ public Flowable<ByteBuffer> body(ReliableDownloadOptions options) {
7373
return this.rawResponse.body();
7474
}
7575

76-
return this.rawResponse.body()
77-
/*
78-
Update how much data we have received in case we need to retry and propagate to the user the data we
79-
have received.
80-
*/
81-
.doOnNext(buffer -> {
82-
this.info.withOffset(this.info.offset() + buffer.remaining());
83-
if (info.count() != null) {
84-
this.info.withCount(this.info.count() - buffer.remaining());
85-
}
86-
})
87-
.onErrorResumeNext(throwable -> {
88-
// So far we have tried once but retried zero times.
89-
return tryContinueFlowable(throwable, 0, optionsReal);
90-
});
76+
/*
77+
We pass -1 for currentRetryCount because we want tryContinueFlowable to receive a value of 0 for number of
78+
retries as we have not actually retried yet, only made the initial try. Because applyReliableDownload() will
79+
add 1 before calling into tryContinueFlowable, we set the initial value to -1.
80+
*/
81+
return this.applyReliableDownload(this.rawResponse.body(), -1, optionsReal);
9182
}
9283

9384
private Flowable<ByteBuffer> tryContinueFlowable(Throwable t, int retryCount, ReliableDownloadOptions options) {
@@ -98,30 +89,38 @@ private Flowable<ByteBuffer> tryContinueFlowable(Throwable t, int retryCount, Re
9889
try {
9990
// Get a new response and try reading from it.
10091
return getter.apply(this.info)
101-
.flatMapPublisher(response -> {
102-
// Do not compound the number of retries; just get the raw body.
103-
ReliableDownloadOptions newOptions = new ReliableDownloadOptions()
104-
.withMaxRetryRequests(0);
105-
106-
return response.body(newOptions)
107-
.doOnNext(buffer -> {
108-
this.info.withOffset(this.info.offset() + buffer.remaining());
109-
if (info.count() != null) {
110-
this.info.withCount(this.info.count() - buffer.remaining());
111-
}
112-
})
113-
.onErrorResumeNext(t2 -> {
114-
// Increment the retry count and try again with the new exception.
115-
return tryContinueFlowable(t2, retryCount + 1, options);
116-
});
117-
});
92+
.flatMapPublisher(response ->
93+
/*
94+
Do not compound the number of retries by passing in another set of downloadOptions; just get
95+
the raw body.
96+
*/
97+
this.applyReliableDownload(this.rawResponse.body(), retryCount, options));
11898
} catch (Exception e) {
11999
// If the getter fails, return the getter failure to the user.
120100
return Flowable.error(e);
121101
}
122102
}
123103
}
124104

105+
private Flowable<ByteBuffer> applyReliableDownload(Flowable<ByteBuffer> data,
106+
int currentRetryCount, ReliableDownloadOptions options) {
107+
return data
108+
.doOnNext(buffer -> {
109+
/*
110+
Update how much data we have received in case we need to retry and propagate to the user the data we
111+
have received.
112+
*/
113+
this.info.withOffset(this.info.offset() + buffer.remaining());
114+
if (this.info.count() != null) {
115+
this.info.withCount(this.info.count() - buffer.remaining());
116+
}
117+
})
118+
.onErrorResumeNext(t2 -> {
119+
// Increment the retry count and try again with the new exception.
120+
return tryContinueFlowable(t2, currentRetryCount + 1, options);
121+
});
122+
}
123+
125124
public int statusCode() {
126125
return this.rawResponse.statusCode();
127126
}

src/main/java/com/microsoft/azure/storage/blob/TransferManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,10 @@ public static Single<CommonRestResponse> uploadFileToBlockBlob(
106106
the list of Ids later. Eager ensures parallelism but may require some internal buffering.
107107
*/
108108
.concatMapEager(i -> {
109-
int count = Math.min(blockLength, (int) (file.size() - i * blockLength));
110-
Flowable<ByteBuffer> data = FlowableUtil.readFile(file, i * blockLength, count);
109+
// The max number of bytes for a block is currently 100MB, so the final result must be an int.
110+
int count = (int) Math.min((long)blockLength, (file.size() - i * (long)blockLength));
111+
// i * blockLength could be a long, so we need a cast to prevent overflow.
112+
Flowable<ByteBuffer> data = FlowableUtil.readFile(file, i * (long)blockLength, count);
111113

112114
// Report progress as necessary.
113115
data = ProgressReporter.addParallelProgressReporting(data, optionsReal.progressReceiver(),

src/test/java/com/microsoft/azure/storage/APISpec.groovy

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ class APISpec extends Specification {
229229
}
230230
}
231231

232-
static ByteBuffer getRandomData(long size) {
232+
static ByteBuffer getRandomData(int size) {
233233
Random rand = new Random(getRandomSeed())
234234
byte[] data = new byte[size]
235235
rand.nextBytes(data)
@@ -240,7 +240,14 @@ class APISpec extends Specification {
240240
File file = File.createTempFile(UUID.randomUUID().toString(), ".txt")
241241
file.deleteOnExit()
242242
FileOutputStream fos = new FileOutputStream(file)
243-
fos.write(getRandomData(size).array())
243+
Random rand = new Random(getRandomSeed())
244+
while (size > 0) {
245+
int sizeToWrite = (int) Math.min((long)(Integer.MAX_VALUE.longValue()/10L), size)
246+
byte[] data = new byte[sizeToWrite]
247+
rand.nextBytes(data)
248+
fos.write(data)
249+
size -= sizeToWrite
250+
}
244251
fos.close()
245252
return file
246253
}
@@ -431,7 +438,6 @@ class APISpec extends Specification {
431438
to play too nicely with mocked objects and the complex reflection stuff on both ends made it more difficult to work
432439
with than was worth it.
433440
*/
434-
435441
def getStubResponse(int code, Class responseHeadersType) {
436442
return new HttpResponse() {
437443

@@ -477,6 +483,58 @@ class APISpec extends Specification {
477483
}
478484
}
479485

486+
/*
487+
This is for stubbing responses that will actually go through the pipeline and autorest code. Autorest does not seem
488+
to play too nicely with mocked objects and the complex reflection stuff on both ends made it more difficult to work
489+
with than was worth it. Because this type is just for BlobDownload, we don't need to accept a header type.
490+
*/
491+
def getStubResponseForBlobDownload(int code, Flowable<ByteBuffer> body, String etag) {
492+
return new HttpResponse() {
493+
494+
@Override
495+
int statusCode() {
496+
return code
497+
}
498+
499+
@Override
500+
String headerValue(String s) {
501+
return null
502+
}
503+
504+
@Override
505+
HttpHeaders headers() {
506+
return new HttpHeaders()
507+
}
508+
509+
@Override
510+
Flowable<ByteBuffer> body() {
511+
return body
512+
}
513+
514+
@Override
515+
Single<byte[]> bodyAsByteArray() {
516+
return null
517+
}
518+
519+
@Override
520+
Single<String> bodyAsString() {
521+
return null
522+
}
523+
524+
@Override
525+
Object deserializedHeaders() {
526+
def headers = new BlobDownloadHeaders()
527+
headers.withETag(etag)
528+
return headers
529+
}
530+
531+
@Override
532+
boolean isDecoded() {
533+
return true
534+
}
535+
}
536+
}
537+
480538
def getContextStubPolicy(int successCode, Class responseHeadersType) {
481539
return Mock(RequestPolicy) {
482540
sendAsync(_) >> { HttpRequest request ->

src/test/java/com/microsoft/azure/storage/BlobAPITest.groovy

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ package com.microsoft.azure.storage
1818
import com.microsoft.azure.storage.blob.*
1919
import com.microsoft.azure.storage.blob.models.*
2020
import com.microsoft.rest.v2.http.HttpPipeline
21+
import com.microsoft.rest.v2.http.HttpRequest
22+
import com.microsoft.rest.v2.policy.RequestPolicy
2123
import com.microsoft.rest.v2.util.FlowableUtil
2224
import io.reactivex.Flowable
25+
import io.reactivex.Single
2326
import spock.lang.Unroll
2427

2528
import java.nio.ByteBuffer
@@ -70,6 +73,48 @@ class BlobAPITest extends APISpec {
7073
headers.blobContentMD5() != null
7174
}
7275

76+
/*
77+
This is to test the appropriate integration of DownloadResponse, including setting the correct range values on
78+
HTTPGetterInfo.
79+
*/
80+
def "Download with retry range"() {
81+
/*
82+
We are going to make a request for some range on a blob. The Flowable returned will throw an exception, forcing
83+
a retry per the ReliableDownloadOptions. The next request should have the same range header, which was generated
84+
from the count and offset values in HTTPGetterInfo that was constructed on the initial call to download. We
85+
don't need to check the data here, but we want to ensure that the correct range is set each time. This will
86+
test the correction of a bug that was found which caused HTTPGetterInfo to have an incorrect offset when it was
87+
constructed in BlobURL.download().
88+
*/
89+
setup:
90+
def mockPolicy = Mock(RequestPolicy) {
91+
sendAsync(_) >> { HttpRequest request ->
92+
if (request.headers().value("x-ms-range") != "bytes=2-") {
93+
return Single.error(new IllegalArgumentException("The range header was not set correctly on retry."))
94+
}
95+
else {
96+
// ETag can be a dummy value. It's not validated, but DownloadResponse requires one
97+
return Single.just(getStubResponseForBlobDownload(206, Flowable.error(new IOException()), "etag"))
98+
}
99+
}
100+
}
101+
def pipeline = HttpPipeline.build(getStubFactory(mockPolicy))
102+
bu = bu.withPipeline(pipeline)
103+
104+
when:
105+
def range = new BlobRange().withOffset(2)
106+
bu.download(range, null, false, null).blockingGet().body(new ReliableDownloadOptions().withMaxRetryRequests(3))
107+
.blockingSubscribe()
108+
109+
then:
110+
/*
111+
Because the dummy Flowable always throws an error. This will also validate that an IllegalArgumentException is
112+
NOT thrown because the types would not match.
113+
*/
114+
def e = thrown(RuntimeException)
115+
e.getCause() instanceof IOException
116+
}
117+
73118
def "Download min"() {
74119
expect:
75120
FlowableUtil.collectBytesInBuffer(bu.download().blockingGet().body(null)).blockingGet() == defaultData

src/test/java/com/microsoft/azure/storage/TransferManagerTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ class TransferManagerTest extends APISpec {
426426
getRandomFile(16 * 1024 * 1024) | _ // medium file in several chunks
427427
getRandomFile(8L * 1026 * 1024 + 10) | _ // medium file not aligned to block
428428
getRandomFile(0) | _ // empty file
429-
getRandomFile(5L * 1024 * 1024 * 1024) | _ // file size exceeds max int
429+
// Files larger than 2GB to test no integer overflow are left to stress/perf tests to keep test passes short.
430430
}
431431

432432
def compareFiles(AsynchronousFileChannel channel1, long offset, long count, AsynchronousFileChannel channel2) {

0 commit comments

Comments
 (0)