Skip to content
This repository was archived by the owner on Jul 19, 2024. It is now read-only.

Commit a1c74f5

Browse files
committed
Merge dev
2 parents e800c6f + 7c26405 commit a1c74f5

File tree

8 files changed

+195
-34
lines changed

8 files changed

+195
-34
lines changed

ChangeLog.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
XXXX.XX.xx Version X.X.X
22
* Support for 2017-07-29 REST version. Please see our REST api documentation and blogs for information about the related added features.
33
* Added support for soft delete feature. If a delete retention policy is enabled through the set service properties API, then blobs or snapshots can be deleted softly and retained for a specified number of days, before being permanently removed by garbage collection.
4+
* Improved performance of blob uploadFromFile APIs to avoid unnecessary buffering.
5+
* Improved performance when streaming directly from a FileInputStream to avoid unnecessary buffering.
6+
* Switched to using fixed length streaming mode in the HTTP client to avoid unnecessary buffering.
47

58
2017.11.01 Version 6.1.0
69
* Added support for the last time the tier was modified.

microsoft-azure-storage-test/src/com/microsoft/azure/storage/EventFiringTests.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -168,16 +168,6 @@ public void testErrorReceivingResponseEvent() throws URISyntaxException, Storage
168168
BlobRequestOptions options = new BlobRequestOptions();
169169
options.setRetryPolicyFactory(new RetryNoRetry());
170170

