Skip to content
This repository was archived by the owner on Jul 19, 2024. It is now read-only.

Commit fab3d2f

Browse files
author
jofriedm-msft
authored
Merge pull request #242 from rickle-msft/dev
Added MarkableFileStream to improve memory efficiency of upload and u…
2 parents 5b14642 + 5e43425 commit fab3d2f

File tree

4 files changed

+139
-9
lines changed

4 files changed

+139
-9
lines changed

microsoft-azure-storage-test/src/com/microsoft/azure/storage/blob/CloudBlobClientTests.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
*/
1515
package com.microsoft.azure.storage.blob;
1616

17+
import java.io.File;
18+
import java.io.FileInputStream;
19+
import java.io.FileOutputStream;
1720
import java.io.IOException;
1821
import java.net.URISyntaxException;
1922
import java.util.ArrayList;
@@ -310,4 +313,55 @@ public void eventOccurred(SendingRequestEvent eventArg) {
310313
container.deleteIfExists();
311314
}
312315
}
316+
317+
@Test
318+
@Category({ DevFabricTests.class, DevStoreTests.class, CloudTests.class })
319+
public void testUploadBlobFromFileSinglePut() throws URISyntaxException, StorageException, IOException {
320+
CloudBlobClient bClient = BlobTestHelper.createCloudBlobClient();
321+
322+
final ArrayList<Boolean> callList = new ArrayList<Boolean>();
323+
OperationContext sendingRequestEventContext = new OperationContext();
324+
sendingRequestEventContext.getSendingRequestEventHandler().addListener(new StorageEvent<SendingRequestEvent>() {
325+
326+
@Override
327+
public void eventOccurred(SendingRequestEvent eventArg) {
328+
assertEquals(eventArg.getRequestResult(), eventArg.getOpContext().getLastResult());
329+
callList.add(true);
330+
}
331+
});
332+
333+
assertEquals(0, callList.size());
334+
335+
CloudBlobContainer container = null;
336+
File sourceFile = File.createTempFile("sourceFile", ".tmp");
337+
File destinationFile = new File(sourceFile.getParentFile(), "destinationFile.tmp");
338+
try {
339+
container = bClient.getContainerReference(BlobTestHelper.generateRandomContainerName());
340+
container.createIfNotExists();
341+
CloudBlockBlob blob = container.getBlockBlobReference(BlobTestHelper
342+
.generateRandomBlobNameWithPrefix("uploadThreshold"));
343+
344+
sourceFile = File.createTempFile("sourceFile", ".tmp");
345+
destinationFile = new File(sourceFile.getParentFile(), "destinationFile.tmp");
346+
347+
int fileSize = 10 * 1024;
348+
byte[] buffer = BlobTestHelper.getRandomBuffer(fileSize);
349+
FileOutputStream fos = new FileOutputStream(sourceFile);
350+
fos.write(buffer);
351+
fos.close();
352+
353+
// This should make a single call even though FileInputStream is not seekable because of the optimizations
354+
// from wrapping it in a MarkableFileInputStream
355+
blob.upload(new FileInputStream(sourceFile), fileSize - 1, null, null, sendingRequestEventContext);
356+
357+
assertEquals(1, callList.size());
358+
}
359+
finally {
360+
container.deleteIfExists();
361+
362+
if (sourceFile.exists()) {
363+
sourceFile.delete();
364+
}
365+
}
366+
}
313367
}

microsoft-azure-storage/src/com/microsoft/azure/storage/blob/CloudBlob.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1765,7 +1765,7 @@ public void uploadFromFile(final String path, final AccessCondition accessCondit
17651765
OperationContext opContext) throws StorageException, IOException {
17661766
File file = new File(path);
17671767
long fileLength = file.length();
1768-
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
1768+
InputStream inputStream = new FileInputStream(file); // The call to upload supports FileInputStream efficiently.
17691769
this.upload(inputStream, fileLength, accessCondition, options, opContext);
17701770
inputStream.close();
17711771
}

