Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions cloud/aws/aws_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ AwsAccessType AwsCloudAccessCredentials::GetAccessType() const {
return AwsAccessType::kConfig;
} else if (!access_key_id.empty() || !secret_key.empty()) {
return AwsAccessType::kSimple;
} else if (getenv("AWS_ACCESS_KEY_ID") != nullptr &&
getenv("AWS_SECRET_ACCESS_KEY") != nullptr) {
return AwsAccessType::kEnvironment;
}
return AwsAccessType::kUndefined;
}
Expand Down Expand Up @@ -161,6 +164,7 @@ Status AwsCloudAccessCredentials::GetCredentialsProvider(
}

#ifdef USE_AWS
static Aws::SDKOptions sdkOptions;

//
// The AWS credentials are specified to the constructor via
Expand All @@ -169,7 +173,8 @@ Status AwsCloudAccessCredentials::GetCredentialsProvider(
AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
const std::shared_ptr<Logger>& info_log)
: CloudEnvImpl(_cloud_env_options, underlying_env, info_log) {
Aws::InitAPI(Aws::SDKOptions());
Aws::InitAPI(sdkOptions); //**TODO: Move this into PrepareOptions and do it
// conditionally (first time)
if (cloud_env_options.src_bucket.GetRegion().empty() ||
cloud_env_options.dest_bucket.GetRegion().empty()) {
std::string region;
Expand All @@ -187,7 +192,11 @@ AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
base_env_ = underlying_env;
}

void AwsEnv::Shutdown() { Aws::ShutdownAPI(Aws::SDKOptions()); }
AwsEnv::~AwsEnv() {
//**TODO: Conditionally call shutdown (or make shutdown conditional on last...
}

void AwsEnv::Shutdown() { Aws::ShutdownAPI(sdkOptions); }


// The factory method for creating an S3 Env
Expand Down
2 changes: 1 addition & 1 deletion cloud/aws/aws_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class AwsEnv : public CloudEnvImpl {
const std::shared_ptr<Logger>& info_log,
CloudEnv** cenv);

virtual ~AwsEnv() {}
virtual ~AwsEnv();

const char* Name() const override { return "aws"; }

Expand Down
2 changes: 1 addition & 1 deletion cloud/aws/aws_retry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ Status AwsCloudOptions::GetClientConfiguration(
// Setup how retries need to be done
config->retryStrategy = std::make_shared<AwsRetryStrategy>(env);
if (cloud_env_options.request_timeout_ms != 0) {
config->requestTimeoutMs = cloud_env_options.request_timeout_ms;
config->requestTimeoutMs = static_cast<long>(cloud_env_options.request_timeout_ms);
}

config->region = ToAwsString(region);
Expand Down
37 changes: 28 additions & 9 deletions cloud/cloud_env.cc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2017 Rockset.
#ifndef ROCKSDB_LITE

#ifndef _WIN32_WINNT
#ifndef _WIN32
#include <unistd.h>
#else
#include <windows.h>
Expand All @@ -12,6 +12,7 @@
#include "cloud/cloud_env_wrapper.h"
#include "cloud/db_cloud_impl.h"
#include "cloud/filename.h"
#include "file/filename.h"
#include "port/likely.h"
#include "rocksdb/cloud/cloud_log_controller.h"
#include "rocksdb/db.h"
Expand Down Expand Up @@ -59,6 +60,23 @@ void BucketOptions::SetBucketName(const std::string& bucket,
}
}

void BucketOptions::SetObjectPath(const std::string& object) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I ask because we can likely offload this to filesystem library)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The object path is the "directory" under which files are stored in the cloud. Before this change, we stored the input name -- which is typically a directory path -- as-is as the object path, but this fails with Windows-style paths.

Since this object path ends up as part of the URL for the bucket, the path must follow URL syntax. So the object path cannot have "" or ":" in it. This method implements the work to "standardize" the path to something that follows URL syntax.

If there is a means to get something from the filesystem library, I am not sure what it is.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just require object path to be URL-compatible? It's pretty clear this method is used to configure the path in the cloud, why are we feeding it local directory path with C:\\? We should either take what the user gives us verbatim or return an error if the path is not supported. Transparently changing the value here is weird.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do not transform it, then this method must return a Status to say that it is malformed, which is doable. The places that initialize the object path from dbname will break on Windows. We would need to provide another function potentially to do the transformation.

If that is what you prefer, then it can be done and I will make the corresponding changes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The places that initialize the object path from dbname will break on Windows.

Where does that happen? Is it only in tests? If it's only in tests we can have a function that transforms the path before calling SetObjectPath

If that is what you prefer, then it can be done and I will make the corresponding changes.

Yeah that plan sounds good.

// Remove the drive if there is one...
auto colon = object.find(':');
if (colon != std::string::npos) {
object_ = object.substr(colon + 1);
} else {
object_ = object;
}
// Replace any "\" with "/"
for (auto pos = object_.find('\\'); pos != std::string::npos;
pos = object_.find('\\', pos)) {
object_[pos] = '/';
}
// Remove any duplicate markers
object_ = NormalizePath(object_);
}

// Initializes the bucket properties

