Skip to content

Commit 158096d

Browse files
HADOOP-19604. ABFS: Full blob md5 computation during flush change to be config driven (#7853)
Contributed by Anmol Asrani
1 parent 2bdd9e2 commit 158096d

16 files changed

+1287
-1184
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,10 @@ public class AbfsConfiguration{
438438
FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION)
439439
private boolean isChecksumValidationEnabled;
440440

441+
@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
442+
FS_AZURE_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_FULL_BLOB_ABFS_CHECKSUM_VALIDATION)
443+
private boolean isFullBlobChecksumValidationEnabled;
444+
441445
@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
442446
FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = DEFAULT_ENABLE_PAGINATED_DELETE)
443447
private boolean isPaginatedDeleteEnabled;
@@ -1705,6 +1709,10 @@ public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled)
17051709
this.isChecksumValidationEnabled = isChecksumValidationEnabled;
17061710
}
17071711

1712+
public boolean isFullBlobChecksumValidationEnabled() {
1713+
return isFullBlobChecksumValidationEnabled;
1714+
}
1715+
17081716
public long getBlobCopyProgressPollWaitMillis() {
17091717
return blobCopyProgressPollWaitMillis;
17101718
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,9 @@ public final class ConfigurationKeys {
356356
/** Add extra layer of verification of the integrity of the request content during transport: {@value}. */
357357
public static final String FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION = "fs.azure.enable.checksum.validation";
358358

359+
/** Add extra layer of verification of the integrity of the full blob request content during transport: {@value}. */
360+
public static final String FS_AZURE_ENABLE_FULL_BLOB_CHECKSUM_VALIDATION = "fs.azure.enable.full.blob.checksum.validation";
361+
359362
public static String accountProperty(String property, String account) {
360363
return property + DOT + account;
361364
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ public final class FileSystemConfigurations {
147147
public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;
148148
public static final boolean DEFAULT_ENABLE_PAGINATED_DELETE = false;
149149
public static final boolean DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION = false;
150+
public static final boolean DEFAULT_ENABLE_FULL_BLOB_ABFS_CHECKSUM_VALIDATION = false;
150151

151152
/**
152153
* Limit of queued block upload operations before writes

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1076,9 +1076,10 @@ public AbfsRestOperation flush(byte[] buffer,
10761076
if (leaseId != null) {
10771077
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
10781078
}
1079-
if (blobMd5 != null) {
1080-
requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, blobMd5));
1081-
}
1079+
String md5Value = (isFullBlobChecksumValidationEnabled() && blobMd5 != null)
1080+
? blobMd5
1081+
: computeMD5Hash(buffer, 0, buffer.length);
1082+
requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, md5Value));
10821083
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
10831084
abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCKLIST);
10841085
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
@@ -1103,7 +1104,12 @@ public AbfsRestOperation flush(byte[] buffer,
11031104
AbfsRestOperation op1 = getPathStatus(path, true, tracingContext,
11041105
contextEncryptionAdapter);
11051106
String metadataMd5 = op1.getResult().getResponseHeader(CONTENT_MD5);
1106-
if (blobMd5 != null && !blobMd5.equals(metadataMd5)) {
1107+
/*
1108+
* Validate the response by comparing the server's MD5 metadata against either:
1109+
* 1. The full blob content MD5 (if full blob checksum validation is enabled), or
1110+
* 2. The full block ID list buffer MD5 (fallback if blob checksum validation is disabled)
1111+
*/
1112+
if (md5Value != null && !md5Value.equals(metadataMd5)) {
11071113
throw ex;
11081114
}
11091115
return op;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1415,14 +1415,25 @@ protected boolean isChecksumValidationEnabled(List<AbfsHttpHeader> requestHeader
14151415
/**
14161416
* Conditions check for allowing checksum support for write operation.
14171417
* Server will support this if client sends the MD5 Hash as a request header.
1418-
* For azure stoage service documentation and more details refer to
1418+
* For azure storage service documentation and more details refer to
14191419
* <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update">Path - Update Azure Rest API</a>.
14201420
* @return true if checksum validation enabled.
14211421
*/
14221422
protected boolean isChecksumValidationEnabled() {
14231423
return getAbfsConfiguration().getIsChecksumValidationEnabled();
14241424
}
14251425

1426+
/**
1427+
* Conditions check for allowing checksum support for write operation.
1428+
* Server will support this if client sends the MD5 Hash as a request header.
1429+
* For azure storage service documentation and more details refer to
1430+
* <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update">Path - Update Azure Rest API</a>.
1431+
* @return true if full blob checksum validation enabled.
1432+
*/
1433+
protected boolean isFullBlobChecksumValidationEnabled() {
1434+
return getAbfsConfiguration().isFullBlobChecksumValidationEnabled();
1435+
}
1436+
14261437
/**
14271438
* Compute MD5Hash of the given byte array starting from given offset up to given length.
14281439
* @param data byte array from which data is fetched to compute MD5 Hash.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -867,10 +867,9 @@ public AbfsRestOperation flush(final String path,
867867
if (leaseId != null) {
868868
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
869869
}
870-
if (isChecksumValidationEnabled() && blobMd5 != null) {
870+
if (isFullBlobChecksumValidationEnabled() && blobMd5 != null) {
871871
requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, blobMd5));
872872
}
873-
874873
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
875874
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION);
876875
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
223223
md5 = MessageDigest.getInstance(MD5);
224224
fullBlobContentMd5 = MessageDigest.getInstance(MD5);
225225
} catch (NoSuchAlgorithmException e) {
226-
if (client.isChecksumValidationEnabled()) {
226+
if (isChecksumValidationEnabled()) {
227227
throw new IOException("MD5 algorithm not available", e);
228228
}
229229
}
@@ -464,10 +464,13 @@ public synchronized void write(final byte[] data, final int off, final int lengt
464464
AbfsBlock block = createBlockIfNeeded(position);
465465
int written = bufferData(block, data, off, length);
466466
// Update the incremental MD5 hash with the written data.
467-
getMessageDigest().update(data, off, written);
468-
467+
if (isChecksumValidationEnabled()) {
468+
getMessageDigest().update(data, off, written);
469+
}
469470
// Update the full blob MD5 hash with the written data.
470-
getFullBlobContentMd5().update(data, off, written);
471+
if (isFullBlobChecksumValidationEnabled()) {
472+
getFullBlobContentMd5().update(data, off, written);
473+
}
471474
int remainingCapacity = block.remainingCapacity();
472475

473476
if (written < length) {
@@ -544,7 +547,7 @@ private void uploadBlockAsync(AbfsBlock blockToUpload,
544547
outputStreamStatistics.bytesToUpload(bytesLength);
545548
outputStreamStatistics.writeCurrentBuffer();
546549
DataBlocks.BlockUploadData blockUploadData = blockToUpload.startUpload();
547-
String md5Hash = getMd5();
550+
String md5Hash = getClient().isChecksumValidationEnabled() ? getMd5() : null;
548551
final Future<Void> job =
549552
executorService.submit(() -> {
550553
AbfsPerfTracker tracker =
@@ -1222,6 +1225,20 @@ public MessageDigest getFullBlobContentMd5() {
12221225
return fullBlobContentMd5;
12231226
}
12241227

1228+
/**
1229+
* @return true if checksum validation is enabled.
1230+
*/
1231+
public boolean isChecksumValidationEnabled() {
1232+
return getClient().isChecksumValidationEnabled();
1233+
}
1234+
1235+
/**
1236+
* @return true if full blob checksum validation is enabled.
1237+
*/
1238+
public boolean isFullBlobChecksumValidationEnabled() {
1239+
return getClient().isFullBlobChecksumValidationEnabled();
1240+
}
1241+
12251242
/**
12261243
* Returns the Base64-encoded MD5 checksum based on the current digest state.
12271244
* This finalizes the digest calculation. Returns null if the digest is empty.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ protected synchronized AbfsBlock createBlockInternal(long position)
9595
setBlockCount(getBlockCount() + 1);
9696
AbfsBlock activeBlock = new AbfsBlobBlock(getAbfsOutputStream(), position, getBlockIdLength(), getBlockCount());
9797
activeBlock.setBlockEntry(addNewEntry(activeBlock.getBlockId(), activeBlock.getOffset()));
98-
getAbfsOutputStream().getMessageDigest().reset();
98+
if (getAbfsOutputStream().isChecksumValidationEnabled()) {
99+
getAbfsOutputStream().getMessageDigest().reset();
100+
}
99101
setActiveBlock(activeBlock);
100102
}
101103
return getActiveBlock();

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,10 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset,
180180
tracingContextFlush.setIngressHandler(BLOB_FLUSH);
181181
tracingContextFlush.setPosition(String.valueOf(offset));
182182
LOG.trace("Flushing data at offset {} for path {}", offset, getAbfsOutputStream().getPath());
183-
String fullBlobMd5 = computeFullBlobMd5();
183+
String fullBlobMd5 = null;
184+
if (getClient().isFullBlobChecksumValidationEnabled()) {
185+
fullBlobMd5 = computeFullBlobMd5();
186+
}
184187
op = getClient().flush(blockListXml.getBytes(StandardCharsets.UTF_8),
185188
getAbfsOutputStream().getPath(),
186189
isClose, getAbfsOutputStream().getCachedSasTokenString(), leaseId,
@@ -194,7 +197,9 @@ isClose, getAbfsOutputStream().getCachedSasTokenString(), leaseId,
194197
LOG.error("Error in remote flush for path {} and offset {}", getAbfsOutputStream().getPath(), offset, ex);
195198
throw ex;
196199
} finally {
197-
getAbfsOutputStream().getFullBlobContentMd5().reset();
200+
if (getClient().isFullBlobChecksumValidationEnabled()) {
201+
getAbfsOutputStream().getFullBlobContentMd5().reset();
202+
}
198203
}
199204
return op;
200205
}
@@ -221,7 +226,7 @@ protected AbfsRestOperation remoteAppendBlobWrite(String path,
221226
AppendRequestParameters reqParams,
222227
TracingContext tracingContext) throws IOException {
223228
// Perform the remote append operation using the blob client.
224-
AbfsRestOperation op = null;
229+
AbfsRestOperation op;
225230
try {
226231
op = blobClient.appendBlock(path, reqParams, uploadData.toByteArray(), tracingContext);
227232
} catch (AbfsRestOperationException ex) {

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSBlockManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ protected synchronized AbfsBlock createBlockInternal(long position)
6262
if (getActiveBlock() == null) {
6363
setBlockCount(getBlockCount() + 1);
6464
AbfsBlock activeBlock = new AbfsBlock(getAbfsOutputStream(), position);
65-
getAbfsOutputStream().getMessageDigest().reset();
65+
if (getAbfsOutputStream().isChecksumValidationEnabled()) {
66+
getAbfsOutputStream().getMessageDigest().reset();
67+
}
6668
setActiveBlock(activeBlock);
6769
}
6870
return getActiveBlock();

0 commit comments

Comments
 (0)