Skip to content

Commit 148c4d7

Browse files
HADOOP-19604. ABFS: BlockId generation based on blockCount along with full blob md5 computation change (#7777) (#7819)
Contributed by Anmol Asrani
1 parent e9749e9 commit 148c4d7

29 files changed

+2439
-179
lines changed

hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsClient.java"/>
4949
<suppress checks="ParameterNumber"
5050
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsBlobClient.java"/>
51+
<suppress checks="ParameterNumber"
52+
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsDfsClient.java"/>
5153
<suppress checks="ParameterNumber|MagicNumber"
5254
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]VersionedFileStatus.java"/>
5355
<suppress checks="ParameterNumber"

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ public static ApiVersion getCurrentVersion() {
216216
public static final String XML_TAG_RESOURCE_TYPE = "ResourceType";
217217
public static final String XML_TAG_INVALID_XML = "Invalid XML";
218218
public static final String XML_TAG_HDI_ISFOLDER = "hdi_isfolder";
219+
public static final String XML_TAG_HDI_PERMISSION = "hdi_permission";
219220
public static final String XML_TAG_ETAG = "Etag";
220221
public static final String XML_TAG_LAST_MODIFIED_TIME = "Last-Modified";
221222
public static final String XML_TAG_CREATION_TIME = "Creation-Time";

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,36 @@ public final class FileSystemConfigurations {
150150
*/
151151
public static final int BLOCK_ID_LENGTH = 60;
152152

153+
/**
154+
* Format string for generating block IDs.
155+
* Example: "%s-%06d" where %s is the stream ID and %06d is the block index.
156+
*/
157+
public static final String BLOCK_ID_FORMAT = "%s-%06d";
158+
159+
/**
160+
* Format string for padding block IDs.
161+
* Example: "%-" specifies left alignment in the format string.
162+
*/
163+
public static final String PADDING_FORMAT = "%-";
164+
165+
/**
166+
* Suffix for string formatting.
167+
* Example: "s" specifies the type as a string in the format string.
168+
*/
169+
public static final String STRING_SUFFIX = "s";
170+
171+
/**
172+
* Character used for padding spaces in block IDs.
173+
* Example: ' ' represents a space character.
174+
*/
175+
public static final char SPACE_CHARACTER = ' ';
176+
177+
/**
178+
* Character used for padding block IDs.
179+
* Example: '_' is used to replace spaces in padded block IDs.
180+
*/
181+
public static final char PADDING_CHARACTER = '_';
182+
153183
/**
154184
* Buffer blocks to disk.
155185
* Capacity is limited to available disk space.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public enum Mode {
3737
private boolean isExpectHeaderEnabled;
3838
private boolean isRetryDueToExpect;
3939
private BlobAppendRequestParameters blobParams;
40+
private final String md5;
4041

4142

4243
/**
@@ -48,14 +49,16 @@ public enum Mode {
4849
* @param isAppendBlob true if the blob is append-blob
4950
* @param leaseId leaseId of the blob to be appended
5051
* @param isExpectHeaderEnabled true if the expect header is enabled
52+
* @param md5 The Base64-encoded MD5 hash of the block for data integrity validation.
5153
*/
5254
public AppendRequestParameters(final long position,
5355
final int offset,
5456
final int length,
5557
final Mode mode,
5658
final boolean isAppendBlob,
5759
final String leaseId,
58-
final boolean isExpectHeaderEnabled) {
60+
final boolean isExpectHeaderEnabled,
61+
final String md5) {
5962
this.position = position;
6063
this.offset = offset;
6164
this.length = length;
@@ -65,6 +68,7 @@ public AppendRequestParameters(final long position,
6568
this.isExpectHeaderEnabled = isExpectHeaderEnabled;
6669
this.isRetryDueToExpect = false;
6770
this.blobParams = null;
71+
this.md5 = md5;
6872
}
6973

7074
/**
@@ -77,6 +81,7 @@ public AppendRequestParameters(final long position,
7781
* @param leaseId leaseId of the blob to be appended
7882
* @param isExpectHeaderEnabled true if the expect header is enabled
7983
* @param blobParams parameters specific to append operation on Blob Endpoint.
84+
* @param md5 The Base64-encoded MD5 hash of the block for data integrity validation.
8085
*/
8186
public AppendRequestParameters(final long position,
8287
final int offset,
@@ -85,7 +90,8 @@ public AppendRequestParameters(final long position,
8590
final boolean isAppendBlob,
8691
final String leaseId,
8792
final boolean isExpectHeaderEnabled,
88-
final BlobAppendRequestParameters blobParams) {
93+
final BlobAppendRequestParameters blobParams,
94+
final String md5) {
8995
this.position = position;
9096
this.offset = offset;
9197
this.length = length;
@@ -95,6 +101,7 @@ public AppendRequestParameters(final long position,
95101
this.isExpectHeaderEnabled = isExpectHeaderEnabled;
96102
this.isRetryDueToExpect = false;
97103
this.blobParams = blobParams;
104+
this.md5 = md5;
98105
}
99106

100107
public long getPosition() {
@@ -146,6 +153,15 @@ public String getBlockId() {
146153
return getBlobParams().getBlockId();
147154
}
148155

156+
/**
157+
* Gets the MD5 hash.
158+
*
159+
* @return the MD5 hash string
160+
*/
161+
public String getMd5() {
162+
return md5;
163+
}
164+
149165
/**
150166
* Sets whether the retry is due to the Expect header.
151167
*

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobBlock.java

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,15 @@
2020

2121
import java.io.IOException;
2222
import java.nio.charset.StandardCharsets;
23+
import java.util.UUID;
2324

2425
import org.apache.commons.codec.binary.Base64;
2526

26-
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_ID_LENGTH;
27+
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_ID_FORMAT;
28+
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.PADDING_CHARACTER;
29+
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.PADDING_FORMAT;
30+
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SPACE_CHARACTER;
31+
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STRING_SUFFIX;
2732

2833
/**
2934
* Represents a block in Azure Blob Storage used by Azure Data Lake Storage (ADLS).
@@ -34,31 +39,50 @@
3439
public class AbfsBlobBlock extends AbfsBlock {
3540

3641
private final String blockId;
42+
private final long blockIndex;
3743

3844
/**
3945
* Gets the activeBlock and the blockId.
4046
*
4147
* @param outputStream AbfsOutputStream Instance.
4248
* @param offset Used to generate blockId based on offset.
49+
* @param blockIdLength the expected length of the generated block ID.
50+
* @param blockIndex the index of the block; used in block ID generation.
4351
* @throws IOException exception is thrown.
4452
*/
45-
AbfsBlobBlock(AbfsOutputStream outputStream, long offset) throws IOException {
53+
AbfsBlobBlock(AbfsOutputStream outputStream, long offset, int blockIdLength, long blockIndex) throws IOException {
4654
super(outputStream, offset);
47-
this.blockId = generateBlockId(offset);
55+
this.blockIndex = blockIndex;
56+
String streamId = outputStream.getStreamID();
57+
UUID streamIdGuid = UUID.nameUUIDFromBytes(streamId.getBytes(StandardCharsets.UTF_8));
58+
this.blockId = generateBlockId(streamIdGuid, blockIdLength);
4859
}
4960

5061
/**
51-
* Helper method that generates blockId.
52-
* @param position The offset needed to generate blockId.
53-
* @return String representing the block ID generated.
62+
* Generates a Base64-encoded block ID string using the given stream UUID and block index.
63+
* The block ID is first created as a raw string using a format with the stream ID and block index.
64+
* If a non-zero rawLength is provided, the raw block ID is padded or trimmed to match the length.
65+
* The final string is then Base64-encoded and returned.
66+
*
67+
* @param streamId the UUID of the stream used to generate the block ID.
68+
* @param rawLength the desired length of the raw block ID string before encoding.
69+
* If 0, no length adjustment is done.
70+
* @return the Base64-encoded block ID string.
5471
*/
55-
private String generateBlockId(long position) {
56-
String streamId = getOutputStream().getStreamID();
57-
String streamIdHash = Integer.toString(streamId.hashCode());
58-
String blockId = String.format("%d_%s", position, streamIdHash);
59-
byte[] blockIdByteArray = new byte[BLOCK_ID_LENGTH];
60-
System.arraycopy(blockId.getBytes(StandardCharsets.UTF_8), 0, blockIdByteArray, 0, Math.min(BLOCK_ID_LENGTH, blockId.length()));
61-
return new String(Base64.encodeBase64(blockIdByteArray), StandardCharsets.UTF_8);
72+
private String generateBlockId(UUID streamId, int rawLength) {
73+
String rawBlockId = String.format(BLOCK_ID_FORMAT, streamId, blockIndex);
74+
75+
if (rawLength != 0) {
76+
// Adjust to match expected decoded length
77+
if (rawBlockId.length() < rawLength) {
78+
rawBlockId = String.format(PADDING_FORMAT + rawLength + STRING_SUFFIX, rawBlockId)
79+
.replace(SPACE_CHARACTER, PADDING_CHARACTER);
80+
} else if (rawBlockId.length() > rawLength) {
81+
rawBlockId = rawBlockId.substring(0, rawLength);
82+
}
83+
}
84+
85+
return Base64.encodeBase64String(rawBlockId.getBytes(StandardCharsets.UTF_8));
6286
}
6387

6488
/**

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@
129129
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_BLOCK_NAME;
130130
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_COMMITTED_BLOCKS;
131131
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_HDI_ISFOLDER;
132+
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_HDI_PERMISSION;
132133
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_NAME;
133134
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_VERSION;
134135
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII;
@@ -898,7 +899,7 @@ public AbfsRestOperation append(final String path,
898899
requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
899900
}
900901
if (isChecksumValidationEnabled()) {
901-
addCheckSumHeaderForWrite(requestHeaders, reqParams, buffer);
902+
addCheckSumHeaderForWrite(requestHeaders, reqParams);
902903
}
903904
if (reqParams.isRetryDueToExpect()) {
904905
String userAgentRetry = getUserAgent();
@@ -982,6 +983,9 @@ public AbfsRestOperation appendBlock(final String path,
982983
if (requestParameters.getLeaseId() != null) {
983984
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, requestParameters.getLeaseId()));
984985
}
986+
if (isChecksumValidationEnabled()) {
987+
addCheckSumHeaderForWrite(requestHeaders, requestParameters);
988+
}
985989
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
986990
abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, APPEND_BLOCK);
987991
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION, abfsUriQueryBuilder);
@@ -1021,6 +1025,7 @@ public AbfsRestOperation appendBlock(final String path,
10211025
* @param leaseId if there is an active lease on the path.
10221026
* @param contextEncryptionAdapter to provide encryption context.
10231027
* @param tracingContext for tracing the server calls.
1028+
* @param blobMd5 the MD5 hash of the blob for integrity verification.
10241029
* @return exception as this operation is not supported on Blob Endpoint.
10251030
* @throws UnsupportedOperationException always.
10261031
*/
@@ -1032,7 +1037,7 @@ public AbfsRestOperation flush(final String path,
10321037
final String cachedSasToken,
10331038
final String leaseId,
10341039
final ContextEncryptionAdapter contextEncryptionAdapter,
1035-
final TracingContext tracingContext) throws AzureBlobFileSystemException {
1040+
final TracingContext tracingContext, String blobMd5) throws AzureBlobFileSystemException {
10361041
throw new UnsupportedOperationException(
10371042
"Flush without blockIds not supported on Blob Endpoint");
10381043
}
@@ -1049,6 +1054,7 @@ public AbfsRestOperation flush(final String path,
10491054
* @param eTag The etag of the blob.
10501055
* @param contextEncryptionAdapter to provide encryption context.
10511056
* @param tracingContext for tracing the service call.
1057+
* @param blobMd5 the MD5 hash of the blob for integrity verification.
10521058
* @return executed rest operation containing response from server.
10531059
* @throws AzureBlobFileSystemException if rest operation fails.
10541060
*/
@@ -1060,7 +1066,7 @@ public AbfsRestOperation flush(byte[] buffer,
10601066
final String leaseId,
10611067
final String eTag,
10621068
ContextEncryptionAdapter contextEncryptionAdapter,
1063-
final TracingContext tracingContext) throws AzureBlobFileSystemException {
1069+
final TracingContext tracingContext, String blobMd5) throws AzureBlobFileSystemException {
10641070
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
10651071
addEncryptionKeyRequestHeaders(path, requestHeaders, false,
10661072
contextEncryptionAdapter, tracingContext);
@@ -1070,9 +1076,9 @@ public AbfsRestOperation flush(byte[] buffer,
10701076
if (leaseId != null) {
10711077
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
10721078
}
1073-
String md5Hash = computeMD5Hash(buffer, 0, buffer.length);
1074-
requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, md5Hash));
1075-
1079+
if (blobMd5 != null) {
1080+
requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, blobMd5));
1081+
}
10761082
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
10771083
abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCKLIST);
10781084
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
@@ -1097,7 +1103,7 @@ public AbfsRestOperation flush(byte[] buffer,
10971103
AbfsRestOperation op1 = getPathStatus(path, true, tracingContext,
10981104
contextEncryptionAdapter);
10991105
String metadataMd5 = op1.getResult().getResponseHeader(CONTENT_MD5);
1100-
if (!md5Hash.equals(metadataMd5)) {
1106+
if (blobMd5 != null && !blobMd5.equals(metadataMd5)) {
11011107
throw ex;
11021108
}
11031109
return op;
@@ -1914,7 +1920,11 @@ private List<AbfsHttpHeader> getMetadataHeadersList(final Hashtable<String, Stri
19141920
// AzureBlobFileSystem supports only ASCII Characters in property values.
19151921
if (isPureASCII(value)) {
19161922
try {
1917-
value = encodeMetadataAttribute(value);
1923+
// URL encoding this JSON metadata, set by the WASB Client during file creation, causes compatibility issues.
1924+
// Therefore, we need to avoid encoding this metadata.
1925+
if (!XML_TAG_HDI_PERMISSION.equalsIgnoreCase(entry.getKey())) {
1926+
value = encodeMetadataAttribute(value);
1927+
}
19181928
} catch (UnsupportedEncodingException e) {
19191929
throw new InvalidAbfsRestOperationException(e);
19201930
}
@@ -2057,7 +2067,7 @@ public static String generateBlockListXml(String blockIdString) {
20572067

20582068
// Split the block ID string by commas and generate XML for each block ID
20592069
if (!blockIdString.isEmpty()) {
2060-
String[] blockIds = blockIdString.split(",");
2070+
String[] blockIds = blockIdString.split(COMMA);
20612071
for (String blockId : blockIds) {
20622072
stringBuilder.append(String.format(LATEST_BLOCK_FORMAT, blockId));
20632073
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -883,27 +883,31 @@ public boolean appendSuccessCheckOp(AbfsRestOperation op, final String path,
883883
* @param leaseId if there is an active lease on the path.
884884
* @param contextEncryptionAdapter to provide encryption context.
885885
* @param tracingContext for tracing the server calls.
886+
* @param blobMd5 The Base64-encoded MD5 hash of the blob for data integrity validation.
886887
* @return executed rest operation containing response from server.
887888
* @throws AzureBlobFileSystemException if rest operation fails.
888889
*/
889890
public abstract AbfsRestOperation flush(String path, long position,
890891
boolean retainUncommittedData, boolean isClose,
891892
String cachedSasToken, String leaseId,
892-
ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext)
893+
ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext, String blobMd5)
893894
throws AzureBlobFileSystemException;
894895

895896
/**
896-
* Flush previously uploaded data to a file.
897-
* @param buffer containing blockIds to be flushed.
898-
* @param path on which data has to be flushed.
899-
* @param isClose specify if this is the last flush to the file.
900-
* @param cachedSasToken to be used for the authenticating operation.
901-
* @param leaseId if there is an active lease on the path.
902-
* @param eTag to specify conditional headers.
903-
* @param contextEncryptionAdapter to provide encryption context.
904-
* @param tracingContext for tracing the server calls.
905-
* @return executed rest operation containing response from server.
906-
* @throws AzureBlobFileSystemException if rest operation fails.
897+
* Flushes previously uploaded data to the specified path.
898+
*
899+
* @param buffer The buffer containing block IDs to be flushed.
900+
* @param path The file path to which data should be flushed.
901+
* @param isClose True if this is the final flush (i.e., the file is being closed).
902+
* @param cachedSasToken SAS token used for authentication (if applicable).
903+
* @param leaseId Lease ID, if a lease is active on the file.
904+
* @param eTag ETag used for conditional request headers (e.g., If-Match).
905+
* @param contextEncryptionAdapter Adapter to provide encryption context, if encryption is enabled.
906+
* @param tracingContext Context for tracing the server calls.
907+
* @param blobMd5 The Base64-encoded MD5 hash of the blob for data integrity validation.
908+
* @return The executed {@link AbfsRestOperation} containing the server response.
909+
*
910+
* @throws AzureBlobFileSystemException if the flush operation fails.
907911
*/
908912
public abstract AbfsRestOperation flush(byte[] buffer,
909913
String path,
@@ -912,7 +916,7 @@ public abstract AbfsRestOperation flush(byte[] buffer,
912916
String leaseId,
913917
String eTag,
914918
ContextEncryptionAdapter contextEncryptionAdapter,
915-
TracingContext tracingContext) throws AzureBlobFileSystemException;
919+
TracingContext tracingContext, String blobMd5) throws AzureBlobFileSystemException;
916920

917921
/**
918922
* Set the properties of a file or directory.
@@ -1356,17 +1360,15 @@ private void appendIfNotEmpty(StringBuilder sb, String regEx,
13561360

13571361
/**
13581362
* Add MD5 hash as request header to the append request.
1363+
*
13591364
* @param requestHeaders to be updated with checksum header
13601365
* @param reqParams for getting offset and length
1361-
* @param buffer for getting input data for MD5 computation
1362-
* @throws AbfsRestOperationException if Md5 computation fails
13631366
*/
13641367
protected void addCheckSumHeaderForWrite(List<AbfsHttpHeader> requestHeaders,
1365-
final AppendRequestParameters reqParams, final byte[] buffer)
1366-
throws AbfsRestOperationException {
1367-
String md5Hash = computeMD5Hash(buffer, reqParams.getoffset(),
1368-
reqParams.getLength());
1369-
requestHeaders.add(new AbfsHttpHeader(CONTENT_MD5, md5Hash));
1368+
final AppendRequestParameters reqParams) {
1369+
if (reqParams.getMd5() != null) {
1370+
requestHeaders.add(new AbfsHttpHeader(CONTENT_MD5, reqParams.getMd5()));
1371+
}
13701372
}
13711373

13721374
/**

0 commit comments

Comments
 (0)