Skip to content
This repository was archived by the owner on Jul 19, 2024. It is now read-only.

Commit 0375367

Browse files
committed
Range options now available for BlobInputStream
CloudBlob::openInputStream now has an overload that takes offset and optional length. BlobInputStream constructor has a similar overload which will only download and stream the specified range.
1 parent ea15a71 commit 0375367

File tree

4 files changed

+234
-15
lines changed

4 files changed

+234
-15
lines changed

microsoft-azure-storage-test/src/com/microsoft/azure/storage/TestHelper.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import com.microsoft.aad.adal4j.AuthenticationContext;
5050
import com.microsoft.aad.adal4j.AuthenticationResult;
5151
import com.microsoft.aad.adal4j.ClientCredential;
52+
import org.junit.Assert;
5253
import org.junit.AssumptionViolatedException;
5354
import org.w3c.dom.DOMException;
5455
import org.w3c.dom.Document;
@@ -322,6 +323,27 @@ public static void assertURIsEqual(URI expected, URI actual, boolean ignoreQuery
322323
assertTrue(actualQueries.isEmpty());
323324
}
324325

326+
public static void expectedException(Runnable method, Class<? extends Exception> exceptionType) {
327+
Throwable caughtException = null;
328+
try {
329+
method.run();
330+
} catch (Exception e) {
331+
Throwable th = e;
332+
while (th != null) {
333+
if (th.getClass().equals(exceptionType)) {
334+
caughtException = th;
335+
break;
336+
}
337+
th = th.getCause();
338+
}
339+
} finally {
340+
if (caughtException == null)
341+
Assert.fail(String.format(
342+
"Expected exception of type %s not encountered",
343+
exceptionType.getCanonicalName()));
344+
}
345+
}
346+
325347
public static URI defiddler(URI uri) throws URISyntaxException {
326348
String fiddlerString = "ipv4.fiddler";
327349
String replacementString = "127.0.0.1";

microsoft-azure-storage-test/src/com/microsoft/azure/storage/blob/CloudBlockBlobTests.java

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.microsoft.azure.storage.file.SharedAccessFilePolicy;
2626

2727
import org.junit.After;
28+
import org.junit.Assert;
2829
import org.junit.Before;
2930
import org.junit.Test;
3031
import org.junit.experimental.categories.Category;
@@ -1989,6 +1990,118 @@ public void testBlobInputStream() throws URISyntaxException, StorageException, I
19891990
blobRef.delete();
19901991
}
19911992

