diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index bd08bc20461016..c4e098b5f5c99c 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -67,7 +67,7 @@ io::FileReaderOptions FileFactory::get_reader_options(RuntimeState* state, }; if (config::enable_file_cache && state != nullptr && state->query_options().__isset.enable_file_cache && - state->query_options().enable_file_cache) { + state->query_options().enable_file_cache && fd.file_cache_admission) { opts.cache_type = io::FileCachePolicy::FILE_BLOCK_CACHE; } if (state != nullptr && state->query_options().__isset.file_cache_base_path && diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index 0ba791bd0a3dc9..a889f991e0e877 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -71,6 +71,7 @@ struct FileDescription { // because for a hive table, differenet partitions may have different // locations(or fs), so different files may have different fs. std::string fs_name; + bool file_cache_admission = true; }; } // namespace io diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 3e7fca5af99498..6fc3afdfe88c57 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -223,6 +223,7 @@ void CsvReader::_init_file_description() { if (_range.__isset.fs_name) { _file_description.fs_name = _range.fs_name; } + _file_description.file_cache_admission = _range.file_cache_admission; } Status CsvReader::init_reader(bool is_load) { diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 3fd33053811a3a..117dc0920d16d0 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -164,6 +164,7 @@ void NewJsonReader::_init_file_description() { if (_range.__isset.fs_name) { _file_description.fs_name = _range.fs_name; } + _file_description.file_cache_admission = _range.file_cache_admission; } Status NewJsonReader::init_reader( diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index b46be576ad9bf0..5282b482899dd3 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -1437,6 +1437,7 @@ void OrcReader::_init_file_description() { if (_scan_range.__isset.fs_name) { _file_description.fs_name = _scan_range.fs_name; } + _file_description.file_cache_admission = _scan_range.file_cache_admission; } DataTypePtr OrcReader::convert_to_doris_type(const orc::Type* orc_type) { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 10621f0ca5f5e2..19b67fc7a4a1e8 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -321,6 +321,7 @@ void ParquetReader::_init_file_description() { if (_scan_range.__isset.fs_name) { _file_description.fs_name = _scan_range.fs_name; } + _file_description.file_cache_admission = _scan_range.file_cache_admission; } Status ParquetReader::init_reader( diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 58e0562daf37ab..05ee7ff77f3f36 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3750,6 +3750,30 @@ public static int metaServiceRpcRetryTimes() { }) public static int first_error_msg_max_length = 256; + @ConfField(mutable = false, description = { + "是否启用文件缓存准入控制", + "Whether to enable file cache admission control" + }) + public static boolean enable_file_cache_admission_control = false; + + @ConfField(mutable = true, description = { + "存储准入规则的JSON文件路径", + "JSON file path for storing admission rules" + }) + public static String file_cache_admission_control_json_file_path = ""; + + @ConfField(mutable = true, description = { + "准入规则自动刷新间隔(秒),设置为0关闭自动刷新", + "Auto-refresh interval for admission policies (seconds). Set to 0 to disable auto refresh" + }) + public static int file_cache_admission_control_fresh_interval_s = 300; + + @ConfField(mutable = true, description = { + "当没有匹配的准入规则时的默认行为:true=允许,false=拒绝", + "Default behavior when no matching admission rule is found: true=allow, false=deny" + }) + public static boolean file_cache_admission_control_default_allow = false; + @ConfField public static String cloud_snapshot_handler_class = "org.apache.doris.cloud.snapshot.CloudSnapshotHandler"; @ConfField diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java index a81f259dd5c0b7..1e70fd7f1686bc 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java @@ -31,6 +31,7 @@ import org.apache.doris.common.util.JdkUtils; import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.FileCacheAdmissionManager; import org.apache.doris.httpv2.HttpServer; import org.apache.doris.journal.bdbje.BDBDebugger; import org.apache.doris.journal.bdbje.BDBTool; @@ -160,6 +161,10 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star serverReady.set(false); gracefulShutdown(); + if (Config.enable_file_cache_admission_control) { + FileCacheAdmissionManager.getInstance().shutdown(); + } + // Shutdown HTTP server after main process graceful shutdown is complete if (httpServer != null) { httpServer.shutdown(); @@ -221,6 +226,10 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star Env.getCurrentEnv().initialize(args); Env.getCurrentEnv().waitForReady(); + if (Config.enable_file_cache_admission_control) { + FileCacheAdmissionManager.getInstance().loadOnStartup(); + } + // init and start: // 1. HttpServer for HTTP Server // 2. FeServer for Thrift Server diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileCacheAdmissionManager.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileCacheAdmissionManager.java new file mode 100644 index 00000000000000..9734a82e0f9f6e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileCacheAdmissionManager.java @@ -0,0 +1,700 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.Config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class FileCacheAdmissionManager { + private static final Logger LOG = LogManager.getLogger(FileCacheAdmissionManager.class); + + public enum RuleType { + EXCLUDE(0), + INCLUDE(1); + + private final int value; + + RuleType(int value) { + this.value = value; + } + + public static RuleType fromValue(int value) { + if (value == 0) { + return EXCLUDE; + } else if (value == 1) { + return INCLUDE; + } + throw new IllegalArgumentException("Invalid RuleType Value: " + value); + } + } + + public enum RuleLevel { + PARTITION, // 0 + TABLE, // 1 + DATABASE, // 2 + CATALOG, // 3 + GLOBAL, // 4 + INVALID // 5 + } + + public static class RulePattern { + private final long id; + private final String userIdentity; + private final String catalog; + private final String database; + private final String table; + private final String partitionPattern; + private final RuleType ruleType; + + public RulePattern(long id, String userIdentity, String catalog, String database, + String table, String partitionPattern, RuleType ruleType) { + this.id = id; + this.userIdentity = userIdentity; + this.catalog = catalog != null ? catalog : ""; + this.database = database != null ? database : ""; + this.table = table != null ? table : ""; + this.partitionPattern = partitionPattern != null ? partitionPattern : ""; + this.ruleType = ruleType; + } + + public long getId() { + return id; + } + + public String getUserIdentity() { + return userIdentity; + } + + public String getCatalog() { + return catalog; + } + + public String getDatabase() { + return database; + } + + public String getTable() { + return table; + } + + public String getPartitionPattern() { + return partitionPattern; + } + + public RuleType getRuleType() { + return ruleType; + } + } + + public static class AdmissionRule { + private final long id; + private final String userIdentity; + private final String catalog; + private final String database; + private final String table; + private final String partitionPattern; + private final RuleType ruleType; + private final boolean enabled; + private final long createdTime; + private final long updatedTime; + + @JsonCreator + public AdmissionRule( + @JsonProperty("id") long id, + @JsonProperty("user_identity") String userIdentity, + @JsonProperty("catalog_name") String catalog, + @JsonProperty("database_name") String database, + @JsonProperty("table_name") String table, + @JsonProperty("partition_pattern") String partitionPattern, + @JsonProperty("rule_type") int ruleType, + @JsonProperty("enabled") boolean enabled, + @JsonProperty("created_time") long createdTime, + @JsonProperty("updated_time") long updatedTime) { + this.id = id; + this.userIdentity = userIdentity != null ? userIdentity : ""; + this.catalog = catalog != null ? catalog : ""; + this.database = database != null ? database : ""; + this.table = table != null ? table : ""; + this.partitionPattern = partitionPattern != null ? partitionPattern : ""; + this.ruleType = RuleType.fromValue(ruleType); + this.enabled = enabled; + this.createdTime = createdTime; + this.updatedTime = updatedTime; + } + + public RulePattern toRulePattern() { + return new RulePattern(id, userIdentity, catalog, database, table, partitionPattern, ruleType); + } + + public long getId() { + return id; + } + + public String getUserIdentity() { + return userIdentity; + } + + public String getTable() { + return table; + } + + public String getDatabase() { + return database; + } + + public String getCatalog() { + return catalog; + } + + public String getPartitionPattern() { + return partitionPattern; + } + + public RuleType getRuleType() { + return ruleType; + } + + public boolean getEnabled() { + return enabled; + } + + public long getCreatedTime() { + return createdTime; + } + + public long getUpdatedTime() { + return updatedTime; + } + } + + public static class RuleLoader { + private static final ObjectMapper MAPPER = new ObjectMapper(); + + public static List loadRulesFromFile(String filePath) throws Exception { + File file = new File(filePath); + if (!file.exists()) { + throw new IllegalArgumentException("File cache admission JSON file does not exist: " + filePath); + } + + return MAPPER.readValue(file, new TypeReference>() {}); + } + } + + public static class ConcurrentRuleCollection { + private Boolean excludeGlobal = false; + private final Set excludeCatalogRules = new HashSet<>(); + private final Map> excludeDatabaseRules = new HashMap<>(); + private final Map> excludeTableRules = new HashMap<>(); + + private Boolean includeGlobal = false; + private final Set includeCatalogRules = new HashSet<>(); + private final Map> includeDatabaseRules = new HashMap<>(); + private final Map> includeTableRules = new HashMap<>(); + + static List reasons = new ArrayList<>(Arrays.asList( + "common catalog-level blacklist rule", // 0 + "common catalog-level whitelist rule", // 1 + "common database-level blacklist rule", // 2 + "common database-level whitelist rule", // 3 + "common table-level blacklist rule", // 4 + "common table-level whitelist rule", // 5 + "user global-level blacklist rule", // 6 + "user global-level whitelist rule", // 7 + "user catalog-level blacklist rule", // 8 + "user catalog-level whitelist rule", // 9 + "user database-level blacklist rule", // 10 + "user database-level whitelist rule", // 11 + "user table-level blacklist rule", // 12 + "user table-level whitelist rule", // 13 + "default rule" // 14 + )); + + public boolean isAllowed(String userIdentity, String catalog, String database, String table, + AtomicReference reason) { + + String catalogDatabase = catalog + "." + database; + + if (containsKeyValue(excludeTableRules, table, catalogDatabase)) { + reason.set(reasons.get(4)); + logAdmission(false, userIdentity, catalog, database, table, reason.get()); + return false; + } + if (containsKeyValue(includeTableRules, table, catalogDatabase)) { + reason.set(reasons.get(5)); + logAdmission(true, userIdentity, catalog, database, table, reason.get()); + return true; + } + if (containsKeyValue(excludeDatabaseRules, database, catalog)) { + reason.set(reasons.get(2)); + logAdmission(false, userIdentity, catalog, database, table, reason.get()); + return false; + } + if (containsKeyValue(includeDatabaseRules, database, catalog)) { + reason.set(reasons.get(3)); + logAdmission(true, userIdentity, catalog, database, table, reason.get()); + return true; + } + if (excludeCatalogRules.contains(catalog)) { + reason.set(reasons.get(0)); + logAdmission(false, userIdentity, catalog, database, table, reason.get()); + return false; + } + if (includeCatalogRules.contains(catalog)) { + reason.set(reasons.get(1)); + logAdmission(true, userIdentity, catalog, database, table, reason.get()); + return true; + } + + // TODO: Implementing partition-level rules + + reason.set(reasons.get(14)); + logAdmission(Config.file_cache_admission_control_default_allow, + userIdentity, catalog, database, table, reason.get()); + return Config.file_cache_admission_control_default_allow; + } + + public boolean isAllowed(ConcurrentRuleCollection userCollection, String userIdentity, String catalog, + String database, String table, AtomicReference reason) { + + String catalogDatabase = catalog + "." + database; + + if (containsKeyValue(excludeTableRules, table, catalogDatabase)) { + reason.set(reasons.get(4)); + logAdmission(false, userIdentity, catalog, database, table, reason.get()); + return false; + } + if (containsKeyValue(userCollection.excludeTableRules, table, catalogDatabase)) { + reason.set(reasons.get(12)); + logAdmission(false, userIdentity, catalog, database, table, reason.get()); + return false; + } + if (containsKeyValue(includeTableRules, table, catalogDatabase)) { + reason.set(reasons.get(5)); + logAdmission(true, userIdentity, catalog, database, table, reason.get()); + return true; + } + if (containsKeyValue(userCollection.includeTableRules, table, catalogDatabase)) { + reason.set(reasons.get(13)); + logAdmission(true, userIdentity, catalog, database, table, reason.get()); + return true; + } + if (containsKeyValue(excludeDatabaseRules, database, catalog)) { + reason.set(reasons.get(2)); + logAdmission(false, userIdentity, catalog, database, table, reason.get()); + return false; + } + if (containsKeyValue(userCollection.excludeDatabaseRules, database, catalog)) { + reason.set(reasons.get(10)); + logAdmission(false, userIdentity, catalog, database, table, reason.get()); + return false; + } + if (containsKeyValue(includeDatabaseRules, database, catalog)) { + reason.set(reasons.get(3)); + logAdmission(true, userIdentity, catalog, database, table, reason.get()); + return true; + } + if (containsKeyValue(userCollection.includeDatabaseRules, database, catalog)) { + reason.set(reasons.get(11)); + logAdmission(true, userIdentity, catalog, database, table, reason.get()); + return true; + } + if (excludeCatalogRules.contains(catalog)) { + reason.set(reasons.get(0)); + logAdmission(false, userIdentity, catalog, database, table, reason.get()); + return false; + } + if (userCollection.excludeCatalogRules.contains(catalog)) { + reason.set(reasons.get(8)); + logAdmission(false, userIdentity, catalog, database, table, reason.get()); + return false; + } + if (includeCatalogRules.contains(catalog)) { + reason.set(reasons.get(1)); + logAdmission(true, userIdentity, catalog, database, table, reason.get()); + return true; + } + if (userCollection.includeCatalogRules.contains(catalog)) { + reason.set(reasons.get(9)); + logAdmission(true, userIdentity, catalog, database, table, reason.get()); + return true; + } + if (userCollection.excludeGlobal) { + reason.set(reasons.get(6)); + logAdmission(false, userIdentity, catalog, database, table, reason.get()); + return false; + } + if (userCollection.includeGlobal) { + reason.set(reasons.get(7)); + logAdmission(true, userIdentity, catalog, database, table, reason.get()); + return true; + } + + // TODO: Implementing partition-level rules + + reason.set(reasons.get(14)); + logAdmission(Config.file_cache_admission_control_default_allow, + userIdentity, catalog, database, table, reason.get()); + return Config.file_cache_admission_control_default_allow; + } + + private boolean containsKeyValue(Map> map, String key, String value) { + Set set = map.get(key); + return set != null && set.contains(value); + } + + private void logAdmission(boolean allowed, String userIdentity, + String catalog, String database, + String table, String reason) { + String status = allowed ? "allowed" : "denied"; + + String logMessage = String.format( + "File cache request %s by %s, user_identity: %s, " + + "catalog: %s, database: %s, table: %s", + status, reason, userIdentity, catalog, database, table); + + LOG.debug(logMessage); + } + + public RuleLevel getRuleLevel(RulePattern rulePattern) { + int pattern = 0; + if (!rulePattern.getPartitionPattern().isEmpty()) { + pattern |= 1; + } + if (!rulePattern.getTable().isEmpty()) { + pattern |= 1 << 1; + } + if (!rulePattern.getDatabase().isEmpty()) { + pattern |= 1 << 2; + } + if (!rulePattern.getCatalog().isEmpty()) { + pattern |= 1 << 3; + } + + RuleLevel[] levelTable = { + /* 0000 */ RuleLevel.GLOBAL, // 0 + /* 0001 */ RuleLevel.INVALID, // 1 + /* 0010 */ RuleLevel.INVALID, // 2 + /* 0011 */ RuleLevel.INVALID, // 3 + /* 0100 */ RuleLevel.INVALID, // 4 + /* 0101 */ RuleLevel.INVALID, // 5 + /* 0110 */ RuleLevel.INVALID, // 6 + /* 0111 */ RuleLevel.INVALID, // 7 + /* 1000 */ RuleLevel.CATALOG, // 8 + /* 1001 */ RuleLevel.INVALID, // 9 + /* 1010 */ RuleLevel.INVALID, // 10 + /* 1011 */ RuleLevel.INVALID, // 11 + /* 1100 */ RuleLevel.DATABASE, // 12 + /* 1101 */ RuleLevel.INVALID, // 13 + /* 1110 */ RuleLevel.TABLE, // 14 + /* 1111 */ RuleLevel.PARTITION // 15 + }; + + return levelTable[pattern]; + } + + public void add(RulePattern rulePattern) { + RuleLevel ruleLevel = getRuleLevel(rulePattern); + if (ruleLevel == RuleLevel.INVALID) { + return; + } + + Set catalogRules = (rulePattern.getRuleType() == RuleType.EXCLUDE) + ? excludeCatalogRules : includeCatalogRules; + Map> databaseRules = (rulePattern.getRuleType() == RuleType.EXCLUDE) + ? excludeDatabaseRules : includeDatabaseRules; + Map> tableRules = (rulePattern.getRuleType() == RuleType.EXCLUDE) + ? excludeTableRules : includeTableRules; + + switch (ruleLevel) { + case GLOBAL: + if (rulePattern.getRuleType() == RuleType.EXCLUDE) { + excludeGlobal = true; + } else { + includeGlobal = true; + } + break; + case CATALOG: + catalogRules.add(rulePattern.getCatalog()); + break; + case DATABASE: + databaseRules.computeIfAbsent(rulePattern.getDatabase(), k -> ConcurrentHashMap.newKeySet()) + .add(rulePattern.getCatalog()); + break; + case TABLE: + String catalogDatabase = rulePattern.getCatalog() + "." + rulePattern.getDatabase(); + tableRules.computeIfAbsent(rulePattern.getTable(), k -> ConcurrentHashMap.newKeySet()) + .add(catalogDatabase); + break; + case PARTITION: + // TODO: Implementing partition-level rules + break; + default: + break; + } + } + } + + public static class ConcurrentRuleManager { + private static final int PARTITION_COUNT = 58; // A-Z + a-z + 其他字符 + private final List> maps; + private final ConcurrentRuleCollection commonCollection; + + static List otherReasons = new ArrayList<>(Arrays.asList( + "empty user_identity", + "invalid user_identity" + )); + + public ConcurrentRuleManager() { + maps = new ArrayList<>(PARTITION_COUNT); + commonCollection = new ConcurrentRuleCollection(); + + for (int i = 0; i < PARTITION_COUNT; i++) { + maps.add(new ConcurrentHashMap<>()); + } + } + + private int getIndex(char firstChar) { + return firstChar - 'A'; + } + + public void initialize(List rules) { + for (AdmissionRule rule : rules) { + if (!rule.getEnabled()) { + continue; + } + + RulePattern rulePattern = rule.toRulePattern(); + + if (rulePattern.getUserIdentity().isEmpty()) { + commonCollection.add(rulePattern); + continue; + } + + char firstChar = rulePattern.getUserIdentity().charAt(0); + if (!Character.isAlphabetic(firstChar)) { + continue; + } + + int index = getIndex(firstChar); + maps.get(index).computeIfAbsent(rulePattern.getUserIdentity(), + k -> new ConcurrentRuleCollection()).add(rulePattern); + } + } + + public boolean isAllowed(String userIdentity, String catalog, String database, String table, + AtomicReference reason) { + if (userIdentity.isEmpty()) { + reason.set(otherReasons.get(0)); + logDefaultAdmission(userIdentity, catalog, database, table, reason.get()); + return Config.file_cache_admission_control_default_allow; + } + + char firstChar = userIdentity.charAt(0); + if (!Character.isAlphabetic(firstChar)) { + reason.set(otherReasons.get(1)); + logDefaultAdmission(userIdentity, catalog, database, table, reason.get()); + return Config.file_cache_admission_control_default_allow; + } + + int index = getIndex(firstChar); + ConcurrentRuleCollection collection = maps.get(index).get(userIdentity); + if (collection == null) { + return commonCollection.isAllowed(userIdentity, catalog, database, table, reason); + } else { + return commonCollection.isAllowed(collection, userIdentity, catalog, database, table, reason); + } + } + + private void logDefaultAdmission(String userIdentity, String catalog, + String database, String table, String reason) { + boolean allowed = Config.file_cache_admission_control_default_allow; + String decision = allowed ? "allowed" : "denied"; + + String logMessage = String.format( + "File cache request %s by file_cache_admission_control_default_allow, " + + "user_identity: %s, catalog: %s, database: %s, table: %s, reason: %s", + decision, userIdentity, catalog, database, table, reason); + + LOG.debug(logMessage); + } + } + + private ConcurrentRuleManager ruleManager; + + private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock(); + private final ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock(); + + private static final FileCacheAdmissionManager INSTANCE = new FileCacheAdmissionManager(); + + private ScheduledExecutorService executorService; + + private long lastLoadedTime; + + public FileCacheAdmissionManager() { + this.ruleManager = new ConcurrentRuleManager(); + this.lastLoadedTime = 0; + } + + public static FileCacheAdmissionManager getInstance() { + return INSTANCE; + } + + public void initialize(List rules) { + ruleManager.initialize(rules); + } + + public boolean isAllowed(String userIdentity, String catalog, String database, String table, + AtomicReference reason) { + readLock.lock(); + boolean isAllowed = ruleManager.isAllowed(userIdentity, catalog, database, table, reason); + readLock.unlock(); + + return isAllowed; + } + + public void loadOnStartup() { + LOG.info("Loading file cache admission rules..."); + + loadRules(); + startRefreshTask(); + } + + public void loadRules(String filePath) { + if (filePath == null || filePath.isEmpty()) { + LOG.warn("File cache admission JSON file path is not configured, admission control will be disabled."); + return; + } + + try { + List loadedRules = RuleLoader.loadRulesFromFile(filePath); + LOG.info("{} rules loaded successfully from file: {}", loadedRules.size(), filePath); + + ConcurrentRuleManager newRuleManager = new ConcurrentRuleManager(); + newRuleManager.initialize(loadedRules); + + writeLock.lock(); + ruleManager = newRuleManager; + writeLock.unlock(); + } catch (Exception e) { + LOG.error("Failed to load file cache admission rules from file: {}", filePath, e); + } + } + + public void loadRules() { + if (Config.file_cache_admission_control_json_file_path == null + || Config.file_cache_admission_control_json_file_path.isEmpty()) { + LOG.warn("File cache admission JSON file path is not configured, admission control will be disabled."); + return; + } + + try { + File ruleFile = new File(Config.file_cache_admission_control_json_file_path); + + if (!ruleFile.exists()) { + LOG.warn("File cache admission JSON file does not exist: {}", + Config.file_cache_admission_control_json_file_path); + return; + } + + long lastModified = ruleFile.lastModified(); + if (lastModified <= lastLoadedTime) { + LOG.info("File cache admission rules file has not been modified since last load, skip loading."); + return; + } + + List loadedRules = RuleLoader.loadRulesFromFile( + Config.file_cache_admission_control_json_file_path); + LOG.info("{} rules loaded successfully from file: {}", loadedRules.size(), + Config.file_cache_admission_control_json_file_path); + + ConcurrentRuleManager newRuleManager = new ConcurrentRuleManager(); + newRuleManager.initialize(loadedRules); + + writeLock.lock(); + lastLoadedTime = lastModified; + ruleManager = newRuleManager; + writeLock.unlock(); + } catch (Exception e) { + LOG.error("Failed to load file cache admission rules from file: {}", + Config.file_cache_admission_control_json_file_path, e); + } + } + + private void startRefreshTask() { + int interval = Config.file_cache_admission_control_fresh_interval_s; + if (interval <= 0) { + LOG.info("File cache admission control refresh interval is {} (<=0), refresh task will not be started.", + interval); + return; + } + + executorService = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "file-cache-admission-rule-refresh"); + t.setDaemon(true); + return t; + }); + + executorService.scheduleAtFixedRate(() -> { + LOG.info("Refreshing file cache admission rules..."); + loadRules(); + }, interval, interval, TimeUnit.SECONDS); + + LOG.info("Started refreshing task, interval: {} seconds", interval); + } + + public void shutdown() { + if (executorService != null) { + LOG.info("Starting shutdown refreshing executorService"); + executorService.shutdown(); + try { + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + LOG.warn("Refreshing executorService did not terminate"); + } + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + LOG.info("Refreshing executorService shutdown completed"); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 2614006b6a1185..fade5b000cf8ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -29,6 +29,7 @@ import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; @@ -71,6 +72,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; /** * FileQueryScanNode for querying the file access type of catalog, now only support @@ -307,11 +309,42 @@ public void createScanRangeLocations() throws UserException { int numBackends = backendPolicy.numBackends(); List pathPartitionKeys = getPathPartitionKeys(); + + Boolean fileCacheAdmission = true; + if (Config.enable_file_cache_admission_control) { + TableIf tableIf = getTargetTable(); + if (tableIf instanceof ExternalTable) { + ExternalTable externalTableIf = (ExternalTable) tableIf; + + String userIdentity = ConnectContext.get().getUserIdentity(); + String catalog = externalTableIf.getCatalog().getName(); + String database = externalTableIf.getDatabase().getFullName(); + String table = externalTableIf.getName(); + + AtomicReference reason = new AtomicReference<>(""); + + long startTime = System.nanoTime(); + + fileCacheAdmission = FileCacheAdmissionManager.getInstance().isAllowed(userIdentity, catalog, + database, table, reason); + + long endTime = System.nanoTime(); + double durationMs = (double) (endTime - startTime) / 1_000_000; + + LOG.debug("File cache admission control cost {} ms", String.format("%.6f", durationMs)); + + addFileCacheAdmissionLog(userIdentity, fileCacheAdmission, reason.get(), durationMs); + } else { + LOG.info("Skip file cache admission control for non-external table: {}.{}", + tableIf.getDatabase().getFullName(), tableIf.getName()); + } + } + if (isBatchMode()) { // File splits are generated lazily, and fetched by backends while scanning. // Only provide the unique ID of split source to backend. - splitAssignment = new SplitAssignment( - backendPolicy, this, this::splitToScanRange, locationProperties, pathPartitionKeys); + splitAssignment = new SplitAssignment(backendPolicy, this, this::splitToScanRange, + locationProperties, pathPartitionKeys, fileCacheAdmission); splitAssignment.init(); if (executor != null) { executor.getSummaryProfile().setGetSplitsFinishTime(); @@ -365,7 +398,8 @@ public void createScanRangeLocations() throws UserException { for (Backend backend : assignment.keySet()) { Collection splits = assignment.get(backend); for (Split split : splits) { - scanRangeLocations.add(splitToScanRange(backend, locationProperties, split, pathPartitionKeys)); + scanRangeLocations.add(splitToScanRange(backend, locationProperties, split, pathPartitionKeys, + fileCacheAdmission)); totalFileSize += split.getLength(); } scanBackendIds.add(backend.getId()); @@ -390,7 +424,8 @@ private TScanRangeLocations splitToScanRange( Backend backend, Map locationProperties, Split split, - List pathPartitionKeys) throws UserException { + List pathPartitionKeys, + Boolean fileCacheAdmission) throws UserException { FileSplit fileSplit = (FileSplit) split; TScanRangeLocations curLocations = newLocations(); // If fileSplit has partition values, use the values collected from hive partitions. @@ -410,6 +445,7 @@ private TScanRangeLocations splitToScanRange( // set file format type, and the type might fall back to native format in setScanParams rangeDesc.setFormatType(getFileFormatType()); setScanParams(rangeDesc, fileSplit); + rangeDesc.setFileCacheAdmission(fileCacheAdmission); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); TScanRangeLocation location = new TScanRangeLocation(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index a7aa0f607ac504..a737ac2e637aca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -52,6 +52,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -68,9 +69,12 @@ public abstract class FileScanNode extends ExternalScanNode { // For display pushdown agg result protected long tableLevelRowCount = -1; + protected List fileCacheAdmissionLogs; + public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, boolean needCheckColumnPriv) { super(id, desc, planNodeName, needCheckColumnPriv); this.needCheckColumnPriv = needCheckColumnPriv; + this.fileCacheAdmissionLogs = new ArrayList<>(); } @Override @@ -191,6 +195,11 @@ public int compare(TFileRangeDesc o1, TFileRangeDesc o2) { .map(node -> node.getId().asInt() + "").collect(Collectors.toList())); output.append(prefix).append("TOPN OPT:").append(topnFilterSources).append("\n"); } + + for (String admissionLog : fileCacheAdmissionLogs) { + output.append(prefix).append(admissionLog).append("\n"); + } + return output.toString(); } @@ -261,4 +270,11 @@ protected void setDefaultValueExprs(TableIf tbl, } } } + + protected void addFileCacheAdmissionLog(String userIdentity, Boolean admitted, String reason, double durationMs) { + String admissionStatus = admitted ? "ADMITTED" : "DENIED"; + String admissionLog = String.format("file cache request %s: user_identity:%s, reason:%s, cost:%.6f ms", + admissionStatus, userIdentity, reason, durationMs); + fileCacheAdmissionLogs.add(admissionLog); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java index cc17818d6b5a82..bb0eeeef8a55e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java @@ -53,6 +53,7 @@ public class SplitAssignment { private final SplitToScanRange splitToScanRange; private final Map locationProperties; private final List pathPartitionKeys; + private final Boolean fileCacheAdmission; private final Object assignLock = new Object(); private Split sampleSplit = null; private final AtomicBoolean isStopped = new AtomicBoolean(false); @@ -66,12 +67,14 @@ public SplitAssignment( SplitGenerator splitGenerator, SplitToScanRange splitToScanRange, Map locationProperties, - List pathPartitionKeys) { + List pathPartitionKeys, + Boolean fileCacheAdmission) { this.backendPolicy = backendPolicy; this.splitGenerator = splitGenerator; this.splitToScanRange = splitToScanRange; this.locationProperties = locationProperties; this.pathPartitionKeys = pathPartitionKeys; + this.fileCacheAdmission = fileCacheAdmission; } public void init() throws UserException { @@ -107,7 +110,8 @@ private void appendBatch(Multimap batch) throws UserException { Collection splits = batch.get(backend); List locations = new ArrayList<>(splits.size()); for (Split split : splits) { - locations.add(splitToScanRange.getScanRange(backend, locationProperties, split, pathPartitionKeys)); + locations.add(splitToScanRange.getScanRange(backend, locationProperties, split, pathPartitionKeys, + fileCacheAdmission)); } while (needMoreSplit()) { BlockingQueue> queue = diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java index 0e890252857583..af69566ae37b32 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java @@ -30,5 +30,6 @@ TScanRangeLocations getScanRange( Backend backend, Map locationProperties, Split split, - List pathPartitionKeys) throws UserException; + List pathPartitionKeys, + Boolean fileCacheAdmission) throws UserException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 35d64c4455e2e8..b2f006ed86ed66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -57,6 +57,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.datasource.FileCacheAdmissionManager; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.dictionary.LayoutType; import org.apache.doris.info.PartitionNamesInfo; @@ -1089,6 +1090,8 @@ import org.antlr.v4.runtime.tree.ParseTree; import org.antlr.v4.runtime.tree.RuleNode; import org.antlr.v4.runtime.tree.TerminalNode; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.math.BigDecimal; import java.math.BigInteger; @@ -1104,6 +1107,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -1112,6 +1116,8 @@ */ @SuppressWarnings({"OptionalUsedAsFieldOrParameterType", "OptionalGetWithoutIsPresent"}) public class LogicalPlanBuilder extends DorisParserBaseVisitor { + private static final Logger LOG = LogManager.getLogger(LogicalPlanBuilder.class); + private static String JOB_NAME = "jobName"; private static String TASK_ID = "taskId"; @@ -8901,6 +8907,41 @@ public LogicalPlan visitWarmUpSelect(DorisParser.WarmUpSelectContext ctx) { + " disable_file_cache=false in cloud mode"); } + if (Config.enable_file_cache_admission_control) { + DorisParser.WarmUpSingleTableRefContext tableRef = ctx.warmUpSingleTableRef(); + List identifierParts = visitMultipartIdentifier(tableRef.multipartIdentifier()); + + int partCount = identifierParts.size(); + String table = identifierParts.get(partCount - 1); + String database = (partCount >= 2) + ? identifierParts.get(partCount - 2) : ConnectContext.get().getDatabase(); + String catalog = (partCount == 3) + ? identifierParts.get(partCount - 3) : ConnectContext.get().getCurrentCatalog().getName(); + String userIdentity = ConnectContext.get().getUserIdentity(); + + if (!"internal".equals(catalog)) { + AtomicReference reason = new AtomicReference<>(""); + + long startTime = System.nanoTime(); + + boolean fileCacheAdmission = FileCacheAdmissionManager.getInstance().isAllowed(userIdentity, catalog, + database, table, reason); + + long endTime = System.nanoTime(); + double durationMs = (double) (endTime - startTime) / 1_000_000; + + LOG.debug("File cache admission control cost {} ms", String.format("%.6f", durationMs)); + + if (!fileCacheAdmission) { + throw new AnalysisException("WARM UP SELECT denied by file cache admission control, reason: " + + reason); + } + } else { + LOG.info("Skip file cache admission control for non-external table: {}.{}", + database, table); + } + } + UnboundBlackholeSink sink = new UnboundBlackholeSink<>(project, new UnboundBlackholeSinkContext(true)); LogicalPlan command = new WarmupSelectCommand(sink); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 03dac4a899dd1c..6aab2e9ff619e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -630,6 +630,11 @@ public String getQualifiedUser() { return currentUserIdentity == null ? null : currentUserIdentity.getQualifiedUser(); } + public String getUserIdentity() { + return currentUserIdentity == null ? null : (currentUserIdentity.getQualifiedUser() + "@" + + currentUserIdentity.getHost()); + } + public boolean getIsTempUser() { return isTempUser; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileCacheAdmissionManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileCacheAdmissionManagerTest.java new file mode 100644 index 00000000000000..7a7f86bcbe8a43 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileCacheAdmissionManagerTest.java @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.Config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +public class FileCacheAdmissionManagerTest { + + private FileCacheAdmissionManager manager; + + @Before + public void setUp() { + Config.file_cache_admission_control_default_allow = false; + manager = new FileCacheAdmissionManager(); + } + + @Test + public void testEmptyUserIdentity() { + AtomicReference reason = new AtomicReference<>(); + boolean result = manager.isAllowed("", "catalog", "database", "table", reason); + Assert.assertFalse(result); + Assert.assertEquals("empty user_identity", reason.get()); + } + + @Test + public void testInvalidUserIdentity() { + AtomicReference reason = new AtomicReference<>(); + boolean result = manager.isAllowed("123user", "catalog", "database", "table", reason); + Assert.assertFalse(result); + Assert.assertEquals("invalid user_identity", reason.get()); + } + + @Test + public void testDefaultAllow() { + Config.file_cache_admission_control_default_allow = true; + AtomicReference reason1 = new AtomicReference<>(); + boolean result1 = manager.isAllowed("user", "catalog", "database", "table", reason1); + Assert.assertTrue(result1); + Assert.assertEquals("default rule", reason1.get()); + + Config.file_cache_admission_control_default_allow = false; + AtomicReference reason2 = new AtomicReference<>(); + boolean result2 = manager.isAllowed("user", "catalog", "database", "table", reason2); + Assert.assertFalse(result2); + Assert.assertEquals("default rule", reason2.get()); + } + + @Test + public void testCommonRule() throws Exception { + List rules = new ArrayList<>(); + long createdTime = 0; + long updatedTime = 0; + + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 1L, "", "catalog_1", "", "", "", + 1, true, createdTime, updatedTime + )); + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 2L, "", "catalog_2", "database_1", "", "", + 1, true, createdTime, updatedTime + )); + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 3L, "", "catalog_3", "database_2", "table_1", "", + 1, true, createdTime, updatedTime + )); + + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.enable(SerializationFeature.INDENT_OUTPUT); + File jsonFile = new File("rules.json"); + objectMapper.writeValue(jsonFile, rules); + + manager.loadRules("rules.json"); + + AtomicReference reason1 = new AtomicReference<>(); + boolean result1 = manager.isAllowed("user", "catalog_1", "database", "table", reason1); + Assert.assertTrue(result1); + Assert.assertEquals("common catalog-level whitelist rule", reason1.get()); + + AtomicReference reason2 = new AtomicReference<>(); + boolean result2 = manager.isAllowed("user", "catalog_2", "database_1", "table", reason2); + Assert.assertTrue(result2); + Assert.assertEquals("common database-level whitelist rule", reason2.get()); + + AtomicReference reason3 = new AtomicReference<>(); + boolean result3 = manager.isAllowed("user", "catalog_3", "database_2", "table_1", reason3); + Assert.assertTrue(result3); + Assert.assertEquals("common table-level whitelist rule", reason3.get()); + } + + @Test + public void testRuleEnabled() throws Exception { + List rules = new ArrayList<>(); + long createdTime = 0; + long updatedTime = 0; + + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 1L, "", "catalog_1", "", "", "", + 1, false, createdTime, updatedTime + )); + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 2L, "", "catalog_2", "database_1", "", "", + 1, false, createdTime, updatedTime + )); + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 3L, "", "catalog_3", "database_2", "table_1", "", + 1, false, createdTime, updatedTime + )); + + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.enable(SerializationFeature.INDENT_OUTPUT); + File jsonFile = new File("rules.json"); + objectMapper.writeValue(jsonFile, rules); + + manager.loadRules("rules.json"); + + AtomicReference reason1 = new AtomicReference<>(); + boolean result1 = manager.isAllowed("user", "catalog_1", "database", "table", reason1); + Assert.assertFalse(result1); + Assert.assertEquals("default rule", reason1.get()); + + AtomicReference reason2 = new AtomicReference<>(); + boolean result2 = manager.isAllowed("user", "catalog_2", "database_1", "table", reason2); + Assert.assertFalse(result2); + Assert.assertEquals("default rule", reason2.get()); + + AtomicReference reason3 = new AtomicReference<>(); + boolean result3 = manager.isAllowed("user", "catalog_3", "database_2", "table_1", reason3); + Assert.assertFalse(result3); + Assert.assertEquals("default rule", reason3.get()); + } + + @Test + public void testUserRule() throws Exception { + List rules = new ArrayList<>(); + long createdTime = 0; + long updatedTime = 0; + + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 4L, "user_1", "catalog_4", "", "", "", + 1, true, createdTime, updatedTime + )); + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 5L, "user_1", "catalog_5", "database_4", "", "", + 1, true, createdTime, updatedTime + )); + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 6L, "user_1", "catalog_6", "database_5", "table_4", "", + 1, true, createdTime, updatedTime + )); + + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.enable(SerializationFeature.INDENT_OUTPUT); + File jsonFile = new File("rules.json"); + objectMapper.writeValue(jsonFile, rules); + + manager.loadRules("rules.json"); + + AtomicReference reason1 = new AtomicReference<>(); + boolean result1 = manager.isAllowed("user_1", "catalog_4", "database", "table", reason1); + Assert.assertTrue(result1); + Assert.assertEquals("user catalog-level whitelist rule", reason1.get()); + AtomicReference reason2 = new AtomicReference<>(); + boolean result2 = manager.isAllowed("user_2", "catalog_4", "database", "table", reason2); + Assert.assertFalse(result2); + Assert.assertEquals("default rule", reason2.get()); + + AtomicReference reason3 = new AtomicReference<>(); + boolean result3 = manager.isAllowed("user_1", "catalog_5", "database_4", "table", reason3); + Assert.assertTrue(result3); + Assert.assertEquals("user database-level whitelist rule", reason3.get()); + AtomicReference reason4 = new AtomicReference<>(); + boolean result4 = manager.isAllowed("user_2", "catalog_5", "database_4", "table", reason4); + Assert.assertFalse(result4); + Assert.assertEquals("default rule", reason4.get()); + + AtomicReference reason5 = new AtomicReference<>(); + boolean result5 = manager.isAllowed("user_1", "catalog_6", "database_5", "table_4", reason5); + Assert.assertTrue(result5); + Assert.assertEquals("user table-level whitelist rule", reason5.get()); + AtomicReference reason6 = new AtomicReference<>(); + boolean result6 = manager.isAllowed("user_2", "catalog_6", "database_5", "table_4", reason6); + Assert.assertFalse(result6); + Assert.assertEquals("default rule", reason6.get()); + } + + @Test + public void testRuleLevelPriority() throws Exception { + List rules = new ArrayList<>(); + long createdTime = 0; + long updatedTime = 0; + + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 7L, "user_3", "", "", "", "", + 1, true, createdTime, updatedTime + )); + + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.enable(SerializationFeature.INDENT_OUTPUT); + File jsonFile = new File("rules.json"); + objectMapper.writeValue(jsonFile, rules); + + manager.loadRules("rules.json"); + + AtomicReference reason1 = new AtomicReference<>(); + boolean result1 = manager.isAllowed("user_3", "catalog", "database", "table", reason1); + Assert.assertTrue(result1); + Assert.assertEquals("user global-level whitelist rule", reason1.get()); + + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 8L, "user_3", "catalog", "", "", "", + 1, true, createdTime, updatedTime + )); + + objectMapper.writeValue(jsonFile, rules); + + manager.loadRules("rules.json"); + + AtomicReference reason2 = new AtomicReference<>(); + boolean result2 = manager.isAllowed("user_3", "catalog", "database", "table", reason2); + Assert.assertTrue(result2); + Assert.assertEquals("user catalog-level whitelist rule", reason2.get()); + + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 9L, "user_3", "catalog", "database", "", "", + 1, true, createdTime, updatedTime + )); + + objectMapper.writeValue(jsonFile, rules); + + manager.loadRules("rules.json"); + + AtomicReference reason3 = new AtomicReference<>(); + boolean result3 = manager.isAllowed("user_3", "catalog", "database", "table", reason3); + Assert.assertTrue(result3); + Assert.assertEquals("user database-level whitelist rule", reason3.get()); + + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 10L, "user_3", "catalog", "database", "table", "", + 1, true, createdTime, updatedTime + )); + + objectMapper.writeValue(jsonFile, rules); + + manager.loadRules("rules.json"); + + AtomicReference reason4 = new AtomicReference<>(); + boolean result4 = manager.isAllowed("user_3", "catalog", "database", "table", reason4); + Assert.assertTrue(result4); + Assert.assertEquals("user table-level whitelist rule", reason4.get()); + } + + @Test + public void testRuleTypePriority() throws Exception { + List rules = new ArrayList<>(); + long createdTime = 0; + long updatedTime = 0; + + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 11L, "user_4", "", "", "", "", + 0, true, createdTime, updatedTime + )); + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 12L, "user_4", "", "", "", "", + 1, true, createdTime, updatedTime + )); + + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.enable(SerializationFeature.INDENT_OUTPUT); + File jsonFile = new File("rules.json"); + objectMapper.writeValue(jsonFile, rules); + + manager.loadRules("rules.json"); + + AtomicReference reason1 = new AtomicReference<>(); + boolean result1 = manager.isAllowed("user_4", "catalog", "database", "table", reason1); + Assert.assertFalse(result1); + Assert.assertEquals("user global-level blacklist rule", reason1.get()); + + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 13L, "user_4", "catalog", "", "", "", + 0, true, createdTime, updatedTime + )); + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 14L, "user_4", "catalog", "", "", "", + 1, true, createdTime, updatedTime + )); + + objectMapper.writeValue(jsonFile, rules); + + manager.loadRules("rules.json"); + + AtomicReference reason2 = new AtomicReference<>(); + boolean result2 = manager.isAllowed("user_4", "catalog", "database", "table", reason2); + Assert.assertFalse(result2); + Assert.assertEquals("user catalog-level blacklist rule", reason2.get()); + + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 15L, "user_4", "catalog", "database", "", "", + 0, true, createdTime, updatedTime + )); + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 16L, "user_4", "catalog", "database", "", "", + 1, true, createdTime, updatedTime + )); + + objectMapper.writeValue(jsonFile, rules); + + manager.loadRules("rules.json"); + + AtomicReference reason3 = new AtomicReference<>(); + boolean result3 = manager.isAllowed("user_4", "catalog", "database", "table", reason3); + Assert.assertFalse(result3); + Assert.assertEquals("user database-level blacklist rule", reason3.get()); + + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 17L, "user_4", "catalog", "database", "table", "", + 0, true, createdTime, updatedTime + )); + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 18L, "user_4", "catalog", "database", "table", "", + 1, true, createdTime, updatedTime + )); + + objectMapper.writeValue(jsonFile, rules); + + manager.loadRules("rules.json"); + + AtomicReference reason4 = new AtomicReference<>(); + boolean result4 = manager.isAllowed("user_4", "catalog", "database", "table", reason4); + Assert.assertFalse(result4); + Assert.assertEquals("user table-level blacklist rule", reason4.get()); + } + + @Test + public void testNestedRulePriorities() throws Exception { + List rules = new ArrayList<>(); + long createdTime = 0; + long updatedTime = 0; + + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 19L, "user_5", "catalog", "", "", "", + 0, true, createdTime, updatedTime + )); + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 20L, "user_5", "catalog", "database", "", "", + 1, true, createdTime, updatedTime + )); + + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 21L, "user_6", "catalog", "", "", "", + 1, true, createdTime, updatedTime + )); + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 22L, "user_6", "catalog", "database", "", "", + 0, true, createdTime, updatedTime + )); + + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 23L, "user_7", "catalog", "database", "", "", + 0, true, createdTime, updatedTime + )); + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 24L, "user_7", "catalog", "database", "table", "", + 1, true, createdTime, updatedTime + )); + + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 25L, "user_8", "catalog", "database", "", "", + 1, true, createdTime, updatedTime + )); + rules.add(new FileCacheAdmissionManager.AdmissionRule( + 26L, "user_8", "catalog", "database", "table", "", + 0, true, createdTime, updatedTime + )); + + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.enable(SerializationFeature.INDENT_OUTPUT); + File jsonFile = new File("rules.json"); + objectMapper.writeValue(jsonFile, rules); + + manager.loadRules("rules.json"); + + AtomicReference reason1 = new AtomicReference<>(); + boolean result1 = manager.isAllowed("user_5", "catalog", "database", "table", reason1); + Assert.assertTrue(result1); + Assert.assertEquals("user database-level whitelist rule", reason1.get()); + AtomicReference reason2 = new AtomicReference<>(); + boolean result2 = manager.isAllowed("user_5", "catalog", "otherDatabase", "table", reason2); + Assert.assertFalse(result2); + Assert.assertEquals("user catalog-level blacklist rule", reason2.get()); + + AtomicReference reason3 = new AtomicReference<>(); + boolean result3 = manager.isAllowed("user_6", "catalog", "database", "table", reason3); + Assert.assertFalse(result3); + Assert.assertEquals("user database-level blacklist rule", reason3.get()); + AtomicReference reason4 = new AtomicReference<>(); + boolean result4 = manager.isAllowed("user_6", "catalog", "otherDatabase", "table", reason4); + Assert.assertTrue(result4); + Assert.assertEquals("user catalog-level whitelist rule", reason4.get()); + + AtomicReference reason5 = new AtomicReference<>(); + boolean result5 = manager.isAllowed("user_7", "catalog", "database", "table", reason5); + Assert.assertTrue(result5); + Assert.assertEquals("user table-level whitelist rule", reason5.get()); + AtomicReference reason6 = new AtomicReference<>(); + boolean result6 = manager.isAllowed("user_7", "catalog", "database", "otherTable", reason6); + Assert.assertFalse(result6); + Assert.assertEquals("user database-level blacklist rule", reason6.get()); + + AtomicReference reason7 = new AtomicReference<>(); + boolean result7 = manager.isAllowed("user_8", "catalog", "database", "table", reason7); + Assert.assertFalse(result7); + Assert.assertEquals("user table-level blacklist rule", reason7.get()); + AtomicReference reason8 = new AtomicReference<>(); + boolean result8 = manager.isAllowed("user_8", "catalog", "database", "otherTable", reason8); + Assert.assertTrue(result8); + Assert.assertEquals("user database-level whitelist rule", reason8.get()); + } + + @AfterClass + public static void deleteJsonFile() throws Exception { + File file = new File("rules.json"); + Assert.assertTrue(file.exists()); + if (file.exists()) { + Assert.assertTrue(file.delete()); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java index ab5205b47a7174..28b2c604fdfc30 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java @@ -76,7 +76,8 @@ void setUp() { mockSplitGenerator, mockSplitToScanRange, locationProperties, - pathPartitionKeys + pathPartitionKeys, + true ); } @@ -92,7 +93,8 @@ void testInitSuccess() throws Exception { mockBackendPolicy.computeScanRangeAssignment((List) any); result = batch; - mockSplitToScanRange.getScanRange(mockBackend, locationProperties, mockSplit, pathPartitionKeys); + mockSplitToScanRange.getScanRange(mockBackend, locationProperties, mockSplit, pathPartitionKeys, + true); result = mockScanRangeLocations; } }; @@ -127,7 +129,8 @@ void testInitTimeout() throws Exception { mockSplitGenerator, mockSplitToScanRange, locationProperties, - pathPartitionKeys + pathPartitionKeys, + true ); new MockUp() { @@ -196,7 +199,8 @@ void testAddToQueueSuccess() throws Exception { mockBackendPolicy.computeScanRangeAssignment((List) any); result = batch; - mockSplitToScanRange.getScanRange(mockBackend, locationProperties, mockSplit, pathPartitionKeys); + mockSplitToScanRange.getScanRange(mockBackend, locationProperties, mockSplit, pathPartitionKeys, + true); result = mockScanRangeLocations; } }; @@ -226,7 +230,8 @@ void testAddToQueueSampleSplitAlreadySet() throws Exception { result = batch; minTimes = 0; - mockSplitToScanRange.getScanRange(mockBackend, locationProperties, mockSplit, pathPartitionKeys); + mockSplitToScanRange.getScanRange(mockBackend, locationProperties, mockSplit, pathPartitionKeys, + true); result = mockScanRangeLocations; minTimes = 0; } @@ -257,7 +262,8 @@ void testAddToQueueWithQueueBlockingScenario() throws Exception { mockBackendPolicy.computeScanRangeAssignment((List) any); result = batch; - mockSplitToScanRange.getScanRange(mockBackend, locationProperties, mockSplit, pathPartitionKeys); + mockSplitToScanRange.getScanRange(mockBackend, locationProperties, mockSplit, pathPartitionKeys, + true); result = mockScanRangeLocations; } }; @@ -287,7 +293,8 @@ void testAddToQueueConcurrentAccess() throws Exception { mockBackendPolicy.computeScanRangeAssignment((List) any); result = batch; - mockSplitToScanRange.getScanRange(mockBackend, locationProperties, mockSplit, pathPartitionKeys); + mockSplitToScanRange.getScanRange(mockBackend, locationProperties, mockSplit, pathPartitionKeys, + true); result = mockScanRangeLocations; } }; @@ -339,7 +346,8 @@ void testInitAndAddToQueueIntegration() throws Exception { mockBackendPolicy.computeScanRangeAssignment((List) any); result = batch; - mockSplitToScanRange.getScanRange(mockBackend, locationProperties, mockSplit, pathPartitionKeys); + mockSplitToScanRange.getScanRange(mockBackend, locationProperties, mockSplit, pathPartitionKeys, + true); result = mockScanRangeLocations; } }; @@ -383,7 +391,8 @@ void testAppendBatchTimeoutBehavior() throws Exception { mockBackendPolicy.computeScanRangeAssignment((List) any); result = batch; - mockSplitToScanRange.getScanRange(mockBackend, locationProperties, mockSplit, pathPartitionKeys); + mockSplitToScanRange.getScanRange(mockBackend, locationProperties, mockSplit, pathPartitionKeys, + true); result = mockScanRangeLocations; } }; diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index f16a1ad1d2d60a..3fcbea2296704f 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -515,6 +515,7 @@ struct TFileRangeDesc { 14: optional i64 self_split_weight // whether the value of columns_from_path is null 15: optional list columns_from_path_is_null; + 16: optional bool file_cache_admission; } struct TSplitSource { diff --git a/tools/export_mysql_rule_to_json.sh b/tools/export_mysql_rule_to_json.sh new file mode 100755 index 00000000000000..0c44ff095242b8 --- /dev/null +++ b/tools/export_mysql_rule_to_json.sh @@ -0,0 +1,83 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -e + +# Configuration +DB_HOST="localhost" +DB_USER="root" +DB_NAME="file_cache_admission_control" +DB_PASS="" +TABLE_NAME="admission_policy" +OUTPUT_FILE="rule_$(date +%Y%m%d_%H%M%S).json" + +echo "=== Database Export Configuration ===" +echo "Database Host: $DB_HOST" +echo "Database User: $DB_USER" +echo "Database Name: $DB_NAME" +echo "Password: $(if [ -n "$DB_PASS" ]; then echo "Set"; else echo "Not set"; fi)" +echo "Table Name: $TABLE_NAME" +echo "Output File: $OUTPUT_FILE" +echo "=====================================" +echo "" + +# Query and convert to JSON (including long type timestamps) +QUERY=$(cat </dev/null) +else + JSON_DATA=$(echo "$QUERY" | mysql -h $DB_HOST -u $DB_USER $DB_NAME -N) +fi + +# Handle NULL +if [ "$JSON_DATA" = "NULL" ] || [ -z "$JSON_DATA" ]; then + JSON_DATA="[]" +fi + +# Save to file +echo "$JSON_DATA" > "$OUTPUT_FILE" + +# Format +if command -v jq &> /dev/null; then + jq '.' "$OUTPUT_FILE" | awk ' + /^ {/ && NR > 3 {print ""} + {print} + ' > "${OUTPUT_FILE}.tmp" && mv "${OUTPUT_FILE}.tmp" "$OUTPUT_FILE" +fi + +echo "Export completed: $OUTPUT_FILE"