Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ DEFINE_Int32(alter_index_worker_count, "3");
DEFINE_Int32(clone_worker_count, "3");
// the count of thread to clone
DEFINE_Int32(storage_medium_migrate_count, "1");
// Fallback to alternative medium at runtime. Primary decision is made by FE's medium_allocation_mode.
DEFINE_mBool(enable_storage_medium_fallback, "false");
// the count of thread to check consistency
DEFINE_Int32(check_consistency_worker_count, "1");
// the count of thread to upload
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ DECLARE_Int32(alter_index_worker_count);
DECLARE_Int32(clone_worker_count);
// the count of thread to clone
DECLARE_Int32(storage_medium_migrate_count);
// Fallback to alternative medium at runtime. Primary decision is made by FE's medium_allocation_mode.
DECLARE_mBool(enable_storage_medium_fallback);
// the count of thread to check consistency
DECLARE_Int32(check_consistency_worker_count);
// the count of thread to upload
Expand Down
47 changes: 38 additions & 9 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,19 +565,48 @@ int StorageEngine::_get_and_set_next_disk_index(int64_t partition_id,
void StorageEngine::_get_candidate_stores(TStorageMedium::type storage_medium,
std::vector<DirInfo>& dir_infos) {
std::vector<double> usages;
std::vector<DirInfo> fallback_dir_infos;
// First pass: try to get stores with specified storage medium
for (auto& it : _store_map) {
DataDir* data_dir = it.second.get();
if (data_dir->is_used()) {
if ((_available_storage_medium_type_count == 1 ||
data_dir->storage_medium() == storage_medium) &&
!data_dir->reach_capacity_limit(0)) {
double usage = data_dir->get_usage(0);
DirInfo dir_info;
dir_info.data_dir = data_dir;
dir_info.usage = usage;
dir_info.available_level = 0;
if (data_dir->is_used() && !data_dir->reach_capacity_limit(0)) {
double usage = data_dir->get_usage(0);
DirInfo dir_info;
dir_info.data_dir = data_dir;
dir_info.usage = usage;
dir_info.available_level = 0;

if (data_dir->storage_medium() == storage_medium) {
usages.push_back(usage);
dir_infos.push_back(dir_info);
} else if (_available_storage_medium_type_count == 1 ||
config::enable_storage_medium_fallback) {
fallback_dir_infos.push_back(dir_info);
}
}
}
// Second pass: if no stores found with specified medium, use fallback stores
// - When only one storage medium type available: always use it regardless of config
// - When multiple storage medium types available: use fallback only if enabled
if (dir_infos.empty() && !fallback_dir_infos.empty()) {
bool should_use_fallback = (_available_storage_medium_type_count == 1) ||
(_available_storage_medium_type_count > 1 &&
config::enable_storage_medium_fallback);

if (should_use_fallback) {
dir_infos = std::move(fallback_dir_infos);
// Rebuild usages for fallback directories
usages.clear();
for (const auto& dir_info : dir_infos) {
usages.push_back(dir_info.usage);
}

if (_available_storage_medium_type_count == 1) {
LOG(INFO) << "Only one storage medium type available, using it for storage medium "
<< storage_medium;
} else {
LOG(INFO) << "No available stores found for specified storage medium "
<< storage_medium << ", trying fallback to alternative storage mediums";
}
}
}
Expand Down
272 changes: 272 additions & 0 deletions be/test/olap/storage_medium_fallback_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gmock/gmock-actions.h>
#include <gmock/gmock-matchers.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>

#include <filesystem>
#include <memory>

#include "common/config.h"
#include "common/status.h"
#include "gtest/gtest_pred_impl.h"
#include "io/fs/local_file_system.h"
#include "olap/data_dir.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "util/threadpool.h"

namespace doris {

class StorageMediumFallbackTest : public testing::Test {
public:
void SetUp() override {
// Create test directories
_test_path = "./be/test/olap/test_data/storage_medium_fallback_test";
_hdd_path = _test_path + "/hdd";
_ssd_path = _test_path + "/ssd";

// Clean up existing test directories
auto st = io::global_local_filesystem()->delete_directory(_test_path);
st = io::global_local_filesystem()->create_directory(_test_path);
ASSERT_TRUE(st.ok()) << st;
st = io::global_local_filesystem()->create_directory(_hdd_path);
ASSERT_TRUE(st.ok()) << st;
st = io::global_local_filesystem()->create_directory(_ssd_path);
ASSERT_TRUE(st.ok()) << st;

// Create meta directories
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_hdd_path + "/meta").ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_ssd_path + "/meta").ok());

// Setup storage engine
EngineOptions options;
options.backend_uid = UniqueId::gen_uid();
_storage_engine = std::make_unique<StorageEngine>(options);

// Store original config values
_original_fallback_config = config::enable_storage_medium_fallback;
}

void TearDown() override {
// Restore original config
config::enable_storage_medium_fallback = _original_fallback_config;

// Clean up test directories
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_test_path).ok());
ExecEnv::GetInstance()->set_storage_engine(nullptr);
}

protected:
// Helper method to setup storage engine with specific configuration
void setupStorageEngine(bool include_hdd = true, bool include_ssd = true) {
_storage_engine->_store_map.clear();
_storage_engine->_available_storage_medium_type_count = 0;

if (include_hdd) {
auto hdd_dir = std::make_unique<DataDir>(*_storage_engine, _hdd_path, 100000000,
TStorageMedium::HDD);
auto init_status = hdd_dir->init();
EXPECT_TRUE(init_status.ok()) << "HDD DataDir init failed: " << init_status;
_storage_engine->_store_map[_hdd_path] = std::move(hdd_dir);
}

if (include_ssd) {
auto ssd_dir = std::make_unique<DataDir>(*_storage_engine, _ssd_path, 100000000,
TStorageMedium::SSD);
auto init_status = ssd_dir->init();
EXPECT_TRUE(init_status.ok()) << "SSD DataDir init failed: " << init_status;
_storage_engine->_store_map[_ssd_path] = std::move(ssd_dir);
}

// Count unique storage medium types
std::set<TStorageMedium::type> medium_types;
for (const auto& store : _storage_engine->_store_map) {
medium_types.insert(store.second->storage_medium());
}
_storage_engine->_available_storage_medium_type_count = medium_types.size();
}

std::unique_ptr<StorageEngine> _storage_engine;
std::string _test_path;
std::string _hdd_path;
std::string _ssd_path;
bool _original_fallback_config;
};

