Skip to content

Commit ceedf55

Browse files
Implement structured message decoder integration with blob download methods
Co-authored-by: gunjansingh-msft <[email protected]>
1 parent 10ae2af commit ceedf55

File tree

7 files changed

+673
-4
lines changed

7 files changed

+673
-4
lines changed

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobDownloadToFileOptions.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.azure.storage.blob.models.DownloadRetryOptions;
1010
import com.azure.storage.common.ParallelTransferOptions;
1111
import com.azure.storage.common.implementation.StorageImplUtils;
12+
import com.azure.storage.common.implementation.contentvalidation.DownloadContentValidationOptions;
1213

1314
import java.nio.file.OpenOption;
1415
import java.util.Set;
@@ -25,6 +26,7 @@ public class BlobDownloadToFileOptions {
2526
private BlobRequestConditions requestConditions;
2627
private boolean retrieveContentRangeMd5;
2728
private Set<OpenOption> openOptions;
29+
private DownloadContentValidationOptions contentValidationOptions;
2830

2931
/**
3032
* Constructs a {@link BlobDownloadToFileOptions}.
@@ -100,6 +102,15 @@ public Set<OpenOption> getOpenOptions() {
100102
return openOptions;
101103
}
102104

105+
/**
106+
* Gets the {@link DownloadContentValidationOptions}.
107+
*
108+
* @return {@link DownloadContentValidationOptions}
109+
*/
110+
public DownloadContentValidationOptions getContentValidationOptions() {
111+
return contentValidationOptions;
112+
}
113+
103114
/**
104115
* Sets the {@link BlobRange}.
105116
*
@@ -165,4 +176,15 @@ public BlobDownloadToFileOptions setOpenOptions(Set<OpenOption> openOptions) {
165176
this.openOptions = openOptions;
166177
return this;
167178
}
179+
180+
/**
181+
* Sets the {@link DownloadContentValidationOptions}.
182+
*
183+
* @param contentValidationOptions {@link DownloadContentValidationOptions}
184+
* @return The updated options.
185+
*/
186+
public BlobDownloadToFileOptions setContentValidationOptions(DownloadContentValidationOptions contentValidationOptions) {
187+
this.contentValidationOptions = contentValidationOptions;
188+
return this;
189+
}
168190
}

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java

Lines changed: 168 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66
import com.azure.core.annotation.ReturnType;
77
import com.azure.core.annotation.ServiceMethod;
8+
import com.azure.core.http.HttpHeaders;
89
import com.azure.core.http.HttpPipeline;
10+
import com.azure.core.http.HttpRequest;
911
import com.azure.core.http.HttpResponse;
1012
import com.azure.core.http.RequestConditions;
1113
import com.azure.core.http.rest.Response;
@@ -83,6 +85,8 @@
8385
import com.azure.storage.common.Utility;
8486
import com.azure.storage.common.implementation.SasImplUtils;
8587
import com.azure.storage.common.implementation.StorageImplUtils;
88+
import com.azure.storage.common.implementation.contentvalidation.DownloadContentValidationOptions;
89+
import com.azure.storage.common.implementation.contentvalidation.StructuredMessageDecodingStream;
8690
import reactor.core.publisher.Flux;
8791
import reactor.core.publisher.Mono;
8892
import reactor.core.publisher.SignalType;
@@ -1173,6 +1177,51 @@ public Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange rang
11731177
}
11741178
}
11751179

