Skip to content
This repository was archived by the owner on Jul 19, 2024. It is now read-only.

Commit 450a9ad

Browse files
committed
Added abort and close
1 parent 979799e commit 450a9ad

File tree

10 files changed

+157
-9
lines changed

10 files changed

+157
-9
lines changed

ChangeLog.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
2020.03.25 Version 8.6.3
2+
* Added the commitWriteOnInputStreamException option to BlobRequestOptions, which will allow the user to configure whether any data staged through openWrite when using the upload(InputStream, long) method will be committed upon failures in the InputStream.
3+
14
2020.03.18 Version 8.6.2
25
* Fixed a bug in the pom that disrupted the ability to download from maven central.
36

microsoft-azure-storage-test/src/com/microsoft/azure/storage/blob/CloudBlockBlobTests.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2755,6 +2755,71 @@ public void testSkipEtagCheck() throws StorageException, IOException, URISyntaxE
27552755
stream.close();
27562756
}
27572757

2758+
private static class ExceptionInputStream extends InputStream {
2759+
private final byte[] data;
2760+
int index = 0;
2761+
boolean firstRead = true;
2762+
2763+
ExceptionInputStream(byte[] data) {
2764+
this.data = data;
2765+
}
2766+
2767+
@Override
2768+
public int read() throws IOException {
2769+
return 0;
2770+
}
2771+
2772+
@Override
2773+
public int read(byte[] arr, int offset, int len) throws IOException {
2774+
if (firstRead) {
2775+
// Fill either half the incoming buffer or use half the data, whichever is smaller.
2776+
// For safe partial write.
2777+
int size = Math.min(data.length, len) / 2;
2778+
if (len >= 0) System.arraycopy(data, index, arr, offset, size);
2779+
firstRead = false;
2780+
return size;
2781+
} else {
2782+
throw new IOException();
2783+
}
2784+
}
2785+
}
2786+
2787+
@Test
2788+
public void testCommitOnInputStreamException() throws StorageException, IOException, URISyntaxException {
2789+
final int blobSize = 2 * Constants.DEFAULT_MINIMUM_READ_SIZE_IN_BYTES; // so BlobInputStream doesn't read entire blob at once.
2790+
2791+
CloudBlobContainer container = BlobTestHelper.getRandomContainerReference();
2792+
container.createIfNotExists();
2793+
CloudBlockBlob blob = container.getBlockBlobReference(BlobTestHelper.generateRandomBlobNameWithPrefix(""));
2794+
2795+
BlobRequestOptions options = new BlobRequestOptions();
2796+
2797+
// Upload with no commit on failure.
2798+
byte[] data = TestHelper.getRandomBuffer(blobSize);
2799+
InputStream is = new ExceptionInputStream(data);
2800+
options.setCommitWriteOnInputStreamException(false);
2801+
2802+
try {
2803+
blob.upload(is, blobSize, null, null, options, null);
2804+
fail();
2805+
} catch(IOException e) {
2806+
assertFalse(blob.exists());
2807+
}
2808+
2809+
// Upload with commit on failure.
2810+
is = new ExceptionInputStream(data);
2811+
options = new BlobRequestOptions(); // Test the default. Should be true.
2812+
2813+
try {
2814+
blob.upload(is, blobSize, null, null, options, null);
2815+
fail();
2816+
} catch (IOException e) {
2817+
assertTrue(blob.exists());
2818+
}
2819+
2820+
container.delete();
2821+
}
2822+
27582823
@Test
27592824
public void testPutGetBlobCPK() throws URISyntaxException, StorageException, IOException {
27602825
// load CPK into options

microsoft-azure-storage/src/com/microsoft/azure/storage/blob/BlobDecryptStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ public void close() throws IOException
5959
this.cryptoStream.close();
6060
}
6161

62+
@Override
63+
void abortAndClose() throws IOException {
64+
this.cryptoStream.close();
65+
}
66+
6267
@Override
6368
public void write(byte[] data, int offset, int length) throws IOException {
6469
// Keep buffering until we have 16 bytes of IV.

microsoft-azure-storage/src/com/microsoft/azure/storage/blob/BlobEncryptStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,4 +159,9 @@ public void close() throws IOException {
159159
this.cipherStream.close();
160160
}
161161

162+
@Override
163+
void abortAndClose() throws IOException {
164+
this.cipherStream.close();
165+
}
166+
162167
}

microsoft-azure-storage/src/com/microsoft/azure/storage/blob/BlobOutputStream.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,10 @@ public void write(final byte[] data) throws IOException {
111111
@Override
112112
@DoesServiceRequest
113113
public abstract void close() throws IOException;
114+
115+
/**
116+
* Closes the output stream without committing the data to the service, typically to be used in cases of errors or
117+
* exceptions in the data source.
118+
*/
119+
abstract void abortAndClose() throws IOException;
114120
}

microsoft-azure-storage/src/com/microsoft/azure/storage/blob/BlobOutputStreamInternal.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,21 @@ public synchronized void close() throws IOException {
339339
}
340340
}
341341