TEST_F(StorageMediumFallbackTest, NormalCase_SingleMedium_HDD) {
setupStorageEngine(true, false); // only HDD
config::enable_storage_medium_fallback = true;

std::vector<DirInfo> dir_infos;
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);

EXPECT_EQ(dir_infos.size(), 1);
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::HDD);
}

TEST_F(StorageMediumFallbackTest, NormalCase_MixedMedium_RequestHDD) {
setupStorageEngine(true, true); // both HDD and SSD
config::enable_storage_medium_fallback = true;

std::vector<DirInfo> dir_infos;
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);

EXPECT_EQ(dir_infos.size(), 1);
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::HDD);
}

TEST_F(StorageMediumFallbackTest, NormalCase_MixedMedium_RequestSSD) {
setupStorageEngine(true, true); // both HDD and SSD
config::enable_storage_medium_fallback = true;

std::vector<DirInfo> dir_infos;
_storage_engine->_get_candidate_stores(TStorageMedium::SSD, dir_infos);

EXPECT_EQ(dir_infos.size(), 1);
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::SSD);
}

TEST_F(StorageMediumFallbackTest, FallbackEnabled_SingleMediumInconsistent) {
setupStorageEngine(true, false); // only HDD
config::enable_storage_medium_fallback = true;

std::vector<DirInfo> dir_infos;
_storage_engine->_get_candidate_stores(TStorageMedium::SSD, dir_infos);

EXPECT_EQ(dir_infos.size(), 1);
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::HDD); // fallback to HDD
}

TEST_F(StorageMediumFallbackTest, FallbackEnabled_MixedMediumUnavailable) {
setupStorageEngine(false, true); // only SSD available
config::enable_storage_medium_fallback = true;

std::vector<DirInfo> dir_infos;
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);

EXPECT_EQ(dir_infos.size(), 1);
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::SSD); // fallback to SSD
}

TEST_F(StorageMediumFallbackTest, FallbackDisabled_SingleMediumInconsistent) {
// Single medium type always forces fallback regardless of config
setupStorageEngine(true, false); // only HDD
config::enable_storage_medium_fallback = false;

std::vector<DirInfo> dir_infos;
_storage_engine->_get_candidate_stores(TStorageMedium::SSD, dir_infos);

EXPECT_EQ(dir_infos.size(), 1); // forced fallback
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::HDD);
}

