Skip to content

Commit 148c729

Browse files
committed
Add a CloudStorageProvider class to CloudEnv
This change moves the functionality for dealing with S3-like bucket objects into a StorageProvider class. The CloudEnvOptions now has a CloudStorageProvider instance. Additionally, the code was "cleaned up" to allow it to more easily be written as "Configurable" and "Customizable". This basically means that the construction of objects was separated from the configuration of objects was separated from the verification/preparation of objects. By separating these three steps, the objects (AwsEnv, S3StorageProvider, Kinesis and Kafka LogControllers) will be easily adapted to loading from property/INI files.
1 parent 97d69b7 commit 148c729

25 files changed

+2494
-2200
lines changed

cloud/aws/aws_env.cc

Lines changed: 186 additions & 1025 deletions
Large diffs are not rendered by default.

cloud/aws/aws_env.h

Lines changed: 15 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -9,66 +9,15 @@
99
#include "cloud/cloud_env_impl.h"
1010
#include "port/sys_time.h"
1111

12-
#ifdef USE_AWS
13-
14-
#include <aws/core/Aws.h>
15-
#include <aws/core/auth/AWSCredentialsProvider.h>
16-
#include <aws/core/utils/Outcome.h>
17-
#include <aws/kinesis/KinesisClient.h>
18-
#include <aws/s3/S3Client.h>
19-
#include <aws/s3/model/BucketLocationConstraint.h>
20-
#include <aws/transfer/TransferManager.h>
2112

2213
#include <chrono>
2314
#include <list>
2415
#include <unordered_map>
2516

