Skip to content
Open
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ endif()

if(WITH_AWS)
find_package(AWSSDK REQUIRED COMPONENTS s3 transfer kinesis)
add_definitions(-DUSE_AWS)
add_definitions(-DUSE_AWS -DUSE_CLOUD)
include_directories(${AWS_INCLUDE_DIR})
list(APPEND THIRDPARTY_LIBS ${AWSSDK_LINK_LIBRARIES})
endif()
Expand Down Expand Up @@ -919,6 +919,7 @@ set(SOURCES
cloud/cloud_scheduler.cc
cloud/cloud_storage_provider.cc
cloud/cloud_file_cache.cc
cloud/mock_cloud_storage_provider.cc
db/db_impl/db_impl_remote_compaction.cc
$<TARGET_OBJECTS:build_version>)

Expand Down
8 changes: 8 additions & 0 deletions build_tools/build_detect_platform
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ if [ "${USE_AWS}XXX" = "1XXX" ]; then
S3_LDFLAGS="$S3_LDFLAGS -laws-cpp-sdk-s3 -laws-cpp-sdk-kinesis -laws-cpp-sdk-core -laws-cpp-sdk-transfer"
COMMON_FLAGS="$COMMON_FLAGS $S3_CCFLAGS"
PLATFORM_LDFLAGS="$S3_LDFLAGS $PLATFORM_LDFLAGS"
USE_CLOUD=1
fi
#
# Support the Kafka WAL storing if the env variable named USE_KAFKA
Expand All @@ -647,6 +648,13 @@ fi
if [ "${USE_KAFKA}XXX" = "1XXX" ]; then
COMMON_FLAGS="$COMMON_FLAGS -DUSE_KAFKA"
PLATFORM_LDFLAGS="-lrdkafka++ $PLATFORM_LDFLAGS"
USE_CLOUD=1
fi

if [ "${USE_CLOUD}XXX" = "1XXX" ]; then
COMMON_FLAGS="$COMMON_FLAGS -DUSE_CLOUD"
#TODO: How to only turn on unit tests when appropriate
COMMON_FLAGS="$COMMON_FLAGS -DROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS"
fi

if test "0$PORTABLE" -eq 0; then
Expand Down
6 changes: 2 additions & 4 deletions cloud/aws/aws_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

#include "cloud/cloud_env_impl.h"

#ifdef USE_AWS

#include <string.h>

#include <chrono>
Expand Down Expand Up @@ -49,6 +47,7 @@ class AwsEnv : public CloudEnvImpl {
const std::shared_ptr<Logger>& info_log,
CloudEnv** cenv);
static Status NewAwsEnv(Env* env, std::unique_ptr<CloudEnv>* cenv);
#ifdef USE_AWS
virtual ~AwsEnv() {}

static const char* kName() { return kAws(); }
Expand All @@ -73,8 +72,7 @@ class AwsEnv : public CloudEnvImpl {
//
explicit AwsEnv(Env* underlying_env, const CloudEnvOptions& cloud_options,
const std::shared_ptr<Logger>& info_log = nullptr);
#endif // USE_AWS
};

} // namespace ROCKSDB_NAMESPACE

#endif // USE_AWS
142 changes: 118 additions & 24 deletions cloud/cloud_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cloud/cloud_storage_provider_impl.h"
#include "cloud/db_cloud_impl.h"
#include "cloud/filename.h"
#include "cloud/mock_cloud_storage_provider.h"
#include "options/configurable_helper.h"
#include "options/options_helper.h"
#include "port/likely.h"
Expand Down Expand Up @@ -116,6 +117,21 @@ void BucketOptions::TEST_Initialize(const std::string& bucket,
}
}

static void ParseTestBucket(const std::string& value, std::string* name,
std::string* path, std::string* region) {
*name = value;
auto pos = name->find(":");
if (pos != std::string::npos) {
*path = name->substr(pos + 1);
*name = name->substr(0, pos);
}
pos = path->find("?");
if (pos != std::string::npos) {
*region = path->substr(pos + 1);
*path = path->substr(0, pos);
}
}

