diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index 8ccd4bf4a46e..d2c989bd0d4f 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -80,6 +80,12 @@ Integer Configure the size of the connection pool. + +
file-io.allow-cache
+ true + Boolean + Whether to allow static cache in file io implementation. If not allowed, this means that there may be a large number of FileIO instances generated, enabling caching can lead to resource leakage. +
format-table.enabled
true @@ -116,6 +122,12 @@ String Metastore of paimon catalog, supports filesystem, hive and jdbc. + +
resolving-file-io.enabled
+ false + Boolean + Whether to enable resolving fileio, when this option is enabled, in conjunction with the table's property data-file.external-paths, Paimon can read and write to external storage paths, such as OSS or S3. In order to access these external paths correctly, you also need to configure the corresponding access key and secret key. +
sync-all-properties
false @@ -140,11 +152,5 @@ String The warehouse root path of catalog. - -
resolving-fileio.enabled
- false - Boolean - Whether to enable resolving fileio, when this option is enabled, in conjunction with the table's property data-file.external-paths, Paimon can read and write to external storage paths, such as OSS or S3. In order to access these external paths correctly, you also need to configure the corresponding access key and secret key. - diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index 597df53ce8df..5ecd5b1cb88e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -54,7 +54,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.fs.FileIOUtils.checkAccess; -import static org.apache.paimon.options.CatalogOptions.RESOLVING_FILEIO_ENABLED; +import static org.apache.paimon.options.CatalogOptions.RESOLVING_FILE_IO_ENABLED; import static org.apache.paimon.utils.Preconditions.checkArgument; /** @@ -420,7 +420,7 @@ default Optional readOverwrittenFileUtf8(Path path) throws IOException { * by the given path. */ static FileIO get(Path path, CatalogContext config) throws IOException { - if (config.options().get(RESOLVING_FILEIO_ENABLED)) { + if (config.options().get(RESOLVING_FILE_IO_ENABLED)) { FileIO fileIO = new ResolvingFileIO(); fileIO.configure(config); return fileIO; diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java index 5dc51132196b..6a46d1a49008 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/ResolvingFileIO.java @@ -29,7 +29,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import static org.apache.paimon.options.CatalogOptions.RESOLVING_FILEIO_ENABLED; +import static org.apache.paimon.options.CatalogOptions.RESOLVING_FILE_IO_ENABLED; /** * An implementation of {@link FileIO} that supports multiple file system schemas. It dynamically @@ -61,7 +61,7 @@ public boolean isObjectStore() { public void configure(CatalogContext context) { Options options = new Options(); context.options().toMap().forEach(options::set); - options.set(RESOLVING_FILEIO_ENABLED, false); + options.set(RESOLVING_FILE_IO_ENABLED, false); this.context = CatalogContext.create( options, context.hadoopConf(), context.preferIO(), context.fallbackIO()); diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index dd0cb6c5b218..29b04cc9d081 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -150,12 +150,21 @@ public class CatalogOptions { + "However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in" + " the metastore and need to be manually added as separate partition operations."); - public static final ConfigOption RESOLVING_FILEIO_ENABLED = - ConfigOptions.key("resolving-fileio.enabled") + public static final ConfigOption RESOLVING_FILE_IO_ENABLED = + ConfigOptions.key("resolving-file-io.enabled") .booleanType() .defaultValue(false) .withDescription( "Whether to enable resolving fileio, when this option is enabled, in conjunction with the table's property data-file.external-paths, " + "Paimon can read and write to external storage paths, such as OSS or S3. " + "In order to access these external paths correctly, you also need to configure the corresponding access key and secret key."); + + public static final ConfigOption FILE_IO_ALLOW_CACHE = + ConfigOptions.key("file-io.allow-cache") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to allow static cache in file io implementation. If not allowed, this means that " + + "there may be a large number of FileIO instances generated, enabling caching can " + + "lead to resource leakage."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java index 372051e169b4..c5f98286352b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -44,6 +44,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE; + /** A {@link FileIO} to support getting token from REST Server. */ public class RESTTokenFileIO implements FileIO { @@ -162,6 +164,7 @@ private FileIO fileIO() throws IOException { CatalogContext context = catalogLoader.context(); Options options = context.options(); options = new Options(RESTUtil.merge(options.toMap(), token.token)); + options.set(FILE_IO_ALLOW_CACHE, false); context = CatalogContext.create(options, context.preferIO(), context.fallbackIO()); try { fileIO = FileIO.get(path, context); diff --git a/paimon-filesystems/paimon-cosn-impl/src/main/java/org/apache/paimon/cosn/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-cosn-impl/src/main/java/org/apache/paimon/cosn/HadoopCompliantFileIO.java index 4cfa26fd0f5a..1887e89ac6a2 100644 --- a/paimon-filesystems/paimon-cosn-impl/src/main/java/org/apache/paimon/cosn/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-cosn-impl/src/main/java/org/apache/paimon/cosn/HadoopCompliantFileIO.java @@ -38,7 +38,9 @@ *

Important: copy this class from HadoopFileIO here to avoid class loader conflicts. */ public abstract class HadoopCompliantFileIO implements FileIO { + private static final long serialVersionUID = 1L; + protected transient volatile FileSystem fs; @Override diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java index 8faa73d694c5..29ec82c9e75f 100644 --- a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -145,12 +146,19 @@ private FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOExcept if (authority == null) { authority = "DEFAULT"; } - FileSystem fs = map.get(authority); - if (fs == null) { - fs = createFileSystem(path); - map.put(authority, fs); + try { + return map.computeIfAbsent( + authority, + k -> { + try { + return createFileSystem(path); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } catch (UncheckedIOException e) { + throw e.getCause(); } - return fs; } protected abstract FileSystem createFileSystem(org.apache.hadoop.fs.Path path) diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java index 6ecc76da3f76..3b3936c9763e 100644 --- a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java +++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java @@ -21,6 +21,7 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; import org.apache.paimon.options.Options; +import org.apache.paimon.utils.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -35,11 +36,14 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE; /** OSS {@link FileIO}. */ public class OSSFileIO extends HadoopCompliantFileIO { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private static final Logger LOG = LoggerFactory.getLogger(OSSFileIO.class); @@ -68,7 +72,11 @@ public class OSSFileIO extends HadoopCompliantFileIO { */ private static final Map CACHE = new ConcurrentHashMap<>(); + // create a shared config to avoid load properties everytime + private static final Configuration SHARED_CONFIG = new Configuration(); + private Options hadoopOptions; + private boolean allowCache = true; @Override public boolean isObjectStore() { @@ -77,6 +85,7 @@ public boolean isObjectStore() { @Override public void configure(CatalogContext context) { + allowCache = context.options().get(FILE_IO_ALLOW_CACHE); hadoopOptions = new Options(); // read all configuration with prefix 'CONFIG_PREFIXES' for (String key : context.options().keySet()) { @@ -101,11 +110,12 @@ public void configure(CatalogContext context) { protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) { final String scheme = path.toUri().getScheme(); final String authority = path.toUri().getAuthority(); - return CACHE.computeIfAbsent( - new CacheKey(hadoopOptions, scheme, authority), - key -> { - Configuration hadoopConf = new Configuration(); - key.options.toMap().forEach(hadoopConf::set); + Supplier supplier = + () -> { + // create config from base config, if initializing a new config, it will + // retrieve props from the file, which comes at a high cost + Configuration hadoopConf = new Configuration(SHARED_CONFIG); + hadoopOptions.toMap().forEach(hadoopConf::set); URI fsUri = path.toUri(); if (scheme == null && authority == null) { fsUri = FileSystem.getDefaultUri(hadoopConf); @@ -124,7 +134,22 @@ protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) { throw new UncheckedIOException(e); } return fs; - }); + }; + + if (allowCache) { + return CACHE.computeIfAbsent( + new CacheKey(hadoopOptions, scheme, authority), key -> supplier.get()); + } else { + return supplier.get(); + } + } + + @Override + public void close() { + if (!allowCache) { + fsMap.values().forEach(IOUtils::closeQuietly); + fsMap.clear(); + } } private static class CacheKey { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java index 701c81a843ad..784095f5d9fb 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java @@ -66,7 +66,7 @@ public class OrcWriterFactory implements FormatWriterFactory { */ @VisibleForTesting OrcWriterFactory(Vectorizer vectorizer) { - this(vectorizer, new Properties(), new Configuration(), 1024); + this(vectorizer, new Properties(), new Configuration(false), 1024); } /** diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java index e3c741cb8e36..c6772c484942 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcSimpleStatsExtractor.java @@ -75,7 +75,7 @@ public SimpleColStats[] extract(FileIO fileIO, Path path) throws IOException { public Pair extractWithFileInfo(FileIO fileIO, Path path) throws IOException { try (Reader reader = - OrcReaderFactory.createReader(new Configuration(), fileIO, path, null)) { + OrcReaderFactory.createReader(new Configuration(false), fileIO, path, null)) { long rowCount = reader.getNumberOfRows(); ColumnStatistics[] columnStatistics = reader.getStatistics(); TypeDescription schema = reader.getSchema(); diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 657745a392cb..c6a0eb621281 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -137,8 +137,10 @@ public long getDataSize() { * @param The type of this builder that is returned by builder methods */ public abstract static class Builder> { - private OutputFile file = null; - private Configuration conf = new Configuration(); + + private final OutputFile file; + + private Configuration conf = new Configuration(false); private ParquetFileWriter.Mode mode; private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME; private long rowGroupSize = DEFAULT_BLOCK_SIZE;