Skip to content

Commit bc5a5b8

Browse files
HADOOP-19658. ABFS:Create and rename idempotency for FNS Blob (#7914)
Contributed by Anmol Asrani
1 parent 158096d commit bc5a5b8

File tree

11 files changed

+508
-135
lines changed

11 files changed

+508
-135
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,10 @@ public class AbfsConfiguration{
492492
DefaultValue = DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID)
493493
private boolean enableClientTransactionId;
494494

495+
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY,
496+
DefaultValue = DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY)
497+
private boolean enableCreateIdempotency;
498+
495499
private String clientProvidedEncryptionKey;
496500
private String clientProvidedEncryptionKeySHA;
497501

@@ -1047,6 +1051,12 @@ public String getAzureAtomicRenameDirs() {
10471051
}
10481052

10491053
public boolean isConditionalCreateOverwriteEnabled() {
1054+
// If either the configured FS service type or the ingress service type is BLOB,
1055+
// conditional create-overwrite is not used.
1056+
if (getIsCreateIdempotencyEnabled() && (getFsConfiguredServiceType() == AbfsServiceType.BLOB
1057+
|| getIngressServiceType() == AbfsServiceType.BLOB)) {
1058+
return false;
1059+
}
10501060
return this.enableConditionalCreateOverwrite;
10511061
}
10521062

@@ -1178,6 +1188,10 @@ public boolean getIsClientTransactionIdEnabled() {
11781188
return enableClientTransactionId;
11791189
}
11801190

1191+
public boolean getIsCreateIdempotencyEnabled() {
1192+
return enableCreateIdempotency;
1193+
}
1194+
11811195
/**
11821196
* Enum config to allow user to pick format of x-ms-client-request-id header
11831197
* @return tracingContextFormat config if valid, else default ALL_ID_FORMAT

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,8 @@ public static String containerProperty(String property, String fsName, String ac
430430
public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = "fs.azure.blob.dir.delete.max.thread";
431431
/**Flag to enable/disable sending client transactional ID during create/rename operations: {@value}*/
432432
public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = "fs.azure.enable.client.transaction.id";
433+
/**Flag to enable/disable create idempotency during create operation: {@value}*/
434+
public static final String FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = "fs.azure.enable.create.blob.idempotency";
433435

434436
private ConfigurationKeys() {}
435437
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,5 +240,7 @@ public final class FileSystemConfigurations {
240240

241241
public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = true;
242242

243+
public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = true;
244+
243245
private FileSystemConfigurations() {}
244246
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -509,9 +509,34 @@ public AbfsRestOperation createPath(final String path,
509509
final TracingContext tracingContext) throws AzureBlobFileSystemException {
510510
AbfsRestOperation op;
511511
if (isFileCreation) {
512-
// Create a file with the specified parameters
513-
op = createFile(path, overwrite, permissions, isAppendBlob, eTag,
514-
contextEncryptionAdapter, tracingContext);
512+
if (getAbfsConfiguration().getIsCreateIdempotencyEnabled()) {
513+
AbfsRestOperation statusOp = null;
514+
try {
515+
// Check if the file already exists by calling GetPathStatus
516+
statusOp = getPathStatus(path, tracingContext, null, false);
517+
} catch (AbfsRestOperationException ex) {
518+
// If the path does not exist, continue with file creation
519+
// For other errors, rethrow the exception
520+
if (ex.getStatusCode() != HTTP_NOT_FOUND) {
521+
throw ex;
522+
}
523+
}
524+
// If the file exists and overwrite is not allowed, throw conflict
525+
if (statusOp != null && statusOp.hasResult() && !overwrite) {
526+
throw new AbfsRestOperationException(
527+
HTTP_CONFLICT,
528+
AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
529+
PATH_EXISTS,
530+
null);
531+
} else {
532+
// Proceed with file creation (force overwrite = true)
533+
op = createFile(path, true, permissions, isAppendBlob, eTag,
534+
contextEncryptionAdapter, tracingContext);
535+
}
536+
} else {
537+
op = createFile(path, overwrite, permissions, isAppendBlob, eTag,
538+
contextEncryptionAdapter, tracingContext);
539+
}
515540
} else {
516541
// Create a directory with the specified parameters
517542
op = createDirectory(path, permissions, isAppendBlob, eTag,
@@ -584,7 +609,6 @@ public AbfsRestOperation createPathRestOp(final String path,
584609
if (eTag != null && !eTag.isEmpty()) {
585610
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
586611
}
587-
588612
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
589613
final AbfsRestOperation op = getAbfsRestOperation(
590614
AbfsRestOperationType.PutBlob,

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
package org.apache.hadoop.fs.azurebfs.services;
2020

21+
import java.util.Collections;
2122
import java.util.List;
2223
import java.util.Map;
24+
import java.util.stream.Collectors;
2325

2426
import org.slf4j.Logger;
2527
import org.slf4j.LoggerFactory;
@@ -54,7 +56,15 @@ public static void dumpHeadersToDebugLog(final String origin,
5456
if (key == null) {
5557
key = "HTTP Response";
5658
}
57-
String values = StringUtils.join(";", entry.getValue());
59+
List<String> valuesList = entry.getValue();
60+
if (valuesList == null) {
61+
valuesList = Collections.emptyList();
62+
} else {
63+
valuesList = valuesList.stream()
64+
.map(v -> v == null ? "" : v) // replace null with empty string
65+
.collect(Collectors.toList());
66+
}
67+
String values = StringUtils.join(";", valuesList);
5868
if (key.contains("Cookie")) {
5969
values = "*cookie info*";
6070
}

0 commit comments

Comments
 (0)