Skip to content

Commit 6f78c73

Browse files
committed
[feat](restore): enhance storage medium control
1 parent 06807b6 commit 6f78c73

40 files changed

+2728
-128
lines changed

be/src/common/config.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,8 @@ DEFINE_Int32(alter_index_worker_count, "3");
232232
DEFINE_Int32(clone_worker_count, "3");
233233
// the count of thread to clone
234234
DEFINE_Int32(storage_medium_migrate_count, "1");
235+
// Fallback to alternative medium at runtime. Primary decision is made by FE's medium_allocation_mode.
236+
DEFINE_mBool(enable_storage_medium_fallback, "false");
235237
// the count of thread to check consistency
236238
DEFINE_Int32(check_consistency_worker_count, "1");
237239
// the count of thread to upload

be/src/common/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,8 @@ DECLARE_Int32(alter_index_worker_count);
276276
DECLARE_Int32(clone_worker_count);
277277
// the count of thread to clone
278278
DECLARE_Int32(storage_medium_migrate_count);
279+
// Fallback to alternative medium at runtime. Primary decision is made by FE's medium_allocation_mode.
280+
DECLARE_mBool(enable_storage_medium_fallback);
279281
// the count of thread to check consistency
280282
DECLARE_Int32(check_consistency_worker_count);
281283
// the count of thread to upload