1180+
/**
1181+
* Reads a range of bytes from a blob with content validation options. Uploading data must be done from the {@link BlockBlobClient}, {@link
1182+
* PageBlobClient}, or {@link AppendBlobClient}.
1183+
*
1184+
* <p><strong>Code Samples</strong></p>
1185+
*
1186+
* <pre>{@code
1187+
* BlobRange range = new BlobRange(1024, 2048L);
1188+
* DownloadRetryOptions options = new DownloadRetryOptions().setMaxRetryRequests(5);
1189+
* DownloadContentValidationOptions validationOptions = new DownloadContentValidationOptions()
1190+
* .setStructuredMessageValidationEnabled(true);
1191+
*
1192+
* client.downloadStreamWithResponse(range, options, null, false, validationOptions).subscribe(response -> {
1193+
* ByteArrayOutputStream downloadData = new ByteArrayOutputStream();
1194+
* response.getValue().subscribe(piece -> {
1195+
* try {
1196+
* downloadData.write(piece.array());
1197+
* } catch (IOException ex) {
1198+
* throw new UncheckedIOException(ex);
1199+
* }
1200+
* });
1201+
* });
1202+
* }</pre>
1203+
*
1204+
* <p>For more information, see the
1205+
* <a href="https://docs.microsoft.com/rest/api/storageservices/get-blob">Azure Docs</a></p>
1206+
*
1207+
* @param range {@link BlobRange}
1208+
* @param options {@link DownloadRetryOptions}
1209+
* @param requestConditions {@link BlobRequestConditions}
1210+
* @param getRangeContentMd5 Whether the contentMD5 for the specified blob range should be returned.
1211+
* @param contentValidationOptions {@link DownloadContentValidationOptions} options for content validation
1212+
* @return A reactive response containing the blob data.
1213+
*/
1214+
@ServiceMethod(returns = ReturnType.SINGLE)
1215+
public Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options,
1216+
BlobRequestConditions requestConditions, boolean getRangeContentMd5, DownloadContentValidationOptions contentValidationOptions) {
1217+
try {
1218+
return withContext(
1219+
context -> downloadStreamWithResponse(range, options, requestConditions, getRangeContentMd5, contentValidationOptions, context));
1220+
} catch (RuntimeException ex) {
1221+
return monoError(LOGGER, ex);
1222+
}
1223+
}
1224+
11761225
/**
11771226
* Reads a range of bytes from a blob. Uploading data must be done from the {@link BlockBlobClient}, {@link
11781227
* PageBlobClient}, or {@link AppendBlobClient}.
@@ -1281,6 +1330,114 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
12811330
});
12821331
}
12831332

1333+
Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options,
1334+
BlobRequestConditions requestConditions, boolean getRangeContentMd5, DownloadContentValidationOptions contentValidationOptions, Context context) {
1335+
// For backward compatibility, if no content validation options are provided, use the original method
1336+
if (contentValidationOptions == null || (!contentValidationOptions.isStructuredMessageValidationEnabled() && !contentValidationOptions.isMd5ValidationEnabled())) {
1337+
return downloadStreamWithResponse(range, options, requestConditions, getRangeContentMd5, context);
1338+
}
1339+
1340+
BlobRange finalRange = range == null ? new BlobRange(0) : range;
1341+
Boolean getMD5 = getRangeContentMd5 || (contentValidationOptions != null && contentValidationOptions.isMd5ValidationEnabled()) ? true : null;
1342+
BlobRequestConditions finalRequestConditions
1343+
= requestConditions == null ? new BlobRequestConditions() : requestConditions;
1344+
DownloadRetryOptions finalOptions = (options == null) ? new DownloadRetryOptions() : options;
1345+
1346+
// The first range should eagerly convert headers as they'll be used to create response types.
1347+
Context firstRangeContext = context == null
1348+
? new Context("azure-eagerly-convert-headers", true)
1349+
: context.addData("azure-eagerly-convert-headers", true);
1350+
1351+
return downloadRange(finalRange, finalRequestConditions, finalRequestConditions.getIfMatch(), getMD5,
1352+
firstRangeContext).map(response -> {
1353+
BlobsDownloadHeaders blobsDownloadHeaders = new BlobsDownloadHeaders(response.getHeaders());
1354+
String eTag = blobsDownloadHeaders.getETag();
1355+
BlobDownloadHeaders blobDownloadHeaders = ModelHelper.populateBlobDownloadHeaders(blobsDownloadHeaders,
1356+
ModelHelper.getErrorCode(response.getHeaders()));
1357+
1358+
/*
1359+
* If the customer did not specify a count, they are reading to the end of the blob. Extract this value
1360+
* from the response for better book-keeping towards the end.
1361+
*/
1362+
long finalCount;
1363+
long initialOffset = finalRange.getOffset();
1364+
if (finalRange.getCount() == null) {
1365+
long blobLength = ModelHelper.getBlobLength(blobDownloadHeaders);
1366+
finalCount = blobLength - initialOffset;
1367+
} else {
1368+
finalCount = finalRange.getCount();
1369+
}
1370+
1371+
// Apply structured message decoding if enabled
1372+
Flux<ByteBuffer> decodedStream = response.getValue();
1373+
if (contentValidationOptions != null && contentValidationOptions.isStructuredMessageValidationEnabled()) {
1374+
// Use the content length from headers to determine expected length for structured message decoding
1375+
Long contentLength = blobDownloadHeaders.getContentLength();
1376+
decodedStream = StructuredMessageDecodingStream.wrapStreamIfNeeded(response.getValue(), contentLength, contentValidationOptions);
1377+
}
1378+
1379+
// The resume function takes throwable and offset at the destination.
1380+
// I.e. offset is relative to the starting point.
1381+
BiFunction<Throwable, Long, Mono<StreamResponse>> onDownloadErrorResume = (throwable, offset) -> {
1382+
if (!(throwable instanceof IOException || throwable instanceof TimeoutException)) {
1383+
return Mono.error(throwable);
1384+
}
1385+
1386+
long newCount = finalCount - offset;
1387+
1388+
/*
1389+
* It's possible that the network stream will throw an error after emitting all data but before
1390+
* completing. Issuing a retry at this stage would leave the download in a bad state with
1391+
* incorrect count and offset values. Because we have read the intended amount of data, we can
1392+
* ignore the error at the end of the stream.
1393+
*/
1394+
if (newCount == 0) {
1395+
LOGGER.warning("Exception encountered in ReliableDownload after all data read from the network "
1396+
+ "but before stream signaled completion. Returning success as all data was downloaded. "
1397+
+ "Exception message: " + throwable.getMessage());
1398+
return Mono.empty();
1399+
}
1400+
1401+
try {
1402+
return downloadRange(new BlobRange(initialOffset + offset, newCount), finalRequestConditions,
1403+
eTag, getMD5, context);
1404+
} catch (Exception e) {
1405+
return Mono.error(e);
1406+
}
1407+
};
1408+
1409+
// Create a new response with the decoded stream
1410+
StreamResponse decodedResponse = new StreamResponse() {
1411+
@Override
1412+
public int getStatusCode() {
1413+
return response.getStatusCode();
1414+
}
1415+
1416+
@Override
1417+
public HttpHeaders getHeaders() {
1418+
return response.getHeaders();
1419+
}
1420+
1421+
@Override
1422+
public Flux<ByteBuffer> getValue() {
1423+
return decodedStream;
1424+
}
1425+
1426+
@Override
1427+
public HttpRequest getRequest() {
1428+
return response.getRequest();
1429+
}
1430+
1431+
@Override
1432+
public void close() {
1433+
response.close();
1434+
}
1435+
};
1436+
1437+
return BlobDownloadAsyncResponseConstructorProxy.create(decodedResponse, onDownloadErrorResume, finalOptions);
1438+
});
1439+
}
1440+
12841441
private Mono<StreamResponse> downloadRange(BlobRange range, BlobRequestConditions requestConditions, String eTag,
12851442
Boolean getMD5, Context context) {
12861443
return azureBlobStorage.getBlobs()
@@ -1503,7 +1660,7 @@ Mono<Response<BlobProperties>> downloadToFileWithResponse(BlobDownloadToFileOpti
15031660
AsynchronousFileChannel channel = downloadToFileResourceSupplier(options.getFilePath(), openOptions);
15041661
return Mono.just(channel)
15051662
.flatMap(c -> this.downloadToFileImpl(c, finalRange, finalParallelTransferOptions,
1506-
options.getDownloadRetryOptions(), finalConditions, options.isRetrieveContentRangeMd5(), context))
1663+
options.getDownloadRetryOptions(), finalConditions, options.isRetrieveContentRangeMd5(), options.getContentValidationOptions(), context))
15071664
.doFinally(signalType -> this.downloadToFileCleanup(channel, options.getFilePath(), signalType));
15081665
}
15091666