microsoft-azure-storage/src/com/microsoft/azure/storage/blob/CloudBlockBlob.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.ByteArrayOutputStream;
2626
import java.io.IOException;
2727
import java.io.InputStream;
28+
import java.io.FileInputStream;
2829
import java.net.HttpURLConnection;
2930
import java.net.URI;
3031
import java.net.URISyntaxException;
@@ -644,8 +645,16 @@ public void upload(final InputStream sourceStream, final long length, final Acce
644645

645646
StreamMd5AndLength descriptor = new StreamMd5AndLength();
646647
descriptor.setLength(length);
647-
648-
InputStream inputDataStream = sourceStream;
648+
649+
// If the sourceStream is a FileInputStream, wrap it in a MarkableFileStream.
650+
// This allows for single shot upload on FileInputStreams.
651+
InputStream inputDataStream;
652+
if(!sourceStream.markSupported() && sourceStream instanceof FileInputStream) {
653+
inputDataStream = new MarkableFileStream((FileInputStream)sourceStream);
654+
}
655+
else {
656+
inputDataStream = sourceStream;
657+
}
649658

650659
// Initial check - skip the PutBlob operation if the input stream isn't markable, or if the length is known to
651660
// be greater than the threshold.
@@ -1018,27 +1027,37 @@ public void uploadBlock(final String blockId, final InputStream sourceStream, fi
10181027
throw new IllegalArgumentException(SR.INVALID_BLOCK_ID);
10191028
}
10201029

1021-
if (sourceStream.markSupported()) {
1030+
// If the sourceStream is a FileInputStream, wrap it in a MarkableFileStream.
1031+
// This prevents buffering the entire block into memory.
1032+
InputStream bufferedStreamReference;
1033+
if(!sourceStream.markSupported() && sourceStream instanceof FileInputStream) {
1034+
bufferedStreamReference = new MarkableFileStream((FileInputStream)sourceStream);
1035+
}
1036+
else {
1037+
bufferedStreamReference = sourceStream;
1038+
}
1039+
1040+
if (bufferedStreamReference.markSupported()) {
10221041
// Mark sourceStream for current position.
1023-
sourceStream.mark(Constants.MAX_MARK_LENGTH);
1042+
bufferedStreamReference.mark(Constants.MAX_MARK_LENGTH);
10241043
}
10251044

1026-
InputStream bufferedStreamReference = sourceStream;
10271045
StreamMd5AndLength descriptor = new StreamMd5AndLength();
10281046
descriptor.setLength(length);
10291047

1030-
if (!sourceStream.markSupported()) {
1048+
if (!bufferedStreamReference.markSupported()) {
10311049
// needs buffering
1050+
// TODO: Change to a BufferedInputStream to avoid the extra buffering and copying.
10321051
final ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
1033-
descriptor = Utility.writeToOutputStream(sourceStream, byteStream, length, false /* rewindSourceStream */,
1052+
descriptor = Utility.writeToOutputStream(bufferedStreamReference, byteStream, length, false /* rewindSourceStream */,
10341053
options.getUseTransactionalContentMD5(), opContext, options);
10351054

10361055
bufferedStreamReference = new ByteArrayInputStream(byteStream.toByteArray());
10371056
}
10381057
else if (length < 0 || options.getUseTransactionalContentMD5()) {
10391058
// If the stream is of unknown length or we need to calculate the
10401059
// MD5, then we we need to read the stream contents first
1041-
descriptor = Utility.analyzeStream(sourceStream, length, -1L, true /* rewindSourceStream */,
1060+
descriptor = Utility.analyzeStream(bufferedStreamReference, length, -1L, true /* rewindSourceStream */,
10421061
options.getUseTransactionalContentMD5());
10431062
}
10441063

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Copyright Microsoft Corporation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
package com.microsoft.azure.storage.core;
16+
17+
import java.io.FileInputStream;
18+
import java.io.FilterInputStream;
19+
import java.io.IOException;
20+
import java.nio.channels.FileChannel;
21+
22+
/**
23+
* RESERVED FOR INTERNAL USE. Wraps a FileStream to allow for more memory efficient uploading.
24+
*/
25+
public final class MarkableFileStream extends FilterInputStream {
26+
private long mark = -1;
27+
private FileChannel fileChannel;
28+
29+
public MarkableFileStream(FileInputStream stream) {
30+
super(stream);
31+
this.fileChannel = stream.getChannel();
32+
}
33+
34+
@Override
35+
public synchronized void mark(int readlimit) {
36+
try {
37+
this.mark = this.fileChannel.position();
38+
}
39+
catch (IOException e) {
40+
this.mark = -1;
41+
}
42+
}
43+
44+
@Override
45+
public synchronized void reset() throws IOException {
46+
if(this.mark == -1){
47+
throw new IOException("Stream must be marked before calling reset");
48+
}
49+
50+
this.fileChannel.position(this.mark);
51+
}
52+
53+
@Override
54+
public boolean markSupported() {
55+
return true;
56+
}
57+
}

0 commit comments

Comments
 (0)