171-
// setting the sending request event handler to trigger an exception.
172-
// this is a retryable exception
173-
eventContext.getSendingRequestEventHandler().addListener(new StorageEvent<SendingRequestEvent>() {
174-
@Override
175-
public void eventOccurred(SendingRequestEvent eventArg) {
176-
HttpURLConnection connection = (HttpURLConnection) eventArg.getConnectionObject();
177-
connection.setFixedLengthStreamingMode(0);
178-
}
179-
});
180-
181171
eventContext.getErrorReceivingResponseEventHandler().addListener(new StorageEvent<ErrorReceivingResponseEvent>() {
182172
@Override
183173
public void eventOccurred(ErrorReceivingResponseEvent eventArg) {
@@ -186,13 +176,15 @@ public void eventOccurred(ErrorReceivingResponseEvent eventArg) {
186176
}
187177
});
188178

189-
OperationContext.getGlobalErrorReceivingResponseEventHandler().addListener(new StorageEvent<ErrorReceivingResponseEvent>() {
179+
StorageEvent<ErrorReceivingResponseEvent> globalEvent = new StorageEvent<ErrorReceivingResponseEvent>() {
190180
@Override
191181
public void eventOccurred(ErrorReceivingResponseEvent eventArg) {
192182
assertEquals(eventArg.getRequestResult(), eventArg.getOpContext().getLastResult());
193183
globalCallList.add(true);
194184
}
195-
});
185+
};
186+
187+
OperationContext.getGlobalErrorReceivingResponseEventHandler().addListener(globalEvent);
196188

197189
CloudBlobClient blobClient = TestHelper.createCloudBlobClient();
198190
CloudBlobContainer container = blobClient.getContainerReference("container1");
@@ -202,10 +194,12 @@ public void eventOccurred(ErrorReceivingResponseEvent eventArg) {
202194
CloudBlockBlob blob1 = container.getBlockBlobReference("blob1");
203195
try {
204196
String blockID = String.format("%08d", 1);
205-
blob1.uploadBlock(blockID, BlobTestHelper.getRandomDataStream(10), 10, null, options, eventContext);
197+
198+
// Trigger an error receiving the response by sending more bytes than the stream has.
199+
blob1.uploadBlock(blockID, BlobTestHelper.getRandomDataStream(10), 11, null, options, eventContext);
206200
} catch (Exception e) { }
207201

208-
// make sure both the local and globab context update
202+
// make sure both the local and global context update
209203
assertEquals(1, callList.size());
210204
assertEquals(1, globalCallList.size());
211205

@@ -214,7 +208,9 @@ public void eventOccurred(ErrorReceivingResponseEvent eventArg) {
214208
.setErrorReceivingResponseEventHandler(new StorageEventMultiCaster<ErrorReceivingResponseEvent, StorageEvent<ErrorReceivingResponseEvent>>());
215209
try {
216210
String blockID2 = String.format("%08d", 2);
217-
blob1.uploadBlock(blockID2, BlobTestHelper.getRandomDataStream(10), 10, null, options, eventContext);
211+
212+
// Trigger an error receiving the response by sending more bytes than the stream has.
213+
blob1.uploadBlock(blockID2, BlobTestHelper.getRandomDataStream(10), 11, null, options, eventContext);
218214
} catch (Exception e) { }
219215

220216
assertEquals(1, callList.size());
@@ -227,13 +223,17 @@ public void eventOccurred(ErrorReceivingResponseEvent eventArg) {
227223
// make sure neither update
228224
try {
229225
String blockID3 = String.format("%08d", 3);
230-
blob1.uploadBlock(blockID3, BlobTestHelper.getRandomDataStream(10), 10, null, options, eventContext);
226+
227+
// Trigger an error receiving the response by sending more bytes than the stream has.
228+
blob1.uploadBlock(blockID3, BlobTestHelper.getRandomDataStream(10), 11, null, options, eventContext);
231229
} catch (Exception e) { }
232230

233231
assertEquals(1, callList.size());
234232
assertEquals(2, globalCallList.size());
235233
}
236234
finally {
235+
// Remove the global listener if it wasn't removed already.
236+
OperationContext.getGlobalErrorReceivingResponseEventHandler().removeListener(globalEvent);
237237
container.deleteIfExists();
238238
}
239239
}

microsoft-azure-storage-test/src/com/microsoft/azure/storage/blob/CloudBlobClientTests.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
*/
1515
package com.microsoft.azure.storage.blob;
1616

17+
import java.io.File;
18+
import java.io.FileInputStream;
19+
import java.io.FileOutputStream;
1720
import java.io.IOException;
1821
import java.net.URISyntaxException;
1922
import java.util.ArrayList;
@@ -310,4 +313,55 @@ public void eventOccurred(SendingRequestEvent eventArg) {
310313
container.deleteIfExists();
311314
}
312315
}
316+
317+
@Test
318+
@Category({ DevFabricTests.class, DevStoreTests.class, CloudTests.class })
319+
public void testUploadBlobFromFileSinglePut() throws URISyntaxException, StorageException, IOException {
320+
CloudBlobClient bClient = BlobTestHelper.createCloudBlobClient();
321+
322+
final ArrayList<Boolean> callList = new ArrayList<Boolean>();
323+
OperationContext sendingRequestEventContext = new OperationContext();
324+
sendingRequestEventContext.getSendingRequestEventHandler().addListener(new StorageEvent<SendingRequestEvent>() {
325+
326+
@Override
327+
public void eventOccurred(SendingRequestEvent eventArg) {
328+
assertEquals(eventArg.getRequestResult(), eventArg.getOpContext().getLastResult());
329+
callList.add(true);
330+
}
331+
});
332+
333+
assertEquals(0, callList.size());
334+
335+
CloudBlobContainer container = null;
336+
File sourceFile = File.createTempFile("sourceFile", ".tmp");
337+
File destinationFile = new File(sourceFile.getParentFile(), "destinationFile.tmp");
338+
try {
339+
container = bClient.getContainerReference(BlobTestHelper.generateRandomContainerName());
340+
container.createIfNotExists();
341+
CloudBlockBlob blob = container.getBlockBlobReference(BlobTestHelper
342+
.generateRandomBlobNameWithPrefix("uploadThreshold"));
343+
344+
sourceFile = File.createTempFile("sourceFile", ".tmp");
345+
destinationFile = new File(sourceFile.getParentFile(), "destinationFile.tmp");
346+
347+
int fileSize = 10 * 1024;
348+
byte[] buffer = BlobTestHelper.getRandomBuffer(fileSize);
349+
FileOutputStream fos = new FileOutputStream(sourceFile);
350+
fos.write(buffer);
351+
fos.close();
352+
353+
// This should make a single call even though FileInputStream is not seekable because of the optimizations
354+
// from wrapping it in a MarkableFileInputStream
355+
blob.upload(new FileInputStream(sourceFile), fileSize - 1, null, null, sendingRequestEventContext);
356+
357+
assertEquals(1, callList.size());
358+
}
359+
finally {
360+
container.deleteIfExists();
361+
362+
if (sourceFile.exists()) {
363+
sourceFile.delete();
364+
}
365+
}
366+
}
313367
}

