Skip to content

Commit 61f8adc

Browse files
committed
Address PR Comments
1 parent 2372ff0 commit 61f8adc

File tree

8 files changed

+100
-84
lines changed

8 files changed

+100
-84
lines changed

cloud/aws/aws_env.cc

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <iostream>
1111
#include <memory>
1212

13+
#include "cloud/cloud_log_controller_impl.h"
1314
#include "rocksdb/cloud/cloud_log_controller.h"
1415
#include "rocksdb/env.h"
1516
#include "rocksdb/status.h"
@@ -516,10 +517,12 @@ AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
516517
// create cloud log client for storing/reading logs
517518
if (create_bucket_status_.ok() && !cloud_env_options.keep_local_log_files) {
518519
if (cloud_env_options.log_type == kLogKinesis) {
519-
create_bucket_status_ = CreateKinesisController(this, &cloud_env_options.cloud_log_controller);
520+
create_bucket_status_ = CloudLogControllerImpl::CreateKinesisController(
521+
this, &cloud_env_options.cloud_log_controller);
520522
} else if (cloud_env_options.log_type == kLogKafka) {
521523
#ifdef USE_KAFKA
522-
create_bucket_status_ = CreateKafkaController(this, &cloud_env_options.cloud_log_controller);
524+
create_bucket_status_ = CloudLogControllerImpl::CreateKafkaController(
525+
this, &cloud_env_options.cloud_log_controller);
523526
#else
524527
create_bucket_status_ = Status::NotSupported(
525528
"In order to use Kafka, make sure you're compiling with "
@@ -542,7 +545,8 @@ AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
542545
// Create Kinesis stream and wait for it to be ready
543546
if (create_bucket_status_.ok()) {
544547
create_bucket_status_ =
545-
cloud_env_options.cloud_log_controller->StartTailingStream(GetSrcBucketName());
548+
cloud_env_options.cloud_log_controller->StartTailingStream(
549+
GetSrcBucketName());
546550
if (!create_bucket_status_.ok()) {
547551
Log(InfoLogLevel::ERROR_LEVEL, info_log,
548552
"[aws] NewAwsEnv Unable to create stream %s",
@@ -656,7 +660,8 @@ Status AwsEnv::NewSequentialFile(const std::string& logical_fname,
656660
return st;
657661

658662
} else if (logfile && !cloud_env_options.keep_local_log_files) {
659-
return cloud_env_options.cloud_log_controller->NewSequentialFile(fname, result, options);
663+
return cloud_env_options.cloud_log_controller->NewSequentialFile(
664+
fname, result, options);
660665
}
661666

662667
// This is neither a sst file or a log file. Read from default env.
@@ -757,8 +762,8 @@ Status AwsEnv::NewRandomAccessFile(const std::string& logical_fname,
757762

758763
} else if (logfile && !cloud_env_options.keep_local_log_files) {
759764
// read from Kinesis
760-
st = cloud_env_options.cloud_log_controller->NewRandomAccessFile(fname, result, options);
761-
return st;
765+
return cloud_env_options.cloud_log_controller->NewRandomAccessFile(
766+
fname, result, options);
762767
}
763768

764769
// This is neither a sst file or a log file. Read from default env.
@@ -795,7 +800,8 @@ Status AwsEnv::NewWritableFile(const std::string& logical_fname,
795800
result->reset(dynamic_cast<WritableFile*>(f.release()));
796801
} else if (logfile && !cloud_env_options.keep_local_log_files) {
797802
std::unique_ptr<CloudLogWritableFile> f(
798-
cloud_env_options.cloud_log_controller->CreateWritableFile(fname, options));
803+
cloud_env_options.cloud_log_controller->CreateWritableFile(fname,
804+
options));
799805
if (!f || !f->status().ok()) {
800806
s = Status::IOError("[aws] NewWritableFile", fname.c_str());
801807
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
@@ -1194,7 +1200,8 @@ Status AwsEnv::DeleteFile(const std::string& logical_fname) {
11941200
if (st.ok()) {
11951201
// Log a Delete record to kinesis stream
11961202
std::unique_ptr<CloudLogWritableFile> f(
1197-
cloud_env_options.cloud_log_controller->CreateWritableFile(fname, EnvOptions()));
1203+
cloud_env_options.cloud_log_controller->CreateWritableFile(
1204+
fname, EnvOptions()));
11981205
if (!f || !f->status().ok()) {
11991206
st = Status::IOError("[Kinesis] DeleteFile", fname.c_str());
12001207
} else {
@@ -1394,7 +1401,8 @@ Status AwsEnv::GetFileModificationTime(const std::string& logical_fname,
13941401
}
13951402
}
13961403
} else if (logfile && !cloud_env_options.keep_local_log_files) {
1397-
st = cloud_env_options.cloud_log_controller->GetFileModificationTime(fname, time);
1404+
st = cloud_env_options.cloud_log_controller->GetFileModificationTime(fname,
1405+
time);
13981406
} else {
13991407
st = base_env_->GetFileModificationTime(fname, time);
14001408
}

cloud/aws/aws_kafka.cc

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ Status KafkaWritableFile::ProduceRaw(const std::string& operation_name,
9999

100100
Status KafkaWritableFile::Append(const Slice& data) {
101101
std::string serialized_data;
102-
CloudLogControllerImpl::SerializeLogRecordAppend(fname_, data, current_offset_,
103-
&serialized_data);
102+
CloudLogControllerImpl::SerializeLogRecordAppend(
103+
fname_, data, current_offset_, &serialized_data);
104104

105105
return ProduceRaw("Append", serialized_data);
106106
}
@@ -175,13 +175,11 @@ Status KafkaWritableFile::LogDelete() {
175175
//
176176
class KafkaController : public CloudLogControllerImpl {
177177
public:
178-
179-
KafkaController(CloudEnv* env,
180-
std::unique_ptr<RdKafka::Producer> producer,
178+
KafkaController(CloudEnv* env, std::unique_ptr<RdKafka::Producer> producer,
181179
std::unique_ptr<RdKafka::Consumer> consumer)
182-
: CloudLogControllerImpl(env),
183-
producer_(std::move(producer)),
184-
consumer_(std::move(consumer)) {
180+
: CloudLogControllerImpl(env),
181+
producer_(std::move(producer)),
182+
consumer_(std::move(consumer)) {
185183
const std::string topic_name = env_->GetSrcBucketName();
186184

187185
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
@@ -394,14 +392,14 @@ CloudLogWritableFile* KafkaController::CreateWritableFile(
394392

395393
namespace rocksdb {
396394
#ifndef USE_KAFKA
397-
Status CreateKafkaController(CloudEnv *,
398-
std::shared_ptr<CloudLogController> *) {
395+
Status CloudLogControllerImpl::CreateKafkaController(
396+
CloudEnv*, std::shared_ptr<CloudLogController>*) {
399397
return Status::NotSupported("In order to use Kafka, make sure you're compiling with "
400398
"USE_KAFKA=1");
401399
}
402400
#else
403-
Status CreateKafkaController(CloudEnv *env,
404-
std::shared_ptr<CloudLogController> *output) {
401+
Status CloudLogControllerImpl::CreateKafkaController(
402+
CloudEnv* env, std::shared_ptr<CloudLogController>* output) {
405403
Status st = Status::OK();
406404
std::string conf_errstr, producer_errstr, consumer_errstr;
407405
const auto& kconf = env->GetCloudEnvOptions().kafka_log_options.client_config_params;

cloud/aws/aws_kinesis.cc

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ Status KinesisWritableFile::Append(const Slice& data) {
7474

7575
// serialize write record
7676
std::string buffer;
77-
CloudLogControllerImpl::SerializeLogRecordAppend(fname_, data, current_offset_,
78-
&buffer);
77+
CloudLogControllerImpl::SerializeLogRecordAppend(fname_, data,
78+
current_offset_, &buffer);
7979
request.SetData(Aws::Utils::ByteBuffer((const unsigned char*)buffer.c_str(),
8080
buffer.size()));
8181

@@ -109,7 +109,8 @@ Status KinesisWritableFile::Close() {
109109

110110
// serialize write record
111111
std::string buffer;
112-
CloudLogControllerImpl::SerializeLogRecordClosed(fname_, current_offset_, &buffer);
112+
CloudLogControllerImpl::SerializeLogRecordClosed(fname_, current_offset_,
113+
&buffer);
113114
request.SetData(Aws::Utils::ByteBuffer((const unsigned char*)buffer.c_str(),
114115
buffer.size()));
115116

@@ -175,10 +176,11 @@ Status KinesisWritableFile::LogDelete() {
175176
//
176177
class KinesisController : public CloudLogControllerImpl {
177178
public:
178-
KinesisController(CloudEnv* env,
179-
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & provider,
180-
const Aws::Client::ClientConfiguration & config)
181-
: CloudLogControllerImpl(env) {
179+
KinesisController(
180+
CloudEnv* env,
181+
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider>& provider,
182+
const Aws::Client::ClientConfiguration& config)
183+
: CloudLogControllerImpl(env) {
182184
kinesis_client_.reset(provider
183185
? new Aws::Kinesis::KinesisClient(provider, config)
184186
: new Aws::Kinesis::KinesisClient(config));
@@ -452,14 +454,14 @@ CloudLogWritableFile* KinesisController::CreateWritableFile(
452454

453455
namespace rocksdb {
454456
#ifndef USE_AWS
455-
Status CreateKinesisController(CloudEnv *,
456-
std::shared_ptr<CloudLogController> *) {
457+
Status CloudLogControllerImpl::CreateKinesisController(
458+
CloudEnv*, std::shared_ptr<CloudLogController>*) {
457459
return Status::NotSupported("In order to use Kinesis, make sure you're compiling with "
458460
"USE_AWS=1");
459461
}
460462
#else
461-
Status CreateKinesisController(CloudEnv *env,
462-
std::shared_ptr<CloudLogController> *output) {
463+
Status CloudLogControllerImpl::CreateKinesisController(
464+
CloudEnv *env, std::shared_ptr<CloudLogController> *output) {
463465
Aws::Client::ClientConfiguration config;
464466
const auto & options = env->GetCloudEnvOptions();
465467
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> provider;

cloud/cloud_env_impl.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
// Copyright (c) 2017 Rockset.
22
#ifndef ROCKSDB_LITE
33

4+
#include "cloud/cloud_env_impl.h"
5+
46
#include <cinttypes>
57

6-
#include "cloud/cloud_env_impl.h"
78
#include "cloud/cloud_env_wrapper.h"
89
#include "cloud/filename.h"
910
#include "cloud/manifest_reader.h"
10-
#include "file/filename.h"
1111
#include "file/file_util.h"
12+
#include "file/filename.h"
1213
#include "port/likely.h"
1314
#include "rocksdb/cloud/cloud_log_controller.h"
1415
#include "rocksdb/db.h"

cloud/cloud_log_controller.cc

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,12 @@ CloudLogWritableFile::CloudLogWritableFile(
2626
CloudLogWritableFile::~CloudLogWritableFile() {}
2727

2828
const std::chrono::microseconds CloudLogControllerImpl::kRetryPeriod =
29-
std::chrono::seconds(30);
29+
std::chrono::seconds(30);
3030

31-
CloudLogController::~CloudLogController() {
32-
}
31+
CloudLogController::~CloudLogController() {}
3332

3433
CloudLogControllerImpl::CloudLogControllerImpl(CloudEnv* env)
35-
: env_(env), running_(false) {
36-
34+
: env_(env), running_(false) {
3735
// Create a random number for the cache directory.
3836
const std::string uid = trim(env_->GetBaseEnv()->GenerateUniqueId());
3937

@@ -63,15 +61,15 @@ CloudLogControllerImpl::~CloudLogControllerImpl() {
6361
"[%s] CloudLogController closed.", Name());
6462
}
6563

66-
std::string CloudLogControllerImpl::GetCachePath(const Slice& original_pathname) const {
64+
std::string CloudLogControllerImpl::GetCachePath(
65+
const Slice& original_pathname) const {
6766
const std::string & cache_dir = GetCacheDir();
6867
return cache_dir + pathsep + basename(original_pathname.ToString());
6968
}
7069

71-
bool CloudLogControllerImpl::ExtractLogRecord(const Slice& input, uint32_t* operation, Slice* filename,
72-
uint64_t* offset_in_file,
73-
uint64_t* file_size, Slice* data) {
74-
70+
bool CloudLogControllerImpl::ExtractLogRecord(
71+
const Slice& input, uint32_t* operation, Slice* filename,
72+
uint64_t* offset_in_file, uint64_t* file_size, Slice* data) {
7573
Slice in = input;
7674
if (in.size() < 1) {
7775
return false;
@@ -206,7 +204,9 @@ Status CloudLogControllerImpl::Apply(const Slice& in) {
206204
}
207205

208206
void CloudLogControllerImpl::SerializeLogRecordAppend(const Slice& filename,
209-
const Slice& data, uint64_t offset, std::string* out) {
207+
const Slice& data,
208+
uint64_t offset,
209+
std::string* out) {
210210
// write the operation type
211211
PutVarint32(out, kAppend);
212212

@@ -220,8 +220,9 @@ void CloudLogControllerImpl::SerializeLogRecordAppend(const Slice& filename,
220220
PutLengthPrefixedSlice(out, data);
221221
}
222222

223-
void CloudLogControllerImpl::SerializeLogRecordClosed(
224-
const Slice& filename, uint64_t file_size, std::string* out) {
223+
void CloudLogControllerImpl::SerializeLogRecordClosed(const Slice& filename,
224+
uint64_t file_size,
225+
std::string* out) {
225226
// write the operation type
226227
PutVarint32(out, kClosed);
227228

@@ -288,7 +289,7 @@ Status CloudLogControllerImpl::Retry(RetryType func) {
288289
}
289290
return stat;
290291
}
291-
292+
292293
Status CloudLogControllerImpl::GetFileModificationTime(const std::string& fname,
293294
uint64_t* time) {
294295
Status st = status();
@@ -298,18 +299,18 @@ Status CloudLogControllerImpl::GetFileModificationTime(const std::string& fname,
298299
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
299300
"[kinesis] GetFileModificationTime logfile %s %s", pathname.c_str(),
300301
"ok");
301-
302+
302303
auto lambda = [this, pathname, time]() -> Status {
303-
return env_->GetBaseEnv()->GetFileModificationTime(pathname, time);
304+
return env_->GetBaseEnv()->GetFileModificationTime(pathname, time);
304305
};
305306
st = Retry(lambda);
306307
}
307308
return st;
308309
}
309-
310-
Status CloudLogControllerImpl::NewSequentialFile(const std::string& fname,
311-
std::unique_ptr<SequentialFile>* result,
312-
const EnvOptions& options) {
310+
311+
Status CloudLogControllerImpl::NewSequentialFile(
312+
const std::string& fname, std::unique_ptr<SequentialFile>* result,
313+
const EnvOptions& options) {
313314
// read from Kinesis
314315
Status st = status();
315316
if (st.ok()) {
@@ -319,39 +320,39 @@ Status CloudLogControllerImpl::NewSequentialFile(const std::string& fname,
319320
"[%s] NewSequentialFile logfile %s %s", Name(), pathname.c_str(), "ok");
320321

321322
auto lambda = [this, pathname, &result, options]() -> Status {
322-
return env_->GetBaseEnv()->NewSequentialFile(pathname, result, options);
323+
return env_->GetBaseEnv()->NewSequentialFile(pathname, result, options);
323324
};
324325
st = Retry(lambda);
325326
}
326327
return st;
327328
}
328329

329-
Status CloudLogControllerImpl::NewRandomAccessFile(const std::string& fname,
330-
std::unique_ptr<RandomAccessFile>* result,
331-
const EnvOptions& options) {
330+
Status CloudLogControllerImpl::NewRandomAccessFile(
331+
const std::string& fname, std::unique_ptr<RandomAccessFile>* result,
332+
const EnvOptions& options) {
332333
Status st = status();
333334
if (st.ok()) {
334335
// map pathname to cache dir
335336
std::string pathname = GetCachePath(Slice(fname));
336337
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
337338
"[%s] NewRandomAccessFile logfile %s %s", Name(), pathname.c_str(),
338339
"ok");
339-
340+
340341
auto lambda = [this, pathname, &result, options]() -> Status {
341-
return env_->GetBaseEnv()->NewRandomAccessFile(pathname, result, options);
342-
};
342+
return env_->GetBaseEnv()->NewRandomAccessFile(pathname, result, options);
343+
};
343344
st = Retry(lambda);
344345
}
345346
return st;
346347
}
347-
348+
348349
Status CloudLogControllerImpl::FileExists(const std::string& fname) {
349350
Status st = status();
350351
if (st.ok()) {
351352
// map pathname to cache dir
352353
std::string pathname = GetCachePath(Slice(fname));
353354
Log(InfoLogLevel::DEBUG_LEVEL, env_->info_log_,
354-
"[%s] FileExists logfile %s %s", Name(), pathname.c_str(), "ok");
355+
"[%s] FileExists logfile %s %s", Name(), pathname.c_str(), "ok");
355356

356357
auto lambda = [this, pathname]() -> Status {
357358
return env_->GetBaseEnv()->FileExists(pathname);
@@ -360,8 +361,9 @@ Status CloudLogControllerImpl::FileExists(const std::string& fname) {
360361
}
361362
return st;
362363
}
363-
364-
Status CloudLogControllerImpl::GetFileSize(const std::string& fname, uint64_t* size) {
364+
365+
Status CloudLogControllerImpl::GetFileSize(const std::string& fname,
366+
uint64_t* size) {
365367
Status st = status();
366368
if (st.ok()) {
367369
// map pathname to cache dir
@@ -370,7 +372,7 @@ Status CloudLogControllerImpl::GetFileSize(const std::string& fname, uint64_t* s
370372
"[%s] GetFileSize logfile %s %s", Name(), pathname.c_str(), "ok");
371373

372374
auto lambda = [this, pathname, size]() -> Status {
373-
return env_->GetBaseEnv()->GetFileSize(pathname, size);
375+
return env_->GetBaseEnv()->GetFileSize(pathname, size);
374376
};
375377
st = Retry(lambda);
376378
}

0 commit comments

Comments
 (0)