Skip to content

Commit 33e92df

Browse files
junmuzymuzammil
authored andcommitted
Changes for supporting Atomic writes with Hadoop 3.4.2+
1 parent 082edcf commit 33e92df

File tree

3 files changed

+65
-2
lines changed

3 files changed

+65
-2
lines changed

paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/AzureFileIO.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020

2121
import org.apache.paimon.catalog.CatalogContext;
2222
import org.apache.paimon.fs.FileIO;
23+
import org.apache.paimon.fs.Path;
2324
import org.apache.paimon.options.Options;
2425

2526
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.fs.FSDataOutputStream;
28+
import org.apache.hadoop.fs.FileAlreadyExistsException;
2629
import org.apache.hadoop.fs.FileSystem;
2730
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
2831
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
@@ -32,6 +35,7 @@
3235
import java.io.IOException;
3336
import java.io.UncheckedIOException;
3437
import java.net.URI;
38+
import java.nio.charset.StandardCharsets;
3539
import java.util.Map;
3640
import java.util.Objects;
3741
import java.util.concurrent.ConcurrentHashMap;
@@ -66,6 +70,34 @@ public void configure(CatalogContext context) {
6670
this.hadoopOptions = mirrorCertainHadoopConfig(loadHadoopConfigFromContext(context));
6771
}
6872

73+
/**
74+
* Write content atomically using Azure conditional writes.
75+
*
76+
* @param path the target file path
77+
* @param content the content to write
78+
* @return true if write succeeded, false if file already exists
79+
* @throws IOException on I/O errors
80+
*/
81+
@Override
82+
public boolean tryToWriteAtomic(Path path, String content) throws IOException {
83+
org.apache.hadoop.fs.Path hadoopPath = path(path);
84+
FileSystem fs = getFileSystem(hadoopPath);
85+
86+
byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
87+
88+
try (FSDataOutputStream out =
89+
fs.createFile(hadoopPath)
90+
.create()
91+
.overwrite(false) // Fails if file exists
92+
.build()) {
93+
out.write(contentBytes);
94+
return true;
95+
} catch (FileAlreadyExistsException e) {
96+
LOG.debug("Conditional write failed, file already exists: {}", path);
97+
return false;
98+
}
99+
}
100+
69101
// add additional config entries from the IO config to the Hadoop config
70102
private Options loadHadoopConfigFromContext(CatalogContext context) {
71103
Options hadoopConfig = new Options();

paimon-filesystems/paimon-azure-impl/src/main/java/org/apache/paimon/azure/HadoopCompliantFileIO.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,11 @@ public boolean rename(Path src, Path dst) throws IOException {
102102
return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst);
103103
}
104104

105-
private org.apache.hadoop.fs.Path path(Path path) {
105+
protected org.apache.hadoop.fs.Path path(Path path) {
106106
return new org.apache.hadoop.fs.Path(path.toUri());
107107
}
108108

109-
private FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOException {
109+
protected FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOException {
110110
if (fs == null) {
111111
synchronized (this) {
112112
if (fs == null) {

paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.paimon.options.Options;
2626

2727
import org.apache.hadoop.conf.Configuration;
28+
import org.apache.hadoop.fs.FSDataOutputStream;
29+
import org.apache.hadoop.fs.FileAlreadyExistsException;
2830
import org.apache.hadoop.fs.FileSystem;
2931
import org.apache.hadoop.fs.s3a.S3AFileSystem;
3032
import org.slf4j.Logger;
@@ -33,6 +35,7 @@
3335
import java.io.IOException;
3436
import java.io.UncheckedIOException;
3537
import java.net.URI;
38+
import java.nio.charset.StandardCharsets;
3639
import java.util.Map;
3740
import java.util.Objects;
3841
import java.util.concurrent.ConcurrentHashMap;
@@ -85,6 +88,34 @@ public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite
8588
new S3MultiPartUpload(fs, fs.getConf()), hadoopPath, path);
8689
}
8790

91+
/**
92+
* Write content atomically using S3 conditional writes via Hadoop 3.4+ native API.
93+
*
94+
* @param path the target file path
95+
* @param content the content to write
96+
* @return true if write succeeded, false if file already exists
97+
* @throws IOException on I/O errors
98+
*/
99+
@Override
100+
public boolean tryToWriteAtomic(Path path, String content) throws IOException {
101+
org.apache.hadoop.fs.Path hadoopPath = path(path);
102+
S3AFileSystem fs = (S3AFileSystem) getFileSystem(hadoopPath);
103+
104+
byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
105+
106+
try (FSDataOutputStream out =
107+
fs.createFile(hadoopPath)
108+
.create()
109+
.overwrite(false) // Fails if file exists (uses If-None-Match: * on S3)
110+
.build()) {
111+
out.write(contentBytes);
112+
return true;
113+
} catch (FileAlreadyExistsException e) {
114+
LOG.debug("Conditional write failed, file already exists: {}", path);
115+
return false;
116+
}
117+
}
118+
88119
// add additional config entries from the IO config to the Hadoop config
89120
private Options loadHadoopConfigFromContext(CatalogContext context) {
90121
Options hadoopConfig = new Options();

0 commit comments

Comments
 (0)