Skip to content

Commit 2372ff0

Browse files
committed
Merge branch 'master' into LogController2
2 parents a0f3726 + 244adf6 commit 2372ff0

File tree

7 files changed

+134
-26
lines changed

7 files changed

+134
-26
lines changed

cloud/aws/aws_env.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,11 @@ class AwsEnv : public CloudEnvImpl {
286286
file_deletion_delay_ = delay;
287287
}
288288

289+
Status TEST_DeletePathInS3(const std::string& bucket,
290+
const std::string& fname) {
291+
return DeletePathInS3(bucket, fname);
292+
}
293+
289294
private:
290295
//
291296
// The AWS credentials are specified to the constructor via

cloud/cloud_env_impl.cc

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,24 @@ Status CloudEnvImpl::LoadLocalCloudManifest(const std::string& dbname) {
4747
if (cloud_manifest_) {
4848
cloud_manifest_.reset();
4949
}
50+
return CloudEnvImpl::LoadLocalCloudManifest(dbname, GetBaseEnv(),
51+
&cloud_manifest_);
52+
}
53+
54+
Status CloudEnvImpl::LoadLocalCloudManifest(
55+
const std::string& dbname, Env* base_env,
56+
std::unique_ptr<CloudManifest>* cloud_manifest) {
5057
std::unique_ptr<SequentialFile> file;
51-
auto cloudManifestFile = CloudManifestFile(dbname);
52-
auto s =
53-
GetBaseEnv()->NewSequentialFile(cloudManifestFile, &file, EnvOptions());
58+
auto cloud_manifest_file_name = CloudManifestFile(dbname);
59+
auto s = base_env->NewSequentialFile(cloud_manifest_file_name, &file,
60+
EnvOptions());
5461
if (!s.ok()) {
5562
return s;
5663
}
5764
return CloudManifest::LoadFromLog(
58-
std::unique_ptr<SequentialFileReader>(
59-
new SequentialFileReader(std::move(file), cloudManifestFile)),
60-
&cloud_manifest_);
65+
std::unique_ptr<SequentialFileReader>(
66+
new SequentialFileReader(std::move(file), cloud_manifest_file_name)),
67+
cloud_manifest);
6168
}
6269