be/src/olap/storage_engine.cpp

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -563,19 +563,48 @@ int StorageEngine::_get_and_set_next_disk_index(int64_t partition_id,
563563
void StorageEngine::_get_candidate_stores(TStorageMedium::type storage_medium,
564564
std::vector<DirInfo>& dir_infos) {
565565
std::vector<double> usages;
566+
std::vector<DirInfo> fallback_dir_infos;
567+
// First pass: try to get stores with specified storage medium
566568
for (auto& it : _store_map) {
567569
DataDir* data_dir = it.second.get();
568-
if (data_dir->is_used()) {
569-
if ((_available_storage_medium_type_count == 1 ||
570-
data_dir->storage_medium() == storage_medium) &&
571-
!data_dir->reach_capacity_limit(0)) {
572-
double usage = data_dir->get_usage(0);
573-
DirInfo dir_info;
574-
dir_info.data_dir = data_dir;
575-
dir_info.usage = usage;
576-
dir_info.available_level = 0;
570+
if (data_dir->is_used() && !data_dir->reach_capacity_limit(0)) {
571+
double usage = data_dir->get_usage(0);
572+
DirInfo dir_info;
573+
dir_info.data_dir = data_dir;
574+
dir_info.usage = usage;
575+
dir_info.available_level = 0;
576+
577+
if (data_dir->storage_medium() == storage_medium) {
577578
usages.push_back(usage);
578579
dir_infos.push_back(dir_info);
580+
} else if (_available_storage_medium_type_count == 1 ||
581+
config::enable_storage_medium_fallback) {
582+
fallback_dir_infos.push_back(dir_info);
583+
}
584+
}
585+
}
586+
// Second pass: if no stores found with specified medium, use fallback stores
587+
// - When only one storage medium type available: always use it regardless of config
588+
// - When multiple storage medium types available: use fallback only if enabled
589+
if (dir_infos.empty() && !fallback_dir_infos.empty()) {
590+
bool should_use_fallback = (_available_storage_medium_type_count == 1) ||
591+
(_available_storage_medium_type_count > 1 &&
592+
config::enable_storage_medium_fallback);
593+
594+
if (should_use_fallback) {
595+
dir_infos = std::move(fallback_dir_infos);
596+
// Rebuild usages for fallback directories
597+
usages.clear();
598+
for (const auto& dir_info : dir_infos) {
599+
usages.push_back(dir_info.usage);
600+
}
601+
602+
if (_available_storage_medium_type_count == 1) {
603+
LOG(INFO) << "Only one storage medium type available, using it for storage medium "
604+
<< storage_medium;
605+
} else {
606+
LOG(INFO) << "No available stores found for specified storage medium "
607+
<< storage_medium << ", trying fallback to alternative storage mediums";
579608
}
580609
}
581610
}
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <gmock/gmock-actions.h>
19+
#include <gmock/gmock-matchers.h>
20+
#include <gtest/gtest-message.h>
21+
#include <gtest/gtest-test-part.h>
22+
#include <gtest/gtest.h>
23+
24+
#include <filesystem>
25+
#include <memory>
26+
27+
#include "common/config.h"
28+
#include "common/status.h"
29+
#include "gtest/gtest_pred_impl.h"
30+
#include "io/fs/local_file_system.h"
31+
#include "olap/data_dir.h"
32+
#include "olap/storage_engine.h"
33+
#include "olap/tablet_manager.h"
34+
#include "util/threadpool.h"
35+
36+
namespace doris {
37+
38+
class StorageMediumFallbackTest : public testing::Test {
39+
public:
40+
void SetUp() override {
41+
// Create test directories
42+
_test_path = "./be/test/olap/test_data/storage_medium_fallback_test";
43+
_hdd_path = _test_path + "/hdd";
44+
_ssd_path = _test_path + "/ssd";
45+
46+
// Clean up existing test directories
47+
auto st = io::global_local_filesystem()->delete_directory(_test_path);
48+
st = io::global_local_filesystem()->create_directory(_test_path);
49+
ASSERT_TRUE(st.ok()) << st;
50+
st = io::global_local_filesystem()->create_directory(_hdd_path);
51+
ASSERT_TRUE(st.ok()) << st;
52+
st = io::global_local_filesystem()->create_directory(_ssd_path);
53+
ASSERT_TRUE(st.ok()) << st;
54+
55+
// Create meta directories
56+
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_hdd_path + "/meta").ok());
57+
EXPECT_TRUE(io::global_local_filesystem()->create_directory(_ssd_path + "/meta").ok());
58+
59+
// Setup storage engine
60+
EngineOptions options;
61+
options.backend_uid = UniqueId::gen_uid();
62+
_storage_engine = std::make_unique<StorageEngine>(options);
63+
64+
// Store original config values
65+
_original_fallback_config = config::enable_storage_medium_fallback;
66+
}
67+
68+
void TearDown() override {
69+
// Restore original config
70+
config::enable_storage_medium_fallback = _original_fallback_config;
71+
72+
// Clean up test directories
73+
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_test_path).ok());
74+
ExecEnv::GetInstance()->set_storage_engine(nullptr);
75+
}
76+
77+
protected:
78+
// Helper method to setup storage engine with specific configuration
79+
void setupStorageEngine(bool include_hdd = true, bool include_ssd = true) {
80+
_storage_engine->_store_map.clear();
81+
_storage_engine->_available_storage_medium_type_count = 0;
82+
83+
if (include_hdd) {
84+
auto hdd_dir = std::make_unique<DataDir>(*_storage_engine, _hdd_path, 100000000,
85+
TStorageMedium::HDD);
86+
auto init_status = hdd_dir->init();
87+
EXPECT_TRUE(init_status.ok()) << "HDD DataDir init failed: " << init_status;
88+
_storage_engine->_store_map[_hdd_path] = std::move(hdd_dir);
89+
}
90+
91+
if (include_ssd) {
92+
auto ssd_dir = std::make_unique<DataDir>(*_storage_engine, _ssd_path, 100000000,
93+
TStorageMedium::SSD);
94+
auto init_status = ssd_dir->init();
95+
EXPECT_TRUE(init_status.ok()) << "SSD DataDir init failed: " << init_status;
96+
_storage_engine->_store_map[_ssd_path] = std::move(ssd_dir);
97+
}
98+
99+
// Count unique storage medium types
100+
std::set<TStorageMedium::type> medium_types;
101+
for (const auto& store : _storage_engine->_store_map) {
102+
medium_types.insert(store.second->storage_medium());
103+
}
104+
_storage_engine->_available_storage_medium_type_count = medium_types.size();
105+
}
106+
107+
std::unique_ptr<StorageEngine> _storage_engine;
108+
std::string _test_path;
109+
std::string _hdd_path;
110+
std::string _ssd_path;
111+
bool _original_fallback_config;
112+
};
113+
114+
TEST_F(StorageMediumFallbackTest, NormalCase_SingleMedium_HDD) {
115+
setupStorageEngine(true, false); // only HDD
116+
config::enable_storage_medium_fallback = true;
117+
118+
std::vector<DirInfo> dir_infos;
119+
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);
120+
121+
EXPECT_EQ(dir_infos.size(), 1);
122+
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::HDD);
123+
}
124+
125+
TEST_F(StorageMediumFallbackTest, NormalCase_MixedMedium_RequestHDD) {
126+
setupStorageEngine(true, true); // both HDD and SSD
127+
config::enable_storage_medium_fallback = true;
128+
129+
std::vector<DirInfo> dir_infos;
130+
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);
131+
132+
EXPECT_EQ(dir_infos.size(), 1);
133+
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::HDD);
134+
}
135+
136+
TEST_F(StorageMediumFallbackTest, NormalCase_MixedMedium_RequestSSD) {
137+
setupStorageEngine(true, true); // both HDD and SSD
138+
config::enable_storage_medium_fallback = true;
139+
140+
std::vector<DirInfo> dir_infos;
141+
_storage_engine->_get_candidate_stores(TStorageMedium::SSD, dir_infos);
142+
143+
EXPECT_EQ(dir_infos.size(), 1);
144+
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::SSD);
145+
}
146+
147+
TEST_F(StorageMediumFallbackTest, FallbackEnabled_SingleMediumInconsistent) {
148+
setupStorageEngine(true, false); // only HDD
149+
config::enable_storage_medium_fallback = true;
150+
151+
std::vector<DirInfo> dir_infos;
152+
_storage_engine->_get_candidate_stores(TStorageMedium::SSD, dir_infos);
153+
154+
EXPECT_EQ(dir_infos.size(), 1);
155+
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::HDD); // fallback to HDD
156+
}
157+
158+
TEST_F(StorageMediumFallbackTest, FallbackEnabled_MixedMediumUnavailable) {
159+
setupStorageEngine(false, true); // only SSD available
160+
config::enable_storage_medium_fallback = true;
161+
162+
std::vector<DirInfo> dir_infos;
163+
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);
164+
165+
EXPECT_EQ(dir_infos.size(), 1);
166+
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::SSD); // fallback to SSD
167+
}
168+
169+
TEST_F(StorageMediumFallbackTest, FallbackDisabled_SingleMediumInconsistent) {
170+
// Single medium type always forces fallback regardless of config
171+
setupStorageEngine(true, false); // only HDD
172+
config::enable_storage_medium_fallback = false;
173+
174+
std::vector<DirInfo> dir_infos;
175+
_storage_engine->_get_candidate_stores(TStorageMedium::SSD, dir_infos);
176+
177+
EXPECT_EQ(dir_infos.size(), 1); // forced fallback
178+
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::HDD);
179+
}
180+
181+
TEST_F(StorageMediumFallbackTest, FallbackDisabled_MixedMediumUnavailable) {
182+
// Mixed environment: fallback disabled should be respected
183+
setupStorageEngine(true, true); // both HDD and SSD
184+
config::enable_storage_medium_fallback = false;
185+
186+
// Verify normal operation first
187+
std::vector<DirInfo> dir_infos;
188+
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);
189+
EXPECT_EQ(dir_infos.size(), 1);
190+
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::HDD);
191+
192+
// Simulate HDD becoming unavailable but maintain mixed environment count
193+
_storage_engine->_store_map.erase(_hdd_path);
194+
_storage_engine->_available_storage_medium_type_count = 2;
195+
196+
// Request unavailable HDD - should fail without fallback
197+
dir_infos.clear();
198+
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);
199+
200+
EXPECT_EQ(dir_infos.size(), 0); // no fallback when disabled
201+
}
202+
203+
TEST_F(StorageMediumFallbackTest, EmptyStoreMap) {
204+
_storage_engine->_store_map.clear();
205+
_storage_engine->_available_storage_medium_type_count = 0;
206+
config::enable_storage_medium_fallback = true;
207+
208+
std::vector<DirInfo> dir_infos;
209+
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);
210+
211+
EXPECT_EQ(dir_infos.size(), 0);
212+
}
213+
214+
TEST_F(StorageMediumFallbackTest, SingleMediumType_AlwaysFallback) {
215+
setupStorageEngine(true, false); // only HDD
216+
_storage_engine->_available_storage_medium_type_count = 1;
217+
config::enable_storage_medium_fallback = false;
218+
219+
std::vector<DirInfo> dir_infos;
220+
_storage_engine->_get_candidate_stores(TStorageMedium::SSD, dir_infos);
221+
222+
EXPECT_EQ(dir_infos.size(), 1); // forced fallback
223+
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::HDD);
224+
}
225+
226+
TEST_F(StorageMediumFallbackTest, Config_DefaultValue) {
227+
config::enable_storage_medium_fallback = false; // default
228+
setupStorageEngine(true, true); // both HDD and SSD
229+
230+
std::vector<DirInfo> dir_infos;
231+
_storage_engine->_get_candidate_stores(TStorageMedium::SSD, dir_infos);
232+
233+
EXPECT_EQ(dir_infos.size(), 1);
234+
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::SSD);
235+
236+
// Single medium type forces fallback even with default config
237+
setupStorageEngine(true, false); // only HDD
238+
dir_infos.clear();
239+
_storage_engine->_get_candidate_stores(TStorageMedium::SSD, dir_infos);
240+
241+
EXPECT_EQ(dir_infos.size(), 1); // forced fallback
242+
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::HDD);
243+
}
244+
245+
TEST_F(StorageMediumFallbackTest, Config_MultiMediumFallbackControl) {
246+
setupStorageEngine(true, true); // both HDD and SSD
247+
config::enable_storage_medium_fallback = false;
248+
249+
// Normal operation works fine
250+
std::vector<DirInfo> dir_infos;
251+
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);
252+
EXPECT_EQ(dir_infos.size(), 1);
253+
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::HDD);
254+
255+
// Simulate HDD unavailable but maintain multi-medium count
256+
_storage_engine->_store_map.erase(_hdd_path);
257+
_storage_engine->_available_storage_medium_type_count = 2;
258+
259+
dir_infos.clear();
260+
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);
261+
EXPECT_EQ(dir_infos.size(), 0); // no fallback when disabled
262+
263+
// Enable fallback and test again
264+
config::enable_storage_medium_fallback = true;
265+
dir_infos.clear();
266+
_storage_engine->_get_candidate_stores(TStorageMedium::HDD, dir_infos);
267+
268+
EXPECT_EQ(dir_infos.size(), 1); // fallback to SSD
269+
EXPECT_EQ(dir_infos[0].data_dir->storage_medium(), TStorageMedium::SSD);
270+
}
271+
272+
} // namespace doris

fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1004,6 +1004,19 @@ public void modifyPartitionsProperty(Database db,
10041004
Map<String, String> modifiedProperties = Maps.newHashMap();
10051005
modifiedProperties.putAll(properties);
10061006

1007+
// 4.2 handle medium_allocation_mode property
1008+
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_MEDIUM_ALLOCATION_MODE)) {
1009+
String mediumAllocationModeValue = properties.get(
1010+
PropertyAnalyzer.PROPERTIES_MEDIUM_ALLOCATION_MODE);
1011+
try {
1012+
DataProperty.MediumAllocationMode mediumAllocationMode
1013+
= DataProperty.MediumAllocationMode.fromString(mediumAllocationModeValue);
1014+
dataProperty.setMediumAllocationMode(mediumAllocationMode);
1015+
} catch (IllegalArgumentException e) {
1016+
throw new AnalysisException(e.getMessage());
1017+
}
1018+
}
1019+
10071020
// 4.3 modify partition storage policy
10081021
// can set multi times storage policy
10091022
String currentStoragePolicy = PropertyAnalyzer.analyzeStoragePolicy(properties);
@@ -1066,7 +1079,7 @@ public void modifyPartitionsProperty(Database db,
10661079
}
10671080
ModifyPartitionInfo info = new ModifyPartitionInfo(db.getId(), olapTable.getId(), partition.getId(),
10681081
newDataProperty, replicaAlloc, hasInMemory ? newInMemory : oldInMemory, currentStoragePolicy,
1069-
Maps.newHashMap());
1082+
Maps.newHashMap(), partitionName, isTempPartition);
10701083
modifyPartitionInfos.add(info);
10711084
}
10721085

0 commit comments

Comments
 (0)