Skip to content

Commit 0769892

Browse files
authored
fix: snapshot recover for disk table (#2174)
* fix: snapshot recover for disk table if snapshot dir not exists * hardlink snapshot to data dir rather than rename
1 parent 2b3ffd9 commit 0769892

File tree

3 files changed

+274
-10
lines changed

3 files changed

+274
-10
lines changed

src/base/file_util.h

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,26 @@ inline static int GetSubDir(const std::string& path,
103103
return 0;
104104
}
105105

106+
inline static int GetSubFiles(const std::string& path, std::vector<std::string>& sub_dir) { // NOLINT
107+
if (path.empty()) {
108+
return -1;
109+
}
110+
DIR* dir = opendir(path.c_str());
111+
if (dir == NULL) {
112+
return -1;
113+
}
114+
struct dirent* ptr;
115+
while ((ptr = readdir(dir)) != NULL) {
116+
if (strcmp(ptr->d_name, ".") == 0 || strcmp(ptr->d_name, "..") == 0) {
117+
continue;
118+
} else if (ptr->d_type == DT_REG) {
119+
sub_dir.push_back(ptr->d_name);
120+
}
121+
}
122+
closedir(dir);
123+
return 0;
124+
}
125+
106126
inline static int GetFileName(const std::string& path,
107127
std::vector<std::string>& file_vec) { // NOLINT
108128
if (path.empty()) {
@@ -287,6 +307,27 @@ __attribute__((unused)) static bool CopyFile(const std::string& src_file, const
287307
return has_error == false;
288308
}
289309

310+
inline static int HardLinkDir(const std::string& src, const std::string& dest) {
311+
if (!IsExists(src)) {
312+
return -2;
313+
}
314+
315+
if (IsExists(dest)) {
316+
RemoveDirRecursive(dest);
317+
}
318+
319+
MkdirRecur(dest);
320+
std::vector<std::string> files;
321+
GetSubFiles(src, files);
322+
for (const auto& file : files) {
323+
int ret = link((src + "/" + file).c_str(), (dest + "/" + file).c_str());
324+
if (ret) {
325+
return ret;
326+
}
327+
}
328+
return 0;
329+
}
330+
290331
} // namespace base
291332
} // namespace openmldb
292333

src/replica/snapshot_replica_test.cc

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ using ::openmldb::storage::Ticket;
4646
using ::openmldb::tablet::TabletImpl;
4747

4848
DECLARE_string(db_root_path);
49+
DECLARE_string(ssd_root_path);
4950
DECLARE_string(hdd_root_path);
5051
DECLARE_string(endpoint);
5152
DECLARE_int32(make_snapshot_threshold_offset);
@@ -326,6 +327,226 @@ TEST_P(SnapshotReplicaTest, SendSnapshot) {
326327
sleep(2);
327328
}
328329

330+
TEST_P(SnapshotReplicaTest, IncompleteSnapshot) {
331+
FLAGS_make_snapshot_threshold_offset = 0;
332+
uint32_t tid = 2;
333+
uint32_t pid = 123;
334+
auto storage_mode = GetParam();
335+
uint64_t cur_time = ::baidu::common::timer::get_micros() / 1000;
336+
{
337+
::openmldb::tablet::TabletImpl* tablet = new ::openmldb::tablet::TabletImpl();
338+
MockClosure closure;
339+
tablet->Init("");
340+
brpc::Server server;
341+
if (server.AddService(tablet, brpc::SERVER_OWNS_SERVICE) != 0) {
342+
PDLOG(WARNING, "fail to register tablet rpc service");
343+
exit(1);
344+
}
345+
brpc::ServerOptions options;
346+
std::string leader_point = "127.0.0.1:18529";
347+
if (server.Start(leader_point.c_str(), &options) != 0) {
348+
PDLOG(WARNING, "fail to start server %s", leader_point.c_str());
349+
exit(1);
350+
}
351+
352+
::openmldb::client::TabletClient client(leader_point, "");
353+
client.Init();
354+
std::vector<std::string> endpoints;
355+
bool ret =
356+
client.CreateTable("table1", tid, pid, 100000, 0, true, endpoints, ::openmldb::type::TTLType::kAbsoluteTime,
357+
16, 0, ::openmldb::type::CompressType::kNoCompress, storage_mode);
358+
ASSERT_TRUE(ret);
359+
ret = client.Put(tid, pid, "testkey", cur_time, ::openmldb::test::EncodeKV("testkey", "value1"));
360+
ASSERT_TRUE(ret);
361+
362+
uint32_t count = 0;
363+
while (count < 10) {
364+
count++;
365+
std::string key = "test";
366+
client.Put(tid, pid, key, cur_time + count, ::openmldb::test::EncodeKV(key, key));
367+
}
368+
::openmldb::api::GeneralRequest grq;
369+
grq.set_tid(tid);
370+
grq.set_pid(pid);
371+
grq.set_storage_mode(storage_mode);
372+
::openmldb::api::GeneralResponse grp;
373+
grp.set_code(-1);
374+
tablet->MakeSnapshot(NULL, &grq, &grp, &closure);
375+
sleep(5);
376+
}
377+
378+
{
379+
::openmldb::tablet::TabletImpl* tablet = new ::openmldb::tablet::TabletImpl();
380+
MockClosure closure;
381+
tablet->Init("");
382+
brpc::Server server;
383+
if (server.AddService(tablet, brpc::SERVER_OWNS_SERVICE) != 0) {
384+
PDLOG(WARNING, "fail to register tablet rpc service");
385+
exit(1);
386+
}
387+
brpc::ServerOptions options;
388+
std::string leader_point = "127.0.0.1:18529";
389+
if (server.Start(leader_point.c_str(), &options) != 0) {
390+
PDLOG(WARNING, "fail to start server %s", leader_point.c_str());
391+
exit(1);
392+
}
393+
394+
::openmldb::client::TabletClient client(leader_point, "");
395+
client.Init();
396+
397+
// load table
398+
::openmldb::api::TableMeta table_meta;
399+
table_meta.set_format_version(1);
400+
table_meta.set_name("table1");
401+
table_meta.set_tid(tid);
402+
table_meta.set_pid(pid);
403+
table_meta.set_storage_mode(storage_mode);
404+
client.LoadTable(table_meta, nullptr);
405+
sleep(5);
406+
407+
::openmldb::api::ScanRequest sr;
408+
sr.set_tid(tid);
409+
sr.set_pid(pid);
410+
sr.set_pk("testkey");
411+
sr.set_st(cur_time + 1);
412+
sr.set_et(cur_time - 1);
413+
sr.set_limit(10);
414+
::openmldb::api::ScanResponse srp;
415+
tablet->Scan(NULL, &sr, &srp, &closure);
416+
ASSERT_EQ(1, (int64_t)srp.count());
417+
ASSERT_EQ(0, srp.code());
418+
419+
sr.set_tid(tid);
420+
sr.set_pid(pid);
421+
sr.set_pk("test");
422+
sr.set_st(cur_time + 20);
423+
sr.set_et(cur_time - 20);
424+
sr.set_limit(20);
425+
tablet->Scan(NULL, &sr, &srp, &closure);
426+
ASSERT_EQ(10, (int64_t)srp.count());
427+
ASSERT_EQ(0, srp.code());
428+
429+
std::string key = "test2";
430+
ASSERT_TRUE(client.Put(tid, pid, key, cur_time, ::openmldb::test::EncodeKV(key, key)));
431+
432+
sr.set_tid(tid);
433+
sr.set_pid(pid);
434+
sr.set_pk("test2");
435+
sr.set_st(cur_time + 20);
436+
sr.set_et(cur_time - 20);
437+
sr.set_limit(20);
438+
tablet->Scan(NULL, &sr, &srp, &closure);
439+
ASSERT_EQ(1, (int64_t)srp.count());
440+
ASSERT_EQ(0, srp.code());
441+
}
442+
443+
// remove the snapshot file, only keeping the MANIFEST
444+
// i.e., corrupt the snapshot
445+
std::string db_root_path;
446+
if (storage_mode == common::kSSD) {
447+
db_root_path = FLAGS_ssd_root_path;
448+
} else if (storage_mode == common::kHDD) {
449+
db_root_path = FLAGS_hdd_root_path;
450+
} else {
451+
db_root_path = FLAGS_db_root_path;
452+
}
453+
std::string snapshot_path = absl::StrCat(db_root_path, "/", tid, "_", pid, "/snapshot/");
454+
std::vector<std::string> sub_dirs;
455+
::openmldb::base::GetSubDir(snapshot_path, sub_dirs);
456+
for (const auto& dir : sub_dirs) {
457+
auto sub_path = absl::StrCat(snapshot_path, dir);
458+
DLOG(INFO) << "remove snapshot path: " << sub_path;
459+
ASSERT_TRUE(::openmldb::base::RemoveDir(sub_path));
460+
}
461+
sleep(2);
462+
463+
{
464+
::openmldb::tablet::TabletImpl* tablet = new ::openmldb::tablet::TabletImpl();
465+
MockClosure closure;
466+
tablet->Init("");
467+
brpc::Server server;
468+
if (server.AddService(tablet, brpc::SERVER_OWNS_SERVICE) != 0) {
469+
PDLOG(WARNING, "fail to register tablet rpc service");
470+
exit(1);
471+
}
472+
brpc::ServerOptions options;
473+
std::string leader_point = "127.0.0.1:18529";
474+
if (server.Start(leader_point.c_str(), &options) != 0) {
475+
PDLOG(WARNING, "fail to start server %s", leader_point.c_str());
476+
exit(1);
477+
}
478+
479+
::openmldb::client::TabletClient client(leader_point, "");
480+
client.Init();
481+
482+
// load table
483+
::openmldb::api::TableMeta table_meta;
484+
table_meta.set_format_version(1);
485+
table_meta.set_name("table1");
486+
table_meta.set_tid(tid);
487+
table_meta.set_pid(pid);
488+
table_meta.set_storage_mode(storage_mode);
489+
client.LoadTable(table_meta, nullptr);
490+
sleep(5);
491+
492+
::openmldb::api::ScanRequest sr;
493+
sr.set_tid(tid);
494+
sr.set_pid(pid);
495+
sr.set_pk("testkey");
496+
sr.set_st(cur_time + 1);
497+
sr.set_et(cur_time - 1);
498+
sr.set_limit(10);
499+
::openmldb::api::ScanResponse srp;
500+
tablet->Scan(NULL, &sr, &srp, &closure);
501+
ASSERT_EQ(1, (int64_t)srp.count());
502+
ASSERT_EQ(0, srp.code());
503+
504+
sr.set_tid(tid);
505+
sr.set_pid(pid);
506+
sr.set_pk("test");
507+
sr.set_st(cur_time + 20);
508+
sr.set_et(cur_time - 20);
509+
sr.set_limit(20);
510+
tablet->Scan(NULL, &sr, &srp, &closure);
511+
ASSERT_EQ(10, (int64_t)srp.count());
512+
ASSERT_EQ(0, srp.code());
513+
514+
sr.set_tid(tid);
515+
sr.set_pid(pid);
516+
sr.set_pk("test2");
517+
sr.set_st(cur_time + 20);
518+
sr.set_et(cur_time - 20);
519+
sr.set_limit(20);
520+
tablet->Scan(NULL, &sr, &srp, &closure);
521+
ASSERT_EQ(1, (int64_t)srp.count());
522+
ASSERT_EQ(0, srp.code());
523+
524+
uint32_t count = 0;
525+
while (count < 10) {
526+
count++;
527+
std::string key = "test3";
528+
client.Put(tid, pid, key, cur_time + count, ::openmldb::test::EncodeKV(key, key));
529+
}
530+
531+
sr.set_tid(tid);
532+
sr.set_pid(pid);
533+
sr.set_pk("test3");
534+
sr.set_st(cur_time + 20);
535+
sr.set_et(cur_time - 20);
536+
sr.set_limit(20);
537+
tablet->Scan(NULL, &sr, &srp, &closure);
538+
ASSERT_EQ(10, (int64_t)srp.count());
539+
ASSERT_EQ(0, srp.code());
540+
541+
::openmldb::api::DropTableRequest dr;
542+
dr.set_tid(tid);
543+
dr.set_pid(pid);
544+
::openmldb::api::DropTableResponse drs;
545+
tablet->DropTable(NULL, &dr, &drs, &closure);
546+
sleep(2);
547+
}
548+
}
549+
329550
TEST_P(SnapshotReplicaTest, LeaderAndFollowerTS) {
330551
auto storage_mode = GetParam();
331552
::openmldb::tablet::TabletImpl* tablet = new ::openmldb::tablet::TabletImpl();

src/tablet/tablet_impl.cc

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3121,16 +3121,17 @@ int TabletImpl::LoadDiskTableInternal(uint32_t tid, uint32_t pid, const ::openml
31213121
std::string manifest_file = snapshot_path + "MANIFEST";
31223122
if (Snapshot::GetLocalManifest(manifest_file, manifest) == 0) {
31233123
std::string snapshot_dir = snapshot_path + manifest.name();
3124-
PDLOG(INFO, "rename dir %s to %s. tid %u pid %u", snapshot_dir.c_str(), data_path.c_str(), tid, pid);
3125-
if (!::openmldb::base::Rename(snapshot_dir, data_path)) {
3126-
PDLOG(WARNING, "rename dir failed. tid %u pid %u path %s", tid, pid, snapshot_dir.c_str());
3127-
break;
3128-
}
3129-
if (unlink(manifest_file.c_str()) < 0) {
3130-
PDLOG(WARNING, "remove manifest failed. tid %u pid %u path %s", tid, pid, manifest_file.c_str());
3131-
break;
3124+
if (::openmldb::base::IsExists(snapshot_dir)) {
3125+
PDLOG(INFO, "hardlink dir %s to %s (tid %u pid %u)", snapshot_dir.c_str(), data_path.c_str(), tid, pid);
3126+
if (::openmldb::base::HardLinkDir(snapshot_dir, data_path)) {
3127+
PDLOG(WARNING, "hardlink snapshot dir %s to data dir failed (tid %u pid %u)", snapshot_dir.c_str(),
3128+
tid, pid);
3129+
break;
3130+
}
3131+
snapshot_offset = manifest.offset();
3132+
} else {
3133+
PDLOG(WARNING, "snapshot_dir %s with tid %u pid %u not exists", snapshot_dir.c_str(), tid, pid);
31323134
}
3133-
snapshot_offset = manifest.offset();
31343135
}
31353136
std::string msg;
31363137
if (CreateTableInternal(&table_meta, msg) < 0) {
@@ -3178,7 +3179,6 @@ int TabletImpl::LoadDiskTableInternal(uint32_t tid, uint32_t pid, const ::openml
31783179
task_pool_.DelayTask(FLAGS_binlog_delete_interval,
31793180
boost::bind(&TabletImpl::SchedDelBinlog, this, tid, pid));
31803181
PDLOG(INFO, "load table success. tid %u pid %u", tid, pid);
3181-
MakeSnapshotInternal(tid, pid, 0, std::shared_ptr<::openmldb::api::TaskInfo>());
31823182
std::string old_data_path = table_path + "/old_data";
31833183
if (::openmldb::base::IsExists(old_data_path)) {
31843184
if (!::openmldb::base::RemoveDir(old_data_path)) {
@@ -3191,6 +3191,8 @@ int TabletImpl::LoadDiskTableInternal(uint32_t tid, uint32_t pid, const ::openml
31913191
task_ptr->set_status(::openmldb::api::TaskStatus::kDone);
31923192
return 0;
31933193
}
3194+
PDLOG(INFO, "Recover table with tid %u and pid %u from binlog offset %u to %u", tid, pid, snapshot_offset,
3195+
latest_offset);
31943196
} else {
31953197
DeleteTableInternal(tid, pid, std::shared_ptr<::openmldb::api::TaskInfo>());
31963198
}

0 commit comments

Comments
 (0)