Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,211 changes: 186 additions & 1,025 deletions cloud/aws/aws_env.cc

Large diffs are not rendered by default.

169 changes: 15 additions & 154 deletions cloud/aws/aws_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,66 +9,15 @@
#include "cloud/cloud_env_impl.h"
#include "port/sys_time.h"

#ifdef USE_AWS

#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentialsProvider.h>
#include <aws/core/utils/Outcome.h>
#include <aws/kinesis/KinesisClient.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/BucketLocationConstraint.h>
#include <aws/transfer/TransferManager.h>

#include <chrono>
#include <list>
#include <unordered_map>

namespace rocksdb {
class ObjectLibrary;

class S3ReadableFile;

class AwsS3ClientWrapper {
public:
AwsS3ClientWrapper(
std::shared_ptr<Aws::S3::S3Client> client,
std::shared_ptr<CloudRequestCallback> cloud_request_callback);

Aws::S3::Model::ListObjectsOutcome ListObjects(
const Aws::S3::Model::ListObjectsRequest& request);

Aws::S3::Model::CreateBucketOutcome CreateBucket(
const Aws::S3::Model::CreateBucketRequest& request);

Aws::S3::Model::HeadBucketOutcome HeadBucket(
const Aws::S3::Model::HeadBucketRequest& request);

Aws::S3::Model::DeleteObjectOutcome DeleteObject(
const Aws::S3::Model::DeleteObjectRequest& request);

Aws::S3::Model::CopyObjectOutcome CopyObject(
const Aws::S3::Model::CopyObjectRequest& request);

Aws::S3::Model::GetObjectOutcome GetObject(
const Aws::S3::Model::GetObjectRequest& request);

Aws::S3::Model::PutObjectOutcome PutObject(
const Aws::S3::Model::PutObjectRequest& request, uint64_t size_hint = 0);

Aws::S3::Model::HeadObjectOutcome HeadObject(
const Aws::S3::Model::HeadObjectRequest& request);

const std::shared_ptr<Aws::S3::S3Client>& GetClient() const {
return client_;
}

private:
std::shared_ptr<Aws::S3::S3Client> client_;
std::shared_ptr<CloudRequestCallback> cloud_request_callback_;
};

namespace detail {
struct JobHandle;
} // namespace detail
#ifdef USE_AWS

//
// The S3 environment for rocksdb. This class overrides all the
Expand Down Expand Up @@ -99,14 +48,19 @@ class AwsEnv : public CloudEnvImpl {
static Status NewAwsEnv(Env* env,
const CloudEnvOptions& env_options,
const std::shared_ptr<Logger> & info_log, CloudEnv** cenv);

static Status CreateS3StorageProvider(std::unique_ptr<CloudStorageProvider>* provider);
static Status CreateKinesisController(std::unique_ptr<CloudLogController>* controller);
explicit AwsEnv(Env* underlying_env,
const CloudEnvOptions& cloud_options,
const std::shared_ptr<Logger>& logger);
virtual ~AwsEnv();
const char *Name() const override { return "AWS"; }

// We cannot invoke Aws::ShutdownAPI from the destructor because there could
// be
// multiple AwsEnv's ceated by a process and Aws::ShutdownAPI should be called
// only once by the entire process when all AwsEnvs are destroyed.
static void Shutdown() { Aws::ShutdownAPI(Aws::SDKOptions()); }
static void Shutdown();

// If you do not specify a region, then S3 buckets are created in the
// standard-region which might not satisfy read-your-own-writes. So,
Expand Down Expand Up @@ -230,16 +184,8 @@ class AwsEnv : public CloudEnvImpl {

virtual uint64_t GetThreadID() const override { return AwsEnv::gettid(); }

virtual Status EmptyBucket(const std::string& bucket,
const std::string& path) override;

std::string GetWALCacheDir();

// The S3 client
std::shared_ptr<AwsS3ClientWrapper> s3client_;

// AWS's utility to help out with uploading and downloading S3 file
std::shared_ptr<Aws::Transfer::TransferManager> awsTransferManager_;

// Saves and retrieves the dbid->dirname mapping in S3
Status SaveDbid(const std::string& bucket_name, const std::string& dbid,
Expand All @@ -250,105 +196,18 @@ class AwsEnv : public CloudEnvImpl {
DbidList* dblist) override;
Status DeleteDbid(const std::string& bucket,
const std::string& dbid) override;
Status ListObjects(const std::string& bucket_name,
const std::string& bucket_object,
BucketObjectMetadata* meta) override;
Status DeleteObject(const std::string& bucket_name,
const std::string& bucket_object_path) override;
Status ExistsObject(const std::string& bucket_name,
const std::string& bucket_object_path) override;
Status GetObjectSize(const std::string& bucket_name,
const std::string& bucket_object_path,
uint64_t* filesize) override;
Status CopyObject(const std::string& bucket_name_src,
const std::string& bucket_object_path_src,
const std::string& bucket_name_dest,
const std::string& bucket_object_path_dest) override;
Status GetObject(const std::string& bucket_name,
const std::string& bucket_object_path,
const std::string& local_path) override;
Status PutObject(const std::string& local_path,
const std::string& bucket_name,
const std::string& bucket_object_path) override;
Status DeleteCloudFileFromDest(const std::string& fname) override;

void RemoveFileFromDeletionQueue(const std::string& filename);

void TEST_SetFileDeletionDelay(std::chrono::seconds delay) {
std::lock_guard<std::mutex> lk(files_to_delete_mutex_);
file_deletion_delay_ = delay;
}

Status Prepare() override;
protected:
private:
//
// The AWS credentials are specified to the constructor via
// access_key_id and secret_key.
//
explicit AwsEnv(Env* underlying_env,
const CloudEnvOptions& cloud_options,
const std::shared_ptr<Logger> & info_log = nullptr);

struct GetObjectResult {
bool success{false};
Aws::Client::AWSError<Aws::S3::S3Errors> error; // if success == false
size_t objectSize{0};
};

GetObjectResult DoGetObject(const Aws::String& bucket, const Aws::String& key,
const std::string& destination);
GetObjectResult DoGetObjectWithTransferManager(
const Aws::String& bucket, const Aws::String& key,
const std::string& destination);


struct PutObjectResult {
bool success{false};
Aws::Client::AWSError<Aws::S3::S3Errors> error; // if success == false
};

PutObjectResult DoPutObject(const std::string& filename,
const Aws::String& bucket, const Aws::String& key,
uint64_t sizeHint);

PutObjectResult DoPutObjectWithTransferManager(const std::string& filename,
const Aws::String& bucket,
const Aws::String& key,
uint64_t sizeHint);

// The pathname that contains a list of all db's inside a bucket.
static constexpr const char* dbid_registry_ = "/.rockset/dbid/";

Status create_bucket_status_;

std::mutex files_to_delete_mutex_;
std::chrono::seconds file_deletion_delay_ = std::chrono::hours(1);
std::unordered_map<std::string, std::shared_ptr<detail::JobHandle>>
files_to_delete_;

Aws::S3::Model::BucketLocationConstraint bucket_location_;

Status status();

// Delete the specified path from S3
Status DeletePathInS3(const std::string& bucket,
const std::string& fname);

// Validate options
Status CheckOption(const EnvOptions& options);

// Return the list of children of the specified path
Status GetChildrenFromS3(const std::string& path,
const std::string& bucket,
std::vector<std::string>* result);

// If metadata, size or modtime is non-nullptr, returns requested data
Status HeadObject(const std::string& bucket, const std::string& path,
Aws::Map<Aws::String, Aws::String>* metadata = nullptr,
uint64_t* size = nullptr, uint64_t* modtime = nullptr);

Status NewS3ReadableFile(const std::string& bucket, const std::string& fname,
std::unique_ptr<S3ReadableFile>* result);

// Save IDENTITY file to S3. Update dbid registry.
Status SaveIdentitytoS3(const std::string& localfile,
const std::string& target_idfile);
Expand All @@ -359,7 +218,9 @@ class AwsEnv : public CloudEnvImpl {
// Converts a local pathname to an object name in the dest bucket
std::string destname(const std::string& localname);
};
#endif // USE_AWS

extern "C" {
void RegisterAwsObjects(ObjectLibrary& library, const std::string& arg);
} // extern "C"
} // namespace rocksdb

#endif // USE_AWS
140 changes: 0 additions & 140 deletions cloud/aws/aws_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,153 +3,13 @@
#pragma once
#ifdef USE_AWS

#include <chrono>
#include <fstream>
#include <iostream>
#include "cloud/aws/aws_env.h"
#include "cloud/filename.h"
#include "file/filename.h"
#include "rocksdb/env.h"
#include "rocksdb/status.h"

#include <aws/core/Aws.h>
#include <aws/core/utils/DateTime.h>
#include <aws/core/utils/Outcome.h>
#include <aws/core/utils/crypto/CryptoStream.h>
#include <aws/core/utils/memory/stl/AWSStreamFwd.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/S3Errors.h>
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/CopyObjectResult.h>
#include <aws/s3/model/CreateBucketConfiguration.h>
#include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/CreateBucketResult.h>
#include <aws/s3/model/DeleteBucketRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectResult.h>
#include <aws/s3/model/GetBucketVersioningRequest.h>
#include <aws/s3/model/GetBucketVersioningResult.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/GetObjectResult.h>
#include <aws/s3/model/HeadBucketRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/HeadObjectResult.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <aws/s3/model/ListObjectsResult.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/PutObjectResult.h>
#include <aws/s3/model/ServerSideEncryption.h>

namespace rocksdb {
inline Aws::String ToAwsString(const std::string& s) {
return Aws::String(s.data(), s.size());
}

class S3ReadableFile : virtual public SequentialFile,
virtual public RandomAccessFile {
public:
S3ReadableFile(AwsEnv* env, const std::string& bucket_prefix,
const std::string& fname, uint64_t size);

// sequential access, read data at current offset in file
virtual Status Read(size_t n, Slice* result, char* scratch) override;

// random access, read data from specified offset in file
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override;

virtual Status Skip(uint64_t n) override;

virtual size_t GetUniqueId(char* id, size_t max_size) const override;

private:
AwsEnv* env_;
std::string fname_;
Aws::String s3_bucket_;
Aws::String s3_object_;
uint64_t offset_;
uint64_t file_size_;
};

// Appends to a file in S3.
class S3WritableFile : public WritableFile {
private:
AwsEnv* env_;
std::string fname_;
std::string tmp_file_;
Status status_;
std::unique_ptr<WritableFile> local_file_;
std::string bucket_prefix_;
std::string cloud_fname_;
bool is_manifest_;

public:
// create S3 bucket
static Status CreateBucketInS3(
std::shared_ptr<AwsS3ClientWrapper> client,
const std::string& bucket_prefix,
const Aws::S3::Model::BucketLocationConstraint& location);

// bucket exists and we can access it
static Status BucketExistsInS3(
std::shared_ptr<AwsS3ClientWrapper> client,
const std::string& bucket_prefix,
const Aws::S3::Model::BucketLocationConstraint& location);

S3WritableFile(AwsEnv* env, const std::string& local_fname,
const std::string& bucket_prefix,
const std::string& cloud_fname, const EnvOptions& options,
const CloudEnvOptions cloud_env_options);

virtual ~S3WritableFile();

virtual Status Append(const Slice& data) override {
assert(status_.ok());
// write to temporary file
return local_file_->Append(data);
}

Status PositionedAppend(const Slice& data, uint64_t offset) override {
return local_file_->PositionedAppend(data, offset);
}
Status Truncate(uint64_t size) override {
return local_file_->Truncate(size);
}
Status Fsync() override { return local_file_->Fsync(); }
bool IsSyncThreadSafe() const override {
return local_file_->IsSyncThreadSafe();
}
bool use_direct_io() const override { return local_file_->use_direct_io(); }
size_t GetRequiredBufferAlignment() const override {
return local_file_->GetRequiredBufferAlignment();
}
uint64_t GetFileSize() override { return local_file_->GetFileSize(); }
size_t GetUniqueId(char* id, size_t max_size) const override {
return local_file_->GetUniqueId(id, max_size);
}
Status InvalidateCache(size_t offset, size_t length) override {
return local_file_->InvalidateCache(offset, length);
}
Status RangeSync(uint64_t offset, uint64_t nbytes) override {
return local_file_->RangeSync(offset, nbytes);
}
Status Allocate(uint64_t offset, uint64_t len) override {
return local_file_->Allocate(offset, len);
}

virtual Status Flush() override {
assert(status_.ok());
return local_file_->Flush();
}

virtual Status Sync() override;

virtual Status status() { return status_; }

virtual Status Close() override;
};

} // namepace rocksdb

#endif /* USE_AWS */
Loading