Skip to content

Commit 216123e

Browse files
committed
Change BucketPrefix methods to BucketName
The XXXGetBucketPrefix methods were changed to XXXGetBucketName. Whereas the old methods would return a name and the "GetBucketName" methods (of aws_file) would prepend "rockset." to the name, the new methods return the full name. Additionally, environment variables were added/changed to set the PREFIX (to use a value instead of "rockset."). The impetus for this change was because we cannot specify the name of the buckets in our environment to be prefixed by "rockset.". Because the GetBucketName methods were not part of a class, it was hard to override the settings. Additionally, this change will make it easier for other cloud environments to use the same methodology in the future.
1 parent 3eb3616 commit 216123e

File tree

16 files changed

+530
-500
lines changed

16 files changed

+530
-500
lines changed

cloud/aws/aws_env.cc

Lines changed: 156 additions & 200 deletions
Large diffs are not rendered by default.

cloud/aws/aws_env.h

Lines changed: 36 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,9 @@ struct JobHandle;
9797
class AwsEnv : public CloudEnvImpl {
9898
public:
9999
// A factory method for creating S3 envs
100-
static Status NewAwsEnv(Env* env, const std::string& src_cloud_storage,
101-
const std::string& src_cloud_object_prefix,
102-
const std::string& src_cloud_region,
103-
const std::string& dest_cloud_storage,
104-
const std::string& dest_cloud_object_prefix,
105-
const std::string& dest_cloud_region,
100+
static Status NewAwsEnv(Env* env,
106101
const CloudEnvOptions& env_options,
107-
std::shared_ptr<Logger> info_log, CloudEnv** cenv);
102+
const std::shared_ptr<Logger> & info_log, CloudEnv** cenv);
108103

109104
virtual ~AwsEnv();
110105

@@ -123,7 +118,7 @@ class AwsEnv : public CloudEnvImpl {
123118
std::unique_ptr<SequentialFile>* result,
124119
const EnvOptions& options) override;
125120

126-
virtual Status NewSequentialFileCloud(const std::string& bucket_prefix,
121+
virtual Status NewSequentialFileCloud(const std::string& bucket,
127122
const std::string& fname,
128123
std::unique_ptr<SequentialFile>* result,
129124
const EnvOptions& options) override;
@@ -238,27 +233,28 @@ class AwsEnv : public CloudEnvImpl {
238233

239234
virtual uint64_t GetThreadID() const override { return AwsEnv::gettid(); }
240235

241-
virtual Status EmptyBucket(const std::string& bucket_prefix,
242-
const std::string& path_prefix) override;
236+
virtual Status EmptyBucket(const std::string& bucket,
237+
const std::string& path) override;
243238

244239
// get the posix env
245240
Env* GetPosixEnv() const { return base_env_; }
246241

247242
bool IsRunning() const { return running_; }
248243

249-
const std::string& GetSrcBucketPrefix() override {
250-
return src_bucket_prefix_;
244+
const std::string& GetSrcBucketName() const override {
245+
return cloud_env_options.src_bucket.GetBucketName();
251246
}
252-
const std::string& GetSrcObjectPrefix() override {
253-
return src_object_prefix_;
247+
const std::string& GetSrcObjectPath() const override {
248+
return cloud_env_options.src_bucket.GetObjectPath();
254249
}
255-
const std::string& GetDestBucketPrefix() override {
256-
return dest_bucket_prefix_;
250+
251+
const std::string& GetDestBucketName() const override {
252+
return cloud_env_options.dest_bucket.GetBucketName();
257253
}
258-
const std::string& GetDestObjectPrefix() override {
259-
return dest_object_prefix_;
254+
const std::string& GetDestObjectPath() const override {
255+
return cloud_env_options.dest_bucket.GetObjectPath();
260256
}
261-
257+
262258
const CloudEnvOptions& GetCloudEnvOptions() override {
263259
return cloud_env_options;
264260
}
@@ -274,48 +270,44 @@ class AwsEnv : public CloudEnvImpl {
274270
std::shared_ptr<Aws::Transfer::TransferManager> awsTransferManager_;
275271

276272
// Configurations for this cloud environent
277-
const CloudEnvOptions cloud_env_options;
273+
CloudEnvOptions cloud_env_options;
278274

279275
//
280276
// Get credentials for running unit tests
281277
//
282278
static Status GetTestCredentials(std::string* aws_access_key_id,
283-
std::string* aws_secret_access_key,
284-
std::string* region);
285-
286-
// Create a specific bucketname suffix so that all unit tests can
287-
// use a single bucket.
288-
static std::string GetTestBucketSuffix();
279+
std::string* aws_secret_access_key,
280+
std::string* region);
289281

290282
Status StartTailingStream();
291283

292284
// Saves and retrieves the dbid->dirname mapping in S3
293285
Status SaveDbid(const std::string& dbid, const std::string& dirname) override;
294-
Status GetPathForDbid(const std::string& bucket_prefix,
286+
Status GetPathForDbid(const std::string& bucket,
295287
const std::string& dbid, std::string* dirname) override;
296-
Status GetDbidList(const std::string& bucket_prefix,
288+
Status GetDbidList(const std::string& bucket,
297289
DbidList* dblist) override;
298-
Status DeleteDbid(const std::string& bucket_prefix,
290+
Status DeleteDbid(const std::string& bucket,
299291
const std::string& dbid) override;
300-
Status ListObjects(const std::string& bucket_name_prefix,
301-
const std::string& bucket_object_prefix,
292+
Status ListObjects(const std::string& bucket_name,
293+
const std::string& bucket_object,
302294
BucketObjectMetadata* meta) override;
303-
Status DeleteObject(const std::string& bucket_name_prefix,
295+
Status DeleteObject(const std::string& bucket_name,
304296
const std::string& bucket_object_path) override;
305-
Status ExistsObject(const std::string& bucket_name_prefix,
297+
Status ExistsObject(const std::string& bucket_name,
306298
const std::string& bucket_object_path) override;
307-
Status GetObjectSize(const std::string& bucket_name_prefix,
299+
Status GetObjectSize(const std::string& bucket_name,
308300
const std::string& bucket_object_path,
309301
uint64_t* filesize) override;
310-
Status CopyObject(const std::string& bucket_name_prefix_src,
302+
Status CopyObject(const std::string& bucket_name_src,
311303
const std::string& bucket_object_path_src,
312-
const std::string& bucket_name_prefix_dest,
304+
const std::string& bucket_name_dest,
313305
const std::string& bucket_object_path_dest) override;
314-
Status GetObject(const std::string& bucket_name_prefix,
306+
Status GetObject(const std::string& bucket_name,
315307
const std::string& bucket_object_path,
316308
const std::string& local_path) override;
317309
Status PutObject(const std::string& local_path,
318-
const std::string& bucket_name_prefix,
310+
const std::string& bucket_name,
319311
const std::string& bucket_object_path) override;
320312
Status DeleteCloudFileFromDest(const std::string& fname) override;
321313

@@ -331,14 +323,9 @@ class AwsEnv : public CloudEnvImpl {
331323
// The AWS credentials are specified to the constructor via
332324
// access_key_id and secret_key.
333325
//
334-
explicit AwsEnv(Env* underlying_env, const std::string& src_bucket_prefix,
335-
const std::string& src_object_prefix,
336-
const std::string& src_bucket_region,
337-
const std::string& dest_bucket_prefix,
338-
const std::string& dest_object_prefix,
339-
const std::string& dest_bucket_region,
326+
explicit AwsEnv(Env* underlying_env,
340327
const CloudEnvOptions& cloud_options,
341-
std::shared_ptr<Logger> info_log = nullptr);
328+
const std::shared_ptr<Logger> & info_log = nullptr);
342329

343330
struct GetObjectResult {
344331
bool success{false};
@@ -370,13 +357,6 @@ class AwsEnv : public CloudEnvImpl {
370357
// The pathname that contains a list of all db's inside a bucket.
371358
static constexpr const char* dbid_registry_ = "/.rockset/dbid/";
372359

373-
std::string src_bucket_prefix_;
374-
std::string src_object_prefix_;
375-
std::string src_bucket_region_;
376-
std::string dest_bucket_prefix_;
377-
std::string dest_object_prefix_;
378-
std::string dest_bucket_region_;
379-
380360
Status create_bucket_status_;
381361

382362
// Background thread to tail stream
@@ -404,23 +384,23 @@ class AwsEnv : public CloudEnvImpl {
404384
Status status();
405385

406386
// Delete the specified path from S3
407-
Status DeletePathInS3(const std::string& bucket_prefix,
387+
Status DeletePathInS3(const std::string& bucket,
408388
const std::string& fname);
409389

410390
// Validate options
411391
Status CheckOption(const EnvOptions& options);
412392

413393
// Return the list of children of the specified path
414394
Status GetChildrenFromS3(const std::string& path,
415-
const std::string& bucket_prefix,
395+
const std::string& bucket,
416396
std::vector<std::string>* result);
417397

418398
// If metadata, size or modtime is non-nullptr, returns requested data
419-
Status HeadObject(const std::string& bucket_prefix, const std::string& path,
399+
Status HeadObject(const std::string& bucket, const std::string& path,
420400
Aws::Map<Aws::String, Aws::String>* metadata = nullptr,
421401
uint64_t* size = nullptr, uint64_t* modtime = nullptr);
422402

423-
Status NewS3ReadableFile(const std::string& bucket_prefix,
403+
Status NewS3ReadableFile(const std::string& bucket,
424404
const std::string& fname,
425405
unique_ptr<S3ReadableFile>* result);
426406

cloud/aws/aws_file.h

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -41,28 +41,10 @@
4141
#include <aws/s3/model/PutObjectResult.h>
4242
#include <aws/s3/model/ServerSideEncryption.h>
4343

44-
// A few local defintions
45-
namespace {
46-
47-
inline std::string GetBucket(const std::string& bucket_prefix) {
48-
return "rockset." + bucket_prefix;
49-
}
50-
51-
inline std::string GetStreamName(const std::string& bucket_prefix) {
52-
return "rockset." + bucket_prefix;
53-
}
54-
inline Aws::String GetAwsBucket(const std::string& bucket_prefix) {
55-
const std::string dd = GetBucket(bucket_prefix);
56-
return Aws::String(dd.c_str(), dd.size());
57-
}
58-
inline Aws::String GetAwsStreamName(const std::string& bucket_prefix) {
59-
const std::string dd = GetStreamName(bucket_prefix);
60-
return Aws::String(dd.c_str(), dd.size());
61-
}
62-
63-
} // namespace
64-
6544
namespace rocksdb {
45+
inline Aws::String ToAwsString(const std::string& s) {
46+
return Aws::String(s.data(), s.size());
47+
}
6648

6749
class S3ReadableFile : virtual public SequentialFile,
6850
virtual public RandomAccessFile {
@@ -122,7 +104,7 @@ class S3WritableFile : public WritableFile {
122104

123105
virtual ~S3WritableFile();
124106

125-
virtual Status Append(const Slice& data) {
107+
virtual Status Append(const Slice& data) override {
126108
assert(status_.ok());
127109
// write to temporary file
128110
return local_file_->Append(data);
@@ -156,16 +138,16 @@ class S3WritableFile : public WritableFile {
156138
return local_file_->Allocate(offset, len);
157139
}
158140

159-
virtual Status Flush() {
141+
virtual Status Flush() override {
160142
assert(status_.ok());
161143
return local_file_->Flush();
162144
}
163145

164-
virtual Status Sync();
146+
virtual Status Sync() override;
165147

166148
virtual Status status() { return status_; }
167149

168-
virtual Status Close();
150+
virtual Status Close() override;
169151
};
170152

171153
} // namepace rocksdb

cloud/aws/aws_kafka.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ KafkaController::KafkaController(AwsEnv* env, std::shared_ptr<Logger> info_log,
149149
: CloudLogController(env, info_log),
150150
producer_(std::move(producer)),
151151
consumer_(std::move(consumer)) {
152-
const std::string topic_name = GetStreamName(env_->GetSrcBucketPrefix());
152+
const std::string topic_name = env_->GetSrcBucketName();
153153

154154
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
155155
"[%s] KafkaController opening stream %s using cachedir '%s'",

cloud/aws/aws_kinesis.cc

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ KinesisWritableFile::KinesisWritableFile(
2626

2727
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
2828
"[kinesis] WritableFile opened file %s", fname_.c_str());
29-
topic_ = GetAwsStreamName(env_->GetSrcBucketPrefix());
29+
std::string bucket = env_->GetSrcBucketName();
30+
topic_ = ToAwsString(bucket);
3031
}
3132

3233
KinesisWritableFile::~KinesisWritableFile() {}
@@ -37,7 +38,7 @@ Status KinesisWritableFile::Append(const Slice& data) {
3738
// create write request
3839
PutRecordRequest request;
3940
request.SetStreamName(topic_);
40-
request.SetPartitionKey(Aws::String(fname_.c_str(), fname_.size()));
41+
request.SetPartitionKey(ToAwsString(fname_));
4142

4243
// serialize write record
4344
std::string buffer;
@@ -72,7 +73,7 @@ Status KinesisWritableFile::Close() {
7273
// create write request
7374
PutRecordRequest request;
7475
request.SetStreamName(topic_);
75-
request.SetPartitionKey(Aws::String(fname_.c_str(), fname_.size()));
76+
request.SetPartitionKey(ToAwsString(fname_));
7677

7778
// serialize write record
7879
std::string buffer;
@@ -109,7 +110,7 @@ Status KinesisWritableFile::LogDelete() {
109110
// create write request
110111
PutRecordRequest request;
111112
request.SetStreamName(topic_);
112-
request.SetPartitionKey(Aws::String(fname_.c_str(), fname_.size()));
113+
request.SetPartitionKey(ToAwsString(fname_));
113114

114115
// serialize write record
115116
std::string buffer;
@@ -140,7 +141,8 @@ KinesisController::KinesisController(
140141
kinesis_client_(std::move(kinesis_client)) {
141142

142143
// Initialize stream name.
143-
topic_ = GetAwsStreamName(env_->GetSrcBucketPrefix());
144+
std::string bucket = env_->GetSrcBucketName();
145+
topic_ = ToAwsString(bucket);
144146

145147
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
146148
"[%s] KinesisController opening stream %s using cachedir '%s'",
@@ -236,8 +238,8 @@ Status KinesisController::TailStream() {
236238
return status_;
237239
}
238240

239-
Status KinesisController::CreateStream(const std::string& bucket_prefix) {
240-
Aws::String topic = GetAwsStreamName(bucket_prefix);
241+
Status KinesisController::CreateStream(const std::string& bucket) {
242+
Aws::String topic = ToAwsString(bucket);
241243

242244
// create stream
243245
CreateStreamRequest create_request;
@@ -254,13 +256,13 @@ Status KinesisController::CreateStream(const std::string& bucket_prefix) {
254256
}
255257
}
256258
if (st.ok()) {
257-
st = WaitForStreamReady(bucket_prefix);
259+
st = WaitForStreamReady(bucket);
258260
}
259261
return st;
260262
}
261263

262-
Status KinesisController::WaitForStreamReady(const std::string& bucket_prefix) {
263-
Aws::String topic = GetAwsStreamName(bucket_prefix);
264+
Status KinesisController::WaitForStreamReady(const std::string& bucket) {
265+
Aws::String topic = ToAwsString(bucket);
264266

265267
// Keep looping if the stream is being initialized
266268
const std::chrono::microseconds start(env_->NowMicros());
@@ -303,7 +305,7 @@ Status KinesisController::WaitForStreamReady(const std::string& bucket_prefix) {
303305
Status KinesisController::InitializeShards() {
304306
// Keep looking for about 10 seconds, in case the stream was newly created
305307
// and is being initialized.
306-
Status st = WaitForStreamReady(env_->GetSrcBucketPrefix());
308+
Status st = WaitForStreamReady(env_->GetSrcBucketName());
307309
if (!st.ok()) {
308310
return st;
309311
}

cloud/aws/aws_kinesis.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ class KinesisController : public CloudLogController {
7878

7979
virtual const std::string GetTypeName() { return "kinesis"; }
8080

81-
virtual Status CreateStream(const std::string& bucket_prefix);
82-
virtual Status WaitForStreamReady(const std::string& bucket_prefix);
81+
virtual Status CreateStream(const std::string& bucket);
82+
virtual Status WaitForStreamReady(const std::string& bucket);
8383
virtual Status TailStream();
8484

8585
virtual CloudLogWritableFile* CreateWritableFile(const std::string& fname,

cloud/aws/aws_log.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ CloudLogController::CloudLogController(
3535
const std::string uid = trim(env_->GetPosixEnv()->GenerateUniqueId());
3636

3737
// Temporary directory for cache.
38-
const std::string bucket_dir = kCacheDir + pathsep + env_->GetSrcBucketPrefix();
38+
const std::string bucket_dir = kCacheDir + pathsep + env_->GetSrcBucketName();
3939
cache_dir_ = bucket_dir + pathsep + uid;
4040

4141
// Create temporary directories.

0 commit comments

Comments
 (0)