diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java index d1a88ef4f839..8799ca3b4716 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java @@ -103,4 +103,11 @@ public class RESTCatalogOptions { .stringType() .noDefaultValue() .withDescription("REST Catalog DLF OSS endpoint."); + + public static final ConfigOption DLF_FILE_IO_CACHE_ENABLED = + ConfigOptions.key("dlf.io-cache-enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Enable cache for visiting files using file io (currently only JindoFileIO supports cache)."); } diff --git a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java index d59ca6dd47c5..6d1464b12082 100644 --- a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -45,11 +45,14 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE; import static org.apache.paimon.rest.RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS; +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT; /** A {@link FileIO} to support getting token from REST Server. */ @@ -63,6 +66,13 @@ public class RESTTokenFileIO implements FileIO { .defaultValue(false) .withDescription("Whether to support data token provided by the REST server."); + public static final ConfigOption FILE_IO_CACHE_POLICY = + ConfigOptions.key("dlf.io-cache.policy") + .stringType() + .noDefaultValue() + .withDescription( + "The cache policy of a table provided by the REST server, combined with: meta,read,write"); + private static final Cache FILE_IO_CACHE = Caffeine.newBuilder() .maximumSize(1000) @@ -239,6 +249,27 @@ private Map mergeTokenWithCatalogOptions(Map tok if (dlfOssEndpoint != null && !dlfOssEndpoint.isEmpty()) { newToken.put("fs.oss.endpoint", dlfOssEndpoint); } + + // Process file io cache configuration + if (!catalogContext.options().get(DLF_FILE_IO_CACHE_ENABLED)) { + // Disable file io cache, remove the cache policy configs + newToken.remove(FILE_IO_CACHE_POLICY.key()); + } else { + // Enable file io cache, reorder cache policy in fixed order, + // and allow user to override policy provided by REST server. + String cachePolicy = catalogContext.options().get(FILE_IO_CACHE_POLICY); + if (cachePolicy == null) { + cachePolicy = token.get(FILE_IO_CACHE_POLICY.key()); + } + if (cachePolicy != null) { + Set cachePolicySet = new TreeSet<>(); + for (String policy : cachePolicy.split(",")) { + cachePolicySet.add(policy.trim().toLowerCase()); + } + newToken.put(FILE_IO_CACHE_POLICY.key(), String.join(",", cachePolicySet)); + } + } + return ImmutableMap.copyOf(newToken); } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 4694c4f9c85e..458bfbe48fe6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -107,6 +107,7 @@ import static org.apache.paimon.TableType.OBJECT_TABLE; import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; import static org.apache.paimon.rest.RESTApi.PAGE_TOKEN; +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT; import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER; import static org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis; @@ -2680,6 +2681,68 @@ void testReadPartitionsTable() throws Exception { } } + @Test + void testEnableFileIOCache() throws Exception { + // Enable cache at client-side + Map options = new HashMap<>(); + options.put( + DLF_FILE_IO_CACHE_ENABLED.key(), + "true"); // DLF_FILE_IO_CACHE_ENABLED MUST be configured to enable cache + this.catalog = newRestCatalogWithDataToken(options); + Identifier identifier = + Identifier.create("test_file_io_cache", "table_for_testing_file_io_cache"); + String cachePolicy = "meta,read"; + RESTToken token = + new RESTToken( + ImmutableMap.of( + "akId", + "akId", + "akSecret", + UUID.randomUUID().toString(), + RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), + cachePolicy), + System.currentTimeMillis() + 3600_000L); + setDataTokenToRestServerForMock(identifier, token); + createTable( + identifier, + ImmutableMap.of(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), cachePolicy), + Lists.newArrayList("col1")); + FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier); + RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO(); + RESTToken fileDataToken = fileIO.validToken(); + assertEquals( + cachePolicy, fileDataToken.token().get(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key())); + } + + @Test + void testDisableFileIOCache() throws Exception { + // Disable cache at client-side + Map options = new HashMap<>(); + this.catalog = newRestCatalogWithDataToken(options); + Identifier identifier = + Identifier.create("test_file_io_cache", "table_for_testing_file_io_cache"); + String cachePolicy = "meta,read"; + RESTToken token = + new RESTToken( + ImmutableMap.of( + "akId", + "akId", + "akSecret", + UUID.randomUUID().toString(), + RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), + cachePolicy), + System.currentTimeMillis() + 3600_000L); + setDataTokenToRestServerForMock(identifier, token); + createTable( + identifier, + ImmutableMap.of(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), cachePolicy), + Lists.newArrayList("col1")); + FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier); + RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO(); + RESTToken fileDataToken = fileIO.validToken(); + assertNull(fileDataToken.token().get(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key())); + } + private TestPagedResponse generateTestPagedResponse( Map queryParams, List testData, diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java index 3ced092f1d68..219de5608dc1 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java @@ -18,6 +18,7 @@ package org.apache.paimon.jindo; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; @@ -27,31 +28,95 @@ import org.apache.paimon.fs.VectoredReadable; import org.apache.paimon.utils.Pair; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + import com.aliyun.jindodata.common.JindoHadoopSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; +import static org.apache.paimon.rest.RESTTokenFileIO.FILE_IO_CACHE_POLICY; + /** * Hadoop {@link FileIO}. * *

Important: copy this class from HadoopFileIO here to avoid class loader conflicts. */ public abstract class HadoopCompliantFileIO implements FileIO { + private static final Logger LOG = LoggerFactory.getLogger(HadoopCompliantFileIO.class); private static final long serialVersionUID = 1L; + /// Detailed cache strategies are retrieved from REST server. + private static final String META_CACHE_ENABLED_TAG = "meta"; + private static final String READ_CACHE_ENABLED_TAG = "read"; + private static final String WRITE_CACHE_ENABLED_TAG = "write"; + + private boolean metaCacheEnabled = false; + private boolean readCacheEnabled = false; + private boolean writeCacheEnabled = false; + protected transient volatile Map> fsMap; + protected transient volatile Map> jindoCacheFsMap; + + // Only enable cache for path which is generated with uuid + private static final List CACHE_WHITELIST_PATH_PATTERN = + Lists.newArrayList("bucket-", "manifest"); + + private boolean shouldCache(Path path) { + String pathStr = path.toUri().getPath(); + for (String pattern : CACHE_WHITELIST_PATH_PATTERN) { + if (pathStr.contains(pattern)) { + return true; + } + } + return false; + } + + @Override + public void configure(CatalogContext context) { + if (context.options().get(DLF_FILE_IO_CACHE_ENABLED) + && context.options().get(FILE_IO_CACHE_POLICY) != null) { + if (context.options().get("fs.jindocache.namespace.rpc.address") == null) { + LOG.info( + "FileIO cache is enabled but JindoCache RPC address is not set, fallback to no-cache"); + } else { + metaCacheEnabled = + context.options() + .get(FILE_IO_CACHE_POLICY) + .contains(META_CACHE_ENABLED_TAG); + readCacheEnabled = + context.options() + .get(FILE_IO_CACHE_POLICY) + .contains(READ_CACHE_ENABLED_TAG); + writeCacheEnabled = + context.options() + .get(FILE_IO_CACHE_POLICY) + .contains(WRITE_CACHE_ENABLED_TAG); + LOG.info( + "Cache enabled with cache policy: meta cache enabled {}, read cache enabled {}, write cache enabled {}", + metaCacheEnabled, + readCacheEnabled, + writeCacheEnabled); + } + } + } @Override public SeekableInputStream newInputStream(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); - Pair pair = getFileSystemPair(hadoopPath); + boolean shouldCache = readCacheEnabled && shouldCache(path); + LOG.debug("InputStream should cache {} for path {}", shouldCache, path); + Pair pair = getFileSystemPair(hadoopPath, shouldCache); JindoHadoopSystem fs = pair.getKey(); String sysType = pair.getValue(); FSDataInputStream fsInput = fs.open(hadoopPath); @@ -63,14 +128,19 @@ public SeekableInputStream newInputStream(Path path) throws IOException { @Override public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); + boolean shouldCache = writeCacheEnabled && shouldCache(path); + LOG.debug("OutputStream should cache {} for path {}", shouldCache, path); return new HadoopPositionOutputStream( - getFileSystem(hadoopPath).create(hadoopPath, overwrite)); + getFileSystem(hadoopPath, shouldCache).create(hadoopPath, overwrite)); } @Override public FileStatus getFileStatus(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); - return new HadoopFileStatus(getFileSystem(hadoopPath).getFileStatus(hadoopPath)); + boolean shouldCache = metaCacheEnabled && shouldCache(path); + LOG.debug("GetFileStatus should cache {} for path {}", shouldCache, path); + return new HadoopFileStatus( + getFileSystem(hadoopPath, shouldCache).getFileStatus(hadoopPath)); } @Override @@ -78,7 +148,7 @@ public FileStatus[] listStatus(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); FileStatus[] statuses = new FileStatus[0]; org.apache.hadoop.fs.FileStatus[] hadoopStatuses = - getFileSystem(hadoopPath).listStatus(hadoopPath); + getFileSystem(hadoopPath, false).listStatus(hadoopPath); if (hadoopStatuses != null) { statuses = new FileStatus[hadoopStatuses.length]; for (int i = 0; i < hadoopStatuses.length; i++) { @@ -93,7 +163,7 @@ public RemoteIterator listFilesIterative(Path path, boolean recursiv throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); org.apache.hadoop.fs.RemoteIterator hadoopIter = - getFileSystem(hadoopPath).listFiles(hadoopPath, recursive); + getFileSystem(hadoopPath, false).listFiles(hadoopPath, recursive); return new RemoteIterator() { @Override public boolean hasNext() throws IOException { @@ -111,26 +181,28 @@ public FileStatus next() throws IOException { @Override public boolean exists(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); - return getFileSystem(hadoopPath).exists(hadoopPath); + boolean shouldCache = metaCacheEnabled && shouldCache(path); + LOG.debug("Exists should cache {} for path {}", shouldCache, path); + return getFileSystem(hadoopPath, shouldCache).exists(hadoopPath); } @Override public boolean delete(Path path, boolean recursive) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); - return getFileSystem(hadoopPath).delete(hadoopPath, recursive); + return getFileSystem(hadoopPath, false).delete(hadoopPath, recursive); } @Override public boolean mkdirs(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); - return getFileSystem(hadoopPath).mkdirs(hadoopPath); + return getFileSystem(hadoopPath, false).mkdirs(hadoopPath); } @Override public boolean rename(Path src, Path dst) throws IOException { org.apache.hadoop.fs.Path hadoopSrc = path(src); org.apache.hadoop.fs.Path hadoopDst = path(dst); - return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst); + return getFileSystem(hadoopSrc, false).rename(hadoopSrc, hadoopDst); } protected org.apache.hadoop.fs.Path path(Path path) { @@ -141,22 +213,34 @@ protected org.apache.hadoop.fs.Path path(Path path) { return new org.apache.hadoop.fs.Path(path.toUri()); } - protected JindoHadoopSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOException { - return getFileSystemPair(path).getKey(); + protected JindoHadoopSystem getFileSystem(org.apache.hadoop.fs.Path path, boolean enableCache) + throws IOException { + return getFileSystemPair(path, enableCache).getKey(); } - protected Pair getFileSystemPair(org.apache.hadoop.fs.Path path) - throws IOException { - if (fsMap == null) { - synchronized (this) { - if (fsMap == null) { - fsMap = new ConcurrentHashMap<>(); + protected Pair getFileSystemPair( + org.apache.hadoop.fs.Path path, boolean enableCache) throws IOException { + Map> map; + if (enableCache) { + if (jindoCacheFsMap == null) { + synchronized (this) { + if (jindoCacheFsMap == null) { + jindoCacheFsMap = new ConcurrentHashMap<>(); + } + } + } + map = jindoCacheFsMap; + } else { + if (fsMap == null) { + synchronized (this) { + if (fsMap == null) { + fsMap = new ConcurrentHashMap<>(); + } } } + map = fsMap; } - Map> map = fsMap; - String authority = path.toUri().getAuthority(); if (authority == null) { authority = "DEFAULT"; @@ -166,7 +250,7 @@ protected Pair getFileSystemPair(org.apache.hadoop.fs authority, k -> { try { - return createFileSystem(path); + return createFileSystem(path, enableCache); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -177,7 +261,7 @@ protected Pair getFileSystemPair(org.apache.hadoop.fs } protected abstract Pair createFileSystem( - org.apache.hadoop.fs.Path path) throws IOException; + org.apache.hadoop.fs.Path path, boolean enableCache) throws IOException; private static class HadoopSeekableInputStream extends SeekableInputStream { diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java index 0aa0939d50f7..40d67d4ed9b3 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java @@ -82,6 +82,7 @@ public class JindoFileIO extends HadoopCompliantFileIO { new ConcurrentHashMap<>(); private Options hadoopOptions; + private Options hadoopOptionsWithCache; private boolean allowCache = true; @Override @@ -91,6 +92,7 @@ public boolean isObjectStore() { @Override public void configure(CatalogContext context) { + super.configure(context); allowCache = context.options().get(FILE_IO_ALLOW_CACHE); hadoopOptions = new Options(); // read all configuration with prefix 'CONFIG_PREFIXES' @@ -127,6 +129,14 @@ public void configure(CatalogContext context) { .iterator() .forEachRemaining(entry -> hadoopOptions.set(entry.getKey(), entry.getValue())); } + + // another config when enable cache + hadoopOptionsWithCache = new Options(hadoopOptions.toMap()); + hadoopOptionsWithCache.set("fs.xengine", "jindocache"); + // Workaround: following configurations to avoid bug in some JindoSDK versions + hadoopOptionsWithCache.set("fs.oss.read.profile.columnar.use-pread", "false"); + hadoopOptionsWithCache.set( + "fs.jindocache.read.profile.columnar.readahead.pread.enable", "false"); } public Options hadoopOptions() { @@ -140,20 +150,22 @@ public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite throw new IOException("File " + path + " already exists."); } org.apache.hadoop.fs.Path hadoopPath = path(path); - Pair pair = getFileSystemPair(hadoopPath); + Pair pair = getFileSystemPair(hadoopPath, false); JindoHadoopSystem fs = pair.getKey(); return new JindoTwoPhaseOutputStream( new JindoMultiPartUpload(fs, hadoopPath), hadoopPath, path); } @Override - protected Pair createFileSystem(org.apache.hadoop.fs.Path path) { + protected Pair createFileSystem( + org.apache.hadoop.fs.Path path, boolean enableCache) { final String scheme = path.toUri().getScheme(); final String authority = path.toUri().getAuthority(); + Options options = enableCache ? hadoopOptionsWithCache : hadoopOptions; Supplier> supplier = () -> { Configuration hadoopConf = new Configuration(false); - hadoopOptions.toMap().forEach(hadoopConf::set); + options.toMap().forEach(hadoopConf::set); URI fsUri = path.toUri(); if (scheme == null && authority == null) { fsUri = FileSystem.getDefaultUri(hadoopConf); @@ -185,7 +197,7 @@ protected Pair createFileSystem(org.apache.hadoop.fs. if (allowCache) { return CACHE.computeIfAbsent( - new CacheKey(hadoopOptions, scheme, authority), key -> supplier.get()); + new CacheKey(options, scheme, authority), key -> supplier.get()); } else { return supplier.get(); } diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java index 435a0bc49a8f..0f8ba549abba 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java @@ -47,7 +47,7 @@ protected MultiPartUploadStore multiPartUploa FileIO fileIO, Path targetPath) throws IOException { JindoFileIO jindoFileIO = (JindoFileIO) fileIO; org.apache.hadoop.fs.Path hadoopPath = jindoFileIO.path(targetPath); - Pair pair = jindoFileIO.getFileSystemPair(hadoopPath); + Pair pair = jindoFileIO.getFileSystemPair(hadoopPath, false); JindoHadoopSystem fs = pair.getKey(); return new JindoMultiPartUpload(fs, hadoopPath); }