Skip to content

Commit b26a8a0

Browse files
authored
Merge pull request #58 from rockset/bucket-name-path
Change BucketPrefix methods to BucketName
2 parents ab3ce6b + 245d02d commit b26a8a0

File tree

16 files changed

+522
-492
lines changed

16 files changed

+522
-492
lines changed

cloud/aws/aws_env.cc

Lines changed: 155 additions & 199 deletions
Large diffs are not rendered by default.

cloud/aws/aws_env.h

Lines changed: 36 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,9 @@ struct JobHandle;
9797
class AwsEnv : public CloudEnvImpl {
9898
public:
9999
// A factory method for creating S3 envs
100-
static Status NewAwsEnv(Env* env, const std::string& src_cloud_storage,
101-
const std::string& src_cloud_object_prefix,
102-
const std::string& src_cloud_region,
103-
const std::string& dest_cloud_storage,
104-
const std::string& dest_cloud_object_prefix,
105-
const std::string& dest_cloud_region,
100+
static Status NewAwsEnv(Env* env,
106101
const CloudEnvOptions& env_options,
107-
std::shared_ptr<Logger> info_log, CloudEnv** cenv);
102+
const std::shared_ptr<Logger> & info_log, CloudEnv** cenv);
108103

109104
virtual ~AwsEnv();
110105

@@ -123,7 +118,7 @@ class AwsEnv : public CloudEnvImpl {
123118
std::unique_ptr<SequentialFile>* result,
124119
const EnvOptions& options) override;
125120

126-
virtual Status NewSequentialFileCloud(const std::string& bucket_prefix,
121+
virtual Status NewSequentialFileCloud(const std::string& bucket,
127122
const std::string& fname,
128123
std::unique_ptr<SequentialFile>* result,
129124
const EnvOptions& options) override;
@@ -238,27 +233,28 @@ class AwsEnv : public CloudEnvImpl {
238233

239234
virtual uint64_t GetThreadID() const override { return AwsEnv::gettid(); }
240235

241-
virtual Status EmptyBucket(const std::string& bucket_prefix,
242-
const std::string& path_prefix) override;
236+
virtual Status EmptyBucket(const std::string& bucket,
237+
const std::string& path) override;
243238

244239
// get the posix env
245240
Env* GetPosixEnv() const { return base_env_; }
246241

247242
bool IsRunning() const { return running_; }
248243

249-
const std::string& GetSrcBucketPrefix() override {
250-
return src_bucket_prefix_;
244+
const std::string& GetSrcBucketName() const override {
245+
return cloud_env_options.src_bucket.GetBucketName();
251246
}
252-
const std::string& GetSrcObjectPrefix() override {
253-
return src_object_prefix_;
247+
const std::string& GetSrcObjectPath() const override {
248+
return cloud_env_options.src_bucket.GetObjectPath();
254249
}
255-
const std::string& GetDestBucketPrefix() override {
256-
return dest_bucket_prefix_;
250+
251+
const std::string& GetDestBucketName() const override {
252+
return cloud_env_options.dest_bucket.GetBucketName();
257253
}
258-
const std::string& GetDestObjectPrefix() override {
259-
return dest_object_prefix_;
254+
const std::string& GetDestObjectPath() const override {
255+
return cloud_env_options.dest_bucket.GetObjectPath();
260256
}
261-
257+
262258
const CloudEnvOptions& GetCloudEnvOptions() override {
263259
return cloud_env_options;
264260
}
@@ -274,48 +270,44 @@ class AwsEnv : public CloudEnvImpl {
274270
std::shared_ptr<Aws::Transfer::TransferManager> awsTransferManager_;
275271

276272
// Configurations for this cloud environent
277-
const CloudEnvOptions cloud_env_options;
273+
CloudEnvOptions cloud_env_options;
278274

279275
//
280276
// Get credentials for running unit tests
281277
//
282278
static Status GetTestCredentials(std::string* aws_access_key_id,
283-
std::string* aws_secret_access_key,
284-
std::string* region);
285-
286-
// Create a specific bucketname suffix so that all unit tests can
287-
// use a single bucket.
288-
static std::string GetTestBucketSuffix();
279+
std::string* aws_secret_access_key,
280+
std::string* region);
289281

290282
Status StartTailingStream();
291283

292284
// Saves and retrieves the dbid->dirname mapping in S3
293285
Status SaveDbid(const std::string& dbid, const std::string& dirname) override;
294-
Status GetPathForDbid(const std::string& bucket_prefix,
286+
Status GetPathForDbid(const std::string& bucket,
295287
const std::string& dbid, std::string* dirname) override;
296-
Status GetDbidList(const std::string& bucket_prefix,
288+
Status GetDbidList(const std::string& bucket,
297289
DbidList* dblist) override;
298-
Status DeleteDbid(const std::string& bucket_prefix,
290+
Status DeleteDbid(const std::string& bucket,
299291
const std::string& dbid) override;
300-
Status ListObjects(const std::string& bucket_name_prefix,
301-
const std::string& bucket_object_prefix,
292+
Status ListObjects(const std::string& bucket_name,
293+
const std::string& bucket_object,
302294
BucketObjectMetadata* meta) override;
303-
Status DeleteObject(const std::string& bucket_name_prefix,
295+
Status DeleteObject(const std::string& bucket_name,
304296
const std::string& bucket_object_path) override;
305-
Status ExistsObject(const std::string& bucket_name_prefix,
297+
Status ExistsObject(const std::string& bucket_name,
306298
const std::string& bucket_object_path) override;
307-
Status GetObjectSize(const std::string& bucket_name_prefix,
299+
Status GetObjectSize(const std::string& bucket_name,
308300
const std::string& bucket_object_path,
309301
uint64_t* filesize) override;
310-
Status CopyObject(const std::string& bucket_name_prefix_src,
302+
Status CopyObject(const std::string& bucket_name_src,
311303
const std::string& bucket_object_path_src,
312-
const std::string& bucket_name_prefix_dest,
304+
const std::string& bucket_name_dest,
313305
const std::string& bucket_object_path_dest) override;
314-
Status GetObject(const std::string& bucket_name_prefix,
306+
Status GetObject(const std::string& bucket_name,
315307
const std::string& bucket_object_path,
316308
const std::string& local_path) override;
317309
Status PutObject(const std::string& local_path,
318-
const std::string& bucket_name_prefix,
310+
const std::string& bucket_name,
319311
const std::string& bucket_object_path) override;
320312
Status DeleteCloudFileFromDest(const std::string& fname) override;
321313

@@ -331,14 +323,9 @@ class AwsEnv : public CloudEnvImpl {
331323
// The AWS credentials are specified to the constructor via
332324
// access_key_id and secret_key.
333325
//
334-
explicit AwsEnv(Env* underlying_env, const std::string& src_bucket_prefix,
335-
const std::string& src_object_prefix,
336-
const std::string& src_bucket_region,
337-
const std::string& dest_bucket_prefix,
338-
const std::string& dest_object_prefix,
339-
const std::string& dest_bucket_region,
326+
explicit AwsEnv(Env* underlying_env,
340327
const CloudEnvOptions& cloud_options,
341-
std::shared_ptr<Logger> info_log = nullptr);
328+
const std::shared_ptr<Logger> & info_log = nullptr);
342329

343330
struct GetObjectResult {
344331
bool success{false};
@@ -370,13 +357,6 @@ class AwsEnv : public CloudEnvImpl {
370357
// The pathname that contains a list of all db's inside a bucket.
371358
static constexpr const char* dbid_registry_ = "/.rockset/dbid/";
372359

373-
std::string src_bucket_prefix_;
374-
std::string src_object_prefix_;
375-
std::string src_bucket_region_;
376-
std::string dest_bucket_prefix_;
377-
std::string dest_object_prefix_;
378-
std::string dest_bucket_region_;
379-
380360
Status create_bucket_status_;
381361

382362
// Background thread to tail stream
@@ -404,23 +384,23 @@ class AwsEnv : public CloudEnvImpl {
404384
Status status();
405385

406386
// Delete the specified path from S3
407-
Status DeletePathInS3(const std::string& bucket_prefix,
387+
Status DeletePathInS3(const std::string& bucket,
408388
const std::string& fname);
409389

410390
// Validate options
411391
Status CheckOption(const EnvOptions& options);
412392

413393
// Return the list of children of the specified path
414394
Status GetChildrenFromS3(const std::string& path,
415-
const std::string& bucket_prefix,
395+
const std::string& bucket,
416396
std::vector<std::string>* result);
417397

418398
// If metadata, size or modtime is non-nullptr, returns requested data
419-
Status HeadObject(const std::string& bucket_prefix, const std::string& path,
399+
Status HeadObject(const std::string& bucket, const std::string& path,
420400
Aws::Map<Aws::String, Aws::String>* metadata = nullptr,
421401
uint64_t* size = nullptr, uint64_t* modtime = nullptr);
422402

423-
Status NewS3ReadableFile(const std::string& bucket_prefix,
403+
Status NewS3ReadableFile(const std::string& bucket,
424404
const std::string& fname,
425405
unique_ptr<S3ReadableFile>* result);
426406

cloud/aws/aws_file.h

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,28 +41,10 @@
4141
#include <aws/s3/model/PutObjectResult.h>
4242
#include <aws/s3/model/ServerSideEncryption.h>
4343

44-
// A few local defintions
45-
namespace {
46-
47-
inline std::string GetBucket(const std::string& bucket_prefix) {
48-
return "rockset." + bucket_prefix;
49-
}
50-
51-
inline std::string GetStreamName(const std::string& bucket_prefix) {
52-
return "rockset." + bucket_prefix;
53-
}
54-
inline Aws::String GetAwsBucket(const std::string& bucket_prefix) {
55-
const std::string dd = GetBucket(bucket_prefix);
56-
return Aws::String(dd.c_str(), dd.size());
57-
}
58-
inline Aws::String GetAwsStreamName(const std::string& bucket_prefix) {
59-
const std::string dd = GetStreamName(bucket_prefix);
60-
return Aws::String(dd.c_str(), dd.size());
61-
}
62-
63-
} // namespace
64-
6544
namespace rocksdb {
45+
inline Aws::String ToAwsString(const std::string& s) {
46+
return Aws::String(s.data(), s.size());
47+
}
6648

6749
class S3ReadableFile : virtual public SequentialFile,
6850
virtual public RandomAccessFile {
@@ -161,7 +143,7 @@ class S3WritableFile : public WritableFile {
161143
return local_file_->Flush();
162144
}
163145

164-
virtual Status Sync() override ;
146+
virtual Status Sync() override;
165147

166148
virtual Status status() { return status_; }
167149

cloud/aws/aws_kafka.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ KafkaController::KafkaController(AwsEnv* env, std::shared_ptr<Logger> info_log,
149149
: CloudLogController(env, info_log),
150150
producer_(std::move(producer)),
151151
consumer_(std::move(consumer)) {
152-
const std::string topic_name = GetStreamName(env_->GetSrcBucketPrefix());
152+
const std::string topic_name = env_->GetSrcBucketName();
153153

154154
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
155155
"[%s] KafkaController opening stream %s using cachedir '%s'",

cloud/aws/aws_kinesis.cc

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ KinesisWritableFile::KinesisWritableFile(
2626

2727
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
2828
"[kinesis] WritableFile opened file %s", fname_.c_str());
29-
topic_ = GetAwsStreamName(env_->GetSrcBucketPrefix());
29+
std::string bucket = env_->GetSrcBucketName();
30+
topic_ = ToAwsString(bucket);
3031
}
3132

3233
KinesisWritableFile::~KinesisWritableFile() {}
@@ -37,7 +38,7 @@ Status KinesisWritableFile::Append(const Slice& data) {
3738
// create write request
3839
PutRecordRequest request;
3940
request.SetStreamName(topic_);
40-
request.SetPartitionKey(Aws::String(fname_.c_str(), fname_.size()));
41+
request.SetPartitionKey(ToAwsString(fname_));
4142

4243
// serialize write record
4344
std::string buffer;
@@ -72,7 +73,7 @@ Status KinesisWritableFile::Close() {
7273
// create write request
7374
PutRecordRequest request;
7475
request.SetStreamName(topic_);
75-
request.SetPartitionKey(Aws::String(fname_.c_str(), fname_.size()));
76+
request.SetPartitionKey(ToAwsString(fname_));
7677

7778
// serialize write record
7879
std::string buffer;
@@ -109,7 +110,7 @@ Status KinesisWritableFile::LogDelete() {
109110
// create write request
110111
PutRecordRequest request;
111112
request.SetStreamName(topic_);
112-
request.SetPartitionKey(Aws::String(fname_.c_str(), fname_.size()));
113+
request.SetPartitionKey(ToAwsString(fname_));
113114

114115
// serialize write record
115116
std::string buffer;
@@ -140,7 +141,8 @@ KinesisController::KinesisController(
140141
kinesis_client_(std::move(kinesis_client)) {
141142

142143
// Initialize stream name.
143-
topic_ = GetAwsStreamName(env_->GetSrcBucketPrefix());
144+
std::string bucket = env_->GetSrcBucketName();
145+
topic_ = ToAwsString(bucket);
144146

145147
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
146148
"[%s] KinesisController opening stream %s using cachedir '%s'",
@@ -236,8 +238,8 @@ Status KinesisController::TailStream() {
236238
return status_;
237239
}
238240

239-
Status KinesisController::CreateStream(const std::string& bucket_prefix) {
240-
Aws::String topic = GetAwsStreamName(bucket_prefix);
241+
Status KinesisController::CreateStream(const std::string& bucket) {
242+
Aws::String topic = ToAwsString(bucket);
241243

242244
// create stream
243245
CreateStreamRequest create_request;
@@ -254,13 +256,13 @@ Status KinesisController::CreateStream(const std::string& bucket_prefix) {
254256
}
255257
}
256258
if (st.ok()) {
257-
st = WaitForStreamReady(bucket_prefix);
259+
st = WaitForStreamReady(bucket);
258260
}
259261
return st;
260262
}
261263

262-
Status KinesisController::WaitForStreamReady(const std::string& bucket_prefix) {
263-
Aws::String topic = GetAwsStreamName(bucket_prefix);
264+
Status KinesisController::WaitForStreamReady(const std::string& bucket) {
265+
Aws::String topic = ToAwsString(bucket);
264266

265267
// Keep looping if the stream is being initialized
266268
const std::chrono::microseconds start(env_->NowMicros());
@@ -303,7 +305,7 @@ Status KinesisController::WaitForStreamReady(const std::string& bucket_prefix) {
303305
Status KinesisController::InitializeShards() {
304306
// Keep looking for about 10 seconds, in case the stream was newly created
305307
// and is being initialized.
306-
Status st = WaitForStreamReady(env_->GetSrcBucketPrefix());
308+
Status st = WaitForStreamReady(env_->GetSrcBucketName());
307309
if (!st.ok()) {
308310
return st;
309311
}

cloud/aws/aws_kinesis.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ class KinesisController : public CloudLogController {
7878

7979
virtual const std::string GetTypeName() { return "kinesis"; }
8080

81-
virtual Status CreateStream(const std::string& bucket_prefix);
82-
virtual Status WaitForStreamReady(const std::string& bucket_prefix);
81+
virtual Status CreateStream(const std::string& bucket);
82+
virtual Status WaitForStreamReady(const std::string& bucket);
8383
virtual Status TailStream();
8484

8585
virtual CloudLogWritableFile* CreateWritableFile(const std::string& fname,

cloud/aws/aws_log.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ CloudLogController::CloudLogController(
3535
const std::string uid = trim(env_->GetPosixEnv()->GenerateUniqueId());
3636

3737
// Temporary directory for cache.
38-
const std::string bucket_dir = kCacheDir + pathsep + env_->GetSrcBucketPrefix();
38+
const std::string bucket_dir = kCacheDir + pathsep + env_->GetSrcBucketName();
3939
cache_dir_ = bucket_dir + pathsep + uid;
4040

4141
// Create temporary directories.

cloud/aws/aws_s3.cc

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ namespace rocksdb {
2727

2828
/******************** Readablefile ******************/
2929

30-
S3ReadableFile::S3ReadableFile(AwsEnv* env, const std::string& bucket_prefix,
30+
S3ReadableFile::S3ReadableFile(AwsEnv* env, const std::string& bucket,
3131
const std::string& fname, uint64_t file_size)
3232
: env_(env), fname_(fname), offset_(0), file_size_(file_size) {
3333
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
3434
"[s3] S3ReadableFile opening file %s", fname_.c_str());
35-
s3_bucket_ = GetAwsBucket(bucket_prefix);
36-
s3_object_ = Aws::String(fname_.c_str(), fname_.size());
35+
s3_bucket_ = ToAwsString(bucket);
36+
s3_object_ = ToAwsString(fname_);
3737
}
3838

3939
// sequential access, read data at current offset in file
@@ -170,11 +170,10 @@ size_t S3ReadableFile::GetUniqueId(char* id, size_t max_size) const {
170170

171171
Status S3WritableFile::BucketExistsInS3(
172172
std::shared_ptr<AwsS3ClientWrapper> client,
173-
const std::string& bucket_prefix,
173+
const std::string& bucket,
174174
const Aws::S3::Model::BucketLocationConstraint& location) {
175-
Aws::String bucket = GetAwsBucket(bucket_prefix);
176175
Aws::S3::Model::HeadBucketRequest request;
177-
request.SetBucket(bucket);
176+
request.SetBucket(Aws::String(bucket.c_str(), bucket.size()));
178177
Aws::S3::Model::HeadBucketOutcome outcome = client->HeadBucket(request);
179178
return outcome.IsSuccess() ? Status::OK() : Status::NotFound();
180179
}
@@ -184,7 +183,7 @@ Status S3WritableFile::BucketExistsInS3(
184183
//
185184
Status S3WritableFile::CreateBucketInS3(
186185
std::shared_ptr<AwsS3ClientWrapper> client,
187-
const std::string& bucket_prefix,
186+
const std::string& bucket,
188187
const Aws::S3::Model::BucketLocationConstraint& location) {
189188
// specify region for the bucket
190189
Aws::S3::Model::CreateBucketConfiguration conf;
@@ -194,9 +193,8 @@ Status S3WritableFile::CreateBucketInS3(
194193
}
195194

196195
// create bucket
197-
Aws::String bucket = GetAwsBucket(bucket_prefix);
198196
Aws::S3::Model::CreateBucketRequest request;
199-
request.SetBucket(bucket);
197+
request.SetBucket(Aws::String(bucket.c_str(), bucket.size()));
200198
request.SetCreateBucketConfiguration(conf);
201199
Aws::S3::Model::CreateBucketOutcome outcome = client->CreateBucket(request);
202200
bool isSuccess = outcome.IsSuccess();

0 commit comments

Comments
 (0)