TEST_F(StorageMediumFallbackTest, FallbackDisabled_MixedMediumUnavailable) {
// Mixed environment: fallback disabled should be respected
setupStorageEngine(true, true); // both HDD and SSD
config::enable_storage_medium_fallback = false;

// Verify normal operation first
std::vector<DirInfo> dir_infos;
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);
EXPECT_EQ(dir_infos.size(), 1);
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::HDD);

// Simulate HDD becoming unavailable but maintain mixed environment count
_storage_engine->_store_map.erase(_hdd_path);
_storage_engine->_available_storage_medium_type_count = 2;

// Request unavailable HDD - should fail without fallback
dir_infos.clear();
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);

EXPECT_EQ(dir_infos.size(), 0); // no fallback when disabled
}

TEST_F(StorageMediumFallbackTest, EmptyStoreMap) {
_storage_engine->_store_map.clear();
_storage_engine->_available_storage_medium_type_count = 0;
config::enable_storage_medium_fallback = true;

std::vector<DirInfo> dir_infos;
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);

EXPECT_EQ(dir_infos.size(), 0);
}

TEST_F(StorageMediumFallbackTest, SingleMediumType_AlwaysFallback) {
setupStorageEngine(true, false); // only HDD
_storage_engine->_available_storage_medium_type_count = 1;
config::enable_storage_medium_fallback = false;

std::vector<DirInfo> dir_infos;
_storage_engine->_get_candidate_stores(TStorageMedium::SSD, dir_infos);

EXPECT_EQ(dir_infos.size(), 1); // forced fallback
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::HDD);
}

TEST_F(StorageMediumFallbackTest, Config_DefaultValue) {
config::enable_storage_medium_fallback = false; // default
setupStorageEngine(true, true); // both HDD and SSD

std::vector<DirInfo> dir_infos;
_storage_engine->_get_candidate_stores(TStorageMedium::SSD, dir_infos);

EXPECT_EQ(dir_infos.size(), 1);
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::SSD);

// Single medium type forces fallback even with default config
setupStorageEngine(true, false); // only HDD
dir_infos.clear();
_storage_engine->_get_candidate_stores(TStorageMedium::SSD, dir_infos);

EXPECT_EQ(dir_infos.size(), 1); // forced fallback
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::HDD);
}

TEST_F(StorageMediumFallbackTest, Config_MultiMediumFallbackControl) {
setupStorageEngine(true, true); // both HDD and SSD
config::enable_storage_medium_fallback = false;

// Normal operation works fine
std::vector<DirInfo> dir_infos;
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);
EXPECT_EQ(dir_infos.size(), 1);
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::HDD);

// Simulate HDD unavailable but maintain multi-medium count
_storage_engine->_store_map.erase(_hdd_path);
_storage_engine->_available_storage_medium_type_count = 2;

dir_infos.clear();
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);
EXPECT_EQ(dir_infos.size(), 0); // no fallback when disabled

// Enable fallback and test again
config::enable_storage_medium_fallback = true;
dir_infos.clear();
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);

EXPECT_EQ(dir_infos.size(), 1); // fallback to SSD
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::SSD);
}

} // namespace doris
15 changes: 14 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,19 @@ public void modifyPartitionsProperty(Database db,
Map<String, String> modifiedProperties = Maps.newHashMap();
modifiedProperties.putAll(properties);

// 4.2 handle medium_allocation_mode property
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_MEDIUM_ALLOCATION_MODE)) {
String mediumAllocationModeValue = properties.get(
PropertyAnalyzer.PROPERTIES_MEDIUM_ALLOCATION_MODE);
try {
DataProperty.MediumAllocationMode mediumAllocationMode
= DataProperty.MediumAllocationMode.fromString(mediumAllocationModeValue);
dataProperty.setMediumAllocationMode(mediumAllocationMode);
} catch (IllegalArgumentException e) {
throw new AnalysisException(e.getMessage());
}
}

// 4.3 modify partition storage policy
// can set multi times storage policy
String currentStoragePolicy = PropertyAnalyzer.analyzeStoragePolicy(properties);
Expand Down Expand Up @@ -1066,7 +1079,7 @@ public void modifyPartitionsProperty(Database db,
}
ModifyPartitionInfo info = new ModifyPartitionInfo(db.getId(), olapTable.getId(), partition.getId(),
newDataProperty, replicaAlloc, hasInMemory ? newInMemory : oldInMemory, currentStoragePolicy,
Maps.newHashMap());
Maps.newHashMap(), partitionName, isTempPartition);
modifyPartitionInfos.add(info);
}

Expand Down
Loading
Loading