1993+
@Test
1994+
@Category({ DevFabricTests.class, DevStoreTests.class })
1995+
public void testBlobInputStreamWithRange() throws StorageException, IOException, URISyntaxException {
1996+
1997+
final int blobLength = 4 * Constants.KB;
1998+
final String blobName = "testBlobInputStreamWithOffset" + UUID.randomUUID();
1999+
2000+
// setup
2001+
final CloudBlockBlob blob = this.container.getBlockBlobReference(blobName);
2002+
final byte[] blobData = BlobTestHelper.getRandomBuffer(blobLength);
2003+
blob.upload(new ByteArrayInputStream(blobData), blobData.length);
2004+
2005+
blob.downloadAttributes();
2006+
2007+
// test
2008+
doOpenInputStreamWithRangeTest(blob, blobData, 0, null);
2009+
2010+
doOpenInputStreamWithRangeTest(blob, blobData, Constants.KB, null);
2011+
doOpenInputStreamWithRangeTest(blob, blobData, Constants.KB, 2 * Constants.KB);
2012+
doOpenInputStreamWithRangeTest(blob, blobData, Constants.KB, 3 * Constants.KB);
2013+
2014+
doOpenInputStreamWithRangeTest(blob, blobData, Constants.KB, 4 * Constants.KB);
2015+
2016+
TestHelper.expectedException(
2017+
customRunnableWrapper(blob, blobData, -1 * Constants.KB, null),
2018+
IndexOutOfBoundsException.class);
2019+
TestHelper.expectedException(
2020+
customRunnableWrapper(blob, blobData, -1 * Constants.KB, 4 * Constants.KB),
2021+
IndexOutOfBoundsException.class);
2022+
}
2023+
2024+
/**
2025+
* Creates a Runnable wrapper for the above test with the below method. (Java 7 doesn't support lambdas).
2026+
* @param blob
2027+
* @param originalData
2028+
* @param offset
2029+
* @param length
2030+
* @return
2031+
*/
2032+
private Runnable customRunnableWrapper(final CloudBlob blob, final byte[] originalData, final int offset, final Integer length) {
2033+
return new Runnable() {
2034+
@Override
2035+
public void run() {
2036+
try {
2037+
doOpenInputStreamWithRangeTest(blob, originalData, offset, length);
2038+
} catch (Exception e) {
2039+
throw new RuntimeException(e);
2040+
}
2041+
}
2042+
};
2043+
}
2044+
2045+
/**
2046+
* Creates and reads from a BlobInputStream using offset and length parameters.
2047+
* Marks at the start and uses a reset halfway through to ensure mark and reset still work.
2048+
* @param blob
2049+
* @param originalData
2050+
* @param offset
2051+
* @param length
2052+
* @throws StorageException
2053+
* @throws IOException
2054+
*/
2055+
private void doOpenInputStreamWithRangeTest(CloudBlob blob, byte[] originalData, int offset, Integer length)
2056+
throws StorageException, IOException {
2057+
2058+
// setup
2059+
final int readLength = length == null
2060+
? (int) (blob.getProperties().getLength() - offset)
2061+
: Math.min(length, (int) (blob.getProperties().getLength() - offset));
2062+
2063+
BlobInputStream stream = blob.openInputStream(
2064+
offset,
2065+
length == null ? null : length.longValue(),
2066+
null, null, null);
2067+
2068+
// test
2069+
boolean hasResetToMark = false;
2070+
boolean hasMarked = false;
2071+
for (int i = 0; i < readLength; i++) {
2072+
2073+
if (!hasMarked && i == readLength / 4) { // if haven't marked, mark at 1/4 through 3/4
2074+
stream.mark(readLength / 2);
2075+
hasMarked = true;
2076+
}
2077+
if (!hasResetToMark && i == readLength / 2) { // call reset at halfway point ONCE
2078+
stream.reset();
2079+
i = readLength / 4;
2080+
hasResetToMark = true;
2081+
}
2082+
if (i == readLength * 4 / 5) { // call reset after mark expiry
2083+
boolean threw = false;
2084+
try {
2085+
stream.reset();
2086+
} catch (IOException e) {
2087+
threw = true;
2088+
} finally {
2089+
if (!threw) {
2090+
Assert.fail("Allowed reset after mark expired.");
2091+
}
2092+
}
2093+
}
2094+
2095+
int data = stream.read();
2096+
assertTrue(String.format("%d is not greater than zero. i = %d", data, i), data >= 0);
2097+
assertEquals(originalData[i + offset], (byte) data);
2098+
}
2099+
assertEquals(-1, stream.read());
2100+
2101+
// cleanup
2102+
stream.close();
2103+
}
2104+
19922105
@Test
19932106
@Category({ DevFabricTests.class, DevStoreTests.class })
19942107
public void testUploadFromByteArray() throws Exception {

microsoft-azure-storage/src/com/microsoft/azure/storage/blob/BlobInputStream.java

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public final class BlobInputStream extends InputStream {
6969
/**
7070
* Holds the stream length.
7171
*/
72-
private long streamLength = -1;
72+
private long streamLength;
7373

7474
/**
7575
* Holds the stream read size for both block and page blobs.
@@ -121,9 +121,14 @@ public final class BlobInputStream extends InputStream {
121121
*/
122122
private AccessCondition accessCondition = null;
123123

124+
/**
125+
* Offset of the source blob this class is configured to stream from.
126+
*/
127+
private final long blobRangeOffset;
128+
124129
/**
125130
* Initializes a new instance of the BlobInputStream class.
126-
*
131+
*
127132
* @param parentBlob
128133
* A {@link CloudBlob} object which represents the blob that this stream is associated with.
129134
* @param accessCondition
@@ -133,19 +138,50 @@ public final class BlobInputStream extends InputStream {
133138
* request.
134139
* @param opContext
135140
* An {@link OperationContext} object which is used to track the execution of the operation.
136-
*
141+
*
137142
* @throws StorageException
138143
* An exception representing any error which occurred during the operation.
139144
*/
140145
@DoesServiceRequest
141146
protected BlobInputStream(final CloudBlob parentBlob, final AccessCondition accessCondition,
142-
final BlobRequestOptions options, final OperationContext opContext) throws StorageException {
147+
final BlobRequestOptions options, final OperationContext opContext) throws StorageException {
148+
this(0, null, parentBlob, accessCondition, options, opContext);
149+
}
150+
151+
/**
152+
* Initializes a new instance of the BlobInputStream class.
153+
* Note that if {@code blobRangeOffset} is not {@code 0} or {@code blobRangeLength} is not {@code null}, there will
154+
* be no content MD5 verification.
155+
*
156+
* @param blobRangeOffset
157+
* The offset of blob data to begin stream.
158+
* @param blobRangeLength
159+
* How much data the stream should return after blobRangeOffset.
160+
* @param parentBlob
161+
* A {@link CloudBlob} object which represents the blob that this stream is associated with.
162+
* @param accessCondition
163+
* An {@link AccessCondition} object which represents the access conditions for the blob.
164+
* @param options
165+
* A {@link BlobRequestOptions} object which represents that specifies any additional options for the
166+
* request.
167+
* @param opContext
168+
* An {@link OperationContext} object which is used to track the execution of the operation.
169+
*
170+
* @throws StorageException
171+
* An exception representing any error which occurred during the operation.
172+
*/
173+
@DoesServiceRequest
174+
protected BlobInputStream(long blobRangeOffset, Long blobRangeLength, final CloudBlob parentBlob,
175+
final AccessCondition accessCondition, final BlobRequestOptions options, final OperationContext opContext)
176+
throws StorageException {
177+
178+
this.blobRangeOffset = blobRangeOffset;
143179
this.parentBlobRef = parentBlob;
144180
this.parentBlobRef.assertCorrectBlobType();
145181
this.options = new BlobRequestOptions(options);
146182
this.opContext = opContext;
147183
this.streamFaulted = false;
148-
this.currentAbsoluteReadPosition = 0;
184+
this.currentAbsoluteReadPosition = blobRangeOffset;
149185
this.readSize = parentBlob.getStreamMinimumReadSizeInBytes();
150186

151187
if (options.getUseTransactionalContentMD5() && this.readSize > 4 * Constants.MB) {
@@ -154,12 +190,22 @@ protected BlobInputStream(final CloudBlob parentBlob, final AccessCondition acce
154190

155191
parentBlob.downloadAttributes(accessCondition, this.options, this.opContext);
156192

193+
Utility.assertInBounds("blobRangeOffset", blobRangeOffset, 0, parentBlob.getProperties().getLength() - 1);
194+
if (blobRangeLength != null) {
195+
Utility.assertGreaterThanOrEqual("blobRangeLength", blobRangeLength, 0);
196+
}
197+
157198
this.retrievedContentMD5Value = parentBlob.getProperties().getContentMD5();
158199

159200
// Will validate it if it was returned
160201
this.validateBlobMd5 = !options.getDisableContentMD5Validation()
161202
&& !Utility.isNullOrEmpty(this.retrievedContentMD5Value);
162203

204+
// Need the whole blob to validate MD5. If we download a range, don't bother trying.
205+
if (blobRangeOffset != 0 || blobRangeLength != null) {
206+
this.validateBlobMd5 = false;
207+
}
208+
163209
// Validates the first option, and sets future requests to use if match
164210
// request option.
165211

@@ -179,7 +225,9 @@ protected BlobInputStream(final CloudBlob parentBlob, final AccessCondition acce
179225
this.accessCondition = AccessCondition.generateIfMatchCondition(this.parentBlobRef.getProperties().getEtag());
180226
this.accessCondition.setLeaseID(previousLeaseId);
181227

182-
this.streamLength = parentBlob.getProperties().getLength();
228+
this.streamLength = blobRangeLength == null
229+
? this.parentBlobRef.getProperties().getLength() - this.blobRangeOffset
230+
: Math.min(this.parentBlobRef.getProperties().getLength() - this.blobRangeOffset, blobRangeLength);
183231

184232
if (this.validateBlobMd5) {
185233
try {
@@ -191,7 +239,7 @@ protected BlobInputStream(final CloudBlob parentBlob, final AccessCondition acce
191239
}
192240
}
193241

194-
this.reposition(0);
242+
this.reposition(blobRangeOffset);
195243
}
196244

197245
/**
@@ -238,7 +286,7 @@ public synchronized void close() throws IOException {
238286
}
239287

240288
/**
241-
* Dispatches a read operation of N bytes. When using sparspe page blobs the page ranges are evaluated and zero
289+
* Dispatches a read operation of N bytes. When using sparse page blobs, the page ranges are evaluated and zero
242290
* bytes may be generated on the client side for some ranges that do not exist.
243291
*
244292
* @param readLength
@@ -444,8 +492,8 @@ private synchronized int readInternal(final byte[] b, final int off, int len) th
444492

445493
// if buffer is empty do next get operation
446494
if ((this.currentBuffer == null || this.currentBuffer.available() == 0)
447-
&& this.currentAbsoluteReadPosition < this.streamLength) {
448-
this.dispatchRead((int) Math.min(this.readSize, this.streamLength - this.currentAbsoluteReadPosition));
495+
&& this.currentAbsoluteReadPosition < this.streamLength + this.blobRangeOffset) {
496+
this.dispatchRead((int) Math.min(this.readSize, this.streamLength + this.blobRangeOffset - this.currentAbsoluteReadPosition));
449497
}
450498

451499
len = Math.min(len, this.readSize);
@@ -459,7 +507,7 @@ private synchronized int readInternal(final byte[] b, final int off, int len) th
459507
if (this.validateBlobMd5) {
460508
this.md5Digest.update(b, off, numberOfBytesRead);
461509

462-
if (this.currentAbsoluteReadPosition == this.streamLength) {
510+
if (this.currentAbsoluteReadPosition == this.streamLength + this.blobRangeOffset) {
463511
// Reached end of stream, validate md5.
464512
final String calculatedMd5 = Base64.encode(this.md5Digest.digest());
465513
if (!calculatedMd5.equals(this.retrievedContentMD5Value)) {
@@ -479,7 +527,7 @@ private synchronized int readInternal(final byte[] b, final int off, int len) th
479527

480528
// update markers
481529
if (this.markExpiry > 0 && this.markedPosition + this.markExpiry < this.currentAbsoluteReadPosition) {
482-
this.markedPosition = 0;
530+
this.markedPosition = this.blobRangeOffset;
483531
this.markExpiry = 0;
484532
}
485533

@@ -532,7 +580,7 @@ public synchronized long skip(final long n) throws IOException {
532580
return 0;
533581
}
534582

535-
if (n < 0 || this.currentAbsoluteReadPosition + n > this.streamLength) {
583+
if (n < 0 || this.currentAbsoluteReadPosition + n > this.streamLength + this.blobRangeOffset) {
536584
throw new IndexOutOfBoundsException();
537585
}
538586

microsoft-azure-storage/src/com/microsoft/azure/storage/blob/CloudBlob.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2464,7 +2464,7 @@ public final boolean isSnapshot() {
24642464
*/
24652465
@DoesServiceRequest
24662466
public final BlobInputStream openInputStream() throws StorageException {
2467-
return this.openInputStream(null /* accessCondition */, null /* options */, null /* opContext */);
2467+
return this.openInputStream(0 /* offset */, null /* range */, null /* accessCondition */, null /* options */, null /* opContext */);
24682468
}
24692469

24702470
/**
@@ -2491,14 +2491,50 @@ public final BlobInputStream openInputStream() throws StorageException {
24912491
@DoesServiceRequest
24922492
public final BlobInputStream openInputStream(final AccessCondition accessCondition, BlobRequestOptions options,
24932493
OperationContext opContext) throws StorageException {
2494+
return this.openInputStream(0, null, accessCondition, options, opContext);
2495+
}
2496+
2497+
/**
2498+
* Opens a blob input stream to download the blob using the specified request options and operation context.
2499+
* <p>
2500+
* Use {@link #setStreamMinimumReadSizeInBytes(int)} to configure the read size.
2501+
*
2502+
* @param offset
2503+
* A <code>long</code> which represents the offset to use as the starting point for the source.
2504+
* @param length
2505+
* A {@link Long} which represents the number of bytes to read or <code>null</code>.
2506+
* @param accessCondition
2507+
* An {@link AccessCondition} object that represents the access conditions for the blob.
2508+
* @param options
2509+
* A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
2510+
* <code>null</code> will use the default request options from the associated service client (
2511+
* {@link CloudBlobClient}).
2512+
* @param opContext
2513+
* An {@link OperationContext} object that represents the context for the current operation. This object
2514+
* is used to track requests to the storage service, and to provide additional runtime information about
2515+
* the operation.
2516+
*
2517+
* @return An <code>InputStream</code> object that represents the stream to use for reading from the blob.
2518+
*
2519+
* @throws StorageException
2520+
* If a storage service error occurred.
2521+
*/
2522+
@DoesServiceRequest
2523+
public final BlobInputStream openInputStream(final long offset, final Long length, final AccessCondition accessCondition, BlobRequestOptions options,
2524+
OperationContext opContext) throws StorageException {
2525+
2526+
if (offset < 0 || (length != null && length <= 0)) {
2527+
throw new IndexOutOfBoundsException();
2528+
}
2529+
24942530
if (opContext == null) {
24952531
opContext = new OperationContext();
24962532
}
24972533

24982534
options = BlobRequestOptions.populateAndApplyDefaults(options, this.properties.getBlobType(), this.blobServiceClient,
24992535
false /* setStartTime */);
25002536

2501-
return new BlobInputStream(this, accessCondition, options, opContext);
2537+
return new BlobInputStream(offset, length, this, accessCondition, options, opContext);
25022538
}
25032539

25042540
/**

0 commit comments

Comments
 (0)