342+
@Override
343+
public void abortAndClose() throws IOException {
344+
try {
345+
this.checkStreamState();
346+
347+
this.threadExecutor.shutdown();
348+
} finally {
349+
this.lastError = new IOException(SR.STREAM_CLOSED);
350+
351+
if (!this.threadExecutor.isShutdown()) {
352+
this.threadExecutor.shutdown();
353+
}
354+
}
355+
}
356+
342357
/**
343358
* Commits the blob, for block blob this uploads the block list.
344359
*

microsoft-azure-storage/src/com/microsoft/azure/storage/blob/BlobRequestOptions.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,15 @@ public final class BlobRequestOptions extends RequestOptions {
8989
* Default is false.
9090
*/
9191
private boolean skipEtagLocking = false;
92-
92+
93+
/**
94+
* A <code>boolean</code> that defines the behavior for handling exceptions when reading from the
95+
* <code>InputStream</code> and using <code>openWrite</code>. If <code>true</code> the data that has been read from
96+
* the stream up to the point of the exception will be flushed and a new blob will be committed with that data.
97+
* Otherwise, the upload will be aborted and no data will be committed.
98+
*/
99+
private boolean commitWriteOnInputStreamException = true;
100+
93101
/**
94102
* Creates an instance of the <code>BlobRequestOptions</code> class.
95103
*/
@@ -119,6 +127,7 @@ public BlobRequestOptions(final BlobRequestOptions other) {
119127
//this.setSourceCustomerProvidedKey(other.getSourceCustomerProvidedKey());
120128
this.setValidateEncryptionPolicy(other.getValidateEncryptionPolicy());
121129
this.setSkipEtagLocking(other.getSkipEtagLocking());
130+
this.setCommitWriteOnInputStreamException(other.getCommitWriteOnInputStreamException());
122131
}
123132
}
124133

@@ -362,6 +371,18 @@ public boolean getSkipEtagLocking() {
362371
return this.skipEtagLocking;
363372
}
364373

374+
/**
375+
* A <code>boolean</code> that defines the behavior for handling exceptions when reading from the
376+
* <code>InputStream</code> and using <code>openWrite</code>. If <code>true</code> the data that has been read from
377+
* the stream up to the point of the exception will be flushed and a new blob will be committed with that data.
378+
* Otherwise, the upload will be aborted and no data will be committed.
379+
*
380+
* @return <code>true</code> if data will be committed upon an exception; otherwise, <code>false</code>.
381+
*/
382+
public boolean getCommitWriteOnInputStreamException() {
383+
return this.commitWriteOnInputStreamException;
384+
}
385+
365386
/**
366387
* Sets whether a conditional failure should be absorbed on a retry attempt for the request. This option
367388
* is only used by {@link CloudAppendBlob} in upload and openWrite methods. By default, it is set to
@@ -528,6 +549,19 @@ public void setSkipEtagLocking(boolean skipEtagLocking) {
528549
this.skipEtagLocking = skipEtagLocking;
529550
}
530551

552+
/**
553+
* A <code>boolean</code> that defines the behavior for handling exceptions when reading from the
554+
* <code>InputStream</code> and using <code>openWrite</code>. If <code>true</code> the data that has been read from
555+
* the stream up to the point of the exception will be flushed and a new blob will be committed with that data.
556+
* Otherwise, the upload will be aborted and no data will be committed.
557+
*
558+
* @param commitWriteOnInputStreamException
559+
* Use <code>true</code> if data will be committed upon an exception; otherwise, <code>false</code>.
560+
*/
561+
public void setCommitWriteOnInputStreamException(boolean commitWriteOnInputStreamException) {
562+
this.commitWriteOnInputStreamException = commitWriteOnInputStreamException;
563+
}
564+
531565
/**
532566
* Assert that if validation is on, an encryption policy is not specified.
533567
*/

microsoft-azure-storage/src/com/microsoft/azure/storage/blob/CloudBlockBlob.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -769,8 +769,9 @@ public void upload(final InputStream sourceStream, final long length, final Acce
769769
* If a storage service error occurred.
770770
*/
771771
@DoesServiceRequest
772-
public void upload(final InputStream sourceStream, final long length, final StandardBlobTier standardBlobTier, final AccessCondition accessCondition,
773-
BlobRequestOptions options, OperationContext opContext) throws StorageException, IOException {
772+
public void upload(final InputStream sourceStream, final long length, final StandardBlobTier standardBlobTier,
773+
final AccessCondition accessCondition, BlobRequestOptions options,
774+
OperationContext opContext) throws StorageException, IOException {
774775
if (length < -1) {
775776
throw new IllegalArgumentException(SR.STREAM_LENGTH_NEGATIVE);
776777
}
@@ -892,11 +893,20 @@ public byte[] getByteArray() {
892893
useOpenWrite = true;
893894
if (useOpenWrite) {
894895
final BlobOutputStream writeStream = this.openOutputStream(accessCondition, options, opContext);
895-
try {
896-
writeStream.write(inputDataStream, length);
897-
}
898-
finally {
899-
writeStream.close();
896+
if (options.getCommitWriteOnInputStreamException()) {
897+
try {
898+
writeStream.write(inputDataStream, length);
899+
} finally {
900+
writeStream.close();
901+
}
902+
} else {
903+
try {
904+
writeStream.write(inputDataStream, length);
905+
writeStream.close();
906+
} catch (Exception e) {
907+
writeStream.abortAndClose();
908+
throw e;
909+
}
900910
}
901911
}
902912
else {

microsoft-azure-storage/src/com/microsoft/azure/storage/blob/LengthLimitingStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,9 @@ public void close() throws IOException {
8181
// no op
8282
}
8383

84+
@Override
85+
void abortAndClose() throws IOException {
86+
// no op
87+
}
88+
8489
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<modelVersion>4.0.0</modelVersion>
1111
<groupId>com.microsoft.azure</groupId>
1212
<artifactId>azure-storage</artifactId>
13-
<version>8.6.2</version>
13+
<version>8.6.3-commitOnFail</version>
1414
<packaging>jar</packaging>
1515

1616
<name>Microsoft Azure Storage Client SDK</name>

0 commit comments

Comments
 (0)