microsoft-azure-storage/src/com/microsoft/azure/storage/blob/BlobOutputStreamInternal.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,8 @@
2727
import java.util.HashSet;
2828
import java.util.Set;
2929
import java.util.UUID;
30-
import java.util.concurrent.Callable;
31-
import java.util.concurrent.ConcurrentHashMap;
32-
import java.util.concurrent.ExecutorCompletionService;
33-
import java.util.concurrent.Future;
34-
import java.util.concurrent.LinkedBlockingQueue;
35-
import java.util.concurrent.ThreadPoolExecutor;
36-
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.*;
31+
import java.util.concurrent.atomic.AtomicInteger;
3732

3833
import com.microsoft.azure.storage.AccessCondition;
3934
import com.microsoft.azure.storage.Constants;
@@ -51,6 +46,29 @@
5146
*/
5247
final class BlobOutputStreamInternal extends BlobOutputStream {
5348

49+
private static class BlobOutputStreamThreadFactory implements ThreadFactory {
50+
private final ThreadGroup group;
51+
private final AtomicInteger threadNumber = new AtomicInteger(1);
52+
private final String namePrefix;
53+
54+
BlobOutputStreamThreadFactory() {
55+
SecurityManager s = System.getSecurityManager();
56+
group = (s != null) ? s.getThreadGroup() :
57+
Thread.currentThread().getThreadGroup();
58+
namePrefix = "azure-storage-bloboutputstream-thread-";
59+
}
60+
61+
public Thread newThread(Runnable r) {
62+
Thread t = new Thread(group, r,
63+
namePrefix + threadNumber.getAndIncrement(),
64+
0);
65+
t.setDaemon(true);
66+
if (t.getPriority() != Thread.NORM_PRIORITY)
67+
t.setPriority(Thread.NORM_PRIORITY);
68+
return t;
69+
}
70+
}
71+
5472
/**
5573
* Holds the {@link AccessCondition} object that represents the access conditions for the blob.
5674
*/
@@ -171,9 +189,10 @@ private BlobOutputStreamInternal(final CloudBlob parentBlob, final AccessConditi
171189
this.threadExecutor = new ThreadPoolExecutor(
172190
this.options.getConcurrentRequestCount(),
173191
this.options.getConcurrentRequestCount(),
174-
10,
192+
10,
175193
TimeUnit.SECONDS,
176-
new LinkedBlockingQueue<Runnable>());
194+
new LinkedBlockingQueue<Runnable>(),
195+
new BlobOutputStreamThreadFactory());
177196
this.completionService = new ExecutorCompletionService<Void>(this.threadExecutor);
178197
}
179198

microsoft-azure-storage/src/com/microsoft/azure/storage/blob/CloudBlob.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1868,7 +1868,7 @@ public void uploadFromFile(final String path, final AccessCondition accessCondit
18681868
OperationContext opContext) throws StorageException, IOException {
18691869
File file = new File(path);
18701870
long fileLength = file.length();
1871-
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
1871+
InputStream inputStream = new FileInputStream(file); // The call to upload supports FileInputStream efficiently.
18721872
this.upload(inputStream, fileLength, accessCondition, options, opContext);
18731873
inputStream.close();
18741874
}