static std::unordered_map<std::string, OptionTypeInfo>
bucket_options_type_info = {
{"object",
Expand Down Expand Up @@ -211,16 +227,7 @@ static std::unordered_map<std::string, OptionTypeInfo>
std::string name = value;
std::string path;
std::string region;
auto pos = name.find(":");
if (pos != std::string::npos) {
path = name.substr(pos + 1);
name = name.substr(0, pos);
}
pos = path.find("?");
if (pos != std::string::npos) {
region = path.substr(pos + 1);
path = path.substr(0, pos);
}
ParseTestBucket(value, &name, &path, &region);
bucket->TEST_Initialize(name, path, region);
return Status::OK();
}}},
Expand Down Expand Up @@ -308,16 +315,7 @@ static std::unordered_map<std::string, OptionTypeInfo>
std::string name;
std::string path;
std::string region;
auto pos = value.find(":");
if (pos != std::string::npos) {
name = value.substr(0, pos);
path = value.substr(pos + 1);
}
pos = path.find("?");
if (pos != std::string::npos) {
region = path.substr(pos + 1);
path = path.substr(0, pos);
}
ParseTestBucket(value, &name, &path, &region);
copts->src_bucket.TEST_Initialize(name, path, region);
copts->dest_bucket.TEST_Initialize(name, path, region);
return Status::OK();
Expand Down Expand Up @@ -385,6 +383,7 @@ Status CloudEnv::NewAwsEnv(
return NewAwsEnv(base_env, options, logger, cenv);
}

namespace {
int DoRegisterCloudObjects(ObjectLibrary& library, const std::string& arg) {
int count = 0;
// Register the Env types
Expand All @@ -396,6 +395,15 @@ int DoRegisterCloudObjects(ObjectLibrary& library, const std::string& arg) {
return guard->get();
});
count++;
library.Register<CloudStorageProvider>(
MockCloudStorageProvider::kClassName(),
[](const std::string& /*uri*/,
std::unique_ptr<CloudStorageProvider>* guard,
std::string* /*errmsg*/) {
guard->reset(new MockCloudStorageProvider());
return guard->get();
});
count++;

count += CloudEnvImpl::RegisterAwsObjects(library, arg);

Expand All @@ -416,14 +424,15 @@ int DoRegisterCloudObjects(ObjectLibrary& library, const std::string& arg) {
return count;
}

void CloudEnv::RegisterCloudObjects(const std::string& arg) {
static void RegisterCloudObjects(const std::string& arg = "") {
static std::once_flag do_once;
std::call_once(do_once,
[&]() {
auto library = ObjectLibrary::Default();
DoRegisterCloudObjects(*library, arg);
});
}
}
} // namespace

