diff --git a/CMakeLists.txt b/CMakeLists.txt index ec149e9be0c..442d9938835 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -185,7 +185,7 @@ endif() if(WITH_AWS) find_package(AWSSDK REQUIRED COMPONENTS s3 transfer kinesis) - add_definitions(-DUSE_AWS) + add_definitions(-DUSE_AWS -DUSE_CLOUD) include_directories(${AWS_INCLUDE_DIR}) list(APPEND THIRDPARTY_LIBS ${AWSSDK_LINK_LIBRARIES}) endif() @@ -919,6 +919,7 @@ set(SOURCES cloud/cloud_scheduler.cc cloud/cloud_storage_provider.cc cloud/cloud_file_cache.cc + cloud/mock_cloud_storage_provider.cc db/db_impl/db_impl_remote_compaction.cc $) diff --git a/build_tools/build_detect_platform b/build_tools/build_detect_platform index b8e469e2a9c..834a1827d57 100755 --- a/build_tools/build_detect_platform +++ b/build_tools/build_detect_platform @@ -638,6 +638,7 @@ if [ "${USE_AWS}XXX" = "1XXX" ]; then S3_LDFLAGS="$S3_LDFLAGS -laws-cpp-sdk-s3 -laws-cpp-sdk-kinesis -laws-cpp-sdk-core -laws-cpp-sdk-transfer" COMMON_FLAGS="$COMMON_FLAGS $S3_CCFLAGS" PLATFORM_LDFLAGS="$S3_LDFLAGS $PLATFORM_LDFLAGS" + USE_CLOUD=1 fi # # Support the Kafka WAL storing if the env variable named USE_KAFKA @@ -647,6 +648,13 @@ fi if [ "${USE_KAFKA}XXX" = "1XXX" ]; then COMMON_FLAGS="$COMMON_FLAGS -DUSE_KAFKA" PLATFORM_LDFLAGS="-lrdkafka++ $PLATFORM_LDFLAGS" + USE_CLOUD=1 +fi + +if [ "${USE_CLOUD}XXX" = "1XXX" ]; then + COMMON_FLAGS="$COMMON_FLAGS -DUSE_CLOUD" + #TODO: How to only turn on unit tests when appropriate + COMMON_FLAGS="$COMMON_FLAGS -DROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS" fi if test "0$PORTABLE" -eq 0; then diff --git a/cloud/aws/aws_env.h b/cloud/aws/aws_env.h index a1bd01704f9..ffc4c8ecc2c 100644 --- a/cloud/aws/aws_env.h +++ b/cloud/aws/aws_env.h @@ -10,8 +10,6 @@ #include "cloud/cloud_env_impl.h" -#ifdef USE_AWS - #include #include @@ -49,6 +47,7 @@ class AwsEnv : public CloudEnvImpl { const std::shared_ptr& info_log, CloudEnv** cenv); static Status NewAwsEnv(Env* env, std::unique_ptr* cenv); +#ifdef USE_AWS virtual ~AwsEnv() {} static const char* kName() { return kAws(); } @@ -73,8 +72,7 @@ class AwsEnv : public CloudEnvImpl { // explicit AwsEnv(Env* underlying_env, const CloudEnvOptions& cloud_options, const std::shared_ptr& info_log = nullptr); +#endif // USE_AWS }; } // namespace ROCKSDB_NAMESPACE - -#endif // USE_AWS diff --git a/cloud/cloud_env.cc b/cloud/cloud_env.cc index 633c8728fab..a7d7fef2b0c 100644 --- a/cloud/cloud_env.cc +++ b/cloud/cloud_env.cc @@ -15,6 +15,7 @@ #include "cloud/cloud_storage_provider_impl.h" #include "cloud/db_cloud_impl.h" #include "cloud/filename.h" +#include "cloud/mock_cloud_storage_provider.h" #include "options/configurable_helper.h" #include "options/options_helper.h" #include "port/likely.h" @@ -116,6 +117,21 @@ void BucketOptions::TEST_Initialize(const std::string& bucket, } } +static void ParseTestBucket(const std::string& value, std::string* name, + std::string* path, std::string* region) { + *name = value; + auto pos = name->find(":"); + if (pos != std::string::npos) { + *path = name->substr(pos + 1); + *name = name->substr(0, pos); + } + pos = path->find("?"); + if (pos != std::string::npos) { + *region = path->substr(pos + 1); + *path = path->substr(0, pos); + } +} + static std::unordered_map bucket_options_type_info = { {"object", @@ -211,16 +227,7 @@ static std::unordered_map std::string name = value; std::string path; std::string region; - auto pos = name.find(":"); - if (pos != std::string::npos) { - path = name.substr(pos + 1); - name = name.substr(0, pos); - } - pos = path.find("?"); - if (pos != std::string::npos) { - region = path.substr(pos + 1); - path = path.substr(0, pos); - } + ParseTestBucket(value, &name, &path, ®ion); bucket->TEST_Initialize(name, path, region); return Status::OK(); }}}, @@ -308,16 +315,7 @@ static std::unordered_map std::string name; std::string path; std::string region; - auto pos = value.find(":"); - if (pos != std::string::npos) { - name = value.substr(0, pos); - path = value.substr(pos + 1); - } - pos = path.find("?"); - if (pos != std::string::npos) { - region = path.substr(pos + 1); - path = path.substr(0, pos); - } + ParseTestBucket(value, &name, &path, ®ion); copts->src_bucket.TEST_Initialize(name, path, region); copts->dest_bucket.TEST_Initialize(name, path, region); return Status::OK(); @@ -385,6 +383,7 @@ Status CloudEnv::NewAwsEnv( return NewAwsEnv(base_env, options, logger, cenv); } +namespace { int DoRegisterCloudObjects(ObjectLibrary& library, const std::string& arg) { int count = 0; // Register the Env types @@ -396,6 +395,15 @@ int DoRegisterCloudObjects(ObjectLibrary& library, const std::string& arg) { return guard->get(); }); count++; + library.Register( + MockCloudStorageProvider::kClassName(), + [](const std::string& /*uri*/, + std::unique_ptr* guard, + std::string* /*errmsg*/) { + guard->reset(new MockCloudStorageProvider()); + return guard->get(); + }); + count++; count += CloudEnvImpl::RegisterAwsObjects(library, arg); @@ -416,14 +424,15 @@ int DoRegisterCloudObjects(ObjectLibrary& library, const std::string& arg) { return count; } -void CloudEnv::RegisterCloudObjects(const std::string& arg) { +static void RegisterCloudObjects(const std::string& arg = "") { static std::once_flag do_once; std::call_once(do_once, [&]() { auto library = ObjectLibrary::Default(); DoRegisterCloudObjects(*library, arg); }); -} +} +} // namespace Status CloudEnv::CreateFromString(const ConfigOptions& config_options, const std::string& value, std::unique_ptr* result) { @@ -458,7 +467,6 @@ Status CloudEnv::CreateFromString(const ConfigOptions& config_options, const std s = cenv->ConfigureFromMap(copy, options); } if (s.ok() && config_options.invoke_prepare_options) { - copy.invoke_prepare_options = config_options.invoke_prepare_options; copy.env = cenv; s = cenv->PrepareOptions(copy); if (s.ok()) { @@ -538,7 +546,7 @@ Status CloudEnv::NewAwsEnv(Env* /*base_env*/, Status CloudEnv::NewAwsEnv(Env* base_env, const CloudEnvOptions& options, const std::shared_ptr& logger, CloudEnv** cenv) { - CloudEnv::RegisterCloudObjects(); + RegisterCloudObjects(); // Dump out cloud env options options.Dump(logger.get()); @@ -558,4 +566,90 @@ Status CloudEnv::NewAwsEnv(Env* base_env, const CloudEnvOptions& options, #endif } // namespace ROCKSDB_NAMESPACE +#ifdef USE_CLOUD +#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS +static std::string ToTestBucket(const std::string& name, + const std::string& path, + const std::string& region = "") { + std::string result = name; + for (auto pos = result.find("_"); pos != std::string::npos; + pos = result.find("_", pos)) { + result[pos] = '.'; + } + if (!path.empty()) { + result.append(":"); + result.append(path); + } + if (!region.empty()) { + result.append("?"); + result.append(region); + } + return result; +} + +static std::string ToTestBucket(const std::string& name) { + // Randomize the test path so that multiple tests can run in parallel + srand(static_cast(time(nullptr))); + std::string path = name + "_" + std::to_string(rand()); + return ToTestBucket(name, "/" + path); +} +extern "C" { +void RegisterCustomObjects(int argc, char** argv) { + std::string test_id = (argc > 0) ? argv[0] : "db_test"; + auto slash = test_id.find_last_of("/\\"); + if (slash != std::string::npos) { + test_id = test_id.substr(slash + 1); + } + + //**TODO: When the Env is a Customizable object and can use options/map, this + //code can go away... + // ... in which case, the RegisterCloudObjects should take in the test_id to + // register for initialize + auto library = ROCKSDB_NAMESPACE::ObjectLibrary::Default(); + library->Register( + "id=.*", [test_id](const std::string& uri, + std::unique_ptr* guard, + std::string* errmsg) { + ROCKSDB_NAMESPACE::ConfigOptions config_options; + std::unique_ptr cguard; + auto s = ROCKSDB_NAMESPACE::CloudEnv::CreateFromString( + config_options, "TEST=" + ToTestBucket(test_id) + "; " + uri, + &cguard); + if (s.ok()) { + auto* cimpl = + static_cast(cguard.get()); + cimpl->TEST_DisableCloudManifest(); + cimpl->TEST_SetFileDeletionDelay(std::chrono::seconds(0)); + guard->reset(cguard.release()); + } else { + *errmsg = s.ToString(); + } + return guard->get(); + }); + library->Register( + "provider=.*", [test_id](const std::string& uri, + std::unique_ptr* guard, + std::string* errmsg) { + ROCKSDB_NAMESPACE::ConfigOptions config_options; + std::unique_ptr cguard; + auto s = ROCKSDB_NAMESPACE::CloudEnv::CreateFromString( + config_options, "TEST=" + ToTestBucket(test_id) + "; " + uri, + &cguard); + if (s.ok()) { + auto* cimpl = + static_cast(cguard.get()); + cimpl->TEST_DisableCloudManifest(); + cimpl->TEST_SetFileDeletionDelay(std::chrono::seconds(0)); + guard->reset(cguard.release()); + } else { + *errmsg = s.ToString(); + } + return guard->get(); + }); + + ROCKSDB_NAMESPACE::RegisterCloudObjects(test_id); +} +} +#endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS +#endif // USE_CLOUD #endif // ROCKSDB_LITE diff --git a/cloud/cloud_env_impl.cc b/cloud/cloud_env_impl.cc index 8efdd4b3d8a..92b148dea58 100644 --- a/cloud/cloud_env_impl.cc +++ b/cloud/cloud_env_impl.cc @@ -16,6 +16,7 @@ #include "file/writable_file_writer.h" #include "port/likely.h" #include "rocksdb/cloud/cloud_log_controller.h" +#include "rocksdb/convenience.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/options.h" @@ -46,6 +47,9 @@ CloudEnvImpl::~CloudEnvImpl() { files_to_delete_.clear(); } StopPurger(); + // Since scheduled jobs may use CloudEnv members, shutdown the scheduler + // before destruction is complete. + scheduler_.reset(); } Status CloudEnvImpl::ExistsCloudObject(const std::string& fname) { @@ -1951,16 +1955,18 @@ Status CloudEnvImpl::PrepareOptions(const ConfigOptions& options) { if (!base_env_) { base_env_ = Env::Default(); } + ConfigOptions copy = options; + copy.env = this; Status status; if (!cloud_env_options.cloud_log_controller && !cloud_env_options.keep_local_log_files) { if (cloud_env_options.log_type == LogType::kLogKinesis) { status = CloudLogController::CreateFromString( - options, CloudLogControllerImpl::kKinesis(), + copy, CloudLogControllerImpl::kKinesis(), &cloud_env_options.cloud_log_controller); } else if (cloud_env_options.log_type == LogType::kLogKafka) { status = CloudLogController::CreateFromString( - options, CloudLogControllerImpl::kKafka(), + copy, CloudLogControllerImpl::kKafka(), &cloud_env_options.cloud_log_controller); } else { status = Status::NotSupported("Unsupported log controller type"); diff --git a/cloud/db_cloud_test.cc b/cloud/db_cloud_test.cc index 539a61309e1..2b409d1c136 100644 --- a/cloud/db_cloud_test.cc +++ b/cloud/db_cloud_test.cc @@ -2,8 +2,6 @@ #ifndef ROCKSDB_LITE -#ifdef USE_AWS - #include "rocksdb/cloud/db_cloud.h" #include @@ -15,12 +13,15 @@ #include "cloud/manifest_reader.h" #include "file/filename.h" #include "logging/logging.h" +#include "port/stack_trace.h" #include "rocksdb/cloud/cloud_storage_provider.h" +#include "rocksdb/convenience.h" #include "rocksdb/options.h" #include "rocksdb/status.h" #include "rocksdb/table.h" #include "test_util/testharness.h" #include "util/random.h" +#include "util/stderr_logger.h" #include "util/string_util.h" #ifndef OS_WIN #include @@ -28,80 +29,79 @@ namespace ROCKSDB_NAMESPACE { -class CloudTest : public testing::Test { +class CloudTest : public testing::Test, + public ::testing::WithParamInterface { public: CloudTest() { Random64 rng(time(nullptr)); test_id_ = std::to_string(rng.Next()); - fprintf(stderr, "Test ID: %s\n", test_id_.c_str()); + cloud_opts_str_ = GetParam(); + + fprintf(stderr, "Test[%s] ID: %s\n", cloud_opts_str_.c_str(), + test_id_.c_str()); - base_env_ = Env::Default(); dbname_ = test::TmpDir() + "/db_cloud-" + test_id_; clone_dir_ = test::TmpDir() + "/ctest-" + test_id_; - cloud_env_options_.TEST_Initialize("dbcloudtest.", dbname_); + cloud_env_options_.TEST_Initialize("dbcloudtest.", "/db_cloud-" + test_id_); options_.create_if_missing = true; persistent_cache_path_ = ""; persistent_cache_size_gb_ = 0; db_ = nullptr; + Env* base = Env::Default(); DestroyDir(dbname_); - base_env_->CreateDirIfMissing(dbname_); - base_env_->NewLogger(test::TmpDir(base_env_) + "/rocksdb-cloud.log", - &options_.info_log); + base->CreateDirIfMissing(dbname_); + base->NewLogger(test::TmpDir(base) + "/rocksdb-cloud.log", + &options_.info_log); options_.info_log->SetInfoLogLevel(InfoLogLevel::DEBUG_LEVEL); Cleanup(); } void Cleanup() { - ASSERT_TRUE(!aenv_); + EXPECT_TRUE(!aenv_); - // check cloud credentials - ASSERT_TRUE(cloud_env_options_.credentials.HasValid().ok()); - - CloudEnv* aenv; - // create a dummy aws env - ASSERT_OK(CloudEnv::NewAwsEnv(base_env_, cloud_env_options_, - options_.info_log, &aenv)); - ASSERT_NE(aenv, nullptr); - aenv_.reset(aenv); + // create a dummy cloud env + EXPECT_OK(CloudEnv::CreateFromString(config_options_, cloud_opts_str_, + cloud_env_options_, &aenv_)); + EXPECT_NE(aenv_, nullptr); // delete all pre-existing contents from the bucket Status st = aenv_->GetStorageProvider()->EmptyBucket( aenv_->GetSrcBucketName(), dbname_); - ASSERT_TRUE(st.ok() || st.IsNotFound()); - aenv_.reset(); + EXPECT_TRUE(st.ok() || st.IsNotFound()); DestroyDir(clone_dir_); - ASSERT_OK(base_env_->CreateDir(clone_dir_)); + ASSERT_OK(aenv_->GetBaseEnv()->CreateDir(clone_dir_)); + aenv_.reset(); } - std::set GetSSTFiles(std::string name) { + // Return total size of all sst files available locally + void GetSSTFilesTotalSize(std::string name, uint64_t* total_size) { std::vector files; aenv_->GetBaseEnv()->GetChildren(name, &files); std::set sst_files; + uint64_t local_size = 0; for (auto& f : files) { if (IsSstFile(RemoveEpoch(f))) { sst_files.insert(f); + std::string lpath = dbname_ + "/" + f; + ASSERT_OK(aenv_->GetBaseEnv()->GetFileSize(lpath, &local_size)); + (*total_size) += local_size; } } - return sst_files; } - // Return total size of all sst files available locally - void GetSSTFilesTotalSize(std::string name, uint64_t* total_size) { + std::set GetSSTFiles(std::string name) { std::vector files; aenv_->GetBaseEnv()->GetChildren(name, &files); std::set sst_files; - uint64_t local_size = 0; for (auto& f : files) { if (IsSstFile(RemoveEpoch(f))) { sst_files.insert(f); - std::string lpath = dbname_ + "/" + f; - ASSERT_OK(aenv_->GetBaseEnv()->GetFileSize(lpath, &local_size)); - (*total_size) += local_size; } } + return sst_files; } std::set GetSSTFilesClone(std::string name) { @@ -118,13 +118,12 @@ class CloudTest : public testing::Test { virtual ~CloudTest() { // Cleanup the cloud bucket if (!cloud_env_options_.src_bucket.GetBucketName().empty()) { - CloudEnv* aenv; - Status st = CloudEnv::NewAwsEnv(base_env_, cloud_env_options_, - options_.info_log, &aenv); + std::unique_ptr cenv; + Status st = CloudEnv::CreateFromString(config_options_, cloud_opts_str_, + cloud_env_options_, &cenv); if (st.ok()) { - aenv->GetStorageProvider()->EmptyBucket(aenv->GetSrcBucketName(), + cenv->GetStorageProvider()->EmptyBucket(cenv->GetSrcBucketName(), dbname_); - delete aenv; } } @@ -132,15 +131,13 @@ class CloudTest : public testing::Test { } void CreateCloudEnv() { - CloudEnv* cenv; cloud_env_options_.use_aws_transfer_manager = true; - ASSERT_OK(CloudEnv::NewAwsEnv(base_env_, cloud_env_options_, - options_.info_log, &cenv)); + ASSERT_OK(CloudEnv::CreateFromString(config_options_, cloud_opts_str_, + cloud_env_options_, &aenv_)); // To catch any possible file deletion bugs, we set file deletion delay to // smallest possible - CloudEnvImpl* cimpl = static_cast(cenv); + CloudEnvImpl* cimpl = static_cast(aenv_.get()); cimpl->TEST_SetFileDeletionDelay(std::chrono::seconds(0)); - aenv_.reset(cenv); } // Open database via the cloud interface @@ -161,9 +158,7 @@ class CloudTest : public testing::Test { void OpenWithColumnFamilies(const std::vector& cfs, std::vector* handles) { - ASSERT_TRUE(cloud_env_options_.credentials.HasValid().ok()); - - // Create new AWS env + // Create new cloud env CreateCloudEnv(); options_.env = aenv_.get(); // Sleep for a second because S3 is eventual consistency. @@ -182,7 +177,7 @@ class CloudTest : public testing::Test { // Try to open and return status Status checkOpen() { - // Create new AWS env + // Create new Cloud env CreateCloudEnv(); options_.env = aenv_.get(); // Sleep for a second because S3 is eventual consistency. @@ -212,7 +207,6 @@ class CloudTest : public testing::Test { // The local directory where the clone resides std::string cname = clone_dir_ + "/" + clone_name; - CloudEnv* cenv; DBCloud* clone_db; // If there is no destination bucket, then the clone needs to copy @@ -228,21 +222,19 @@ class CloudTest : public testing::Test { force_keep_local_on_invalid_dest_bucket) { copt.keep_local_sst_files = true; } - // Create new AWS env - Status st = CloudEnv::NewAwsEnv(base_env_, copt, options_.info_log, &cenv); + // Create new cloud env + Status st = CloudEnv::CreateFromString(config_options_, cloud_opts_str_, + copt, cloud_env); if (!st.ok()) { return st; } // To catch any possible file deletion bugs, we set file deletion delay to // smallest possible - CloudEnvImpl* cimpl = static_cast(cenv); + CloudEnvImpl* cimpl = static_cast(cloud_env->get()); cimpl->TEST_SetFileDeletionDelay(std::chrono::seconds(0)); // sets the cloud env to be used by the env wrapper - options_.env = cenv; - - // Returns the cloud env that was created - cloud_env->reset(cenv); + options_.env = cimpl; // default column family ColumnFamilyOptions cfopt = options_; @@ -290,8 +282,8 @@ class CloudTest : public testing::Test { } Status GetCloudLiveFilesSrc(std::set* list) { - std::unique_ptr manifest(new ManifestReader( - options_.info_log, aenv_.get(), aenv_->GetSrcBucketName())); + std::unique_ptr manifest( + new ManifestReader(aenv_.get(), aenv_->GetSrcBucketName())); return manifest->GetLiveFiles(aenv_->GetSrcObjectPath(), list); } @@ -330,11 +322,12 @@ class CloudTest : public testing::Test { protected: std::string test_id_; - Env* base_env_; Options options_; std::string dbname_; std::string clone_dir_; CloudEnvOptions cloud_env_options_; + ConfigOptions config_options_; + std::string cloud_opts_str_; std::string dbid_; std::string persistent_cache_path_; uint64_t persistent_cache_size_gb_; @@ -346,7 +339,7 @@ class CloudTest : public testing::Test { // Most basic test. Create DB, write one key, close it and then check to see // that the key exists. // -TEST_F(CloudTest, BasicTest) { +TEST_P(CloudTest, BasicTest) { // Put one key-value OpenDB(); std::string value; @@ -367,7 +360,7 @@ TEST_F(CloudTest, BasicTest) { CloseDB(); } -TEST_F(CloudTest, GetChildrenTest) { +TEST_P(CloudTest, GetChildrenTest) { // Create some objects in S3 OpenDB(); ASSERT_OK(db_->Put(WriteOptions(), "Hello", "World")); @@ -394,7 +387,7 @@ TEST_F(CloudTest, GetChildrenTest) { // // Create and read from a clone. // -TEST_F(CloudTest, Newdb) { +TEST_P(CloudTest, Newdb) { std::string master_dbid; std::string newdb1_dbid; std::string newdb2_dbid; @@ -474,7 +467,7 @@ TEST_F(CloudTest, Newdb) { CloseDB(); } -TEST_F(CloudTest, ColumnFamilies) { +TEST_P(CloudTest, ColumnFamilies) { std::vector handles; // Put one key-value OpenDB(&handles); @@ -524,7 +517,7 @@ TEST_F(CloudTest, ColumnFamilies) { // // Create and read from a clone. // -TEST_F(CloudTest, TrueClone) { +TEST_P(CloudTest, TrueClone) { std::string master_dbid; std::string newdb1_dbid; std::string newdb2_dbid; @@ -638,7 +631,7 @@ TEST_F(CloudTest, TrueClone) { // // verify that dbid registry is appropriately handled // -TEST_F(CloudTest, DbidRegistry) { +TEST_P(CloudTest, DbidRegistry) { // Put one key-value OpenDB(); std::string value; @@ -654,7 +647,7 @@ TEST_F(CloudTest, DbidRegistry) { CloseDB(); } -TEST_F(CloudTest, KeepLocalFiles) { +TEST_P(CloudTest, KeepLocalFiles) { cloud_env_options_.keep_local_sst_files = true; for (int iter = 0; iter < 4; ++iter) { cloud_env_options_.use_direct_io_for_cloud_download = @@ -690,13 +683,14 @@ TEST_F(CloudTest, KeepLocalFiles) { } } -TEST_F(CloudTest, CopyToFromS3) { +#ifdef USE_AWS +TEST_P(CloudTest, CopyToFromS3) { std::string fname = dbname_ + "/100000.sst"; // iter 0 -- not using transfer manager // iter 1 -- using transfer manager for (int iter = 0; iter < 2; ++iter) { - // Create aws env + // Create cloud env cloud_env_options_.keep_local_sst_files = true; cloud_env_options_.use_aws_transfer_manager = iter == 1; CreateCloudEnv(); @@ -716,7 +710,7 @@ TEST_F(CloudTest, CopyToFromS3) { } // delete the file manually. - ASSERT_OK(base_env_->DeleteFile(fname)); + ASSERT_OK(aenv_->GetBaseEnv()->DeleteFile(fname)); // reopen file for reading. It should be refetched from cloud storage. { @@ -734,11 +728,12 @@ TEST_F(CloudTest, CopyToFromS3) { } } } +#endif // USE_AWS -TEST_F(CloudTest, DelayFileDeletion) { +TEST_P(CloudTest, DelayFileDeletion) { std::string fname = dbname_ + "/000010.sst"; - // Create aws env + // Create cloud env cloud_env_options_.keep_local_sst_files = true; CreateCloudEnv(); CloudEnvImpl* cimpl = static_cast(aenv_.get()); @@ -780,7 +775,7 @@ TEST_F(CloudTest, DelayFileDeletion) { } // Verify that a savepoint copies all src files to destination -TEST_F(CloudTest, Savepoint) { +TEST_P(CloudTest, Savepoint) { // Put one key-value OpenDB(); std::string value; @@ -854,8 +849,9 @@ TEST_F(CloudTest, Savepoint) { dest_path); } -TEST_F(CloudTest, Encryption) { - // Create aws env +#ifdef USE_AWS +TEST_P(CloudTest, Encryption) { + // Create cloud env cloud_env_options_.server_side_encryption = true; char* key_id = getenv("AWS_KMS_KEY_ID"); if (key_id != nullptr) { @@ -877,8 +873,9 @@ TEST_F(CloudTest, Encryption) { ASSERT_EQ(value, "World"); CloseDB(); } +#endif // USE_AWS -TEST_F(CloudTest, DirectReads) { +TEST_P(CloudTest, DirectReads) { options_.use_direct_reads = true; options_.use_direct_io_for_flush_and_compaction = true; BlockBasedTableOptions bbto; @@ -903,7 +900,7 @@ TEST_F(CloudTest, DirectReads) { } #ifdef USE_KAFKA -TEST_F(CloudTest, KeepLocalLogKafka) { +TEST_P(CloudTest, KeepLocalLogKafka) { cloud_env_options_.keep_local_log_files = false; cloud_env_options_.log_type = LogType::kLogKafka; cloud_env_options_.kafka_log_options @@ -941,9 +938,10 @@ TEST_F(CloudTest, KeepLocalLogKafka) { } #endif /* USE_KAFKA */ +#ifdef USE_AWS // TODO(igor): determine why this fails, // https://github.com/rockset/rocksdb-cloud/issues/35 -TEST_F(CloudTest, DISABLED_KeepLocalLogKinesis) { +TEST_P(CloudTest, DISABLED_KeepLocalLogKinesis) { cloud_env_options_.keep_local_log_files = false; cloud_env_options_.log_type = LogType::kLogKinesis; @@ -978,10 +976,11 @@ TEST_F(CloudTest, DISABLED_KeepLocalLogKinesis) { CloseDB(); } +#endif // USE_AWS // Test whether we are able to recover nicely from two different writers to the // same S3 bucket. (The feature that was enabled by CLOUDMANIFEST) -TEST_F(CloudTest, TwoDBsOneBucket) { +TEST_P(CloudTest, TwoDBsOneBucket) { auto firstDB = dbname_; auto secondDB = dbname_ + "-1"; cloud_env_options_.keep_local_sst_files = true; @@ -1064,7 +1063,7 @@ TEST_F(CloudTest, TwoDBsOneBucket) { // -- it runs two databases on exact same S3 bucket. The work on CLOUDMANIFEST // enables us to run in that configuration for extended amount of time (1 hour // by default) without any issues -- the last CLOUDMANIFEST writer wins. -TEST_F(CloudTest, TwoConcurrentWriters) { +TEST_P(CloudTest, TwoConcurrentWriters) { auto firstDB = dbname_; auto secondDB = dbname_ + "-1"; @@ -1153,7 +1152,7 @@ TEST_F(CloudTest, TwoConcurrentWriters) { // Creates a pure RocksDB database and makes sure we can migrate to RocksDB // Cloud -TEST_F(CloudTest, MigrateFromPureRocksDB) { +TEST_P(CloudTest, MigrateFromPureRocksDB) { { // Create local RocksDB Options options; options.create_if_missing = true; @@ -1190,7 +1189,7 @@ TEST_F(CloudTest, MigrateFromPureRocksDB) { // Tests that we can open cloud DB without destination and source bucket set. // This is useful for tests. -TEST_F(CloudTest, NoDestOrSrc) { +TEST_P(CloudTest, NoDestOrSrc) { DestroyDir(dbname_); cloud_env_options_.keep_local_sst_files = true; cloud_env_options_.src_bucket.SetBucketName(""); @@ -1210,7 +1209,7 @@ TEST_F(CloudTest, NoDestOrSrc) { CloseDB(); } -TEST_F(CloudTest, PreloadCloudManifest) { +TEST_P(CloudTest, PreloadCloudManifest) { DestroyDir(dbname_); // Put one key-value OpenDB(); @@ -1234,7 +1233,7 @@ TEST_F(CloudTest, PreloadCloudManifest) { // from a cloud bucket but new writes are not propagated // back to any cloud bucket. Once cloned, all updates are local. // -TEST_F(CloudTest, Ephemeral) { +TEST_P(CloudTest, Ephemeral) { cloud_env_options_.keep_local_sst_files = true; options_.level0_file_num_compaction_trigger = 100; // never compact @@ -1331,14 +1330,14 @@ TEST_F(CloudTest, Ephemeral) { // started after durable clone upload its CLOUDMANIFEST but before it uploads // one of the MANIFEST. In this case, we want to verify that ephemeral clone is // able to reinitialize instead of crash looping. -TEST_F(CloudTest, EphemeralOnCorruptedDB) { +TEST_P(CloudTest, EphemeralOnCorruptedDB) { cloud_env_options_.keep_local_sst_files = true; options_.level0_file_num_compaction_trigger = 100; // never compact OpenDB(); std::vector files; - base_env_->GetChildren(dbname_, &files); + aenv_->GetBaseEnv()->GetChildren(dbname_, &files); // Get the MANIFEST file std::string manifest_file_name; @@ -1385,7 +1384,7 @@ TEST_F(CloudTest, EphemeralOnCorruptedDB) { // In this mode, every open of the ephemeral clone db causes its // data to be resynced with the master db. // -TEST_F(CloudTest, EphemeralResync) { +TEST_P(CloudTest, EphemeralResync) { cloud_env_options_.keep_local_sst_files = true; cloud_env_options_.ephemeral_resync_on_open = true; options_.level0_file_num_compaction_trigger = 100; // never compact @@ -1462,7 +1461,7 @@ TEST_F(CloudTest, EphemeralResync) { CreateLoggerFromOptions(clone_dir_ + "/db_ephemeral", options_, &options_.info_log); - CloneDB("db_ephemeral", "", "", &cloud_db, &cloud_env); + Status st = CloneDB("db_ephemeral", "", "", &cloud_db, &cloud_env); // Retrieve the id of this clone. It should be same as before ASSERT_OK(cloud_db->GetDbIdentity(dbid)); @@ -1480,7 +1479,7 @@ TEST_F(CloudTest, EphemeralResync) { } } -TEST_F(CloudTest, CheckpointToCloud) { +TEST_P(CloudTest, CheckpointToCloud) { cloud_env_options_.keep_local_sst_files = true; options_.level0_file_num_compaction_trigger = 100; // never compact @@ -1526,7 +1525,7 @@ TEST_F(CloudTest, CheckpointToCloud) { } // Basic test to copy object within S3. -TEST_F(CloudTest, CopyObjectTest) { +TEST_P(CloudTest, CopyObjectTest) { CreateCloudEnv(); // We need to open an empty DB in order for epoch to work. @@ -1534,7 +1533,7 @@ TEST_F(CloudTest, CopyObjectTest) { std::string content = "This is a test file"; std::string fname = dbname_ + "/100000.sst"; - std::string dst_fname = dbname_ + "/200000.sst"; + // std::string dst_fname = dbname_ + "/200000.sst"; { std::unique_ptr writableFile; @@ -1544,14 +1543,15 @@ TEST_F(CloudTest, CopyObjectTest) { } Status st = aenv_->GetStorageProvider()->CopyCloudObject( - aenv_->GetSrcBucketName(), aenv_->RemapFilename(fname), - aenv_->GetSrcBucketName(), dst_fname); + aenv_->GetSrcBucketName(), aenv_->GetSrcObjectPath() + "/100000.sst", + aenv_->GetSrcBucketName(), aenv_->GetSrcObjectPath() + "/200000.sst"); ASSERT_OK(st); { std::unique_ptr readableFile; st = aenv_->GetStorageProvider()->NewCloudReadableFile( - aenv_->GetSrcBucketName(), dst_fname, &readableFile, EnvOptions()); + aenv_->GetSrcBucketName(), aenv_->GetSrcObjectPath() + "/200000.sst", + &readableFile, EnvOptions()); ASSERT_OK(st); char scratch[100]; @@ -1569,7 +1569,7 @@ TEST_F(CloudTest, CopyObjectTest) { // // Verify that we can cache data from S3 in persistent cache. // -TEST_F(CloudTest, PersistentCache) { +TEST_P(CloudTest, PersistentCache) { std::string pcache = test::TmpDir() + "/persistent_cache"; SetPersistentCache(pcache, 1); @@ -1591,7 +1591,7 @@ TEST_F(CloudTest, PersistentCache) { // This test create 2 DBs that shares a block cache. Ensure that reads from one // DB do not get the values from the other DB. -TEST_F(CloudTest, SharedBlockCache) { +TEST_P(CloudTest, SharedBlockCache) { cloud_env_options_.keep_local_sst_files = false; // Share the block cache. @@ -1640,14 +1640,14 @@ TEST_F(CloudTest, SharedBlockCache) { } // Verify that sst_file_cache and file_cache cannot be set together -TEST_F(CloudTest, KeepLocalFilesAndFileCache) { +TEST_P(CloudTest, KeepLocalFilesAndFileCache) { cloud_env_options_.sst_file_cache = NewLRUCache(1024); // 1 KB cache cloud_env_options_.keep_local_sst_files = true; ASSERT_TRUE(checkOpen().IsInvalidArgument()); } // Verify that sst_file_cache can be disabled -TEST_F(CloudTest, FileCacheZero) { +TEST_P(CloudTest, FileCacheZero) { cloud_env_options_.sst_file_cache = NewLRUCache(0); // zero size OpenDB(); CloudEnvImpl* cimpl = static_cast(aenv_.get()); @@ -1668,7 +1668,7 @@ TEST_F(CloudTest, FileCacheZero) { } // Verify that sst_file_cache is very small, so no files are local. -TEST_F(CloudTest, FileCacheSmall) { +TEST_P(CloudTest, FileCacheSmall) { cloud_env_options_.sst_file_cache = NewLRUCache(10); // Practically zero size OpenDB(); CloudEnvImpl* cimpl = static_cast(aenv_.get()); @@ -1683,7 +1683,7 @@ TEST_F(CloudTest, FileCacheSmall) { } // Relatively large sst_file cache, so all files are local. -TEST_F(CloudTest, FileCacheLarge) { +TEST_P(CloudTest, FileCacheLarge) { size_t capacity = 10240L; std::shared_ptr cache = NewLRUCache(capacity); cloud_env_options_.sst_file_cache = cache; @@ -1716,7 +1716,7 @@ TEST_F(CloudTest, FileCacheLarge) { } // Cache will have a few files only. -TEST_F(CloudTest, FileCacheOnDemand) { +TEST_P(CloudTest, FileCacheOnDemand) { size_t capacity = 3000; int num_shard_bits = 1; bool strict_capacity_limit = false; @@ -1760,25 +1760,20 @@ TEST_F(CloudTest, FileCacheOnDemand) { CloseDB(); } +#ifdef USE_AWS +INSTANTIATE_TEST_CASE_P(AWS, CloudTest, ::testing::Values("id=aws;")); +#endif // USE_AWS + +INSTANTIATE_TEST_CASE_P(Mock, CloudTest, ::testing::Values("provider=mock;")); } // namespace ROCKSDB_NAMESPACE // A black-box test for the cloud wrapper around rocksdb int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } -#else // USE_AWS - -#include - -int main(int, char**) { - fprintf(stderr, - "SKIPPED as DBCloud is supported only when USE_AWS is defined.\n"); - return 0; -} -#endif - #else // ROCKSDB_LITE #include diff --git a/cloud/manifest_reader.cc b/cloud/manifest_reader.cc index 1677e390a78..20fcf0a79cf 100644 --- a/cloud/manifest_reader.cc +++ b/cloud/manifest_reader.cc @@ -3,6 +3,7 @@ #include "cloud/manifest_reader.h" +#include "cloud/aws/aws_env.h" #include "cloud/cloud_manifest.h" #include "cloud/db_cloud_impl.h" #include "cloud/filename.h" @@ -15,9 +16,10 @@ namespace ROCKSDB_NAMESPACE { -ManifestReader::ManifestReader(std::shared_ptr info_log, CloudEnv* cenv, - const std::string& bucket_prefix) - : info_log_(info_log), cenv_(cenv), bucket_prefix_(bucket_prefix) {} +ManifestReader::ManifestReader(CloudEnv* cenv, const std::string& bucket_prefix) + : cenv_(cenv), bucket_prefix_(bucket_prefix) { + info_log_ = cenv_->GetLogger(); +} ManifestReader::~ManifestReader() {} diff --git a/cloud/manifest_reader.h b/cloud/manifest_reader.h index 40a0e3e43bc..15969b611a2 100644 --- a/cloud/manifest_reader.h +++ b/cloud/manifest_reader.h @@ -18,8 +18,7 @@ namespace ROCKSDB_NAMESPACE { // class ManifestReader { public: - ManifestReader(std::shared_ptr info_log, CloudEnv* cenv, - const std::string& bucket_prefix); + ManifestReader(CloudEnv* cenv, const std::string& bucket_prefix); virtual ~ManifestReader(); @@ -30,7 +29,7 @@ class ManifestReader { uint64_t* maxFileNumber); private: - std::shared_ptr info_log_; + Logger* info_log_; CloudEnv* cenv_; std::string bucket_prefix_; }; diff --git a/cloud/mock_cloud_storage_provider.cc b/cloud/mock_cloud_storage_provider.cc new file mode 100644 index 00000000000..c93e2cb133d --- /dev/null +++ b/cloud/mock_cloud_storage_provider.cc @@ -0,0 +1,303 @@ +// Copyright (c) 2016-present, Rockset, Inc. All rights reserved. +// + +#include "cloud/mock_cloud_storage_provider.h" + +#include "cloud/cloud_storage_provider_impl.h" +#include "cloud/filename.h" +#include "file/file_util.h" +#include "options/options_helper.h" +#include "rocksdb/cloud/cloud_env_options.h" +#include "rocksdb/convenience.h" +#include "rocksdb/file_system.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { +Status MockCloudStorageProvider::PrepareOptions(const ConfigOptions& options) { + cenv_ = static_cast(options.env); + fs_ = cenv_->GetBaseEnv()->GetFileSystem(); + Status s = fs_->GetTestDirectory(IOOptions(), &root_, nullptr); + if (s.ok()) { + s = CloudStorageProvider::PrepareOptions(options); + } + if (s.ok() && cenv_->HasDestBucket()) { + s = CreateLocalPath(cenv_->GetDestBucketName()); + if (s.ok()) { + s = CreateLocalPath(cenv_->GetDestBucketName() + "/" + + cenv_->GetDestObjectPath()); + } + } + return s; +} + +std::string MockCloudStorageProvider::GetLocalPath( + const std::string& bucket) const { + return NormalizePath(root_ + "/" + bucket); +} + +std::string MockCloudStorageProvider::GetLocalPath( + const std::string& bucket, const std::string& prefix) const { + if (prefix.empty()) { + return GetLocalPath(bucket); + } else if (StartsWith(prefix, root_)) { + return GetLocalPath(bucket + "/" + prefix.substr(root_.size())); + } else { + return GetLocalPath(bucket + "/" + prefix); + } +} + +Status MockCloudStorageProvider::CreateLocalPath(const std::string& path) { + Status s; + auto dir = NormalizePath(path); + if (paths_.find(dir) == paths_.end()) { + // Build all subdirectories as necessary + for (auto pos = dir.find('/'); pos != std::string::npos; + pos = dir.find('/', pos + 1)) { + auto subdir = dir.substr(0, pos); + if (paths_.find(subdir) == paths_.end()) { + s = fs_->CreateDirIfMissing(GetLocalPath(subdir), IOOptions(), nullptr); + if (s.ok()) { + paths_.insert(subdir); + } else { + break; + } + } + } + if (s.ok() && paths_.find(dir) == paths_.end()) { + s = fs_->CreateDirIfMissing(GetLocalPath(dir), IOOptions(), nullptr); + if (s.ok()) { + paths_.insert(dir); + } + } + } + return s; +} + +Status MockCloudStorageProvider::CreateBucket(const std::string& bucket_name) { + std::string path = GetLocalPath(bucket_name); + Status s = fs_->CreateDir(path, IOOptions(), nullptr); + return s; +} + +Status MockCloudStorageProvider::ExistsBucket(const std::string& bucket_name) { + std::string path = GetLocalPath(bucket_name); + Status s = fs_->FileExists(path, IOOptions(), nullptr); + return s; +} + +Status MockCloudStorageProvider::EmptyBucket(const std::string& bucket_name, + const std::string& object_path) { + std::string path = GetLocalPath(bucket_name, object_path); + Status s = fs_->FileExists(path, IOOptions(), nullptr); + if (s.ok()) { + s = DestroyDir(cenv_->GetBaseEnv(), path); + } + return s; +} + +Status MockCloudStorageProvider::DeleteCloudObject( + const std::string& bucket_name, const std::string& object_path) { + std::string path = GetLocalPath(bucket_name, object_path); + Status s = fs_->DeleteFile(path, IOOptions(), nullptr); + return s; +} + +Status MockCloudStorageProvider::ListCloudObjects( + const std::string& bucket_name, const std::string& object_path, + std::vector* results) { + std::string path = GetLocalPath(bucket_name, object_path); + std::vector children; + Status s = fs_->GetChildren(path, IOOptions(), &children, nullptr); + if (s.ok()) { + for (const auto& c : children) { + if (c == "." || c == "..") { + continue; + } else { + results->push_back(c); + } + } + } + return s; +} + +Status MockCloudStorageProvider::ExistsCloudObject( + const std::string& bucket_name, const std::string& object_path) { + std::string path = GetLocalPath(bucket_name, object_path); + Status s = fs_->FileExists(path, IOOptions(), nullptr); + return s; +} + +Status MockCloudStorageProvider::GetCloudObjectSize( + const std::string& bucket_name, const std::string& object_path, + uint64_t* filesize) { + std::string path = GetLocalPath(bucket_name, object_path); + Status s = fs_->GetFileSize(path, IOOptions(), filesize, nullptr); + if (!s.ok()) { + Status st = ExistsCloudObject(bucket_name, object_path); + if (st.IsNotFound()) { + s = st; + } + } + return s; +} + +// Get the modification time of the object in cloud storage +Status MockCloudStorageProvider::GetCloudObjectModificationTime( + const std::string& bucket_name, const std::string& object_path, + uint64_t* time) { + std::string path = GetLocalPath(bucket_name, object_path); + Status s = fs_->GetFileModificationTime(path, IOOptions(), time, nullptr); + if (!s.ok()) { + Status st = ExistsCloudObject(bucket_name, object_path); + if (st.IsNotFound()) { + s = st; + } + } + return s; +} + +Status MockCloudStorageProvider::GetCloudObjectMetadata( + const std::string& bucket_name, const std::string& object_path, + CloudObjectInformation* info) { + std::string path = GetLocalPath(bucket_name, object_path); + + info->content_hash.clear(); // No hash + std::unique_ptr file; + std::string data; + Status s = GetCloudObjectSize(bucket_name, object_path, &info->size); + if (s.ok()) { + s = GetCloudObjectModificationTime(bucket_name, object_path, + &info->modification_time); + } + if (s.ok()) { + s = ReadFileToString(fs_.get(), path, &data); + } + if (s.ok()) { + s = StringToMap(data, &info->metadata); + } + return s; +} + +Status MockCloudStorageProvider::PutCloudObjectMetadata( + const std::string& bucket_name, const std::string& object_path, + const std::unordered_map& metadata) { + Status s = CreateLocalPath(dirname(bucket_name + "/" + object_path)); + if (s.ok()) { + std::string data; + for (const auto& iter : metadata) { + data.append(iter.first).append("=").append(iter.second).append("; "); + } + std::string path = GetLocalPath(bucket_name, object_path); + s = WriteStringToFile(fs_.get(), data, path); + } + return s; +} + +Status MockCloudStorageProvider::CopyCloudObject( + const std::string& src_bucket_name, const std::string& src_object_path, + const std::string& dest_bucket_name, const std::string& dest_object_path) { + std::string from_path = GetLocalPath(src_bucket_name, src_object_path); + std::string to_path = GetLocalPath(dest_bucket_name, dest_object_path); + Status s = CopyFile(fs_.get(), from_path, to_path, 0, true); + return s; +} + +// Downloads object from the cloud into a local directory +Status MockCloudStorageProvider::GetCloudObject(const std::string& bucket_name, + const std::string& object_path, + const std::string& local_path) { + std::string from_path = GetLocalPath(bucket_name, object_path); + Status s = fs_->FileExists(from_path, IOOptions(), nullptr); + if (s.ok()) { + s = CopyFile(fs_.get(), from_path, local_path, 0, true); + } + return s; +} + +class MockCloudStorageReadableFile : public CloudStorageReadableFile { + private: + std::unique_ptr target_; + mutable uint64_t offset_; + + public: + MockCloudStorageReadableFile(std::unique_ptr&& t) + : target_(std::move(t)), offset_(0) {} + virtual const char* Type() const { + return MockCloudStorageProvider::kClassName(); + } + Status Skip(uint64_t n) override { + offset_ += n; + return Status::OK(); + } + bool use_direct_io() const override { + return false; + } // target_->use_direct_io(); } + size_t GetRequiredBufferAlignment() const override { + return target_->GetRequiredBufferAlignment(); + } + Status InvalidateCache(size_t offset, size_t length) override { + return target_->InvalidateCache(offset, length); + } + Status PositionedRead(uint64_t offset, size_t n, Slice* result, + char* scratch) override { + return Read(offset, n, result, scratch); + } + Status Read(size_t n, Slice* result, char* scratch) override { + return Read(offset_, n, result, scratch); + } + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + IOOptions io_opts; + IODebugContext dbg; + Status s = target_->Read(offset, n, io_opts, result, scratch, &dbg); + if (s.ok()) { + offset_ = offset + result->size(); + } + return s; + } +}; + +class MockCloudStorageWritableFile : public CloudStorageWritableFileImpl { + public: + MockCloudStorageWritableFile(CloudEnv* env, const std::string& local_fname, + const std::string& bucket, + const std::string& cloud_fname, + const EnvOptions& options) + : CloudStorageWritableFileImpl(env, local_fname, bucket, cloud_fname, + options) {} + virtual const char* Name() const override { + return MockCloudStorageProvider::kClassName(); + } +}; + +Status MockCloudStorageProvider::NewCloudReadableFile( + const std::string& bucket, const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) { + std::string path = GetLocalPath(bucket, fname); + std::unique_ptr file; + Status s = fs_->NewRandomAccessFile(path, options, &file, nullptr); + if (s.ok()) { + result->reset(new MockCloudStorageReadableFile(std::move(file))); + } + return s; +} + +Status MockCloudStorageProvider::NewCloudWritableFile( + const std::string& local_path, const std::string& bucket_name, + const std::string& object_path, + std::unique_ptr* result, + const EnvOptions& options) { + result->reset(new MockCloudStorageWritableFile(cenv_, local_path, bucket_name, + object_path, options)); + return (*result)->status(); +} + +Status MockCloudStorageProvider::PutCloudObject( + const std::string& local_file, const std::string& bucket_name, + const std::string& object_path) { + std::string to_path = GetLocalPath(bucket_name, object_path); + Status s = CopyFile(fs_.get(), local_file, to_path, 0, true); + return s; +} +} // namespace ROCKSDB_NAMESPACE diff --git a/cloud/mock_cloud_storage_provider.h b/cloud/mock_cloud_storage_provider.h new file mode 100644 index 00000000000..7615313f51c --- /dev/null +++ b/cloud/mock_cloud_storage_provider.h @@ -0,0 +1,102 @@ +// Copyright (c) 2017 Rockset + +#pragma once + +#ifndef ROCKSDB_LITE +#include + +#include "rocksdb/cloud/cloud_storage_provider.h" + +namespace ROCKSDB_NAMESPACE { +class CloudEnv; +class FileSystem; +struct ConfigOptions; + +// All writes to this DB can be configured to be persisted +// in cloud storage. +// +class MockCloudStorageProvider : public CloudStorageProvider { + public: + MockCloudStorageProvider() {} + static const char* kClassName() { return "mock"; } + const char* Name() const override { return kClassName(); } + + Status CreateBucket(const std::string& bucket_name) override; + Status ExistsBucket(const std::string& bucket_name) override; + Status EmptyBucket(const std::string& bucket_name, + const std::string& object_path) override; + Status DeleteCloudObject(const std::string& bucket_name, + const std::string& object_path) override; + Status ListCloudObjects(const std::string& bucket_name, + const std::string& object_path, + std::vector* path_names) override; + Status ExistsCloudObject(const std::string& bucket_name, + const std::string& object_path) override; + // Get the size of the object in cloud storage + Status GetCloudObjectSize(const std::string& bucket_name, + const std::string& object_path, + uint64_t* filesize) override; + + // Get the modification time of the object in cloud storage + Status GetCloudObjectModificationTime(const std::string& bucket_name, + const std::string& object_path, + uint64_t* time) override; + + // Get the metadata of the object in cloud storage + Status GetCloudObjectMetadata(const std::string& bucket_name, + const std::string& object_path, + CloudObjectInformation* info) override; + + Status CopyCloudObject(const std::string& src_bucket_name, + const std::string& src_object_path, + const std::string& dest_bucket_name, + const std::string& dest_object_path) override; + // Updates/Sets the metadata of the object in cloud storage + Status PutCloudObjectMetadata( + const std::string& bucket_name, const std::string& object_path, + const std::unordered_map& metadata) override; + + // Create a new cloud file in the appropriate location from the input path. + // Updates result with the file handle. + Status NewCloudWritableFile(const std::string& local_path, + const std::string& bucket_name, + const std::string& object_path, + std::unique_ptr* result, + const EnvOptions& options) override; + + // Create a new readable cloud file, returning the file handle in result. + Status NewCloudReadableFile(const std::string& bucket, + const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; + + // Downloads object from the cloud into a local directory + Status GetCloudObject(const std::string& bucket_name, + const std::string& object_path, + const std::string& local_path) override; + Status PutCloudObject(const std::string& local_file, + const std::string& bucket_name, + const std::string& object_path) override; + + // Prepares/Initializes the storage provider for the input cloud environment + virtual Status PrepareOptions(const ConfigOptions& options) override; + + protected: + private: + std::string GetLocalPath(const std::string& bucket) const; + + std::string GetLocalPath(const std::string& bucket, + const std::string& prefix) const; + Status CreateLocalPath(const std::string& path); + +#ifdef MJR + Status CopyFile(const std::string& from_path, const std::string& to_path); +#endif + std::string root_; + CloudEnv* cenv_; + std::set paths_; + std::shared_ptr fs_; +}; +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/cloud/purge.cc b/cloud/purge.cc index 1298196baaf..36d811104bb 100644 --- a/cloud/purge.cc +++ b/cloud/purge.cc @@ -94,7 +94,7 @@ Status CloudEnvImpl::FindObsoleteFiles(const std::string& bucket_name_prefix, } std::unique_ptr extractor( - new ManifestReader(info_log_, this, bucket_name_prefix)); + new ManifestReader(this, bucket_name_prefix)); // Step2: from all MANIFEST files in Step 1, compile a list of all live files for (auto iter = dbid_list.begin(); iter != dbid_list.end(); ++iter) { diff --git a/cloud/remote_compaction_test.cc b/cloud/remote_compaction_test.cc index 3dc7e08c695..28cd12d4f1a 100644 --- a/cloud/remote_compaction_test.cc +++ b/cloud/remote_compaction_test.cc @@ -2,8 +2,6 @@ #ifndef ROCKSDB_LITE -#ifdef USE_AWS - #include #include #include @@ -14,8 +12,10 @@ #include "db/db_impl/db_impl.h" #include "file/filename.h" #include "logging/logging.h" +#include "port/stack_trace.h" #include "rocksdb/cloud/cloud_storage_provider.h" #include "rocksdb/cloud/db_cloud.h" +#include "rocksdb/convenience.h" #include "rocksdb/options.h" #include "rocksdb/status.h" #include "rocksdb/table.h" @@ -27,13 +27,19 @@ namespace ROCKSDB_NAMESPACE { -class RemoteCompactionTest : public testing::Test { +class RemoteCompactionTest : public testing::Test, + public ::testing::WithParamInterface { public: RemoteCompactionTest() { + Random64 rng(time(nullptr)); + auto test_id = std::to_string(rng.Next()); + base_env_ = Env::Default(); - dbname_ = test::TmpDir() + "/db_cloud"; - clone_dir_ = test::TmpDir() + "/ctest"; - cloud_env_options_.TEST_Initialize("dbcloud.", dbname_); + cloud_opts_str_ = GetParam(); + + dbname_ = test::TmpDir() + "/db_cloud-" + test_id; + clone_dir_ = test::TmpDir() + "/ctest-" + test_id; + cloud_env_options_.TEST_Initialize("dbcloud.", "/db_cloud-" + test_id); cloud_env_options_.keep_local_sst_files = true; options_.create_if_missing = true; @@ -52,25 +58,22 @@ class RemoteCompactionTest : public testing::Test { } void Cleanup() { - ASSERT_TRUE(!aenv_); + EXPECT_TRUE(!aenv_); - // check cloud credentials - ASSERT_TRUE(cloud_env_options_.credentials.HasValid().ok()); + // create a dummy cloud env + EXPECT_OK(CloudEnv::CreateFromString(config_options_, cloud_opts_str_, + cloud_env_options_, &aenv_)); + EXPECT_NE(aenv_, nullptr); - CloudEnv* aenv; - // create a dummy aws env - ASSERT_OK(CloudEnv::NewAwsEnv(base_env_, cloud_env_options_, - options_.info_log, &aenv)); - aenv_.reset(aenv); // delete all pre-existing contents from the bucket Status st = aenv_->GetStorageProvider()->EmptyBucket(aenv_->GetSrcBucketName(), ""); ASSERT_TRUE(st.ok() || st.IsNotFound()); - aenv_.reset(); // delete and create directory where clones reside DestroyDir(clone_dir_); - ASSERT_OK(base_env_->CreateDir(clone_dir_)); + ASSERT_OK(aenv_->GetBaseEnv()->CreateDir(clone_dir_)); + aenv_.reset(); } std::set GetSSTFiles(std::string name) { @@ -94,20 +97,16 @@ class RemoteCompactionTest : public testing::Test { virtual ~RemoteCompactionTest() { CloseDB(); } void CreateCloudEnv() { - CloudEnv* cenv; - ASSERT_OK(CloudEnv::NewAwsEnv(base_env_, cloud_env_options_, - options_.info_log, &cenv)); + ASSERT_OK(CloudEnv::CreateFromString(config_options_, cloud_opts_str_, + cloud_env_options_, &aenv_)); // To catch any possible file deletion bugs, we set file deletion delay to // smallest possible - CloudEnvImpl* cimpl = static_cast(cenv); + CloudEnvImpl* cimpl = static_cast(aenv_.get()); cimpl->TEST_SetFileDeletionDelay(std::chrono::seconds(0)); - aenv_.reset(cenv); } // Open database via the cloud interface void OpenDB() { - ASSERT_TRUE(cloud_env_options_.credentials.HasValid().ok()); - // Create new cloud env CreateCloudEnv(); options_.env = aenv_.get(); @@ -143,7 +142,6 @@ class RemoteCompactionTest : public testing::Test { const std::string& dest_object_path, std::unique_ptr* cloud_db, std::unique_ptr* cloud_env) { - CloudEnv* cenv; DBCloud* clone_db; // If there is no destination bucket, then the clone needs to copy @@ -159,16 +157,14 @@ class RemoteCompactionTest : public testing::Test { copt.keep_local_sst_files = true; } // Create new AWS env - ASSERT_OK(CloudEnv::NewAwsEnv(base_env_, copt, options_.info_log, &cenv)); + ASSERT_OK(CloudEnv::CreateFromString(config_options_, cloud_opts_str_, copt, + cloud_env)); // To catch any possible file deletion bugs, we set file deletion delay to // smallest possible - CloudEnvImpl* cimpl = static_cast(cenv); + CloudEnvImpl* cimpl = static_cast(cloud_env->get()); cimpl->TEST_SetFileDeletionDelay(std::chrono::seconds(0)); // sets the cloud env to be used by the env wrapper - options_.env = cenv; - - // Returns the cloud env that was created - cloud_env->reset(cenv); + options_.env = cloud_env->get(); // default column family ColumnFamilyOptions cfopt = options_; @@ -208,8 +204,8 @@ class RemoteCompactionTest : public testing::Test { } Status GetCloudLiveFilesSrc(std::set* list) { - std::unique_ptr manifest(new ManifestReader( - options_.info_log, aenv_.get(), aenv_->GetSrcBucketName())); + std::unique_ptr manifest( + new ManifestReader(aenv_.get(), aenv_->GetSrcBucketName())); return manifest->GetLiveFiles(aenv_->GetSrcObjectPath(), list); } @@ -323,6 +319,8 @@ class RemoteCompactionTest : public testing::Test { std::string dbname_; std::string clone_dir_; CloudEnvOptions cloud_env_options_; + ConfigOptions config_options_; + std::string cloud_opts_str_; std::string dbid_; std::string persistent_cache_path_; uint64_t persistent_cache_size_gb_; @@ -336,7 +334,7 @@ class RemoteCompactionTest : public testing::Test { // Create a clone and setup a compaction service so that compactions // on the DB translates into a compaction request on the clone. // -TEST_F(RemoteCompactionTest, BasicTest) { +TEST_P(RemoteCompactionTest, BasicTest) { OpenDB(); std::string value; @@ -409,7 +407,7 @@ TEST_F(RemoteCompactionTest, BasicTest) { CloseDB(); } -TEST_F(RemoteCompactionTest, ColumnFamilyTest) { +TEST_P(RemoteCompactionTest, ColumnFamilyTest) { OpenDB(); std::string value; @@ -494,25 +492,22 @@ TEST_F(RemoteCompactionTest, ColumnFamilyTest) { CloseDB(); } +#ifdef USE_AWS +INSTANTIATE_TEST_CASE_P(AWS, RemoteCompactionTest, + ::testing::Values("id=aws;")); +#endif // USE_AWS + +INSTANTIATE_TEST_CASE_P(Mock, RemoteCompactionTest, + ::testing::Values("provider=mock;")); } // namespace ROCKSDB_NAMESPACE // Run all pluggable compaction tests int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } -#else // USE_AWS - -#include - -int main(int, char**) { - fprintf(stderr, - "SKIPPED as DBCloud is supported only when USE_AWS is defined.\n"); - return 0; -} -#endif - #else // ROCKSDB_LITE #include diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 1d81774815a..b9370e93705 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -14,7 +14,7 @@ #include "rocksdb/convenience.h" #include "rocksdb/env_encryption.h" #include "util/stderr_logger.h" -#ifdef USE_AWS +#ifdef USE_CLOUD #include "cloud/cloud_env_impl.h" #include "rocksdb/cloud/cloud_storage_provider.h" #endif @@ -66,14 +66,24 @@ SpecialEnv::SpecialEnv(Env* base, bool time_elapse_only_sleep) table_write_callback_ = nullptr; } DBTestBase::DBTestBase(const std::string path, bool env_do_fsync) - : option_env_(kDefaultEnv), + : is_cloud_env_(false), mem_env_(nullptr), encrypted_env_(nullptr), - option_config_(kDefault), - s3_env_(nullptr) { + option_config_(kDefault) { Env* base_env = Env::Default(); ConfigOptions config_options; EXPECT_OK(test::CreateEnvFromSystem(config_options, &base_env, &env_guard_)); + if (env_guard_ != nullptr) { + ///**TODO: This should change/become simpler when Env inherits + /// Customizable + const char* test_env_uri = getenv("TEST_ENV_URI"); + const char* start = strstr(test_env_uri, "id="); + if (start != nullptr) { + is_cloud_env_ = strncmp(start + 3, "aws", 3) == 0; + } else if (strstr(test_env_uri, "provider=") != NULL) { + is_cloud_env_ = true; + } + } EXPECT_NE(nullptr, base_env); if (getenv("MEM_ENV")) { mem_env_ = new MockEnv(base_env); @@ -89,17 +99,6 @@ DBTestBase::DBTestBase(const std::string path, bool env_do_fsync) #endif // !ROCKSDB_LITE env_ = new SpecialEnv(encrypted_env_ ? encrypted_env_ : (mem_env_ ? mem_env_ : base_env)); -#ifndef ROCKSDB_LITE -#ifdef USE_AWS - // Randomize the test path so that multiple tests can run in parallel - srand(static_cast(time(nullptr))); - std::string mypath = path + "_" + std::to_string(rand()); - - env_->NewLogger(test::TmpDir(env_) + "/rocksdb-cloud.log", &info_log_); - info_log_->SetInfoLogLevel(InfoLogLevel::DEBUG_LEVEL); - s3_env_ = CreateNewAwsEnv(mypath, env_); -#endif -#endif // !ROCKSDB_LITE env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::HIGH); env_->skip_fsync_ = !env_do_fsync; @@ -138,13 +137,14 @@ DBTestBase::~DBTestBase() { delete env_; #ifndef ROCKSDB_LITE -#ifdef USE_AWS - auto cenv = static_cast(s3_env_); - cenv->GetStorageProvider()->EmptyBucket(cenv->GetSrcBucketName(), - cenv->GetSrcObjectPath()); +#ifdef USE_CLOUD + if (is_cloud_env_) { + auto cenv = static_cast(env_guard_.get()); + cenv->GetStorageProvider()->EmptyBucket(cenv->GetSrcBucketName(), + cenv->GetSrcObjectPath()); + } #endif #endif // !ROCKSDB_LITE - delete s3_env_; } bool DBTestBase::ShouldSkipOptions(int option_config, int skip_mask) { @@ -212,26 +212,14 @@ bool DBTestBase::ChangeOptions(int skip_mask) { if (ShouldSkipOptions(option_config_, skip_mask)) { continue; } - if (option_env_ == kAwsEnv && ShouldSkipAwsOptions(option_config_)) { + if (is_cloud_env_ && ShouldSkipAwsOptions(option_config_)) { continue; } break; } if (option_config_ >= kEnd) { -#ifndef USE_AWS - // If not built for AWS, skip it - if (option_env_ + 1 == kAwsEnv) { - option_env_++; - } -#endif - if (option_env_ + 1 >= kEndEnv) { - Destroy(last_options_); - return false; - } else { - option_env_++; - option_config_ = kDefault; - continue; - } + Destroy(last_options_); + return false; } else { auto options = CurrentOptions(); options.create_if_missing = true; @@ -617,25 +605,6 @@ Options DBTestBase::GetOptions( break; } - switch (option_env_) { - case kDefaultEnv: { - options.env = env_; - break; - } -#ifdef USE_AWS - case kAwsEnv: { - assert(s3_env_); - options.env = s3_env_; - options.recycle_log_file_num = 0; // do not reuse log files - options.allow_mmap_reads = false; // mmap is incompatible with S3 - break; - } -#endif /* USE_AWS */ - - default: - break; - } - if (options_override.filter_policy) { table_options.filter_policy = options_override.filter_policy; table_options.partition_filters = options_override.partition_filters; @@ -644,36 +613,12 @@ Options DBTestBase::GetOptions( if (set_block_based_table_factory) { options.table_factory.reset(NewBlockBasedTableFactory(table_options)); } + options.env = env_; options.create_if_missing = true; options.fail_if_options_file_error = true; return options; } -#ifndef ROCKSDB_LITE -#ifdef USE_AWS -Env* DBTestBase::CreateNewAwsEnv(const std::string& prefix, Env* parent) { - if (!prefix.empty()) { - fprintf(stderr, "Creating new cloud env with prefix %s\n", prefix.c_str()); - } - - // get credentials - CloudEnvOptions coptions; - CloudEnv* cenv = nullptr; - std::string region; - coptions.TEST_Initialize("dbtest.", prefix, region); - Status st = CloudEnv::NewAwsEnv(parent, coptions, info_log_, &cenv); - CloudEnvImpl* cimpl = static_cast(cenv); - cimpl->TEST_DisableCloudManifest(); - cimpl->TEST_SetFileDeletionDelay(std::chrono::seconds(0)); - ROCKS_LOG_INFO(info_log_, "Created new aws env with path %s", prefix.c_str()); - if (!st.ok()) { - Log(InfoLogLevel::DEBUG_LEVEL, info_log_, "%s", st.ToString().c_str()); - } - assert(st.ok() && cenv); - return cenv; -} -#endif // USE_AWS -#endif // ROCKSDB_LITE void DBTestBase::CreateColumnFamilies(const std::vector& cfs, const Options& options) { @@ -789,9 +734,9 @@ void DBTestBase::Destroy(const Options& options, bool delete_cf_paths) { } Close(); ASSERT_OK(DestroyDB(dbname_, options, column_families)); -#ifdef USE_AWS - if (s3_env_) { - CloudEnv* cenv = static_cast(s3_env_); +#ifdef USE_CLOUD + if (is_cloud_env_) { + CloudEnv* cenv = static_cast(env_guard_.get()); Status st = cenv->GetStorageProvider()->EmptyBucket( cenv->GetSrcBucketName(), dbname_); ASSERT_TRUE(st.ok() || st.IsNotFound()); @@ -833,7 +778,11 @@ bool DBTestBase::IsDirectIOSupported() { } bool DBTestBase::IsMemoryMappedAccessSupported() const { - return (!encrypted_env_); + if (is_cloud_env_) { + return false; + } else { + return (!encrypted_env_); + } } Status DBTestBase::Flush(int cf) { diff --git a/db/db_test_util.h b/db/db_test_util.h index bcb93c8055d..75460761e3b 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -21,6 +21,7 @@ #include #include +#include "cloud/aws/aws_env.h" #include "db/db_impl/db_impl.h" #include "db/dbformat.h" #include "file/filename.h" @@ -950,18 +951,11 @@ class DBTestBase : public testing::Test { kEnd, }; - // The types of envs that we want to test with - enum OptionConfigEnv { - kDefaultEnv = 0, // posix env - kAwsEnv = 1, // aws env - kEndEnv = 2, - }; - int option_env_; - public: std::string dbname_; std::string alternative_wal_dir_; std::string alternative_db_log_dir_; + bool is_cloud_env_; MockEnv* mem_env_; Env* encrypted_env_; SpecialEnv* env_; @@ -972,8 +966,6 @@ class DBTestBase : public testing::Test { int option_config_; Options last_options_; - Env* s3_env_; - // Skip some options, as they may not be applicable to a specific test. // To add more skip constants, use values 4, 8, 16, etc. enum OptionSkip { diff --git a/include/rocksdb/cloud/cloud_env_options.h b/include/rocksdb/cloud/cloud_env_options.h index eb22ac0dd2b..2f4af4ef1f1 100644 --- a/include/rocksdb/cloud/cloud_env_options.h +++ b/include/rocksdb/cloud/cloud_env_options.h @@ -430,7 +430,6 @@ class CloudEnv : public Env, public Configurable { virtual ~CloudEnv(); - static void RegisterCloudObjects(const std::string& mode = ""); static Status CreateFromString(const ConfigOptions& config_options, const std::string& id, std::unique_ptr* env); static Status CreateFromString(const ConfigOptions& config_options, const std::string& id, diff --git a/src.mk b/src.mk index 70f657de4de..cd4e9f7adfd 100644 --- a/src.mk +++ b/src.mk @@ -21,6 +21,7 @@ LIB_SOURCES = \ cloud/cloud_scheduler.cc \ cloud/cloud_storage_provider.cc \ cloud/cloud_file_cache.cc \ + cloud/mock_cloud_storage_provider.cc \ db/arena_wrapped_db_iter.cc \ db/blob/blob_fetcher.cc \ db/blob/blob_file_addition.cc \