Skip to content

Commit 368e0bb

Browse files
authored
Merge pull request #81 from rockset/CloudStorageProvider
Added CloudStorageProvider class to CloudEnv/Options
2 parents ef83c2f + 79c3057 commit 368e0bb

17 files changed

+1778
-1419
lines changed

cloud/aws/aws_env.cc

Lines changed: 121 additions & 699 deletions
Large diffs are not rendered by default.

cloud/aws/aws_env.h

Lines changed: 6 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,6 @@
1414

1515
#ifdef USE_AWS
1616

17-
#include <aws/core/Aws.h>
18-
#include <aws/core/auth/AWSCredentialsProvider.h>
19-
#include <aws/core/utils/Outcome.h>
20-
#include <aws/kinesis/KinesisClient.h>
21-
#include <aws/s3/S3Client.h>
22-
#include <aws/s3/model/BucketLocationConstraint.h>
23-
#include <aws/transfer/TransferManager.h>
24-
2517
#include <chrono>
2618
#include <list>
2719
#include <unordered_map>
@@ -30,44 +22,6 @@ namespace rocksdb {
3022

3123
class S3ReadableFile;
3224

33-
class AwsS3ClientWrapper {
34-
public:
35-
AwsS3ClientWrapper(
36-
std::shared_ptr<Aws::S3::S3Client> client,
37-
std::shared_ptr<CloudRequestCallback> cloud_request_callback);
38-
39-
Aws::S3::Model::ListObjectsOutcome ListObjects(
40-
const Aws::S3::Model::ListObjectsRequest& request);
41-
42-
Aws::S3::Model::CreateBucketOutcome CreateBucket(
43-
const Aws::S3::Model::CreateBucketRequest& request);
44-
45-
Aws::S3::Model::HeadBucketOutcome HeadBucket(
46-
const Aws::S3::Model::HeadBucketRequest& request);
47-
48-
Aws::S3::Model::DeleteObjectOutcome DeleteObject(
49-
const Aws::S3::Model::DeleteObjectRequest& request);
50-
51-
Aws::S3::Model::CopyObjectOutcome CopyObject(
52-
const Aws::S3::Model::CopyObjectRequest& request);
53-
54-
Aws::S3::Model::GetObjectOutcome GetObject(
55-
const Aws::S3::Model::GetObjectRequest& request);
56-
57-
Aws::S3::Model::PutObjectOutcome PutObject(
58-
const Aws::S3::Model::PutObjectRequest& request, uint64_t size_hint = 0);
59-
60-
Aws::S3::Model::HeadObjectOutcome HeadObject(
61-
const Aws::S3::Model::HeadObjectRequest& request);
62-
63-
const std::shared_ptr<Aws::S3::S3Client>& GetClient() const {
64-
return client_;
65-
}
66-
67-
private:
68-
std::shared_ptr<Aws::S3::S3Client> client_;
69-
std::shared_ptr<CloudRequestCallback> cloud_request_callback_;
70-
};
7125

7226
namespace detail {
7327
struct JobHandle;
@@ -105,11 +59,13 @@ class AwsEnv : public CloudEnvImpl {
10559

10660
virtual ~AwsEnv();
10761

62+
const char* Name() const override { return "aws"; }
63+
10864
// We cannot invoke Aws::ShutdownAPI from the destructor because there could
10965
// be
11066
// multiple AwsEnv's ceated by a process and Aws::ShutdownAPI should be called
11167
// only once by the entire process when all AwsEnvs are destroyed.
112-
static void Shutdown() { Aws::ShutdownAPI(Aws::SDKOptions()); }
68+
static void Shutdown();
11369

11470
// If you do not specify a region, then S3 buckets are created in the
11571
// standard-region which might not satisfy read-your-own-writes. So,
@@ -237,17 +193,8 @@ class AwsEnv : public CloudEnvImpl {
237193

238194
virtual uint64_t GetThreadID() const override { return AwsEnv::gettid(); }
239195

240-
virtual Status EmptyBucket(const std::string& bucket,
241-
const std::string& path) override;
242-
243196
std::string GetWALCacheDir();
244197

245-
// The S3 client
246-
std::shared_ptr<AwsS3ClientWrapper> s3client_;
247-
248-
// AWS's utility to help out with uploading and downloading S3 file
249-
std::shared_ptr<Aws::Transfer::TransferManager> awsTransferManager_;
250-
251198
// Saves and retrieves the dbid->dirname mapping in S3
252199
Status SaveDbid(const std::string& bucket_name, const std::string& dbid,
253200
const std::string& dirname) override;
@@ -257,27 +204,9 @@ class AwsEnv : public CloudEnvImpl {
257204
DbidList* dblist) override;
258205
Status DeleteDbid(const std::string& bucket,
259206
const std::string& dbid) override;
260-
Status ListObjects(const std::string& bucket_name,
261-
const std::string& bucket_object,
262-
BucketObjectMetadata* meta) override;
263-
Status DeleteObject(const std::string& bucket_name,
264-
const std::string& bucket_object_path) override;
265-
Status ExistsObject(const std::string& bucket_name,
266-
const std::string& bucket_object_path) override;
267-
Status GetObjectSize(const std::string& bucket_name,
268-
const std::string& bucket_object_path,
269-
uint64_t* filesize) override;
270-
Status CopyObject(const std::string& bucket_name_src,
271-
const std::string& bucket_object_path_src,
272-
const std::string& bucket_name_dest,
273-
const std::string& bucket_object_path_dest) override;
274-
Status GetObject(const std::string& bucket_name,
275-
const std::string& bucket_object_path,
276-
const std::string& local_path) override;
277-
Status PutObject(const std::string& local_path,
278-
const std::string& bucket_name,
279-
const std::string& bucket_object_path) override;
280207
Status DeleteCloudFileFromDest(const std::string& fname) override;
208+
Status CopyLocalFileToDest(const std::string& local_name,
209+
const std::string& cloud_name) override;
281210

282211
void RemoveFileFromDeletionQueue(const std::string& filename);
283212

@@ -287,9 +216,7 @@ class AwsEnv : public CloudEnvImpl {
287216
}
288217

289218
Status TEST_DeletePathInS3(const std::string& bucket,
290-
const std::string& fname) {
291-
return DeletePathInS3(bucket, fname);
292-
}
219+
const std::string& fname);
293220

294221
private:
295222
//
@@ -300,33 +227,8 @@ class AwsEnv : public CloudEnvImpl {
300227
const CloudEnvOptions& cloud_options,
301228
const std::shared_ptr<Logger> & info_log = nullptr);
302229

303-
struct GetObjectResult {
304-
bool success{false};
305-
Aws::Client::AWSError<Aws::S3::S3Errors> error; // if success == false
306-
size_t objectSize{0};
307-
};
308-
309-
GetObjectResult DoGetObject(const Aws::String& bucket, const Aws::String& key,
310-
const std::string& destination);
311-
GetObjectResult DoGetObjectWithTransferManager(
312-
const Aws::String& bucket, const Aws::String& key,
313-
const std::string& destination);
314230

315231

316-
struct PutObjectResult {
317-
bool success{false};
318-
Aws::Client::AWSError<Aws::S3::S3Errors> error; // if success == false
319-
};
320-
321-
PutObjectResult DoPutObject(const std::string& filename,
322-
const Aws::String& bucket, const Aws::String& key,
323-
uint64_t sizeHint);
324-
325-
PutObjectResult DoPutObjectWithTransferManager(const std::string& filename,
326-
const Aws::String& bucket,
327-
const Aws::String& key,
328-
uint64_t sizeHint);
329-
330232
// The pathname that contains a list of all db's inside a bucket.
331233
static constexpr const char* dbid_registry_ = "/.rockset/dbid/";
332234

@@ -338,26 +240,11 @@ class AwsEnv : public CloudEnvImpl {
338240
files_to_delete_;
339241
Random64 rng_;
340242

341-
Aws::S3::Model::BucketLocationConstraint bucket_location_;
342-
343243
Status status();
344244

345-
// Delete the specified path from S3
346-
Status DeletePathInS3(const std::string& bucket,
347-
const std::string& fname);
348-
349245
// Validate options
350246
Status CheckOption(const EnvOptions& options);
351247

352-
// Return the list of children of the specified path
353-
Status GetChildrenFromS3(const std::string& path,
354-
const std::string& bucket,
355-
std::vector<std::string>* result);
356-
357-
// If metadata, size or modtime is non-nullptr, returns requested data
358-
Status HeadObject(const std::string& bucket, const std::string& path,
359-
Aws::Map<Aws::String, Aws::String>* metadata = nullptr,
360-
uint64_t* size = nullptr, uint64_t* modtime = nullptr);
361248

362249
Status NewS3ReadableFile(const std::string& bucket, const std::string& fname,
363250
std::unique_ptr<S3ReadableFile>* result);

cloud/aws/aws_file.h

Lines changed: 0 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -3,153 +3,13 @@
33
#pragma once
44
#ifdef USE_AWS
55

6-
#include <chrono>
7-
#include <fstream>
8-
#include <iostream>
9-
#include "cloud/aws/aws_env.h"
10-
#include "cloud/filename.h"
11-
#include "file/filename.h"
12-
#include "rocksdb/env.h"
13-
#include "rocksdb/status.h"
14-
156
#include <aws/core/Aws.h>
16-
#include <aws/core/utils/DateTime.h>
17-
#include <aws/core/utils/Outcome.h>
18-
#include <aws/core/utils/crypto/CryptoStream.h>
19-
#include <aws/core/utils/memory/stl/AWSStreamFwd.h>
20-
#include <aws/s3/S3Client.h>
21-
#include <aws/s3/S3Errors.h>
22-
#include <aws/s3/model/CopyObjectRequest.h>
23-
#include <aws/s3/model/CopyObjectRequest.h>
24-
#include <aws/s3/model/CopyObjectResult.h>
25-
#include <aws/s3/model/CreateBucketConfiguration.h>
26-
#include <aws/s3/model/CreateBucketRequest.h>
27-
#include <aws/s3/model/CreateBucketResult.h>
28-
#include <aws/s3/model/DeleteBucketRequest.h>
29-
#include <aws/s3/model/DeleteObjectRequest.h>
30-
#include <aws/s3/model/DeleteObjectResult.h>
31-
#include <aws/s3/model/GetBucketVersioningRequest.h>
32-
#include <aws/s3/model/GetBucketVersioningResult.h>
33-
#include <aws/s3/model/GetObjectRequest.h>
34-
#include <aws/s3/model/GetObjectResult.h>
35-
#include <aws/s3/model/HeadBucketRequest.h>
36-
#include <aws/s3/model/HeadObjectRequest.h>
37-
#include <aws/s3/model/HeadObjectResult.h>
38-
#include <aws/s3/model/ListObjectsRequest.h>
39-
#include <aws/s3/model/ListObjectsResult.h>
40-
#include <aws/s3/model/PutObjectRequest.h>
41-
#include <aws/s3/model/PutObjectResult.h>
42-
#include <aws/s3/model/ServerSideEncryption.h>
437

448
namespace rocksdb {
459
inline Aws::String ToAwsString(const std::string& s) {
4610
return Aws::String(s.data(), s.size());
4711
}
4812

49-
class S3ReadableFile : virtual public SequentialFile,
50-
virtual public RandomAccessFile {
51-
public:
52-
S3ReadableFile(AwsEnv* env, const std::string& bucket_prefix,
53-
const std::string& fname, uint64_t size);
54-
55-
// sequential access, read data at current offset in file
56-
virtual Status Read(size_t n, Slice* result, char* scratch) override;
57-
58-
// random access, read data from specified offset in file
59-
virtual Status Read(uint64_t offset, size_t n, Slice* result,
60-
char* scratch) const override;
61-
62-
virtual Status Skip(uint64_t n) override;
63-
64-
virtual size_t GetUniqueId(char* id, size_t max_size) const override;
65-
66-
private:
67-
AwsEnv* env_;
68-
std::string fname_;
69-
Aws::String s3_bucket_;
70-
Aws::String s3_object_;
71-
uint64_t offset_;
72-
uint64_t file_size_;
73-
};
74-
75-
// Appends to a file in S3.
76-
class S3WritableFile : public WritableFile {
77-
private:
78-
AwsEnv* env_;
79-
std::string fname_;
80-
std::string tmp_file_;
81-
Status status_;
82-
std::unique_ptr<WritableFile> local_file_;
83-
std::string bucket_prefix_;
84-
std::string cloud_fname_;
85-
bool is_manifest_;
86-
87-
public:
88-
// create S3 bucket
89-
static Status CreateBucketInS3(
90-
std::shared_ptr<AwsS3ClientWrapper> client,
91-
const std::string& bucket_prefix,
92-
const Aws::S3::Model::BucketLocationConstraint& location);
93-
94-
// bucket exists and we can access it
95-
static Status BucketExistsInS3(
96-
std::shared_ptr<AwsS3ClientWrapper> client,
97-
const std::string& bucket_prefix,
98-
const Aws::S3::Model::BucketLocationConstraint& location);
99-
100-
S3WritableFile(AwsEnv* env, const std::string& local_fname,
101-
const std::string& bucket_prefix,
102-
const std::string& cloud_fname, const EnvOptions& options,
103-
const CloudEnvOptions cloud_env_options);
104-
105-
virtual ~S3WritableFile();
106-
107-
virtual Status Append(const Slice& data) override {
108-
assert(status_.ok());
109-
// write to temporary file
110-
return local_file_->Append(data);
111-
}
112-
113-
Status PositionedAppend(const Slice& data, uint64_t offset) override {
114-
return local_file_->PositionedAppend(data, offset);
115-
}
116-
Status Truncate(uint64_t size) override {
117-
return local_file_->Truncate(size);
118-
}
119-
Status Fsync() override { return local_file_->Fsync(); }
120-
bool IsSyncThreadSafe() const override {
121-
return local_file_->IsSyncThreadSafe();
122-
}
123-
bool use_direct_io() const override { return local_file_->use_direct_io(); }
124-
size_t GetRequiredBufferAlignment() const override {
125-
return local_file_->GetRequiredBufferAlignment();
126-
}
127-
uint64_t GetFileSize() override { return local_file_->GetFileSize(); }
128-
size_t GetUniqueId(char* id, size_t max_size) const override {
129-
return local_file_->GetUniqueId(id, max_size);
130-
}
131-
Status InvalidateCache(size_t offset, size_t length) override {
132-
return local_file_->InvalidateCache(offset, length);
133-
}
134-
Status RangeSync(uint64_t offset, uint64_t nbytes) override {
135-
return local_file_->RangeSync(offset, nbytes);
136-
}
137-
Status Allocate(uint64_t offset, uint64_t len) override {
138-
return local_file_->Allocate(offset, len);
139-
}
140-
141-
virtual Status Flush() override {
142-
assert(status_.ok());
143-
return local_file_->Flush();
144-
}
145-
146-
virtual Status Sync() override;
147-
148-
virtual Status status() { return status_; }
149-
150-
virtual Status Close() override;
151-
};
152-
15313
} // namepace rocksdb
15414

15515
#endif /* USE_AWS */

0 commit comments

Comments
 (0)