Status CloudEnv::CreateFromString(const ConfigOptions& config_options, const std::string& value,
std::unique_ptr<CloudEnv>* result) {
Expand Down Expand Up @@ -458,7 +467,6 @@ Status CloudEnv::CreateFromString(const ConfigOptions& config_options, const std
s = cenv->ConfigureFromMap(copy, options);
}
if (s.ok() && config_options.invoke_prepare_options) {
copy.invoke_prepare_options = config_options.invoke_prepare_options;
copy.env = cenv;
s = cenv->PrepareOptions(copy);
if (s.ok()) {
Expand Down Expand Up @@ -538,7 +546,7 @@ Status CloudEnv::NewAwsEnv(Env* /*base_env*/,
Status CloudEnv::NewAwsEnv(Env* base_env, const CloudEnvOptions& options,
const std::shared_ptr<Logger>& logger,
CloudEnv** cenv) {
CloudEnv::RegisterCloudObjects();
RegisterCloudObjects();
// Dump out cloud env options
options.Dump(logger.get());

Expand All @@ -558,4 +566,90 @@ Status CloudEnv::NewAwsEnv(Env* base_env, const CloudEnvOptions& options,
#endif

} // namespace ROCKSDB_NAMESPACE
#ifdef USE_CLOUD
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
static std::string ToTestBucket(const std::string& name,
const std::string& path,
const std::string& region = "") {
std::string result = name;
for (auto pos = result.find("_"); pos != std::string::npos;
pos = result.find("_", pos)) {
result[pos] = '.';
}
if (!path.empty()) {
result.append(":");
result.append(path);
}
if (!region.empty()) {
result.append("?");
result.append(region);
}
return result;
}

static std::string ToTestBucket(const std::string& name) {
// Randomize the test path so that multiple tests can run in parallel
srand(static_cast<unsigned int>(time(nullptr)));
std::string path = name + "_" + std::to_string(rand());
return ToTestBucket(name, "/" + path);
}
extern "C" {
void RegisterCustomObjects(int argc, char** argv) {
std::string test_id = (argc > 0) ? argv[0] : "db_test";
auto slash = test_id.find_last_of("/\\");
if (slash != std::string::npos) {
test_id = test_id.substr(slash + 1);
}

//**TODO: When the Env is a Customizable object and can use options/map, this
//code can go away...
// ... in which case, the RegisterCloudObjects should take in the test_id to
// register for initialize
auto library = ROCKSDB_NAMESPACE::ObjectLibrary::Default();
library->Register<ROCKSDB_NAMESPACE::Env>(
"id=.*", [test_id](const std::string& uri,
std::unique_ptr<ROCKSDB_NAMESPACE::Env>* guard,
std::string* errmsg) {
ROCKSDB_NAMESPACE::ConfigOptions config_options;
std::unique_ptr<ROCKSDB_NAMESPACE::CloudEnv> cguard;
auto s = ROCKSDB_NAMESPACE::CloudEnv::CreateFromString(
config_options, "TEST=" + ToTestBucket(test_id) + "; " + uri,
&cguard);
if (s.ok()) {
auto* cimpl =
static_cast<ROCKSDB_NAMESPACE::CloudEnvImpl*>(cguard.get());
cimpl->TEST_DisableCloudManifest();
cimpl->TEST_SetFileDeletionDelay(std::chrono::seconds(0));
guard->reset(cguard.release());
} else {
*errmsg = s.ToString();
}
return guard->get();
});
library->Register<ROCKSDB_NAMESPACE::Env>(
"provider=.*", [test_id](const std::string& uri,
std::unique_ptr<ROCKSDB_NAMESPACE::Env>* guard,
std::string* errmsg) {
ROCKSDB_NAMESPACE::ConfigOptions config_options;
std::unique_ptr<ROCKSDB_NAMESPACE::CloudEnv> cguard;
auto s = ROCKSDB_NAMESPACE::CloudEnv::CreateFromString(
config_options, "TEST=" + ToTestBucket(test_id) + "; " + uri,
&cguard);
if (s.ok()) {
auto* cimpl =
static_cast<ROCKSDB_NAMESPACE::CloudEnvImpl*>(cguard.get());
cimpl->TEST_DisableCloudManifest();
cimpl->TEST_SetFileDeletionDelay(std::chrono::seconds(0));
guard->reset(cguard.release());
} else {
*errmsg = s.ToString();
}
return guard->get();
});

ROCKSDB_NAMESPACE::RegisterCloudObjects(test_id);
}
}
#endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
#endif // USE_CLOUD
#endif // ROCKSDB_LITE
10 changes: 8 additions & 2 deletions cloud/cloud_env_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "file/writable_file_writer.h"
#include "port/likely.h"
#include "rocksdb/cloud/cloud_log_controller.h"
#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
Expand Down Expand Up @@ -46,6 +47,9 @@ CloudEnvImpl::~CloudEnvImpl() {
files_to_delete_.clear();
}
StopPurger();
// Since scheduled jobs may use CloudEnv members, shutdown the scheduler
// before destruction is complete.
scheduler_.reset();
}

Status CloudEnvImpl::ExistsCloudObject(const std::string& fname) {
Expand Down Expand Up @@ -1951,16 +1955,18 @@ Status CloudEnvImpl::PrepareOptions(const ConfigOptions& options) {
if (!base_env_) {
base_env_ = Env::Default();
}
ConfigOptions copy = options;
copy.env = this;
Status status;
if (!cloud_env_options.cloud_log_controller &&
!cloud_env_options.keep_local_log_files) {
if (cloud_env_options.log_type == LogType::kLogKinesis) {
status = CloudLogController::CreateFromString(
options, CloudLogControllerImpl::kKinesis(),
copy, CloudLogControllerImpl::kKinesis(),
&cloud_env_options.cloud_log_controller);
} else if (cloud_env_options.log_type == LogType::kLogKafka) {
status = CloudLogController::CreateFromString(
options, CloudLogControllerImpl::kKafka(),
copy, CloudLogControllerImpl::kKafka(),
&cloud_env_options.cloud_log_controller);
} else {
status = Status::NotSupported("Unsupported log controller type");
Expand Down
Loading