Skip to content

Commit e488884

Browse files
committed
Concurrent load disk when cs starts #880
(#880)
1 parent 09abd6d commit e488884

File tree

4 files changed

+59
-16
lines changed

4 files changed

+59
-16
lines changed

src/chunkserver/block_manager.cc

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,15 +165,46 @@ int64_t BlockManager::DiskQuota() const {
165165

166166
// TODO: concurrent load
167167
bool BlockManager::LoadStorage() {
168-
bool ret = true;
168+
// return value from Disk::LoadStorage:
169+
// 0: initial state, 1: done, -1: error occured
170+
std::vector<int> ret_vals;
171+
ret_vals.resize(disks_.size());
172+
ThreadPool tp(disks_.size());
173+
int disk_index = 0;
169174
for (auto it = disks_.begin(); it != disks_.end(); ++it) {
170175
Disk* disk = it->second;
171-
ret = ret && disk->LoadStorage(std::bind(&BlockManager::AddBlock,
176+
std::function<bool (int64_t, Disk*, BlockMeta)> callback = std::bind(&BlockManager::AddBlock,
172177
this, std::placeholders::_1,
173178
std::placeholders::_2,
174-
std::placeholders::_3));
175-
disk_quota_ += disk->GetQuota();
179+
std::placeholders::_3);
180+
tp.AddTask(std::bind(&Disk::LoadStorage, disk, callback, &(ret_vals[disk_index])));
181+
++disk_index;
182+
}
183+
bool ret = true;
184+
while (true) {
185+
sleep(1);
186+
bool done = true;
187+
for (auto it = ret_vals.begin(); it != ret_vals.end(); ++it) {
188+
if (*it < 0) {
189+
ret = false;
190+
break;
191+
} else if (*it == 0) {
192+
done = false;
193+
break;
194+
}
195+
}
196+
if (done || !ret) {
197+
break;
198+
}
199+
}
200+
if (ret) {
201+
for (auto it = disks_.begin(); it != disks_.end(); ++it) {
202+
Disk* disk = it->second;
203+
disk_quota_ += disk->GetQuota();
204+
}
176205
}
206+
tp.Stop(false);
207+
LOG(INFO, "LoadStorage done. Quota = %ld", disk_quota_);
177208
return ret;
178209
}
179210

src/chunkserver/disk.cc

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,16 @@ Disk::~Disk() {
3535
metadb_ = NULL;
3636
}
3737

38-
bool Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback) {
38+
void Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback, int* ret_val) {
3939
MutexLock lock(&mu_);
4040
int64_t start_load_time = common::timer::get_micros();
4141
leveldb::Options options;
4242
options.create_if_missing = true;
4343
leveldb::Status s = leveldb::DB::Open(options, path_ + "meta/", &metadb_);
4444
if (!s.ok()) {
4545
LOG(WARNING, "Load blocks fail: %s", s.ToString().c_str());
46-
return false;
46+
*ret_val = -1;
47+
return;
4748
}
4849

4950
std::string version_key(8, '\0');
@@ -61,7 +62,8 @@ bool Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback)
6162
if (1 != sscanf(it->key().data(), "%ld", &block_id)) {
6263
LOG(WARNING, "Unknown key: %s\n", it->key().ToString().c_str());
6364
delete it;
64-
return false;
65+
*ret_val = -1;
66+
return;
6567
}
6668
BlockMeta meta;
6769
if (!meta.ParseFromArray(it->value().data(), it->value().size())) {
@@ -97,13 +99,15 @@ bool Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback)
9799
}
98100
delete it;
99101
int64_t end_load_time = common::timer::get_micros();
100-
LOG(INFO, "Disk %s Load %ld blocks, use %ld ms, namespace version: %ld",
101-
path_.c_str(), block_num, (end_load_time - start_load_time) / 1000, namespace_version_);
102+
LOG(INFO, "Disk %s Load %ld blocks, use %ld ms, namespace version: %ld, size %s",
103+
path_.c_str(), block_num, (end_load_time - start_load_time) / 1000,
104+
namespace_version_, common::HumanReadableString(counters_.data_size.Get()).c_str());
102105
if (namespace_version_ == 0 && block_num > 0) {
103106
LOG(WARNING, "Namespace version lost!");
104107
}
105108
quota_ += counters_.data_size.Get();
106-
return true;
109+
*ret_val = 1;
110+
return;
107111
}
108112

109113
std::string Disk::Path() const {

src/chunkserver/disk.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class Disk {
3636
Disk(const std::string& path, int64_t quota);
3737
~Disk();
3838
std::string Path() const;
39-
bool LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback);
39+
void LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback, int* ret_val);
4040
int64_t NamespaceVersion() const;
4141
bool SetNamespaceVersion(int64_t version);
4242
void Seek(int64_t block_id, std::vector<leveldb::Iterator*>* iters);

src/chunkserver/test/data_block_test.cc

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,20 @@ class DataBlockTest : public ::testing::Test {
2020
protected:
2121
};
2222

23-
void AddBlock(int64_t block_id, Disk* disk, BlockMeta meta) {
23+
void AdBlock(int64_t block_id, Disk* disk, BlockMeta meta) {
2424
// do nothing
2525
}
2626

2727
TEST_F(DataBlockTest, CreateBlock) {
2828
mkdir("./block123", 0755);
2929
std::string file_path("./block123");
3030
Disk disk(file_path, 1000000);
31-
disk.LoadStorage(std::bind(AddBlock, std::placeholders::_1,
32-
std::placeholders::_2, std::placeholders::_3));
31+
int ret_val;
32+
std::function<void (int64_t, Disk*, BlockMeta)> callback = std::bind(AdBlock,
33+
std::placeholders::_1,
34+
std::placeholders::_2,
35+
std::placeholders::_3);
36+
disk.LoadStorage(callback, &ret_val);
3337
BlockMeta meta;
3438
FileCache file_cache(10);
3539
int64_t block_id = 123;
@@ -51,8 +55,12 @@ TEST_F(DataBlockTest, WriteAndReadBlock) {
5155
mkdir("./block123", 0755);
5256
std::string file_path("./block123");
5357
Disk disk(file_path, 1000000);
54-
disk.LoadStorage(std::bind(AddBlock, std::placeholders::_1,
55-
std::placeholders::_2, std::placeholders::_3));
58+
int ret_val;
59+
std::function<void (int64_t, Disk*, BlockMeta)> callback = std::bind(AdBlock,
60+
std::placeholders::_1,
61+
std::placeholders::_2,
62+
std::placeholders::_3);
63+
disk.LoadStorage(callback, &ret_val);
5664
FileCache file_cache(10);
5765
int64_t block_id = 123;
5866
meta.set_block_id(block_id);

0 commit comments

Comments
 (0)