Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,11 @@ public class RESTCatalogOptions {
.stringType()
.noDefaultValue()
.withDescription("REST Catalog DLF OSS endpoint.");

public static final ConfigOption<Boolean> DLF_FILE_IO_CACHE_ENABLED =
ConfigOptions.key("dlf.io-cache-enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Enable cache for visiting files using file io (currently only JindoFileIO supports cache).");
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,14 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE;
import static org.apache.paimon.rest.RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT;

/** A {@link FileIO} to support getting token from REST Server. */
Expand All @@ -63,6 +66,13 @@ public class RESTTokenFileIO implements FileIO {
.defaultValue(false)
.withDescription("Whether to support data token provided by the REST server.");

public static final ConfigOption<String> FILE_IO_CACHE_POLICY =
ConfigOptions.key("dlf.io-cache.policy")
.stringType()
.noDefaultValue()
.withDescription(
"The cache policy of a table provided by the REST server, combined with: meta,read,write");

private static final Cache<RESTToken, FileIO> FILE_IO_CACHE =
Caffeine.newBuilder()
.maximumSize(1000)
Expand Down Expand Up @@ -239,6 +249,27 @@ private Map<String, String> mergeTokenWithCatalogOptions(Map<String, String> tok
if (dlfOssEndpoint != null && !dlfOssEndpoint.isEmpty()) {
newToken.put("fs.oss.endpoint", dlfOssEndpoint);
}

// Process file io cache configuration
if (!catalogContext.options().get(DLF_FILE_IO_CACHE_ENABLED)) {
// Disable file io cache, remove the cache policy configs
newToken.remove(FILE_IO_CACHE_POLICY.key());
} else {
// Enable file io cache, reorder cache policy in fixed order,
// and allow user to override policy provided by REST server.
String cachePolicy = catalogContext.options().get(FILE_IO_CACHE_POLICY);
if (cachePolicy == null) {
cachePolicy = token.get(FILE_IO_CACHE_POLICY.key());
}
if (cachePolicy != null) {
Set<String> cachePolicySet = new TreeSet<>();
for (String policy : cachePolicy.split(",")) {
cachePolicySet.add(policy.trim().toLowerCase());
}
newToken.put(FILE_IO_CACHE_POLICY.key(), String.join(",", cachePolicySet));
}
}

return ImmutableMap.copyOf(newToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import static org.apache.paimon.TableType.OBJECT_TABLE;
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.rest.RESTApi.PAGE_TOKEN;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT;
import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER;
import static org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
Expand Down Expand Up @@ -2680,6 +2681,68 @@ void testReadPartitionsTable() throws Exception {
}
}

@Test
void testEnableFileIOCache() throws Exception {
// Enable cache at client-side
Map<String, String> options = new HashMap<>();
options.put(
DLF_FILE_IO_CACHE_ENABLED.key(),
"true"); // DLF_FILE_IO_CACHE_ENABLED MUST be configured to enable cache
this.catalog = newRestCatalogWithDataToken(options);
Identifier identifier =
Identifier.create("test_file_io_cache", "table_for_testing_file_io_cache");
String cachePolicy = "meta,read";
RESTToken token =
new RESTToken(
ImmutableMap.of(
"akId",
"akId",
"akSecret",
UUID.randomUUID().toString(),
RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(),
cachePolicy),
System.currentTimeMillis() + 3600_000L);
setDataTokenToRestServerForMock(identifier, token);
createTable(
identifier,
ImmutableMap.of(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), cachePolicy),
Lists.newArrayList("col1"));
FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO();
RESTToken fileDataToken = fileIO.validToken();
assertEquals(
cachePolicy, fileDataToken.token().get(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key()));
}

@Test
void testDisableFileIOCache() throws Exception {
// Disable cache at client-side
Map<String, String> options = new HashMap<>();
this.catalog = newRestCatalogWithDataToken(options);
Identifier identifier =
Identifier.create("test_file_io_cache", "table_for_testing_file_io_cache");
String cachePolicy = "meta,read";
RESTToken token =
new RESTToken(
ImmutableMap.of(
"akId",
"akId",
"akSecret",
UUID.randomUUID().toString(),
RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(),
cachePolicy),
System.currentTimeMillis() + 3600_000L);
setDataTokenToRestServerForMock(identifier, token);
createTable(
identifier,
ImmutableMap.of(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), cachePolicy),
Lists.newArrayList("col1"));
FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO();
RESTToken fileDataToken = fileIO.validToken();
assertNull(fileDataToken.token().get(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key()));
}

private TestPagedResponse generateTestPagedResponse(
Map<String, String> queryParams,
List<Integer> testData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.jindo;

import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
Expand All @@ -27,31 +28,95 @@
import org.apache.paimon.fs.VectoredReadable;
import org.apache.paimon.utils.Pair;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;

import com.aliyun.jindodata.common.JindoHadoopSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED;
import static org.apache.paimon.rest.RESTTokenFileIO.FILE_IO_CACHE_POLICY;

/**
* Hadoop {@link FileIO}.
*
* <p>Important: copy this class from HadoopFileIO here to avoid class loader conflicts.
*/
public abstract class HadoopCompliantFileIO implements FileIO {
private static final Logger LOG = LoggerFactory.getLogger(HadoopCompliantFileIO.class);

private static final long serialVersionUID = 1L;

/// Detailed cache strategies are retrieved from REST server.
private static final String META_CACHE_ENABLED_TAG = "meta";
private static final String READ_CACHE_ENABLED_TAG = "read";
private static final String WRITE_CACHE_ENABLED_TAG = "write";

private boolean metaCacheEnabled = false;
private boolean readCacheEnabled = false;
private boolean writeCacheEnabled = false;

protected transient volatile Map<String, Pair<JindoHadoopSystem, String>> fsMap;
protected transient volatile Map<String, Pair<JindoHadoopSystem, String>> jindoCacheFsMap;

// Only enable cache for path which is generated with uuid
private static final List<String> CACHE_WHITELIST_PATH_PATTERN =
Lists.newArrayList("bucket-", "manifest");

private boolean shouldCache(Path path) {
String pathStr = path.toUri().getPath();
for (String pattern : CACHE_WHITELIST_PATH_PATTERN) {
if (pathStr.contains(pattern)) {
return true;
}
}
return false;
}

@Override
public void configure(CatalogContext context) {
if (context.options().get(DLF_FILE_IO_CACHE_ENABLED)
&& context.options().get(FILE_IO_CACHE_POLICY) != null) {
if (context.options().get("fs.jindocache.namespace.rpc.address") == null) {
LOG.info(
"FileIO cache is enabled but JindoCache RPC address is not set, fallback to no-cache");
} else {
metaCacheEnabled =
context.options()
.get(FILE_IO_CACHE_POLICY)
.contains(META_CACHE_ENABLED_TAG);
readCacheEnabled =
context.options()
.get(FILE_IO_CACHE_POLICY)
.contains(READ_CACHE_ENABLED_TAG);
writeCacheEnabled =
context.options()
.get(FILE_IO_CACHE_POLICY)
.contains(WRITE_CACHE_ENABLED_TAG);
LOG.info(
"Cache enabled with cache policy: meta cache enabled {}, read cache enabled {}, write cache enabled {}",
metaCacheEnabled,
readCacheEnabled,
writeCacheEnabled);
}
}
}

@Override
public SeekableInputStream newInputStream(Path path) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
Pair<JindoHadoopSystem, String> pair = getFileSystemPair(hadoopPath);
boolean shouldCache = readCacheEnabled && shouldCache(path);
LOG.debug("InputStream should cache {} for path {}", shouldCache, path);
Pair<JindoHadoopSystem, String> pair = getFileSystemPair(hadoopPath, shouldCache);
JindoHadoopSystem fs = pair.getKey();
String sysType = pair.getValue();
FSDataInputStream fsInput = fs.open(hadoopPath);
Expand All @@ -63,22 +128,27 @@ public SeekableInputStream newInputStream(Path path) throws IOException {
@Override
public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
boolean shouldCache = writeCacheEnabled && shouldCache(path);
LOG.debug("OutputStream should cache {} for path {}", shouldCache, path);
return new HadoopPositionOutputStream(
getFileSystem(hadoopPath).create(hadoopPath, overwrite));
getFileSystem(hadoopPath, shouldCache).create(hadoopPath, overwrite));
}

@Override
public FileStatus getFileStatus(Path path) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
return new HadoopFileStatus(getFileSystem(hadoopPath).getFileStatus(hadoopPath));
boolean shouldCache = metaCacheEnabled && shouldCache(path);
LOG.debug("GetFileStatus should cache {} for path {}", shouldCache, path);
return new HadoopFileStatus(
getFileSystem(hadoopPath, shouldCache).getFileStatus(hadoopPath));
}

@Override
public FileStatus[] listStatus(Path path) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
FileStatus[] statuses = new FileStatus[0];
org.apache.hadoop.fs.FileStatus[] hadoopStatuses =
getFileSystem(hadoopPath).listStatus(hadoopPath);
getFileSystem(hadoopPath, false).listStatus(hadoopPath);
if (hadoopStatuses != null) {
statuses = new FileStatus[hadoopStatuses.length];
for (int i = 0; i < hadoopStatuses.length; i++) {
Expand All @@ -93,7 +163,7 @@ public RemoteIterator<FileStatus> listFilesIterative(Path path, boolean recursiv
throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
org.apache.hadoop.fs.RemoteIterator<org.apache.hadoop.fs.LocatedFileStatus> hadoopIter =
getFileSystem(hadoopPath).listFiles(hadoopPath, recursive);
getFileSystem(hadoopPath, false).listFiles(hadoopPath, recursive);
return new RemoteIterator<FileStatus>() {
@Override
public boolean hasNext() throws IOException {
Expand All @@ -111,26 +181,28 @@ public FileStatus next() throws IOException {
@Override
public boolean exists(Path path) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
return getFileSystem(hadoopPath).exists(hadoopPath);
boolean shouldCache = metaCacheEnabled && shouldCache(path);
LOG.debug("Exists should cache {} for path {}", shouldCache, path);
return getFileSystem(hadoopPath, shouldCache).exists(hadoopPath);
}

@Override
public boolean delete(Path path, boolean recursive) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
return getFileSystem(hadoopPath).delete(hadoopPath, recursive);
return getFileSystem(hadoopPath, false).delete(hadoopPath, recursive);
}

@Override
public boolean mkdirs(Path path) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
return getFileSystem(hadoopPath).mkdirs(hadoopPath);
return getFileSystem(hadoopPath, false).mkdirs(hadoopPath);
}

@Override
public boolean rename(Path src, Path dst) throws IOException {
org.apache.hadoop.fs.Path hadoopSrc = path(src);
org.apache.hadoop.fs.Path hadoopDst = path(dst);
return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst);
return getFileSystem(hadoopSrc, false).rename(hadoopSrc, hadoopDst);
}

protected org.apache.hadoop.fs.Path path(Path path) {
Expand All @@ -141,22 +213,34 @@ protected org.apache.hadoop.fs.Path path(Path path) {
return new org.apache.hadoop.fs.Path(path.toUri());
}

protected JindoHadoopSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOException {
return getFileSystemPair(path).getKey();
protected JindoHadoopSystem getFileSystem(org.apache.hadoop.fs.Path path, boolean enableCache)
throws IOException {
return getFileSystemPair(path, enableCache).getKey();
}

protected Pair<JindoHadoopSystem, String> getFileSystemPair(org.apache.hadoop.fs.Path path)
throws IOException {
if (fsMap == null) {
synchronized (this) {
if (fsMap == null) {
fsMap = new ConcurrentHashMap<>();
protected Pair<JindoHadoopSystem, String> getFileSystemPair(
org.apache.hadoop.fs.Path path, boolean enableCache) throws IOException {
Map<String, Pair<JindoHadoopSystem, String>> map;
if (enableCache) {
if (jindoCacheFsMap == null) {
synchronized (this) {
if (jindoCacheFsMap == null) {
jindoCacheFsMap = new ConcurrentHashMap<>();
}
}
}
map = jindoCacheFsMap;
} else {
if (fsMap == null) {
synchronized (this) {
if (fsMap == null) {
fsMap = new ConcurrentHashMap<>();
}
}
}
map = fsMap;
}

Map<String, Pair<JindoHadoopSystem, String>> map = fsMap;

String authority = path.toUri().getAuthority();
if (authority == null) {
authority = "DEFAULT";
Expand All @@ -166,7 +250,7 @@ protected Pair<JindoHadoopSystem, String> getFileSystemPair(org.apache.hadoop.fs
authority,
k -> {
try {
return createFileSystem(path);
return createFileSystem(path, enableCache);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand All @@ -177,7 +261,7 @@ protected Pair<JindoHadoopSystem, String> getFileSystemPair(org.apache.hadoop.fs
}

protected abstract Pair<JindoHadoopSystem, String> createFileSystem(
org.apache.hadoop.fs.Path path) throws IOException;
org.apache.hadoop.fs.Path path, boolean enableCache) throws IOException;

private static class HadoopSeekableInputStream extends SeekableInputStream {

Expand Down
Loading