void BucketOptions::TEST_Initialize(const std::string& bucket,
Expand All @@ -70,15 +88,13 @@ void BucketOptions::TEST_Initialize(const std::string& bucket,
if (!CloudEnvOptions::GetNameFromEnvironment("ROCKSDB_CLOUD_TEST_BUCKET_NAME",
"ROCKSDB_CLOUD_BUCKET_NAME",
&bucket_)) {
#ifdef _WIN32_WINNT
#ifdef _WIN32
char user_name[257]; // UNLEN + 1
DWORD dwsize = sizeof(user_name);
if (!::GetUserName(user_name, &dwsize)) {
bucket_ = bucket_ + "unknown";
} else {
bucket_ =
bucket_ +
std::string(user_name, static_cast<std::string::size_type>(dwsize));
bucket_ = bucket_ + user_name;
}
#else
bucket_ = bucket + std::to_string(geteuid());
Expand All @@ -90,10 +106,13 @@ void BucketOptions::TEST_Initialize(const std::string& bucket,
prefix_ = prefix;
}
name_ = prefix_ + bucket_;
if (!CloudEnvOptions::GetNameFromEnvironment("ROCKSDB_CLOUD_TEST_OBECT_PATH",
"ROCKSDB_CLOUD_OBJECT_PATH",
&object_)) {
object_ = object;
std::string value;
if (CloudEnvOptions::GetNameFromEnvironment("ROCKSDB_CLOUD_TEST_OBECT_PATH",
"ROCKSDB_CLOUD_OBJECT_PATH",
&value)) {
SetObjectPath(value);
} else {
SetObjectPath(object);
}
if (!CloudEnvOptions::GetNameFromEnvironment(
"ROCKSDB_CLOUD_TEST_REGION", "ROCKSDB_CLOUD_REGION", &region_)) {
Expand Down
2 changes: 1 addition & 1 deletion cloud/cloud_env_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ std::string CloudEnvImpl::RemapFilename(const std::string& logical_path) const {
return logical_path;
}
auto file_name = basename(logical_path);
uint64_t fileNumber;
uint64_t fileNumber = 0;
FileType type;
WalFileType walType;
if (file_name == "MANIFEST") {
Expand Down
11 changes: 10 additions & 1 deletion cloud/cloud_env_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
#include "rocksdb/env.h"
#include "rocksdb/status.h"

#ifdef _WIN32
// Windows API macro interference
#undef DeleteFile
#undef GetCurrentTime
#undef GetFreeSpace
#endif

namespace ROCKSDB_NAMESPACE {
class CloudScheduler;
class CloudStorageReadableFile;
Expand Down Expand Up @@ -336,7 +343,9 @@ class CloudEnvImpl : public CloudEnv {
bool test_disable_cloud_manifest_{false};

// scratch space in local dir
static constexpr const char* SCRATCH_LOCAL_DIR = "/tmp";
std::string GetScratchDirectory() const;
std::string GetScratchFile() const;

std::mutex files_to_delete_mutex_;
std::chrono::seconds file_deletion_delay_ = std::chrono::hours(1);
std::unordered_map<std::string, int> files_to_delete_;
Expand Down
18 changes: 12 additions & 6 deletions cloud/cloud_scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,22 @@
#include <thread>
#include <unordered_set>

#include "rocksdb/env.h"
#include "test_util/testharness.h"

namespace ROCKSDB_NAMESPACE {

class CloudSchedulerTest : public testing::Test {
public:
CloudSchedulerTest() { scheduler_ = CloudScheduler::Get(); }
CloudSchedulerTest() {
scheduler_ = CloudScheduler::Get();
env_ = Env::Default();
}
~CloudSchedulerTest() {}

std::shared_ptr<CloudScheduler> scheduler_;
Env *env_;

void WaitForJobs(const std::vector<long> &jobs, uint32_t delay) {
bool running = true;
while (running) {
Expand All @@ -34,7 +40,7 @@ class CloudSchedulerTest : public testing::Test {
}
}
if (running) {
usleep(delay);
env_->SleepForMicroseconds(delay);
}
}
}
Expand Down Expand Up @@ -89,14 +95,14 @@ TEST_F(CloudSchedulerTest, TestRecurring) {
std::chrono::microseconds(100), doJob2,
nullptr);
while (job2 <= 4) {
usleep(100);
env_->SleepForMicroseconds(100);
}
ASSERT_GE(job2.load(), 4);
ASSERT_GT(job1.load(), job2);
ASSERT_TRUE(scheduler_->CancelJob(handle1));
auto old1 = job1.load();
auto old2 = job2.load();
usleep(200);
env_->SleepForMicroseconds(200);
ASSERT_EQ(job1.load(), old1);
ASSERT_GT(job2.load(), old2);
}
Expand All @@ -117,7 +123,7 @@ TEST_F(CloudSchedulerTest, TestMultipleSchedulers) {
ASSERT_FALSE(scheduler2->CancelJob(handle1));
ASSERT_TRUE(scheduler2->CancelJob(handle2));
ASSERT_FALSE(scheduler2->CancelJob(handle2));
usleep(200);
env_->SleepForMicroseconds(200);
ASSERT_EQ(job1, 2);
ASSERT_EQ(job2, 1);

Expand All @@ -130,7 +136,7 @@ TEST_F(CloudSchedulerTest, TestMultipleSchedulers) {
scheduler2.reset();
auto old1 = job1.load();
auto old2 = job2.load();
usleep(200);
env_->SleepForMicroseconds(200);
ASSERT_EQ(job2, old2);
ASSERT_GT(job1, old1);
}
Expand Down
Loading