diff --git a/cloud/aws/aws_env.cc b/cloud/aws/aws_env.cc index e18d53bfb9b..372be89234c 100644 --- a/cloud/aws/aws_env.cc +++ b/cloud/aws/aws_env.cc @@ -50,6 +50,9 @@ AwsAccessType AwsCloudAccessCredentials::GetAccessType() const { return AwsAccessType::kConfig; } else if (!access_key_id.empty() || !secret_key.empty()) { return AwsAccessType::kSimple; + } else if (getenv("AWS_ACCESS_KEY_ID") != nullptr && + getenv("AWS_SECRET_ACCESS_KEY") != nullptr) { + return AwsAccessType::kEnvironment; } return AwsAccessType::kUndefined; } @@ -161,6 +164,7 @@ Status AwsCloudAccessCredentials::GetCredentialsProvider( } #ifdef USE_AWS +static Aws::SDKOptions sdkOptions; // // The AWS credentials are specified to the constructor via @@ -169,7 +173,8 @@ Status AwsCloudAccessCredentials::GetCredentialsProvider( AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options, const std::shared_ptr& info_log) : CloudEnvImpl(_cloud_env_options, underlying_env, info_log) { - Aws::InitAPI(Aws::SDKOptions()); + Aws::InitAPI(sdkOptions); //**TODO: Move this into PrepareOptions and do it + // conditionally (first time) if (cloud_env_options.src_bucket.GetRegion().empty() || cloud_env_options.dest_bucket.GetRegion().empty()) { std::string region; @@ -187,7 +192,11 @@ AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options, base_env_ = underlying_env; } -void AwsEnv::Shutdown() { Aws::ShutdownAPI(Aws::SDKOptions()); } +AwsEnv::~AwsEnv() { + //**TODO: Conditionally call shutdown (or make shutdown conditional on last... +} + +void AwsEnv::Shutdown() { Aws::ShutdownAPI(sdkOptions); } // The factory method for creating an S3 Env diff --git a/cloud/aws/aws_env.h b/cloud/aws/aws_env.h index f96780a4cf4..9d6443ff690 100644 --- a/cloud/aws/aws_env.h +++ b/cloud/aws/aws_env.h @@ -49,7 +49,7 @@ class AwsEnv : public CloudEnvImpl { const std::shared_ptr& info_log, CloudEnv** cenv); - virtual ~AwsEnv() {} + virtual ~AwsEnv(); const char* Name() const override { return "aws"; } diff --git a/cloud/aws/aws_retry.cc b/cloud/aws/aws_retry.cc index a9abff2b81f..c2b8d7a05d1 100644 --- a/cloud/aws/aws_retry.cc +++ b/cloud/aws/aws_retry.cc @@ -125,7 +125,7 @@ Status AwsCloudOptions::GetClientConfiguration( // Setup how retries need to be done config->retryStrategy = std::make_shared(env); if (cloud_env_options.request_timeout_ms != 0) { - config->requestTimeoutMs = cloud_env_options.request_timeout_ms; + config->requestTimeoutMs = static_cast(cloud_env_options.request_timeout_ms); } config->region = ToAwsString(region); diff --git a/cloud/cloud_env.cc b/cloud/cloud_env.cc index 5d016a13d55..8353fa82d7a 100644 --- a/cloud/cloud_env.cc +++ b/cloud/cloud_env.cc @@ -1,7 +1,7 @@ // Copyright (c) 2017 Rockset. #ifndef ROCKSDB_LITE -#ifndef _WIN32_WINNT +#ifndef _WIN32 #include #else #include @@ -12,6 +12,7 @@ #include "cloud/cloud_env_wrapper.h" #include "cloud/db_cloud_impl.h" #include "cloud/filename.h" +#include "file/filename.h" #include "port/likely.h" #include "rocksdb/cloud/cloud_log_controller.h" #include "rocksdb/db.h" @@ -59,6 +60,50 @@ void BucketOptions::SetBucketName(const std::string& bucket, } } +static Status IsNormalizedPath(const std::string& path) { + auto pos = path.find_first_of("\\{}[]<>%~#|^\'\""); + if (pos != std::string::npos) { + return Status::InvalidArgument("Illegal character in object path:", path); + } + pos = path.find_first_of("&$@=;:+,?"); + if (pos != std::string::npos) { + return Status::InvalidArgument("Special character in object path: ", path); + } + return Status::OK(); +} + +Status BucketOptions::NormalizeObjectPath(const std::string& path, + std::string* result) { + // Remove the drive if there is one... + auto colon = path.find(':'); + std::string normalized; + if (colon != std::string::npos) { + normalized = path.substr(colon + 1); + } else { + normalized = path; + } + // Replace any "\" with "/" + for (auto pos = normalized.find('\\'); pos != std::string::npos; + pos = normalized.find('\\', pos)) { + normalized[pos] = '/'; + } + // Remove any duplicate markers + normalized = NormalizePath(normalized); + Status s = IsNormalizedPath(normalized); + if (s.ok()) { + *result = normalized; + } + return s; +} + +Status BucketOptions::SetObjectPath(const std::string& object) { + Status s = IsNormalizedPath(object); + if (s.ok()) { + object_ = object; + } + return s; +} + // Initializes the bucket properties void BucketOptions::TEST_Initialize(const std::string& bucket, @@ -70,15 +115,13 @@ void BucketOptions::TEST_Initialize(const std::string& bucket, if (!CloudEnvOptions::GetNameFromEnvironment("ROCKSDB_CLOUD_TEST_BUCKET_NAME", "ROCKSDB_CLOUD_BUCKET_NAME", &bucket_)) { -#ifdef _WIN32_WINNT +#ifdef _WIN32 char user_name[257]; // UNLEN + 1 DWORD dwsize = sizeof(user_name); if (!::GetUserName(user_name, &dwsize)) { bucket_ = bucket_ + "unknown"; } else { - bucket_ = - bucket_ + - std::string(user_name, static_cast(dwsize)); + bucket_ = bucket_ + user_name; } #else bucket_ = bucket + std::to_string(geteuid()); @@ -90,11 +133,16 @@ void BucketOptions::TEST_Initialize(const std::string& bucket, prefix_ = prefix; } name_ = prefix_ + bucket_; - if (!CloudEnvOptions::GetNameFromEnvironment("ROCKSDB_CLOUD_TEST_OBECT_PATH", - "ROCKSDB_CLOUD_OBJECT_PATH", - &object_)) { - object_ = object; + std::string value; + if (CloudEnvOptions::GetNameFromEnvironment("ROCKSDB_CLOUD_TEST_OBECT_PATH", + "ROCKSDB_CLOUD_OBJECT_PATH", + &value)) { + NormalizeObjectPath(value, &value); + } else { + NormalizeObjectPath(object, &value); } + SetObjectPath(value); + if (!CloudEnvOptions::GetNameFromEnvironment( "ROCKSDB_CLOUD_TEST_REGION", "ROCKSDB_CLOUD_REGION", ®ion_)) { region_ = region; @@ -117,15 +165,24 @@ Status CloudEnv::NewAwsEnv( const std::string& dest_cloud_region, const CloudEnvOptions& cloud_options, const std::shared_ptr& logger, CloudEnv** cenv) { CloudEnvOptions options = cloud_options; + Status s; if (!src_cloud_bucket.empty()) options.src_bucket.SetBucketName(src_cloud_bucket); - if (!src_cloud_object.empty()) - options.src_bucket.SetObjectPath(src_cloud_object); + if (!src_cloud_object.empty()) { + s = options.src_bucket.SetObjectPath(src_cloud_object); + if (!s.ok()) { + return s; + } + } if (!src_cloud_region.empty()) options.src_bucket.SetRegion(src_cloud_region); - if (!dest_cloud_bucket.empty()) + if (!dest_cloud_bucket.empty()) options.dest_bucket.SetBucketName(dest_cloud_bucket); - if (!dest_cloud_object.empty()) - options.dest_bucket.SetObjectPath(dest_cloud_object); + if (!dest_cloud_object.empty()) { + s = options.dest_bucket.SetObjectPath(dest_cloud_object); + if (!s.ok()) { + return s; + } + } if (!dest_cloud_region.empty()) options.dest_bucket.SetRegion(dest_cloud_region); return NewAwsEnv(base_env, options, logger, cenv); diff --git a/cloud/cloud_env_impl.cc b/cloud/cloud_env_impl.cc index 1bd3729850a..1d0c10f650a 100644 --- a/cloud/cloud_env_impl.cc +++ b/cloud/cloud_env_impl.cc @@ -848,7 +848,7 @@ std::string CloudEnvImpl::RemapFilename(const std::string& logical_path) const { return logical_path; } auto file_name = basename(logical_path); - uint64_t fileNumber; + uint64_t fileNumber = 0; FileType type; WalFileType walType; if (file_name == "MANIFEST") { diff --git a/cloud/cloud_env_impl.h b/cloud/cloud_env_impl.h index 9cc2bc22daf..d0743ac33f9 100644 --- a/cloud/cloud_env_impl.h +++ b/cloud/cloud_env_impl.h @@ -11,6 +11,13 @@ #include "rocksdb/env.h" #include "rocksdb/status.h" +#ifdef _WIN32 +// Windows API macro interference +#undef DeleteFile +#undef GetCurrentTime +#undef GetFreeSpace +#endif + namespace ROCKSDB_NAMESPACE { class CloudScheduler; class CloudStorageReadableFile; @@ -336,7 +343,9 @@ class CloudEnvImpl : public CloudEnv { bool test_disable_cloud_manifest_{false}; // scratch space in local dir - static constexpr const char* SCRATCH_LOCAL_DIR = "/tmp"; + std::string GetScratchDirectory() const; + std::string GetScratchFile() const; + std::mutex files_to_delete_mutex_; std::chrono::seconds file_deletion_delay_ = std::chrono::hours(1); std::unordered_map files_to_delete_; diff --git a/cloud/cloud_scheduler_test.cc b/cloud/cloud_scheduler_test.cc index a2acd04f159..2b7c67457d6 100644 --- a/cloud/cloud_scheduler_test.cc +++ b/cloud/cloud_scheduler_test.cc @@ -13,16 +13,22 @@ #include #include +#include "rocksdb/env.h" #include "test_util/testharness.h" namespace ROCKSDB_NAMESPACE { class CloudSchedulerTest : public testing::Test { public: - CloudSchedulerTest() { scheduler_ = CloudScheduler::Get(); } + CloudSchedulerTest() { + scheduler_ = CloudScheduler::Get(); + env_ = Env::Default(); + } ~CloudSchedulerTest() {} std::shared_ptr scheduler_; + Env *env_; + void WaitForJobs(const std::vector &jobs, uint32_t delay) { bool running = true; while (running) { @@ -34,7 +40,7 @@ class CloudSchedulerTest : public testing::Test { } } if (running) { - usleep(delay); + env_->SleepForMicroseconds(delay); } } } @@ -89,14 +95,14 @@ TEST_F(CloudSchedulerTest, TestRecurring) { std::chrono::microseconds(100), doJob2, nullptr); while (job2 <= 4) { - usleep(100); + env_->SleepForMicroseconds(100); } ASSERT_GE(job2.load(), 4); ASSERT_GT(job1.load(), job2); ASSERT_TRUE(scheduler_->CancelJob(handle1)); auto old1 = job1.load(); auto old2 = job2.load(); - usleep(200); + env_->SleepForMicroseconds(200); ASSERT_EQ(job1.load(), old1); ASSERT_GT(job2.load(), old2); } @@ -117,7 +123,7 @@ TEST_F(CloudSchedulerTest, TestMultipleSchedulers) { ASSERT_FALSE(scheduler2->CancelJob(handle1)); ASSERT_TRUE(scheduler2->CancelJob(handle2)); ASSERT_FALSE(scheduler2->CancelJob(handle2)); - usleep(200); + env_->SleepForMicroseconds(200); ASSERT_EQ(job1, 2); ASSERT_EQ(job2, 1); @@ -130,7 +136,7 @@ TEST_F(CloudSchedulerTest, TestMultipleSchedulers) { scheduler2.reset(); auto old1 = job1.load(); auto old2 = job2.load(); - usleep(200); + env_->SleepForMicroseconds(200); ASSERT_EQ(job2, old2); ASSERT_GT(job1, old1); } diff --git a/cloud/db_cloud_test.cc b/cloud/db_cloud_test.cc index 31147c83cc2..d25f01d2e45 100644 --- a/cloud/db_cloud_test.cc +++ b/cloud/db_cloud_test.cc @@ -13,6 +13,7 @@ #include "cloud/db_cloud_impl.h" #include "cloud/filename.h" #include "cloud/manifest_reader.h" +#include "file/file_util.h" #include "file/filename.h" #include "logging/logging.h" #include "rocksdb/cloud/cloud_storage_provider.h" @@ -45,7 +46,7 @@ class CloudTest : public testing::Test { persistent_cache_size_gb_ = 0; db_ = nullptr; - DestroyDir(dbname_); + DestroyDir(base_env_, dbname_); base_env_->CreateDirIfMissing(dbname_); base_env_->NewLogger(test::TmpDir(base_env_) + "/rocksdb-cloud.log", &options_.info_log); @@ -62,17 +63,18 @@ class CloudTest : public testing::Test { CloudEnv* aenv; // create a dummy aws env + ASSERT_OK(CloudEnv::NewAwsEnv(base_env_, cloud_env_options_, options_.info_log, &aenv)); ASSERT_NE(aenv, nullptr); aenv_.reset(aenv); // delete all pre-existing contents from the bucket Status st = aenv_->GetStorageProvider()->EmptyBucket( - aenv_->GetSrcBucketName(), dbname_); + aenv_->GetSrcBucketName(), aenv_->GetSrcObjectPath()); ASSERT_TRUE(st.ok() || st.IsNotFound()); aenv_.reset(); - DestroyDir(clone_dir_); + DestroyDir(base_env_, clone_dir_); ASSERT_OK(base_env_->CreateDir(clone_dir_)); } @@ -93,12 +95,6 @@ class CloudTest : public testing::Test { return GetSSTFiles(cname); } - void DestroyDir(const std::string& dir) { - std::string cmd = "rm -rf " + dir; - int rc = system(cmd.c_str()); - ASSERT_EQ(rc, 0); - } - virtual ~CloudTest() { // Cleanup the cloud bucket if (!cloud_env_options_.src_bucket.GetBucketName().empty()) { @@ -107,7 +103,7 @@ class CloudTest : public testing::Test { options_.info_log, &aenv); if (st.ok()) { aenv->GetStorageProvider()->EmptyBucket(aenv->GetSrcBucketName(), - dbname_); + aenv->GetSrcObjectPath()); delete aenv; } } @@ -346,7 +342,7 @@ TEST_F(CloudTest, GetChildrenTest) { ASSERT_OK(db_->Flush(FlushOptions())); CloseDB(); - DestroyDir(dbname_); + DestroyDir(base_env_, dbname_); OpenDB(); std::vector children; @@ -476,7 +472,7 @@ TEST_F(CloudTest, ColumnFamilies) { CloseDB(&handles); // destory local state - DestroyDir(dbname_); + DestroyDir(base_env_, dbname_); // new cloud env CreateCloudEnv(); @@ -637,12 +633,12 @@ TEST_F(CloudTest, KeepLocalFiles) { ASSERT_OK(db_->Flush(FlushOptions())); CloseDB(); - DestroyDir(dbname_); + DestroyDir(base_env_, dbname_); OpenDB(); std::vector files; ASSERT_OK(Env::Default()->GetChildren(dbname_, &files)); - long sst_files = + auto sst_files = std::count_if(files.begin(), files.end(), [](const std::string& file) { return file.find("sst") != std::string::npos; }); @@ -669,14 +665,14 @@ TEST_F(CloudTest, CopyToFromS3) { CreateCloudEnv(); CloudEnvImpl* cimpl = static_cast(aenv_.get()); cimpl->TEST_InitEmptyCloudManifest(); - char buffer[1 * 1024 * 1024]; + char buffer[10 * 1024]; // create a 10 MB file and upload it to cloud { std::unique_ptr writer; ASSERT_OK(aenv_->NewWritableFile(fname, &writer, EnvOptions())); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 1024; i++) { ASSERT_OK(writer->Append(Slice(buffer, sizeof(buffer)))); } // sync and close file @@ -691,7 +687,7 @@ TEST_F(CloudTest, CopyToFromS3) { ASSERT_OK(aenv_->NewRandomAccessFile(fname, &reader, EnvOptions())); uint64_t offset = 0; - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 1024; i++) { Slice result; char* scratch = &buffer[0]; ASSERT_OK(reader->Read(offset, sizeof(buffer), &result, scratch)); @@ -884,8 +880,8 @@ TEST_F(CloudTest, KeepLocalLogKafka) { delete db_; db_ = nullptr; aenv_.reset(); - DestroyDir(dbname_); - DestroyDir("/tmp/ROCKSET"); + DestroyDir(base_env_, dbname_); + DestroyDir(base_env_, "/tmp/ROCKSET"); // Create new env. CreateCloudEnv(); @@ -923,8 +919,8 @@ TEST_F(CloudTest, DISABLED_KeepLocalLogKinesis) { delete db_; db_ = nullptr; aenv_.reset(); - DestroyDir(dbname_); - DestroyDir("/tmp/ROCKSET"); + DestroyDir(base_env_, dbname_); + DestroyDir(base_env_, "/tmp/ROCKSET"); // Create new env. CreateCloudEnv(); @@ -1070,7 +1066,7 @@ TEST_F(CloudTest, TwoConcurrentWriters) { for (int i = 0; i < 5; ++i) { closeDB1(); if (i == 2) { - DestroyDir(firstDB); + DestroyDir(base_env_, firstDB); } // opening the database makes me a master (i.e. CLOUDMANIFEST points to my // manifest), my writes are applied to the shared space! @@ -1082,7 +1078,7 @@ TEST_F(CloudTest, TwoConcurrentWriters) { } closeDB2(); if (i == 2) { - DestroyDir(secondDB); + DestroyDir(base_env_, secondDB); } // opening the database makes me a master (i.e. CLOUDMANIFEST points to my // manifest), my writes are applied to the shared space! @@ -1116,6 +1112,7 @@ TEST_F(CloudTest, TwoConcurrentWriters) { std::string v; ASSERT_TRUE(db1->Get(ReadOptions(), "ShouldNotBeApplied", &v).IsNotFound()); + closeDB1(); } // Creates a pure RocksDB database and makes sure we can migrate to RocksDB @@ -1158,7 +1155,7 @@ TEST_F(CloudTest, MigrateFromPureRocksDB) { // Tests that we can open cloud DB without destination and source bucket set. // This is useful for tests. TEST_F(CloudTest, NoDestOrSrc) { - DestroyDir(dbname_); + DestroyDir(base_env_, dbname_); cloud_env_options_.keep_local_sst_files = true; cloud_env_options_.src_bucket.SetBucketName(""); cloud_env_options_.src_bucket.SetObjectPath(""); @@ -1178,7 +1175,7 @@ TEST_F(CloudTest, NoDestOrSrc) { } TEST_F(CloudTest, PreloadCloudManifest) { - DestroyDir(dbname_); + DestroyDir(base_env_, dbname_); // Put one key-value OpenDB(); std::string value; @@ -1476,7 +1473,7 @@ TEST_F(CloudTest, CheckpointToCloud) { ASSERT_EQ(2, GetSSTFiles(dbname_).size()); CloseDB(); - DestroyDir(dbname_); + DestroyDir(base_env_, dbname_); cloud_env_options_.src_bucket = checkpoint_bucket; @@ -1501,7 +1498,7 @@ TEST_F(CloudTest, CopyObjectTest) { std::string content = "This is a test file"; std::string fname = dbname_ + "/100000.sst"; - std::string dst_fname = dbname_ + "/200000.sst"; + std::string dst_fname = aenv_->GetSrcObjectPath() + "/200000.sst"; { std::unique_ptr writableFile; @@ -1509,9 +1506,9 @@ TEST_F(CloudTest, CopyObjectTest) { writableFile->Append(content); writableFile->Fsync(); } - + auto remapped = basename(aenv_->RemapFilename(fname)); Status st = aenv_->GetStorageProvider()->CopyCloudObject( - aenv_->GetSrcBucketName(), aenv_->RemapFilename(fname), + aenv_->GetSrcBucketName(), aenv_->GetSrcObjectPath() + "/" + remapped, aenv_->GetSrcBucketName(), dst_fname); ASSERT_OK(st); @@ -1608,8 +1605,11 @@ TEST_F(CloudTest, SharedBlockCache) { } // namespace ROCKSDB_NAMESPACE +#include "port/stack_trace.h" + // A black-box test for the cloud wrapper around rocksdb int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/cloud/purge.cc b/cloud/purge.cc index 1298196baaf..4390b07b77c 100644 --- a/cloud/purge.cc +++ b/cloud/purge.cc @@ -4,6 +4,7 @@ #include "cloud/purge.h" #include +#include #include #include "cloud/db_cloud_impl.h" @@ -15,6 +16,7 @@ #include "rocksdb/env.h" #include "rocksdb/options.h" #include "rocksdb/status.h" +#include "util/random.h" namespace ROCKSDB_NAMESPACE { @@ -186,6 +188,38 @@ Status CloudEnvImpl::FindObsoleteDbid( return st; } +std::string CloudEnvImpl::GetScratchDirectory() const { + const char* env = getenv("TMPDIR"); + if (env != nullptr) { + return std::string(env); + } else { +#ifdef _WIN32 + env = getenv("TEMP"); + if (env == nullptr) { + env = getenv("TMP"); + } + if (env != nullptr) { + return std::string(env); + } else { + return "c:\\tmp"; + } +#else + return "/tmp"; +#endif + } +} + +std::string CloudEnvImpl::GetScratchFile() const { + int64_t current_time = 0; + auto s = base_env_->GetCurrentTime(¤t_time); + if (!s.ok()) { + current_time = static_cast(std::time(0)); + } + Random rand(static_cast(current_time)); + const std::string scratch = GetScratchDirectory(); + return TempFileName(scratch, rand.Next()); +} + // // For each of the dbids in the list, extract the entire list of // parent dbids. @@ -193,15 +227,11 @@ Status CloudEnvImpl::extractParents(const std::string& bucket_name_prefix, const DbidList& dbid_list, DbidParents* parents) { const std::string delimiter(DBID_SEPARATOR); - // use current time as seed for random generator - std::srand(static_cast(std::time(0))); - const std::string random = std::to_string(std::rand()); - const std::string scratch(SCRATCH_LOCAL_DIR); + auto localfile = GetScratchFile(); Status st; for (auto iter = dbid_list.begin(); iter != dbid_list.end(); ++iter) { // download IDENTITY std::string cloudfile = iter->second + "/IDENTITY"; - std::string localfile = scratch + "/.rockset_IDENTITY." + random; st = GetStorageProvider()->GetCloudObject(bucket_name_prefix, cloudfile, localfile); if (!st.ok() && !st.IsNotFound()) { diff --git a/include/rocksdb/cloud/cloud_env_options.h b/include/rocksdb/cloud/cloud_env_options.h index 1bf10580b70..ec7360dd99c 100644 --- a/include/rocksdb/cloud/cloud_env_options.h +++ b/include/rocksdb/cloud/cloud_env_options.h @@ -118,6 +118,11 @@ class BucketOptions { std::string region_; // The region for the bucket std::string name_; // The name of the bucket (prefix_ + bucket_) public: + // Attempts to normalize the input path, removing any special or illegal + // characters Returns OK if the path was normalized, non-OK if the path could + // not be normalized + static Status NormalizeObjectPath(const std::string& path, + std::string* normalized); BucketOptions(); // Sets the name of the bucket to be the new bucket name. // If prefix is specified, the new bucket name will be [prefix][bucket] @@ -125,7 +130,7 @@ class BucketOptions { void SetBucketName(const std::string& bucket, const std::string& prefix = ""); const std::string& GetBucketName() const { return name_; } const std::string& GetObjectPath() const { return object_; } - void SetObjectPath(const std::string& object) { object_ = object; } + Status SetObjectPath(const std::string& object); const std::string& GetRegion() const { return region_; } void SetRegion(const std::string& region) { region_ = region; }