@@ -1518,7 +1675,7 @@ private AsynchronousFileChannel downloadToFileResourceSupplier(String filePath,
15181675
private Mono<Response<BlobProperties>> downloadToFileImpl(AsynchronousFileChannel file, BlobRange finalRange,
15191676
com.azure.storage.common.ParallelTransferOptions finalParallelTransferOptions,
15201677
DownloadRetryOptions downloadRetryOptions, BlobRequestConditions requestConditions, boolean rangeGetContentMd5,
1521-
Context context) {
1678+
DownloadContentValidationOptions contentValidationOptions, Context context) {
15221679
// See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong.
15231680
ProgressListener progressReceiver = finalParallelTransferOptions.getProgressListener();
15241681
ProgressReporter progressReporter
@@ -1528,8 +1685,15 @@ private Mono<Response<BlobProperties>> downloadToFileImpl(AsynchronousFileChanne
15281685
* Downloads the first chunk and gets the size of the data and etag if not specified by the user.
15291686
*/
15301687
BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> downloadFunc
1531-
= (range, conditions) -> this.downloadStreamWithResponse(range, downloadRetryOptions, conditions,
1532-
rangeGetContentMd5, context);
1688+
= (range, conditions) -> {
1689+
if (contentValidationOptions != null && (contentValidationOptions.isStructuredMessageValidationEnabled() || contentValidationOptions.isMd5ValidationEnabled())) {
1690+
return this.downloadStreamWithResponse(range, downloadRetryOptions, conditions,
1691+
rangeGetContentMd5, contentValidationOptions, context);
1692+
} else {
1693+
return this.downloadStreamWithResponse(range, downloadRetryOptions, conditions,
1694+
rangeGetContentMd5, context);
1695+
}
1696+
};
15331697

15341698
return ChunkedDownloadUtils
15351699
.downloadFirstChunk(finalRange, finalParallelTransferOptions, requestConditions, downloadFunc, true,

0 commit comments

Comments
 (0)