microsoft-azure-storage/src/com/microsoft/azure/storage/blob/CloudBlockBlob.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.ByteArrayOutputStream;
2626
import java.io.IOException;
2727
import java.io.InputStream;
28+
import java.io.FileInputStream;
2829
import java.net.HttpURLConnection;
2930
import java.net.URI;
3031
import java.net.URISyntaxException;
@@ -644,8 +645,16 @@ public void upload(final InputStream sourceStream, final long length, final Acce
644645

645646
StreamMd5AndLength descriptor = new StreamMd5AndLength();
646647
descriptor.setLength(length);
647-
648-
InputStream inputDataStream = sourceStream;
648+
649+
// If the sourceStream is a FileInputStream, wrap it in a MarkableFileStream.
650+
// This allows for single shot upload on FileInputStreams.
651+
InputStream inputDataStream;
652+
if(!sourceStream.markSupported() && sourceStream instanceof FileInputStream) {
653+
inputDataStream = new MarkableFileStream((FileInputStream)sourceStream);
654+
}
655+
else {
656+
inputDataStream = sourceStream;
657+
}
649658

650659
// Initial check - skip the PutBlob operation if the input stream isn't markable, or if the length is known to
651660
// be greater than the threshold.
@@ -1018,27 +1027,37 @@ public void uploadBlock(final String blockId, final InputStream sourceStream, fi
10181027
throw new IllegalArgumentException(SR.INVALID_BLOCK_ID);
10191028
}
10201029

1021-
if (sourceStream.markSupported()) {
1030+
// If the sourceStream is a FileInputStream, wrap it in a MarkableFileStream.
1031+
// This prevents buffering the entire block into memory.
1032+
InputStream bufferedStreamReference;
1033+
if(!sourceStream.markSupported() && sourceStream instanceof FileInputStream) {
1034+
bufferedStreamReference = new MarkableFileStream((FileInputStream)sourceStream);
1035+
}
1036+
else {
1037+
bufferedStreamReference = sourceStream;
1038+
}
1039+
1040+
if (bufferedStreamReference.markSupported()) {
10221041
// Mark sourceStream for current position.
1023-
sourceStream.mark(Constants.MAX_MARK_LENGTH);
1042+
bufferedStreamReference.mark(Constants.MAX_MARK_LENGTH);
10241043
}
10251044

1026-
InputStream bufferedStreamReference = sourceStream;
10271045
StreamMd5AndLength descriptor = new StreamMd5AndLength();
10281046
descriptor.setLength(length);
10291047

1030-
if (!sourceStream.markSupported()) {
1048+
if (!bufferedStreamReference.markSupported()) {
10311049
// needs buffering
1050+
// TODO: Change to a BufferedInputStream to avoid the extra buffering and copying.
10321051
final ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
1033-
descriptor = Utility.writeToOutputStream(sourceStream, byteStream, length, false /* rewindSourceStream */,
1052+
descriptor = Utility.writeToOutputStream(bufferedStreamReference, byteStream, length, false /* rewindSourceStream */,
10341053
options.getUseTransactionalContentMD5(), opContext, options);
10351054

10361055
bufferedStreamReference = new ByteArrayInputStream(byteStream.toByteArray());
10371056
}
10381057
else if (length < 0 || options.getUseTransactionalContentMD5()) {
10391058
// If the stream is of unknown length or we need to calculate the
10401059
// MD5, then we we need to read the stream contents first
1041-
descriptor = Utility.analyzeStream(sourceStream, length, -1L, true /* rewindSourceStream */,
1060+
descriptor = Utility.analyzeStream(bufferedStreamReference, length, -1L, true /* rewindSourceStream */,
10421061
options.getUseTransactionalContentMD5());
10431062
}
10441063

microsoft-azure-storage/src/com/microsoft/azure/storage/core/ExecutionEngine.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,15 @@ public static <CLIENT_TYPE, PARENT_TYPE, RESULT_TYPE> RESULT_TYPE executeWithRet
8787
try {
8888
if (task.getSendStream() != null) {
8989
Logger.info(opContext, LogConstants.UPLOAD);
90+
91+
// Always set fixed length streaming mode when we know the length in advance.
92+
// This sets the Content-Length of the request and allows the request payload to
93+
// be streamed from the source. Otherwise, the HTTP layer will buffer the entire
94+
// request and compute the Content-Length itself.
95+
if (task.getLength() >= 0) {
96+
request.setFixedLengthStreamingMode(task.getLength());
97+
}
98+
9099
final StreamMd5AndLength descriptor = Utility.writeToOutputStream(task.getSendStream(),
91100
request.getOutputStream(), task.getLength(), false /* rewindStream */,
92101
false /* calculate MD5 */, opContext, task.getRequestOptions());
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Copyright Microsoft Corporation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
package com.microsoft.azure.storage.core;
16+
17+
import java.io.FileInputStream;
18+
import java.io.FilterInputStream;
19+
import java.io.IOException;
20+
import java.nio.channels.FileChannel;
21+
22+
/**
23+
* RESERVED FOR INTERNAL USE. Wraps a FileStream to allow for more memory efficient uploading.
24+
*/
25+
public final class MarkableFileStream extends FilterInputStream {
26+
private long mark = -1;
27+
private FileChannel fileChannel;
28+
29+
public MarkableFileStream(FileInputStream stream) {
30+
super(stream);
31+
this.fileChannel = stream.getChannel();
32+
}
33+
34+
@Override
35+
public synchronized void mark(int readlimit) {
36+
try {
37+
this.mark = this.fileChannel.position();
38+
}
39+
catch (IOException e) {
40+
this.mark = -1;
41+
}
42+
}
43+
44+
@Override
45+
public synchronized void reset() throws IOException {
46+
if(this.mark == -1){
47+
throw new IOException("Stream must be marked before calling reset");
48+
}
49+
50+
this.fileChannel.position(this.mark);
51+
}
52+
53+
@Override
54+
public boolean markSupported() {
55+
return true;
56+
}
57+
}

0 commit comments

Comments
 (0)