2617
namespace rocksdb {
18+
class ObjectLibrary;
2719

28-
class S3ReadableFile;
29-
30-
class AwsS3ClientWrapper {
31-
public:
32-
AwsS3ClientWrapper(
33-
std::shared_ptr<Aws::S3::S3Client> client,
34-
std::shared_ptr<CloudRequestCallback> cloud_request_callback);
35-
36-
Aws::S3::Model::ListObjectsOutcome ListObjects(
37-
const Aws::S3::Model::ListObjectsRequest& request);
38-
39-
Aws::S3::Model::CreateBucketOutcome CreateBucket(
40-
const Aws::S3::Model::CreateBucketRequest& request);
41-
42-
Aws::S3::Model::HeadBucketOutcome HeadBucket(
43-
const Aws::S3::Model::HeadBucketRequest& request);
44-
45-
Aws::S3::Model::DeleteObjectOutcome DeleteObject(
46-
const Aws::S3::Model::DeleteObjectRequest& request);
47-
48-
Aws::S3::Model::CopyObjectOutcome CopyObject(
49-
const Aws::S3::Model::CopyObjectRequest& request);
50-
51-
Aws::S3::Model::GetObjectOutcome GetObject(
52-
const Aws::S3::Model::GetObjectRequest& request);
53-
54-
Aws::S3::Model::PutObjectOutcome PutObject(
55-
const Aws::S3::Model::PutObjectRequest& request, uint64_t size_hint = 0);
56-
57-
Aws::S3::Model::HeadObjectOutcome HeadObject(
58-
const Aws::S3::Model::HeadObjectRequest& request);
59-
60-
const std::shared_ptr<Aws::S3::S3Client>& GetClient() const {
61-
return client_;
62-
}
63-
64-
private:
65-
std::shared_ptr<Aws::S3::S3Client> client_;
66-
std::shared_ptr<CloudRequestCallback> cloud_request_callback_;
67-
};
68-
69-
namespace detail {
70-
struct JobHandle;
71-
} // namespace detail
20+
#ifdef USE_AWS
7221

7322
//
7423
// The S3 environment for rocksdb. This class overrides all the
@@ -99,14 +48,19 @@ class AwsEnv : public CloudEnvImpl {
9948
static Status NewAwsEnv(Env* env,
10049
const CloudEnvOptions& env_options,
10150
const std::shared_ptr<Logger> & info_log, CloudEnv** cenv);
102-
51+
static Status CreateS3StorageProvider(std::unique_ptr<CloudStorageProvider>* provider);
52+
static Status CreateKinesisController(std::unique_ptr<CloudLogController>* controller);
53+
explicit AwsEnv(Env* underlying_env,
54+
const CloudEnvOptions& cloud_options,
55+
const std::shared_ptr<Logger>& logger);
10356
virtual ~AwsEnv();
57+
const char *Name() const override { return "AWS"; }
10458

10559
// We cannot invoke Aws::ShutdownAPI from the destructor because there could
10660
// be
10761
// multiple AwsEnv's ceated by a process and Aws::ShutdownAPI should be called
10862
// only once by the entire process when all AwsEnvs are destroyed.
109-
static void Shutdown() { Aws::ShutdownAPI(Aws::SDKOptions()); }
63+
static void Shutdown();
11064

11165
// If you do not specify a region, then S3 buckets are created in the
11266
// standard-region which might not satisfy read-your-own-writes. So,
@@ -230,16 +184,8 @@ class AwsEnv : public CloudEnvImpl {
230184

231185
virtual uint64_t GetThreadID() const override { return AwsEnv::gettid(); }
232186

233-
virtual Status EmptyBucket(const std::string& bucket,
234-
const std::string& path) override;
235-
236187
std::string GetWALCacheDir();
237188

238-
// The S3 client
239-
std::shared_ptr<AwsS3ClientWrapper> s3client_;
240-
241-
// AWS's utility to help out with uploading and downloading S3 file
242-
std::shared_ptr<Aws::Transfer::TransferManager> awsTransferManager_;
243189

244190
// Saves and retrieves the dbid->dirname mapping in S3
245191
Status SaveDbid(const std::string& bucket_name, const std::string& dbid,
@@ -250,105 +196,18 @@ class AwsEnv : public CloudEnvImpl {
250196
DbidList* dblist) override;
251197
Status DeleteDbid(const std::string& bucket,
252198
const std::string& dbid) override;
253-
Status ListObjects(const std::string& bucket_name,
254-
const std::string& bucket_object,
255-
BucketObjectMetadata* meta) override;
256-
Status DeleteObject(const std::string& bucket_name,
257-
const std::string& bucket_object_path) override;
258-
Status ExistsObject(const std::string& bucket_name,
259-
const std::string& bucket_object_path) override;
260-
Status GetObjectSize(const std::string& bucket_name,
261-
const std::string& bucket_object_path,
262-
uint64_t* filesize) override;
263-
Status CopyObject(const std::string& bucket_name_src,
264-
const std::string& bucket_object_path_src,
265-
const std::string& bucket_name_dest,
266-
const std::string& bucket_object_path_dest) override;
267-
Status GetObject(const std::string& bucket_name,
268-
const std::string& bucket_object_path,
269-
const std::string& local_path) override;
270-
Status PutObject(const std::string& local_path,
271-
const std::string& bucket_name,
272-
const std::string& bucket_object_path) override;
273-
Status DeleteCloudFileFromDest(const std::string& fname) override;
274-
275-
void RemoveFileFromDeletionQueue(const std::string& filename);
276-
277-
void TEST_SetFileDeletionDelay(std::chrono::seconds delay) {
278-
std::lock_guard<std::mutex> lk(files_to_delete_mutex_);
279-
file_deletion_delay_ = delay;
280-
}
281-
199+
Status Prepare() override;
200+
protected:
282201
private:
283-
//
284-
// The AWS credentials are specified to the constructor via
285-
// access_key_id and secret_key.
286-
//
287-
explicit AwsEnv(Env* underlying_env,
288-
const CloudEnvOptions& cloud_options,
289-
const std::shared_ptr<Logger> & info_log = nullptr);
290-
291-
struct GetObjectResult {
292-
bool success{false};
293-
Aws::Client::AWSError<Aws::S3::S3Errors> error; // if success == false
294-
size_t objectSize{0};
295-
};
296-
297-
GetObjectResult DoGetObject(const Aws::String& bucket, const Aws::String& key,
298-
const std::string& destination);
299-
GetObjectResult DoGetObjectWithTransferManager(
300-
const Aws::String& bucket, const Aws::String& key,
301-
const std::string& destination);
302-
303-
304-
struct PutObjectResult {
305-
bool success{false};
306-
Aws::Client::AWSError<Aws::S3::S3Errors> error; // if success == false
307-
};
308-
309-
PutObjectResult DoPutObject(const std::string& filename,
310-
const Aws::String& bucket, const Aws::String& key,
311-
uint64_t sizeHint);
312-
313-
PutObjectResult DoPutObjectWithTransferManager(const std::string& filename,
314-
const Aws::String& bucket,
315-
const Aws::String& key,
316-
uint64_t sizeHint);
317-
318202
// The pathname that contains a list of all db's inside a bucket.
319203
static constexpr const char* dbid_registry_ = "/.rockset/dbid/";
320204

321205
Status create_bucket_status_;
322-
323-
std::mutex files_to_delete_mutex_;
324-
std::chrono::seconds file_deletion_delay_ = std::chrono::hours(1);
325-
std::unordered_map<std::string, std::shared_ptr<detail::JobHandle>>
326-
files_to_delete_;
327-
328-
Aws::S3::Model::BucketLocationConstraint bucket_location_;
329-
330206
Status status();
331207

332-
// Delete the specified path from S3
333-
Status DeletePathInS3(const std::string& bucket,
334-
const std::string& fname);
335-
336208
// Validate options
337209
Status CheckOption(const EnvOptions& options);
338210

339-
// Return the list of children of the specified path
340-
Status GetChildrenFromS3(const std::string& path,
341-
const std::string& bucket,
342-
std::vector<std::string>* result);
343-
344-
// If metadata, size or modtime is non-nullptr, returns requested data
345-
Status HeadObject(const std::string& bucket, const std::string& path,
346-
Aws::Map<Aws::String, Aws::String>* metadata = nullptr,
347-
uint64_t* size = nullptr, uint64_t* modtime = nullptr);
348-
349-
Status NewS3ReadableFile(const std::string& bucket, const std::string& fname,
350-
std::unique_ptr<S3ReadableFile>* result);
351-
352211
// Save IDENTITY file to S3. Update dbid registry.
353212
Status SaveIdentitytoS3(const std::string& localfile,
354213
const std::string& target_idfile);
@@ -359,7 +218,9 @@ class AwsEnv : public CloudEnvImpl {
359218
// Converts a local pathname to an object name in the dest bucket
360219
std::string destname(const std::string& localname);
361220
};
221+
#endif // USE_AWS
362222

223+
extern "C" {
224+
void RegisterAwsObjects(ObjectLibrary& library, const std::string& arg);
225+
} // extern "C"
363226
} // namespace rocksdb
364-
365-
#endif // USE_AWS

cloud/aws/aws_file.h

Lines changed: 0 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -3,153 +3,13 @@
33
#pragma once
44
#ifdef USE_AWS
55

6-
#include <chrono>
7-
#include <fstream>
8-
#include <iostream>
9-
#include "cloud/aws/aws_env.h"
10-
#include "cloud/filename.h"
11-
#include "file/filename.h"
12-
#include "rocksdb/env.h"
13-
#include "rocksdb/status.h"
14-
156
#include <aws/core/Aws.h>
16-
#include <aws/core/utils/DateTime.h>
17-
#include <aws/core/utils/Outcome.h>
18-
#include <aws/core/utils/crypto/CryptoStream.h>
19-
#include <aws/core/utils/memory/stl/AWSStreamFwd.h>
20-
#include <aws/s3/S3Client.h>
21-
#include <aws/s3/S3Errors.h>
22-
#include <aws/s3/model/CopyObjectRequest.h>
23-
#include <aws/s3/model/CopyObjectRequest.h>
24-
#include <aws/s3/model/CopyObjectResult.h>
25-
#include <aws/s3/model/CreateBucketConfiguration.h>
26-
#include <aws/s3/model/CreateBucketRequest.h>
27-
#include <aws/s3/model/CreateBucketResult.h>
28-
#include <aws/s3/model/DeleteBucketRequest.h>
29-
#include <aws/s3/model/DeleteObjectRequest.h>
30-
#include <aws/s3/model/DeleteObjectResult.h>
31-
#include <aws/s3/model/GetBucketVersioningRequest.h>
32-
#include <aws/s3/model/GetBucketVersioningResult.h>
33-
#include <aws/s3/model/GetObjectRequest.h>
34-
#include <aws/s3/model/GetObjectResult.h>
35-
#include <aws/s3/model/HeadBucketRequest.h>
36-
#include <aws/s3/model/HeadObjectRequest.h>
37-
#include <aws/s3/model/HeadObjectResult.h>
38-
#include <aws/s3/model/ListObjectsRequest.h>
39-
#include <aws/s3/model/ListObjectsResult.h>
40-
#include <aws/s3/model/PutObjectRequest.h>
41-
#include <aws/s3/model/PutObjectResult.h>
42-
#include <aws/s3/model/ServerSideEncryption.h>
437

448
namespace rocksdb {
459
inline Aws::String ToAwsString(const std::string& s) {
4610
return Aws::String(s.data(), s.size());
4711
}
4812

49-
class S3ReadableFile : virtual public SequentialFile,
50-
virtual public RandomAccessFile {
51-
public:
52-
S3ReadableFile(AwsEnv* env, const std::string& bucket_prefix,
53-
const std::string& fname, uint64_t size);
54-
55-
// sequential access, read data at current offset in file
56-
virtual Status Read(size_t n, Slice* result, char* scratch) override;
57-
58-
// random access, read data from specified offset in file
59-
virtual Status Read(uint64_t offset, size_t n, Slice* result,
60-
char* scratch) const override;
61-
62-
virtual Status Skip(uint64_t n) override;
63-
64-
virtual size_t GetUniqueId(char* id, size_t max_size) const override;
65-
66-
private:
67-
AwsEnv* env_;
68-
std::string fname_;
69-
Aws::String s3_bucket_;
70-
Aws::String s3_object_;
71-
uint64_t offset_;
72-
uint64_t file_size_;
73-
};
74-
75-
// Appends to a file in S3.
76-
class S3WritableFile : public WritableFile {
77-
private:
78-
AwsEnv* env_;
79-
std::string fname_;
80-
std::string tmp_file_;
81-
Status status_;
82-
std::unique_ptr<WritableFile> local_file_;
83-
std::string bucket_prefix_;
84-
std::string cloud_fname_;
85-
bool is_manifest_;
86-
87-
public:
88-
// create S3 bucket
89-
static Status CreateBucketInS3(
90-
std::shared_ptr<AwsS3ClientWrapper> client,
91-
const std::string& bucket_prefix,
92-
const Aws::S3::Model::BucketLocationConstraint& location);
93-
94-
// bucket exists and we can access it
95-
static Status BucketExistsInS3(
96-
std::shared_ptr<AwsS3ClientWrapper> client,
97-
const std::string& bucket_prefix,
98-
const Aws::S3::Model::BucketLocationConstraint& location);
99-
100-
S3WritableFile(AwsEnv* env, const std::string& local_fname,
101-
const std::string& bucket_prefix,
102-
const std::string& cloud_fname, const EnvOptions& options,
103-
const CloudEnvOptions cloud_env_options);
104-
105-
virtual ~S3WritableFile();
106-
107-
virtual Status Append(const Slice& data) override {
108-
assert(status_.ok());
109-
// write to temporary file
110-
return local_file_->Append(data);
111-
}
112-
113-
Status PositionedAppend(const Slice& data, uint64_t offset) override {
114-
return local_file_->PositionedAppend(data, offset);
115-
}
116-
Status Truncate(uint64_t size) override {
117-
return local_file_->Truncate(size);
118-
}
119-
Status Fsync() override { return local_file_->Fsync(); }
120-
bool IsSyncThreadSafe() const override {
121-
return local_file_->IsSyncThreadSafe();
122-
}
123-
bool use_direct_io() const override { return local_file_->use_direct_io(); }
124-
size_t GetRequiredBufferAlignment() const override {
125-
return local_file_->GetRequiredBufferAlignment();
126-
}
127-
uint64_t GetFileSize() override { return local_file_->GetFileSize(); }
128-
size_t GetUniqueId(char* id, size_t max_size) const override {
129-
return local_file_->GetUniqueId(id, max_size);
130-
}
131-
Status InvalidateCache(size_t offset, size_t length) override {
132-
return local_file_->InvalidateCache(offset, length);
133-
}
134-
Status RangeSync(uint64_t offset, uint64_t nbytes) override {
135-
return local_file_->RangeSync(offset, nbytes);
136-
}
137-
Status Allocate(uint64_t offset, uint64_t len) override {
138-
return local_file_->Allocate(offset, len);
139-
}
140-
141-
virtual Status Flush() override {
142-
assert(status_.ok());
143-
return local_file_->Flush();
144-
}
145-
146-
virtual Status Sync() override;
147-
148-
virtual Status status() { return status_; }
149-
150-
virtual Status Close() override;
151-
};
152-
15313
} // namepace rocksdb
15414

15515
#endif /* USE_AWS */

0 commit comments

Comments
 (0)