Skip to content

Commit 33cbf8b

Browse files
authored
Merge pull request #89 from rockset/CloudEnv
Clean up use of StorageProvider
2 parents d82aa33 + 002a4eb commit 33cbf8b

File tree

3 files changed

+138
-83
lines changed

3 files changed

+138
-83
lines changed

cloud/cloud_env.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,10 @@ CloudEnv::CloudEnv(const CloudEnvOptions& options, Env* base,
8989
const std::shared_ptr<Logger>& logger)
9090
: cloud_env_options(options), base_env_(base), info_log_(logger) {}
9191

92-
CloudEnv::~CloudEnv() {}
92+
CloudEnv::~CloudEnv() {
93+
cloud_env_options.cloud_log_controller.reset();
94+
cloud_env_options.storage_provider.reset();
95+
}
9396

9497
Status CloudEnv::NewAwsEnv(
9598
Env* base_env, const std::string& src_cloud_bucket,

cloud/cloud_env_impl.cc

Lines changed: 110 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,104 @@ CloudEnvImpl::~CloudEnvImpl() {
3333
StopPurger();
3434
}
3535

36+
Status CloudEnvImpl::ExistsCloudObject(const std::string& fname) {
37+
Status st = Status::NotFound();
38+
if (HasDestBucket()) {
39+
st = cloud_env_options.storage_provider->ExistsCloudObject(
40+
GetDestBucketName(), destname(fname));
41+
}
42+
if (st.IsNotFound() && HasSrcBucket() && !SrcMatchesDest()) {
43+
st = cloud_env_options.storage_provider->ExistsCloudObject(
44+
GetSrcBucketName(), srcname(fname));
45+
}
46+
return st;
47+
}
48+
49+
Status CloudEnvImpl::GetCloudObject(const std::string& fname) {
50+
Status st = Status::NotFound();
51+
if (HasDestBucket()) {
52+
st = cloud_env_options.storage_provider->GetCloudObject(
53+
GetDestBucketName(), destname(fname), fname);
54+
}
55+
if (st.IsNotFound() && HasSrcBucket() && !SrcMatchesDest()) {
56+
st = cloud_env_options.storage_provider->GetCloudObject(
57+
GetSrcBucketName(), srcname(fname), fname);
58+
}
59+
return st;
60+
}
61+
62+
Status CloudEnvImpl::GetCloudObjectSize(const std::string& fname,
63+
uint64_t* remote_size) {
64+
Status st = Status::NotFound();
65+
if (HasDestBucket()) {
66+
st = cloud_env_options.storage_provider->GetCloudObjectSize(
67+
GetDestBucketName(), destname(fname), remote_size);
68+
}
69+
if (st.IsNotFound() && HasSrcBucket() && !SrcMatchesDest()) {
70+
st = cloud_env_options.storage_provider->GetCloudObjectSize(
71+
GetSrcBucketName(), srcname(fname), remote_size);
72+
}
73+
return st;
74+
}
75+
76+
Status CloudEnvImpl::GetCloudObjectModificationTime(const std::string& fname,
77+
uint64_t* time) {
78+
Status st = Status::NotFound();
79+
if (HasDestBucket()) {
80+
st = cloud_env_options.storage_provider->GetCloudObjectModificationTime(
81+
GetDestBucketName(), destname(fname), time);
82+
}
83+
if (st.IsNotFound() && HasSrcBucket() && !SrcMatchesDest()) {
84+
st = cloud_env_options.storage_provider->GetCloudObjectModificationTime(
85+
GetSrcBucketName(), srcname(fname), time);
86+
}
87+
return st;
88+
}
89+
90+
Status CloudEnvImpl::ListCloudObjects(const std::string& path,
91+
std::vector<std::string>* result) {
92+
Status st;
93+
// Fetch the list of children from both cloud buckets
94+
if (HasSrcBucket()) {
95+
st = cloud_env_options.storage_provider->ListCloudObjects(
96+
GetSrcBucketName(), GetSrcObjectPath(), result);
97+
if (!st.ok()) {
98+
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
99+
"[%s] GetChildren src bucket %s %s error from S3 %s", Name(),
100+
GetSrcBucketName().c_str(), path.c_str(), st.ToString().c_str());
101+
return st;
102+
}
103+
}
104+
if (HasDestBucket() && !SrcMatchesDest()) {
105+
st = cloud_env_options.storage_provider->ListCloudObjects(
106+
GetDestBucketName(), GetDestObjectPath(), result);
107+
if (!st.ok()) {
108+
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
109+
"[%s] GetChildren dest bucket %s %s error from S3 %s", Name(),
110+
GetDestBucketName().c_str(), path.c_str(), st.ToString().c_str());
111+
}
112+
}
113+
return st;
114+
}
115+
116+
Status CloudEnvImpl::NewCloudReadableFile(
117+
const std::string& fname, std::unique_ptr<CloudStorageReadableFile>* result,
118+
const EnvOptions& options) {
119+
Status st = Status::NotFound();
120+
if (HasDestBucket()) { // read from destination
121+
st = cloud_env_options.storage_provider->NewCloudReadableFile(
122+
GetDestBucketName(), destname(fname), result, options);
123+
if (st.ok()) {
124+
return st;
125+
}
126+
}
127+
if (HasSrcBucket() && !SrcMatchesDest()) { // read from src bucket
128+
st = cloud_env_options.storage_provider->NewCloudReadableFile(
129+
GetSrcBucketName(), srcname(fname), result, options);
130+
}
131+
return st;
132+
}
133+
36134
// open a file for sequential reading
37135
Status CloudEnvImpl::NewSequentialFile(const std::string& logical_fname,
38136
std::unique_ptr<SequentialFile>* result,
@@ -58,28 +156,14 @@ Status CloudEnvImpl::NewSequentialFile(const std::string& logical_fname,
58156
if (!st.ok()) {
59157
if (cloud_env_options.keep_local_sst_files || !sstfile) {
60158
// copy the file to the local storage if keep_local_sst_files is true
61-
if (HasDestBucket()) {
62-
st = cloud_env_options.storage_provider->GetCloudObject(
63-
GetDestBucketName(), destname(fname), fname);
64-
}
65-
if (!st.ok() && HasSrcBucket() && !SrcMatchesDest()) {
66-
st = cloud_env_options.storage_provider->GetCloudObject(
67-
GetSrcBucketName(), srcname(fname), fname);
68-
}
159+
st = GetCloudObject(fname);
69160
if (st.ok()) {
70161
// we successfully copied the file, try opening it locally now
71162
st = base_env_->NewSequentialFile(fname, result, options);
72163
}
73164
} else {
74165
std::unique_ptr<CloudStorageReadableFile> file;
75-
if (!st.ok() && HasDestBucket()) { // read from destination
76-
st = cloud_env_options.storage_provider->NewCloudReadableFile(
77-
GetDestBucketName(), destname(fname), &file, options);
78-
}
79-
if (!st.ok() && HasSrcBucket()) { // read from src bucket
80-
st = cloud_env_options.storage_provider->NewCloudReadableFile(
81-
GetSrcBucketName(), srcname(fname), &file, options);
82-
}
166+
st = NewCloudReadableFile(fname, &file, options);
83167
if (st.ok()) {
84168
result->reset(file.release());
85169
}
@@ -145,14 +229,7 @@ Status CloudEnvImpl::NewRandomAccessFile(
145229
if (cloud_env_options.keep_local_sst_files || !sstfile) {
146230
if (!st.ok()) {
147231
// copy the file to the local storage if keep_local_sst_files is true
148-
if (HasDestBucket()) {
149-
st = cloud_env_options.storage_provider->GetCloudObject(
150-
GetDestBucketName(), destname(fname), fname);
151-
}
152-
if (!st.ok() && HasSrcBucket() && !SrcMatchesDest()) {
153-
st = cloud_env_options.storage_provider->GetCloudObject(
154-
GetSrcBucketName(), srcname(fname), fname);
155-
}
232+
st = GetCloudObject(fname);
156233
if (st.ok()) {
157234
// we successfully copied the file, try opening it locally now
158235
st = base_env_->NewRandomAccessFile(fname, result, options);
@@ -169,12 +246,7 @@ Status CloudEnvImpl::NewRandomAccessFile(
169246
}
170247
stax = Status::NotFound();
171248
if (HasDestBucket()) {
172-
stax = cloud_env_options.storage_provider->GetCloudObjectSize(
173-
GetDestBucketName(), destname(fname), &remote_size);
174-
}
175-
if (stax.IsNotFound() && HasSrcBucket()) {
176-
stax = cloud_env_options.storage_provider->GetCloudObjectSize(
177-
GetSrcBucketName(), srcname(fname), &remote_size);
249+
stax = GetCloudObjectSize(fname, &remote_size);
178250
}
179251
if (stax.IsNotFound() && !HasDestBucket()) {
180252
// It is legal for file to not be present in S3 if destination bucket
@@ -193,14 +265,7 @@ Status CloudEnvImpl::NewRandomAccessFile(
193265
// true, we will never use CloudReadableFile to read; we copy the file
194266
// locally and read using base_env.
195267
std::unique_ptr<CloudStorageReadableFile> file;
196-
if (!st.ok() && HasDestBucket()) {
197-
st = cloud_env_options.storage_provider->NewCloudReadableFile(
198-
GetDestBucketName(), destname(fname), &file, options);
199-
}
200-
if (!st.ok() && HasSrcBucket()) {
201-
st = cloud_env_options.storage_provider->NewCloudReadableFile(
202-
GetSrcBucketName(), srcname(fname), &file, options);
203-
}
268+
st = NewCloudReadableFile(fname, &file, options);
204269
if (st.ok()) {
205270
result->reset(file.release());
206271
}
@@ -294,13 +359,8 @@ Status CloudEnvImpl::FileExists(const std::string& logical_fname) {
294359
if (sstfile || manifest || identity) {
295360
// We read first from local storage and then from cloud storage.
296361
st = base_env_->FileExists(fname);
297-
if (st.IsNotFound() && HasDestBucket()) {
298-
st = cloud_env_options.storage_provider->ExistsCloudObject(
299-
GetDestBucketName(), destname(fname));
300-
}
301-
if (!st.ok() && HasSrcBucket()) {
302-
st = cloud_env_options.storage_provider->ExistsCloudObject(
303-
GetSrcBucketName(), srcname(fname));
362+
if (st.IsNotFound()) {
363+
st = ExistsCloudObject(fname);
304364
}
305365
} else if (logfile && !cloud_env_options.keep_local_log_files) {
306366
// read from Kinesis
@@ -319,26 +379,11 @@ Status CloudEnvImpl::GetChildren(const std::string& path,
319379
Name(), path.c_str());
320380
result->clear();
321381

322-
// Fetch the list of children from both buckets in S3
323382
Status st;
324-
if (HasSrcBucket() && !cloud_env_options.skip_cloud_files_in_getchildren) {
325-
st = cloud_env_options.storage_provider->ListCloudObjects(
326-
GetSrcBucketName(), GetSrcObjectPath(), result);
383+
if (!cloud_env_options.skip_cloud_files_in_getchildren) {
384+
// Fetch the list of children from the cloud
385+
st = ListCloudObjects(path, result);
327386
if (!st.ok()) {
328-
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
329-
"[%s] GetChildren src bucket %s %s error from S3 %s", Name(),
330-
GetSrcBucketName().c_str(), path.c_str(), st.ToString().c_str());
331-
return st;
332-
}
333-
}
334-
if (HasDestBucket() && !SrcMatchesDest() &&
335-
!cloud_env_options.skip_cloud_files_in_getchildren) {
336-
st = cloud_env_options.storage_provider->ListCloudObjects(
337-
GetDestBucketName(), GetDestObjectPath(), result);
338-
if (!st.ok()) {
339-
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
340-
"[%s] GetChildren dest bucket %s %s error from S3 %s", Name(),
341-
GetDestBucketName().c_str(), path.c_str(), st.ToString().c_str());
342387
return st;
343388
}
344389
}
@@ -399,16 +444,7 @@ Status CloudEnvImpl::GetFileSize(const std::string& logical_fname,
399444
if (base_env_->FileExists(fname).ok()) {
400445
st = base_env_->GetFileSize(fname, size);
401446
} else {
402-
st = Status::NotFound();
403-
// Get file length from CloudStorage
404-
if (HasDestBucket()) {
405-
st = cloud_env_options.storage_provider->GetCloudObjectSize(
406-
GetDestBucketName(), destname(fname), size);
407-
}
408-
if (st.IsNotFound() && HasSrcBucket()) {
409-
st = cloud_env_options.storage_provider->GetCloudObjectSize(
410-
GetSrcBucketName(), srcname(fname), size);
411-
}
447+
st = GetCloudObjectSize(fname, size);
412448
}
413449
} else if (logfile && !cloud_env_options.keep_local_log_files) {
414450
st = cloud_env_options.cloud_log_controller->GetFileSize(fname, size);
@@ -435,15 +471,7 @@ Status CloudEnvImpl::GetFileModificationTime(const std::string& logical_fname,
435471
if (base_env_->FileExists(fname).ok()) {
436472
st = base_env_->GetFileModificationTime(fname, time);
437473
} else {
438-
st = Status::NotFound();
439-
if (HasDestBucket()) {
440-
st = cloud_env_options.storage_provider->GetCloudObjectModificationTime(
441-
GetDestBucketName(), destname(fname), time);
442-
}
443-
if (st.IsNotFound() && HasSrcBucket()) {
444-
st = cloud_env_options.storage_provider->GetCloudObjectModificationTime(
445-
GetSrcBucketName(), srcname(fname), time);
446-
}
474+
st = GetCloudObjectModificationTime(fname, time);
447475
}
448476
} else if (logfile && !cloud_env_options.keep_local_log_files) {
449477
st = cloud_env_options.cloud_log_controller->GetFileModificationTime(fname,

cloud/cloud_env_impl.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "rocksdb/status.h"
1313

1414
namespace rocksdb {
15+
class CloudStorageReadableFile;
1516

1617
//
1718
// The Cloud environment
@@ -218,6 +219,29 @@ class CloudEnvImpl : public CloudEnv {
218219
}
219220

220221
protected:
222+
// Checks to see if the input fname exists in the dest or src bucket
223+
Status ExistsCloudObject(const std::string& fname);
224+
225+
// Gets the cloud object fname from the dest or src bucket
226+
Status GetCloudObject(const std::string& fname);
227+
228+
// Gets the size of the named cloud object from the dest or src bucket
229+
Status GetCloudObjectSize(const std::string& fname, uint64_t* remote_size);
230+
231+
// Gets the modification time of the named cloud object from the dest or src
232+
// bucket
233+
Status GetCloudObjectModificationTime(const std::string& fname,
234+
uint64_t* time);
235+
236+
// Returns the list of cloud objects from the src and dest buckets.
237+
Status ListCloudObjects(const std::string& path,
238+
std::vector<std::string>* result);
239+
240+
// Returns a CloudStorageReadableFile from the dest or src bucket
241+
Status NewCloudReadableFile(const std::string& fname,
242+
std::unique_ptr<CloudStorageReadableFile>* result,
243+
const EnvOptions& options);
244+
221245
// Copy IDENTITY file to cloud storage. Update dbid registry.
222246
Status SaveIdentityToCloud(const std::string& localfile,
223247
const std::string& idfile);

0 commit comments

Comments
 (0)