6370
std::string CloudEnvImpl::RemapFilename(const std::string& logical_path) const {
@@ -438,14 +445,39 @@ Status CloudEnvImpl::NeedsReinitialization(const std::string& local_dir,
438445
}
439446

440447
// We found a local dbid but we did not find this dbid in bucket registry.
448+
// This is an ephemeral clone.
441449
if (src_object_path.empty() && dest_object_path.empty()) {
442450
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
443451
"[cloud_env_impl] NeedsReinitialization: "
444452
"local dbid %s does not have a mapping in cloud registry "
445453
"src bucket %s or dest bucket %s",
446454
local_dbid.c_str(), src_bucket.c_str(), dest_bucket.c_str());
447455

448-
// This is an ephemeral clone. Resync all files from cloud.
456+
// The local CLOUDMANIFEST on ephemeral clone is by definition out-of-sync
457+
// with the CLOUDMANIFEST in the cloud. That means we need to make sure the
458+
// local MANIFEST is compatible with the local CLOUDMANIFEST. Otherwise
459+
// there is no way we can recover since all MANIFEST files on the cloud are
460+
// only compatible with CLOUDMANIFEST on the cloud.
461+
//
462+
// If the local MANIFEST is not compatible with local CLOUDMANIFEST, we will
463+
// need to reinitialize the entire directory.
464+
std::unique_ptr<CloudManifest> cloud_manifest;
465+
Env* base_env = GetBaseEnv();
466+
Status load_status =
467+
LoadLocalCloudManifest(local_dir, base_env, &cloud_manifest);
468+
if (load_status.ok()) {
469+
std::string current_epoch = cloud_manifest->GetCurrentEpoch().ToString();
470+
Status local_manifest_exists =
471+
base_env->FileExists(ManifestFileWithEpoch(local_dir, current_epoch));
472+
if (!local_manifest_exists.ok()) {
473+
Log(InfoLogLevel::WARN_LEVEL, info_log_,
474+
"[cloud_env_impl] NeedsReinitialization: CLOUDMANIFEST exists "
475+
"locally, but no local MANIFEST is compatible");
476+
return Status::OK();
477+
}
478+
}
479+
480+
// Resync all files from cloud.
449481
// If the resycn failed, then return success to indicate that
450482
// the local directory needs to be completely removed and recreated.
451483
st = ResyncDir(local_dir);

cloud/cloud_env_impl.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,14 @@ class CloudEnvImpl : public CloudEnv {
4343
const DbidList& dbid_list, DbidParents* parents);
4444
virtual Status PreloadCloudManifest(const std::string& local_dbname) override;
4545

46+
// Load CLOUDMANIFEST if exists in local disk to current env.
4647
Status LoadLocalCloudManifest(const std::string& dbname);
48+
49+
// Local CLOUDMANIFEST from `base_env` into `cloud_manifest`.
50+
static Status LoadLocalCloudManifest(
51+
const std::string& dbname, Env* base_env,
52+
std::unique_ptr<CloudManifest>* cloud_manifest);
53+
4754
// Transfers the filename from RocksDB's domain to the physical domain, based
4855
// on information stored in CLOUDMANIFEST.
4956
// For example, it will map 00010.sst to 00010.sst-[epoch] where [epoch] is

cloud/db_cloud_test.cc

Lines changed: 76 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@
66

77
#include "rocksdb/cloud/db_cloud.h"
88

9-
#include <cinttypes>
109
#include <algorithm>
1110
#include <chrono>
11+
#include <cinttypes>
1212

1313
#include "cloud/aws/aws_env.h"
1414
#include "cloud/aws/aws_file.h"
1515
#include "cloud/db_cloud_impl.h"
1616
#include "cloud/filename.h"
1717
#include "cloud/manifest_reader.h"
18+
#include "logging/logging.h"
1819
#include "rocksdb/options.h"
1920
#include "rocksdb/status.h"
2021
#include "rocksdb/table.h"
@@ -133,11 +134,11 @@ class CloudTest : public testing::Test {
133134
}
134135

135136
// Creates and Opens a clone
136-
void CloneDB(const std::string& clone_name,
137-
const std::string& dest_bucket_name,
138-
const std::string& dest_object_path,
139-
std::unique_ptr<DBCloud>* cloud_db,
140-
std::unique_ptr<CloudEnv>* cloud_env) {
137+
Status CloneDB(const std::string& clone_name,
138+
const std::string& dest_bucket_name,
139+
const std::string& dest_object_path,
140+
std::unique_ptr<DBCloud>* cloud_db,
141+
std::unique_ptr<CloudEnv>* cloud_env) {
141142
// The local directory where the clone resides
142143
std::string cname = clone_dir_ + "/" + clone_name;
143144

@@ -153,11 +154,15 @@ class CloudTest : public testing::Test {
153154
copt.dest_bucket.SetBucketName(dest_bucket_name);
154155
}
155156
copt.dest_bucket.SetObjectPath(dest_object_path);
156-
if (! copt.dest_bucket.IsValid()) {
157+
if (!copt.dest_bucket.IsValid()) {
157158
copt.keep_local_sst_files = true;
158159
}
159160
// Create new AWS env
160-
ASSERT_OK(CloudEnv::NewAwsEnv(base_env_, copt, options_.info_log, &cenv));
161+
Status st = CloudEnv::NewAwsEnv(base_env_, copt, options_.info_log, &cenv);
162+
if (!st.ok()) {
163+
return st;
164+
}
165+
161166
// To catch any possible file deletion bugs, we set file deletion delay to
162167
// smallest possible
163168
((AwsEnv*)cenv)->TEST_SetFileDeletionDelay(std::chrono::seconds(0));
@@ -175,15 +180,20 @@ class CloudTest : public testing::Test {
175180
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cfopt));
176181
std::vector<ColumnFamilyHandle*> handles;
177182

178-
ASSERT_OK(DBCloud::Open(options_, cname, column_families,
179-
persistent_cache_path_, persistent_cache_size_gb_,
180-
&handles, &clone_db));
183+
st = DBCloud::Open(options_, cname, column_families, persistent_cache_path_,
184+
persistent_cache_size_gb_, &handles, &clone_db);
185+
if (!st.ok()) {
186+
return st;
187+
}
188+
181189
cloud_db->reset(clone_db);
182190

183191
// Delete the handle for the default column family because the DBImpl
184192
// always holds a reference to it.
185-
ASSERT_TRUE(handles.size() > 0);
193+
assert(handles.size() > 0);
186194
delete handles[0];
195+
196+
return st;
187197
}
188198

189199
void CloseDB() {
@@ -1165,6 +1175,60 @@ TEST_F(CloudTest, Ephemeral) {
11651175
}
11661176
}
11671177

1178+
// This test is performed in a rare race condition where ephemral clone is
1179+
// started after durable clone upload its CLOUDMANIFEST but before it uploads
1180+
// one of the MANIFEST. In this case, we want to verify that ephemeral clone is
1181+
// able to reinitialize instead of crash looping.
1182+
TEST_F(CloudTest, EphemeralOnCorruptedDB) {
1183+
cloud_env_options_.keep_local_sst_files = true;
1184+
options_.level0_file_num_compaction_trigger = 100; // never compact
1185+
1186+
OpenDB();
1187+
1188+
std::vector<std::string> files;
1189+
base_env_->GetChildren(dbname_, &files);
1190+
1191+
// Get the MANIFEST file
1192+
std::string manifest_file_name;
1193+
for (const auto& file_name : files) {
1194+
if (file_name.rfind("MANIFEST", 0) == 0) {
1195+
manifest_file_name = file_name;
1196+
break;
1197+
}
1198+
}
1199+
1200+
ASSERT_FALSE(manifest_file_name.empty());
1201+
1202+
// Delete MANIFEST file from S3 bucket.
1203+
// This is to simulate the scenario where CLOUDMANIFEST is uploaded, but
1204+
// MANIFEST is not yet uploaded from the durable shard.
1205+
auto aws_env = dynamic_cast<AwsEnv*>(aenv_.get());
1206+
ASSERT_TRUE(aws_env != nullptr);
1207+
aws_env->TEST_DeletePathInS3(
1208+
aws_env->GetSrcBucketName(),
1209+
aws_env->GetSrcObjectPath() + "/" + manifest_file_name);
1210+
1211+
// Ephemeral clone should fail.
1212+
std::unique_ptr<DBCloud> clone_db;
1213+
std::unique_ptr<CloudEnv> cenv;
1214+
Status st = CloneDB("clone1", "", "", &clone_db, &cenv);
1215+
ASSERT_NOK(st);
1216+
1217+
// Put the MANIFEST file back
1218+
aws_env->PutObject(dbname_ + "/" + manifest_file_name,
1219+
aws_env->GetSrcBucketName(),
1220+
aws_env->GetSrcObjectPath() + "/" + manifest_file_name);
1221+
1222+
// Try one more time. This time it should succeed.
1223+
clone_db.reset();
1224+
cenv.reset();
1225+
st = CloneDB("clone1", "", "", &clone_db, &cenv);
1226+
ASSERT_OK(st);
1227+
1228+
clone_db->Close();
1229+
CloseDB();
1230+
}
1231+
11681232
//
11691233
// Test Ephemeral clones with resyncOnOpen mode.
11701234
// In this mode, every open of the ephemeral clone db causes its

db/compaction/compaction_job.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1705,7 +1705,7 @@ void CompactionJob::RunRemote(PluggableCompactionService* service) {
17051705
files_in_one_level.level = f.level;
17061706
for (size_t i = 0; i < f.size(); i++) {
17071707
uint64_t fileno = f[i]->fd.GetNumber();
1708-
uint64_t pathid = f[i]->fd.GetPathId();
1708+
uint32_t pathid = f[i]->fd.GetPathId();
17091709
files_in_one_level.files.push_back(
17101710
TableFileName(c->immutable_cf_options()->cf_paths, fileno, pathid));
17111711
}

db/db_impl/db_impl.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -825,16 +825,16 @@ class DBImpl : public DB {
825825
virtual Status ExecuteRemoteCompactionRequest(
826826
const PluggableCompactionParam& inputParams,
827827
PluggableCompactionResult* result,
828-
bool sanitize);
828+
bool sanitize) override;
829829

830830
// This registered service will be called to do a remote compaction
831-
virtual Status RegisterPluggableCompactionService(std::unique_ptr<PluggableCompactionService> rservice) {
831+
virtual Status RegisterPluggableCompactionService(std::unique_ptr<PluggableCompactionService> rservice) override {
832832
remote_compaction_service_ = std::move(rservice);
833833
return Status::OK();
834834
}
835835

836836
// Clearoff any registered pluggable compaction service
837-
virtual void UnRegisterPluggableCompactionService() {
837+
virtual void UnRegisterPluggableCompactionService() override {
838838
remote_compaction_service_.reset(nullptr);
839839
}
840840

include/rocksdb/utilities/stackable_db.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -437,16 +437,16 @@ class StackableDB : public DB {
437437
virtual Status ExecuteRemoteCompactionRequest(
438438
const PluggableCompactionParam& inputParams,
439439
PluggableCompactionResult* result,
440-
bool sanitize) {
440+
bool sanitize) override {
441441
return db_->ExecuteRemoteCompactionRequest(inputParams, result, sanitize);
442442
}
443443

444444
virtual Status RegisterPluggableCompactionService(
445-
std::unique_ptr<PluggableCompactionService> rservice) {
445+
std::unique_ptr<PluggableCompactionService> rservice) override {
446446
return db_->RegisterPluggableCompactionService(std::move(rservice));
447447
}
448448

449-
virtual void UnRegisterPluggableCompactionService() {
449+
virtual void UnRegisterPluggableCompactionService() override {
450450
db_->UnRegisterPluggableCompactionService();
451451
}
452452

0 commit comments

Comments
 (0)